Skip to content

Commit 3f2ac97

Browse files
authored
DPL: allow CallbacksPolicy to have access to command line options (#7534)
1 parent 505138f commit 3f2ac97

16 files changed

Lines changed: 53 additions & 23 deletions

Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,33 @@
1414
#include "ITStracking/TrackingConfigParam.h"
1515
#include "ITStracking/Configuration.h"
1616
#include "DetectorsRaw/HBFUtilsInitializer.h"
17+
#include "Framework/CallbacksPolicy.h"
18+
#include "Framework/CallbackService.h"
19+
#include "Framework/ConfigContext.h"
1720

1821
#include "GPUO2Interface.h"
1922
#include "GPUReconstruction.h"
2023
#include "GPUChainITS.h"
24+
#include <vector>
2125

2226
using namespace o2::framework;
2327

2428
// ------------------------------------------------------------------
2529

30+
void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
31+
{
32+
policies.push_back(CallbacksPolicy{
33+
[](DeviceSpec const& spec, ConfigContext const& context) -> bool {
34+
// auto filename = context.options().get<std::string>("filename");
35+
return false;
36+
},
37+
[](CallbackService& service, InitContext& context) {
38+
// std::vector<uint64_t> enums = readEnumerations(context.options().get<std::string>("filename"));
39+
service.set(CallbackService::Id::NewTimeslice, [/*enums*/](o2::header::DataHeader& dh) {
40+
});
41+
}});
42+
}
43+
2644
// we need to add workflow options before including Framework/runDataProcessing
2745
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
2846
{

Framework/Core/include/Framework/CallbacksPolicy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ namespace o2::framework
2020
{
2121

2222
struct CallbackService;
23+
struct InitContext;
2324

2425
struct CallbacksPolicy {
25-
using CallbacksCustomization = std::function<void(CallbackService&)>;
26+
using CallbacksCustomization = std::function<void(CallbackService&, InitContext&)>;
2627
DeviceMatcher matcher = nullptr;
2728
CallbacksCustomization policy = nullptr;
2829
static std::vector<CallbacksPolicy> createDefaultPolicies();

Framework/Core/include/Framework/DataProcessorMatchers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ namespace o2::framework
1717

1818
struct DataProcessorSpec;
1919
struct DeviceSpec;
20+
struct ConfigContext;
2021

2122
using DataProcessorMatcher = std::function<bool(DataProcessorSpec const& spec)>;
22-
using DeviceMatcher = std::function<bool(DeviceSpec const& spec)>;
23+
using DeviceMatcher = std::function<bool(DeviceSpec const& spec, ConfigContext const& context)>;
2324

2425
/// A set of helper to build policies that need to
2526
/// be applied based on some DataProcessorSpec property

Framework/Core/src/CallbacksPolicy.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ namespace o2::framework
2525
CallbacksPolicy epnProcessReporting()
2626
{
2727
return {
28-
.matcher = [](DeviceSpec const&) -> bool {
28+
.matcher = [](DeviceSpec const&, ConfigContext const& context) -> bool {
2929
/// FIXME:
3030
return getenv("DDS_SESSION_ID") != nullptr; },
31-
.policy = [](CallbackService& callbacks) -> void {
31+
.policy = [](CallbackService& callbacks, InitContext& context) -> void {
3232
callbacks.set(CallbackService::Id::PreProcessing, [](ServiceRegistry& registry, int op) {
3333
auto& info = registry.get<TimingInfo>();
3434
LOGP(info, "Processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ void DataProcessingDevice::Init()
307307

308308
// Invoke the callback policy for this device.
309309
if (mSpec.callbacksPolicy.policy != nullptr) {
310-
mSpec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>());
310+
InitContext initContext{*mConfigRegistry, mServiceRegistry};
311+
mSpec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(), initContext);
311312
}
312313
}
313314

Framework/Core/src/DataProcessorMatchers.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ DataProcessorMatcher DataProcessorMatchers::matchByName(char const* name_)
2424

2525
DeviceMatcher DeviceMatchers::matchByName(char const* name_)
2626
{
27-
return [name = std::string(name_)](DeviceSpec const& spec) {
27+
return [name = std::string(name_)](DeviceSpec const& spec, ConfigContext const&) {
2828
return spec.name == name;
2929
};
3030
}

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
826826
std::vector<DeviceSpec>& devices,
827827
ResourceManager& resourceManager,
828828
std::string const& uniqueWorkflowId,
829+
ConfigContext const& configContext,
829830
bool optimizeTopology,
830831
unsigned short resourcesMonitoringInterval,
831832
std::string const& channelPrefix)
@@ -902,7 +903,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
902903
}
903904
}
904905
for (auto& policy : callbacksPolicies) {
905-
if (policy.matcher(device) == true) {
906+
if (policy.matcher(device, configContext) == true) {
906907
device.callbacksPolicy = policy;
907908
break;
908909
}

Framework/Core/src/DeviceSpecHelpers.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ namespace o2::framework
4040
{
4141
struct InputChannelSpec;
4242
struct OutputChannelSpec;
43+
struct ConfigContext;
4344

4445
struct DeviceSpecHelpers {
4546
/// Helper to convert from an abstract dataflow specification, @a workflow,
@@ -54,6 +55,7 @@ struct DeviceSpecHelpers {
5455
std::vector<DeviceSpec>& devices,
5556
ResourceManager& resourceManager,
5657
std::string const& uniqueWorkflowId,
58+
ConfigContext const& configContext,
5759
bool optimizeTopology = false,
5860
unsigned short resourcesMonitoringInterval = 0,
5961
std::string const& channelPrefix = "");
@@ -66,6 +68,7 @@ struct DeviceSpecHelpers {
6668
std::vector<DeviceSpec>& devices,
6769
ResourceManager& resourceManager,
6870
std::string const& uniqueWorkflowId,
71+
ConfigContext const& configContext,
6972
bool optimizeTopology = false,
7073
unsigned short resourcesMonitoringInterval = 0,
7174
std::string const& channelPrefix = "")
@@ -74,7 +77,7 @@ struct DeviceSpecHelpers {
7477
std::vector<ResourcePolicy> resourcePolicies = ResourcePolicy::createDefaultPolicies();
7578
dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies,
7679
dispatchPolicies, resourcePolicies, callbacksPolicies, devices,
77-
resourceManager, uniqueWorkflowId, optimizeTopology,
80+
resourceManager, uniqueWorkflowId, configContext, optimizeTopology,
7881
resourcesMonitoringInterval, channelPrefix);
7982
}
8083

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
14961496
runningWorkflow.devices,
14971497
*resourceManager,
14981498
driverInfo.uniqueWorkflowId,
1499+
*driverInfo.configContext,
14991500
!varmap["no-IPC"].as<bool>(),
15001501
driverInfo.resourcesMonitoringInterval,
15011502
varmap["channel-prefix"].as<std::string>());

Framework/Core/test/test_DeviceSpec.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1)
5959
BOOST_CHECK_EQUAL(offers[0].startPort, 22000);
6060
BOOST_CHECK_EQUAL(offers[0].rangeSize, 1000);
6161

62-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
62+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
6363
BOOST_REQUIRE_EQUAL(devices.size(), 2);
6464
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
6565
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -96,7 +96,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1PushPull)
9696
std::vector<DeviceSpec> devices;
9797
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
9898
SimpleResourceManager rm(resources);
99-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
99+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
100100
BOOST_CHECK_EQUAL(devices.size(), 2);
101101
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
102102
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec2)
142142

143143
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
144144
SimpleResourceManager rm(resources);
145-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
145+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
146146
BOOST_CHECK_EQUAL(devices.size(), 2);
147147
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
148148
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec3)
186186

187187
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
188188
SimpleResourceManager rm(resources);
189-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
189+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
190190
BOOST_CHECK_EQUAL(devices.size(), 3);
191191
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2);
192192
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -236,7 +236,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec4)
236236
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
237237
SimpleResourceManager rm(resources);
238238

239-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
239+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
240240
BOOST_CHECK_EQUAL(devices.size(), 4);
241241
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2);
242242
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -307,7 +307,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyForwarding)
307307

308308
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
309309
SimpleResourceManager rm(resources);
310-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
310+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
311311
BOOST_CHECK_EQUAL(devices.size(), 3);
312312
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
313313
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
@@ -587,7 +587,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline)
587587
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
588588
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
589589
SimpleResourceManager rm(resources);
590-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
590+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
591591
BOOST_CHECK_EQUAL(devices.size(), 6);
592592
BOOST_CHECK_EQUAL(devices[0].id, "A");
593593
BOOST_CHECK_EQUAL(devices[1].id, "B_t0");

0 commit comments

Comments
 (0)