Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Detectors/ITSMFT/ITS/workflow/src/its-reco-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,33 @@
#include "ITStracking/TrackingConfigParam.h"
#include "ITStracking/Configuration.h"
#include "DetectorsRaw/HBFUtilsInitializer.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/CallbackService.h"
#include "Framework/ConfigContext.h"

#include "GPUO2Interface.h"
#include "GPUReconstruction.h"
#include "GPUChainITS.h"
#include <vector>

using namespace o2::framework;

// ------------------------------------------------------------------

void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
{
policies.push_back(CallbacksPolicy{
[](DeviceSpec const& spec, ConfigContext const& context) -> bool {
// auto filename = context.options().get<std::string>("filename");
return false;
},
[](CallbackService& service, InitContext& context) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ktf
seems to work as expected except 1 problem: the filename for the tf-info vector comes via workflow global options, i.e. ConfigContext, as you correctly have it in the 1st lambda. Can the second lambda also have ConfigContext supplied (InitContext does not hurt but useless in this particular case).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought InitContext actually had also the workflow option. I will try to fix that.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check, did not know it is supposed to pass the global options.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktf I've checked, global options are not accessible via InitContext.options()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supposed to pass all the options which apply to a given DataProcessorSpec. I do not remember if workflow options are considered "global".

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, in the o2-global-track-cluster-reader --track-types ITS --cluster-types ITS --hbfutils-config o2_tfidinfo.root
the particular reader devices don't see the option --hbfutils-config.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktf Looks like there is more serious problem: the customize(std::vector<o2::framework::CallbacksPolicy>& policies) defined for particular workflow, checks all devices of the global merged workflow. So, to make it work currently I need to pass the option (filename with TF header infos) to all these workflows. I.e. since both o2-global-track-cluster-reader and o2-its-reco-workflow have this customize, I need to call the workflow as

o2-global-track-cluster-reader --track-types ITS --cluster-types ITS --hbfutils-config o2_tfidinfo.root ... | \
o2-its-reco-workflow --clusters-from-upstream  --hbfutils-config o2_tfidinfo.root ...

otherwise the o2-its-reco-workflow checks the devices of the o2-global-track-cluster-reader against its own hbfutils-config option (by default pointing to `o2simdigitizerworkflow_configuration.ini).

// std::vector<uint64_t> enums = readEnumerations(context.options().get<std::string>("filename"));
service.set(CallbackService::Id::NewTimeslice, [/*enums*/](o2::header::DataHeader& dh) {
});
}});
}

// we need to add workflow options before including Framework/runDataProcessing
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
{
Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/include/Framework/CallbacksPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ namespace o2::framework
{

struct CallbackService;
struct InitContext;

struct CallbacksPolicy {
using CallbacksCustomization = std::function<void(CallbackService&)>;
using CallbacksCustomization = std::function<void(CallbackService&, InitContext&)>;
DeviceMatcher matcher = nullptr;
CallbacksCustomization policy = nullptr;
static std::vector<CallbacksPolicy> createDefaultPolicies();
Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/include/Framework/DataProcessorMatchers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ namespace o2::framework

struct DataProcessorSpec;
struct DeviceSpec;
struct ConfigContext;

using DataProcessorMatcher = std::function<bool(DataProcessorSpec const& spec)>;
using DeviceMatcher = std::function<bool(DeviceSpec const& spec)>;
using DeviceMatcher = std::function<bool(DeviceSpec const& spec, ConfigContext const& context)>;

/// A set of helper to build policies that need to
/// be applied based on some DataProcessorSpec property
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/CallbacksPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace o2::framework
CallbacksPolicy epnProcessReporting()
{
return {
.matcher = [](DeviceSpec const&) -> bool {
.matcher = [](DeviceSpec const&, ConfigContext const& context) -> bool {
/// FIXME:
return getenv("DDS_SESSION_ID") != nullptr; },
.policy = [](CallbackService& callbacks) -> void {
.policy = [](CallbackService& callbacks, InitContext& context) -> void {
callbacks.set(CallbackService::Id::PreProcessing, [](ServiceRegistry& registry, int op) {
auto& info = registry.get<TimingInfo>();
LOGP(info, "Processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",
Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ void DataProcessingDevice::Init()

// Invoke the callback policy for this device.
if (mSpec.callbacksPolicy.policy != nullptr) {
mSpec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>());
InitContext initContext{*mConfigRegistry, mServiceRegistry};
mSpec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(), initContext);
}
}

Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/DataProcessorMatchers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ DataProcessorMatcher DataProcessorMatchers::matchByName(char const* name_)

DeviceMatcher DeviceMatchers::matchByName(char const* name_)
{
return [name = std::string(name_)](DeviceSpec const& spec) {
return [name = std::string(name_)](DeviceSpec const& spec, ConfigContext const&) {
return spec.name == name;
};
}
Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
std::vector<DeviceSpec>& devices,
ResourceManager& resourceManager,
std::string const& uniqueWorkflowId,
ConfigContext const& configContext,
bool optimizeTopology,
unsigned short resourcesMonitoringInterval,
std::string const& channelPrefix)
Expand Down Expand Up @@ -902,7 +903,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
}
}
for (auto& policy : callbacksPolicies) {
if (policy.matcher(device) == true) {
if (policy.matcher(device, configContext) == true) {
device.callbacksPolicy = policy;
break;
}
Expand Down
5 changes: 4 additions & 1 deletion Framework/Core/src/DeviceSpecHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace o2::framework
{
struct InputChannelSpec;
struct OutputChannelSpec;
struct ConfigContext;

struct DeviceSpecHelpers {
/// Helper to convert from an abstract dataflow specification, @a workflow,
Expand All @@ -54,6 +55,7 @@ struct DeviceSpecHelpers {
std::vector<DeviceSpec>& devices,
ResourceManager& resourceManager,
std::string const& uniqueWorkflowId,
ConfigContext const& configContext,
bool optimizeTopology = false,
unsigned short resourcesMonitoringInterval = 0,
std::string const& channelPrefix = "");
Expand All @@ -66,6 +68,7 @@ struct DeviceSpecHelpers {
std::vector<DeviceSpec>& devices,
ResourceManager& resourceManager,
std::string const& uniqueWorkflowId,
ConfigContext const& configContext,
bool optimizeTopology = false,
unsigned short resourcesMonitoringInterval = 0,
std::string const& channelPrefix = "")
Expand All @@ -74,7 +77,7 @@ struct DeviceSpecHelpers {
std::vector<ResourcePolicy> resourcePolicies = ResourcePolicy::createDefaultPolicies();
dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies,
dispatchPolicies, resourcePolicies, callbacksPolicies, devices,
resourceManager, uniqueWorkflowId, optimizeTopology,
resourceManager, uniqueWorkflowId, configContext, optimizeTopology,
resourcesMonitoringInterval, channelPrefix);
}

Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
runningWorkflow.devices,
*resourceManager,
driverInfo.uniqueWorkflowId,
*driverInfo.configContext,
!varmap["no-IPC"].as<bool>(),
driverInfo.resourcesMonitoringInterval,
varmap["channel-prefix"].as<std::string>());
Expand Down
14 changes: 7 additions & 7 deletions Framework/Core/test/test_DeviceSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1)
BOOST_CHECK_EQUAL(offers[0].startPort, 22000);
BOOST_CHECK_EQUAL(offers[0].rangeSize, 1000);

DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_REQUIRE_EQUAL(devices.size(), 2);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -96,7 +96,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1PushPull)
std::vector<DeviceSpec> devices;
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 2);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec2)

std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 2);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec3)

std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 3);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -236,7 +236,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec4)
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);

DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 4);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -307,7 +307,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyForwarding)

std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 3);
BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1);
BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind);
Expand Down Expand Up @@ -587,7 +587,7 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline)
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_CHECK_EQUAL(devices.size(), 6);
BOOST_CHECK_EQUAL(devices[0].id, "A");
BOOST_CHECK_EQUAL(devices[1].id, "B_t0");
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ BOOST_AUTO_TEST_CASE(test_prepareArguments)
CompletionPolicy::createDefaultPolicies(),
callbacksPolicies,
deviceSpecs,
*rm, "workflow-id");
*rm, "workflow-id", *configContext);

// Now doing the test cases
CheckMatrix matrix;
Expand Down
6 changes: 5 additions & 1 deletion Framework/Core/test/test_FrameworkDataFlowToDDS.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
#include "Framework/DeviceSpec.h"
#include "Framework/ProcessingContext.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ConfigContext.h"
#include "Framework/ConfigParamStore.h"
#include "Framework/ConfigParamRegistry.h"

#include <chrono>
#include <sstream>
#include <thread>
#include <memory>

using namespace o2::framework;

Expand Down Expand Up @@ -92,7 +96,7 @@ BOOST_AUTO_TEST_CASE(TestDDS)
SimpleResourceManager rm(resources);
auto completionPolicies = CompletionPolicy::createDefaultPolicies();
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", true);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true);
std::vector<DeviceControl> controls;
std::vector<DeviceExecution> executions;
controls.resize(devices.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ BOOST_AUTO_TEST_CASE(TestO2ControlDump)
SimpleResourceManager rm(resources);
auto completionPolicies = CompletionPolicy::createDefaultPolicies();
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", true);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true);
std::vector<DeviceControl> controls;
std::vector<DeviceExecution> executions;
controls.resize(devices.size());
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/test/test_Graphviz.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ BOOST_AUTO_TEST_CASE(TestGraphviz)
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
str.str("");
GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices);
lineByLineComparision(str.str(), R"EXPECTED(digraph structs {
Expand Down Expand Up @@ -143,7 +143,7 @@ BOOST_AUTO_TEST_CASE(TestGraphvizWithPipeline)
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
str.str("");
GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices);
lineByLineComparision(str.str(), R"EXPECTED(digraph structs {
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/test/test_TimeParallelPipelining.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(TimePipeliningSimple)
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_REQUIRE_EQUAL(devices.size(), 4);
auto& producer = devices[0];
auto& layer0Consumer0 = devices[1];
Expand Down Expand Up @@ -114,7 +114,7 @@ BOOST_AUTO_TEST_CASE(TimePipeliningFull)
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
SimpleResourceManager rm(resources);
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id");
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
BOOST_REQUIRE_EQUAL(devices.size(), 7);
auto& producer = devices[0];
auto& layer0Consumer0 = devices[1];
Expand Down
2 changes: 1 addition & 1 deletion Framework/TestWorkflows/src/o2DiamondWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void customize(std::vector<CallbacksPolicy>& policies)
{
policies.push_back(CallbacksPolicy{
.matcher = DeviceMatchers::matchByName("A"),
.policy = [](CallbackService& service) {
.policy = [](CallbackService& service, InitContext&) {
service.set(CallbackService::Id::Start, []() { LOG(INFO) << "invoked at start"; });
}});
}
Expand Down