diff --git a/Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx b/Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx index 5ff5ea26030c0..636cbcb8c9520 100644 --- a/Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx +++ b/Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx @@ -14,15 +14,33 @@ #include "ITStracking/TrackingConfigParam.h" #include "ITStracking/Configuration.h" #include "DetectorsRaw/HBFUtilsInitializer.h" +#include "Framework/CallbacksPolicy.h" +#include "Framework/CallbackService.h" +#include "Framework/ConfigContext.h" #include "GPUO2Interface.h" #include "GPUReconstruction.h" #include "GPUChainITS.h" +#include using namespace o2::framework; // ------------------------------------------------------------------ +void customize(std::vector& policies) +{ + policies.push_back(CallbacksPolicy{ + [](DeviceSpec const& spec, ConfigContext const& context) -> bool { + // auto filename = context.options().get("filename"); + return false; + }, + [](CallbackService& service, InitContext& context) { + // std::vector enums = readEnumerations(context.options().get("filename")); + service.set(CallbackService::Id::NewTimeslice, [/*enums*/](o2::header::DataHeader& dh) { + }); + }}); +} + // we need to add workflow options before including Framework/runDataProcessing void customize(std::vector& workflowOptions) { diff --git a/Framework/Core/include/Framework/CallbacksPolicy.h b/Framework/Core/include/Framework/CallbacksPolicy.h index 0d8186f45b974..9a0960b1cb4b5 100644 --- a/Framework/Core/include/Framework/CallbacksPolicy.h +++ b/Framework/Core/include/Framework/CallbacksPolicy.h @@ -20,9 +20,10 @@ namespace o2::framework { struct CallbackService; +struct InitContext; struct CallbacksPolicy { - using CallbacksCustomization = std::function; + using CallbacksCustomization = std::function; DeviceMatcher matcher = nullptr; CallbacksCustomization policy = nullptr; static std::vector createDefaultPolicies(); diff --git a/Framework/Core/include/Framework/DataProcessorMatchers.h b/Framework/Core/include/Framework/DataProcessorMatchers.h index 0f0da56b00256..3a55776c8e754 100644 --- a/Framework/Core/include/Framework/DataProcessorMatchers.h +++ b/Framework/Core/include/Framework/DataProcessorMatchers.h @@ -17,9 +17,10 @@ namespace o2::framework struct DataProcessorSpec; struct DeviceSpec; +struct ConfigContext; using DataProcessorMatcher = std::function; -using DeviceMatcher = std::function; +using DeviceMatcher = std::function; /// A set of helper to build policies that need to /// be applied based on some DataProcessorSpec property diff --git a/Framework/Core/src/CallbacksPolicy.cxx b/Framework/Core/src/CallbacksPolicy.cxx index f098d147240f1..b675878560a01 100644 --- a/Framework/Core/src/CallbacksPolicy.cxx +++ b/Framework/Core/src/CallbacksPolicy.cxx @@ -25,10 +25,10 @@ namespace o2::framework CallbacksPolicy epnProcessReporting() { return { - .matcher = [](DeviceSpec const&) -> bool { + .matcher = [](DeviceSpec const&, ConfigContext const& context) -> bool { /// FIXME: return getenv("DDS_SESSION_ID") != nullptr; }, - .policy = [](CallbackService& callbacks) -> void { + .policy = [](CallbackService& callbacks, InitContext& context) -> void { callbacks.set(CallbackService::Id::PreProcessing, [](ServiceRegistry& registry, int op) { auto& info = registry.get(); LOGP(info, "Processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}", diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e896f3554b693..9a1bd811f64fa 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -307,7 +307,8 @@ void DataProcessingDevice::Init() // Invoke the callback policy for this device. if (mSpec.callbacksPolicy.policy != nullptr) { - mSpec.callbacksPolicy.policy(mServiceRegistry.get()); + InitContext initContext{*mConfigRegistry, mServiceRegistry}; + mSpec.callbacksPolicy.policy(mServiceRegistry.get(), initContext); } } diff --git a/Framework/Core/src/DataProcessorMatchers.cxx b/Framework/Core/src/DataProcessorMatchers.cxx index 08108593d0f89..491978fb5aad9 100644 --- a/Framework/Core/src/DataProcessorMatchers.cxx +++ b/Framework/Core/src/DataProcessorMatchers.cxx @@ -24,7 +24,7 @@ DataProcessorMatcher DataProcessorMatchers::matchByName(char const* name_) DeviceMatcher DeviceMatchers::matchByName(char const* name_) { - return [name = std::string(name_)](DeviceSpec const& spec) { + return [name = std::string(name_)](DeviceSpec const& spec, ConfigContext const&) { return spec.name == name; }; } diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 8766274c56035..9e6114a524204 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -826,6 +826,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf std::vector& devices, ResourceManager& resourceManager, std::string const& uniqueWorkflowId, + ConfigContext const& configContext, bool optimizeTopology, unsigned short resourcesMonitoringInterval, std::string const& channelPrefix) @@ -902,7 +903,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf } } for (auto& policy : callbacksPolicies) { - if (policy.matcher(device) == true) { + if (policy.matcher(device, configContext) == true) { device.callbacksPolicy = policy; break; } diff --git a/Framework/Core/src/DeviceSpecHelpers.h b/Framework/Core/src/DeviceSpecHelpers.h index caed1963ff5bc..a6781728cf92e 100644 --- a/Framework/Core/src/DeviceSpecHelpers.h +++ b/Framework/Core/src/DeviceSpecHelpers.h @@ -40,6 +40,7 @@ namespace o2::framework { struct InputChannelSpec; struct OutputChannelSpec; +struct ConfigContext; struct DeviceSpecHelpers { /// Helper to convert from an abstract dataflow specification, @a workflow, @@ -54,6 +55,7 @@ struct DeviceSpecHelpers { std::vector& devices, ResourceManager& resourceManager, std::string const& uniqueWorkflowId, + ConfigContext const& configContext, bool optimizeTopology = false, unsigned short resourcesMonitoringInterval = 0, std::string const& channelPrefix = ""); @@ -66,6 +68,7 @@ struct DeviceSpecHelpers { std::vector& devices, ResourceManager& resourceManager, std::string const& uniqueWorkflowId, + ConfigContext const& configContext, bool optimizeTopology = false, unsigned short resourcesMonitoringInterval = 0, std::string const& channelPrefix = "") @@ -74,7 +77,7 @@ struct DeviceSpecHelpers { std::vector resourcePolicies = ResourcePolicy::createDefaultPolicies(); dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, dispatchPolicies, resourcePolicies, callbacksPolicies, devices, - resourceManager, uniqueWorkflowId, optimizeTopology, + resourceManager, uniqueWorkflowId, configContext, optimizeTopology, resourcesMonitoringInterval, channelPrefix); } diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 342895e589ba1..935518d9f3c73 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1496,6 +1496,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, runningWorkflow.devices, *resourceManager, driverInfo.uniqueWorkflowId, + *driverInfo.configContext, !varmap["no-IPC"].as(), driverInfo.resourcesMonitoringInterval, varmap["channel-prefix"].as()); diff --git a/Framework/Core/test/test_DeviceSpec.cxx b/Framework/Core/test/test_DeviceSpec.cxx index d853510ed6f12..306b93058f6a5 100644 --- a/Framework/Core/test/test_DeviceSpec.cxx +++ b/Framework/Core/test/test_DeviceSpec.cxx @@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1) BOOST_CHECK_EQUAL(offers[0].startPort, 22000); BOOST_CHECK_EQUAL(offers[0].rangeSize, 1000); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_REQUIRE_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -96,7 +96,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1PushPull) std::vector devices; std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec2) std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec3) std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -236,7 +236,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec4) std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 4); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -307,7 +307,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyForwarding) std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -587,7 +587,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline) auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_CHECK_EQUAL(devices.size(), 6); BOOST_CHECK_EQUAL(devices[0].id, "A"); BOOST_CHECK_EQUAL(devices[1].id, "B_t0"); diff --git a/Framework/Core/test/test_DeviceSpecHelpers.cxx b/Framework/Core/test/test_DeviceSpecHelpers.cxx index 939b67cce3998..6d3ee9022256f 100644 --- a/Framework/Core/test/test_DeviceSpecHelpers.cxx +++ b/Framework/Core/test/test_DeviceSpecHelpers.cxx @@ -141,7 +141,7 @@ BOOST_AUTO_TEST_CASE(test_prepareArguments) CompletionPolicy::createDefaultPolicies(), callbacksPolicies, deviceSpecs, - *rm, "workflow-id"); + *rm, "workflow-id", *configContext); // Now doing the test cases CheckMatrix matrix; diff --git a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx index 5095a9542c5f9..b53978dcd3f4b 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx @@ -23,10 +23,14 @@ #include "Framework/DeviceSpec.h" #include "Framework/ProcessingContext.h" #include "Framework/WorkflowSpec.h" +#include "Framework/ConfigContext.h" +#include "Framework/ConfigParamStore.h" +#include "Framework/ConfigParamRegistry.h" #include #include #include +#include using namespace o2::framework; @@ -92,7 +96,7 @@ BOOST_AUTO_TEST_CASE(TestDDS) SimpleResourceManager rm(resources); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", true); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true); std::vector controls; std::vector executions; controls.resize(devices.size()); diff --git a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx index 34da1cd78c7bb..8d7bc3b888ef8 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx @@ -419,7 +419,7 @@ BOOST_AUTO_TEST_CASE(TestO2ControlDump) SimpleResourceManager rm(resources); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", true); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true); std::vector controls; std::vector executions; controls.resize(devices.size()); diff --git a/Framework/Core/test/test_Graphviz.cxx b/Framework/Core/test/test_Graphviz.cxx index b60e43e49dd51..2d5b00c64ce04 100644 --- a/Framework/Core/test/test_Graphviz.cxx +++ b/Framework/Core/test/test_Graphviz.cxx @@ -103,7 +103,7 @@ BOOST_AUTO_TEST_CASE(TestGraphviz) auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { @@ -143,7 +143,7 @@ BOOST_AUTO_TEST_CASE(TestGraphvizWithPipeline) auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { diff --git a/Framework/Core/test/test_TimeParallelPipelining.cxx b/Framework/Core/test/test_TimeParallelPipelining.cxx index 4f16e9bcc349f..b345550bdbd67 100644 --- a/Framework/Core/test/test_TimeParallelPipelining.cxx +++ b/Framework/Core/test/test_TimeParallelPipelining.cxx @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(TimePipeliningSimple) auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_REQUIRE_EQUAL(devices.size(), 4); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1]; @@ -114,7 +114,7 @@ BOOST_AUTO_TEST_CASE(TimePipeliningFull) auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id"); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); BOOST_REQUIRE_EQUAL(devices.size(), 7); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1]; diff --git a/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx b/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx index 3e0da1120479f..e87a0542cfdcc 100644 --- a/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx +++ b/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx @@ -39,7 +39,7 @@ void customize(std::vector& policies) { policies.push_back(CallbacksPolicy{ .matcher = DeviceMatchers::matchByName("A"), - .policy = [](CallbackService& service) { + .policy = [](CallbackService& service, InitContext&) { service.set(CallbackService::Id::Start, []() { LOG(INFO) << "invoked at start"; }); }}); }