From c70ca6fd3e9b45a1d35575d023943c9cb6fafe04 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 24 Nov 2021 10:42:01 +0100 Subject: [PATCH 1/3] DPL raw proxy: support for sequence of payloads/pruned headers This is also simplifying the message processing loop to avoid repeated query matching for split payload message pairs. Extending configuration of DPL injector function, but new configuration features like `paranoid` checking of message consistency still need to be implemented. Time consuming consistency checks are will be optional. --- .../Framework/ExternalFairMQDeviceProxy.h | 19 +++- .../Core/src/ExternalFairMQDeviceProxy.cxx | 88 +++++++++++-------- 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h index 06c41c1c73373..65033743c96ea 100644 --- a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h @@ -56,12 +56,29 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime /// multipart ensemble. InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step); +/// @struct DPLModelAdapterConfig +/// Configuration object for dplModelAdaptor +struct DPLModelAdapterConfig { + /// throw runtime error if an input message is not matched by filter rules + bool throwOnUnmatchedInputs = true; + /// do all kinds of consistency checks + bool paranoid = false; + /// blindly forward on one channel + bool blindForward = false; +}; + /// This is to be used when the input data is already formatted like DPL /// expects it, i.e. with the DataProcessingHeader in the header stack /// The list of specs is used as a filter list, all incoming data matching an entry /// in the list will be send through the corresponding channel InjectorFunction dplModelAdaptor(std::vector const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}}, - bool throwOnUnmatchedInputs = true); + DPLModelAdapterConfig config = DPLModelAdapterConfig{}); + +/// legacy function +inline InjectorFunction dplModelAdaptor(std::vector const& specs, bool throwOnUnmatchedInputs) +{ + return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs}); +} /// The default connection method for the custom source static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1); diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 09a52dd9a73b7..771b273630bd4 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, }; } -InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, bool throwOnUnmatchedInputs) +InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPLModelAdapterConfig config) { - // structure to hald information on the unmatch ed data and print a warning at cleanup + bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs; + // structure to hold information on the unmatched data and print a warning at cleanup class DroppedDataSpecs { public: @@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo void warning() const { if (not descriptions.empty()) { - LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n" + LOG(warning) << "Some input data could not be matched by filter rules to output specs\n" + << "Active rules: " << descriptions << "\n" << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION"; } } @@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) { std::unordered_map outputs; std::vector unmatchedDescriptions; - int lastSplitPartIndex = -1; - std::string channelNameForSplitParts; static int64_t dplCounter = -1; dplCounter++; - for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) { + for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) { const auto dh = o2::header::get(parts.At(msgidx * 2)->GetData()); if (!dh) { LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing"; @@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification}; LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts; - bool indexDone = false; + size_t finalBlockIndex = 0; + std::string channelName = ""; + for (auto const& spec : filterSpecs) { // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any' if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) || DataSpecUtils::match(spec, query)) { - auto channelName = channelRetriever(query, dph->startTime); + channelName = channelRetriever(query, dph->startTime); if (channelName.empty()) { LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query); - break; - } - // the checks for consistency of split payload parts are of informative nature - // forwarding happens independently - if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { - if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { - LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; - } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { - LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex - << " of " << dh->splitPayloadParts; - } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { - LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex - << ", matching " << channelName << ", expecting " << channelNameForSplitParts; - } - lastSplitPartIndex = dh->splitPayloadIndex; - channelNameForSplitParts = channelName; - if (lastSplitPartIndex + 1 == dh->splitPayloadParts) { - lastSplitPartIndex = -1; - channelNameForSplitParts = ""; - } - } else if (lastSplitPartIndex != -1) { - LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1 - << " but got a new data block"; } - outputs[channelName].AddPart(std::move(parts.At(msgidx * 2))); - outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1))); - LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")"; - indexDone = true; break; } } - if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { + if (!channelName.empty()) { + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // this is indicating a sequence of payloads following the header + // FIXME: we will probably also set the DataHeader version + finalBlockIndex = msgidx + dh->splitPayloadParts + 1; + } else { + // We can consider the next splitPayloadParts as one block of messages pairs + // because we are guaranteed they are all the same. + // If splitPayloadParts = 0, we assume that means there is only one (header, payload) + // pair. + finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; + } + assert(finalBlockIndex >= msgidx + 2); + if (finalBlockIndex > parts.Size()) { + // TODO error handling + //LOGP(error, "DataHeader::splitPayloadParts invalid"); + continue; + } + + // the checks for consistency of split payload parts are of informative nature + // forwarding happens independently + //if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { + // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { + // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; + // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { + // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex + // << " of " << dh->splitPayloadParts; + // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { + // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex + // << ", matching " << channelName << ", expecting " << channelNameForSplitParts; + // } + //} + LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size()); + for (; msgidx < finalBlockIndex; ++msgidx) { + outputs[channelName].AddPart(std::move(parts.At(msgidx))); + } + msgidx -= 2; + } + if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query)); } - } + } // end of loop over parts + for (auto& [channelName, channelParts] : outputs) { if (channelParts.Size() == 0) { continue; From b384b470e9d9a5bf30bdfd747d4bcb599fcc7955 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 5 Jan 2022 16:21:40 +0100 Subject: [PATCH 2/3] Including messages with multiple payloads in unit test Having now a flexible message generation to test both split payload pair messages as well as payload sequences with one preceding header. Randomized test packages and enhanced check on the consumer side. Message mode is randomly selected per event. --- .../test_ExternalFairMQDeviceWorkflow.cxx | 353 ++++++++++++++++-- 1 file changed, 312 insertions(+), 41 deletions(-) diff --git a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx index f552f093add36..cb8c6dbb12c3d 100644 --- a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx +++ b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx @@ -15,14 +15,52 @@ using namespace o2::framework; #include "Framework/AlgorithmSpec.h" #include "Framework/DataProcessorSpec.h" #include "Framework/ChannelSpec.h" +#include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/SourceInfoHeader.h" #include "Framework/ExternalFairMQDeviceProxy.h" #include "Framework/ControlService.h" #include "Framework/CallbackService.h" +#include "Framework/RawDeviceService.h" #include "Framework/Logger.h" +#include "Framework/InputRecordWalker.h" #include "Headers/DataHeader.h" #include "fairmq/FairMQDevice.h" +namespace test_config +{ +enum struct ProxyMode { + All, + SkipOutput, + OnlyOutput, // also excludes checker + NoProxies, +}; +} + +namespace test_header +{ +struct MsgModeHeader : public o2::header::BaseHeader { + enum struct MsgMode { + Pair, + Sequence, + }; + + static constexpr uint32_t sVersion = 1; + static constexpr o2::header::HeaderType sHeaderType = "MsgMode"; + MsgModeHeader(MsgMode _mode, size_t nParts) + : BaseHeader(sizeof(MsgModeHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), mode(_mode), nPayloadParts(nParts) + { + } + + MsgMode mode; + size_t nPayloadParts; +}; +} // namespace test_header +std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val); +std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val); +std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode val); +std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode val); + // we need to add workflow options before including Framework/runDataProcessing void customize(std::vector& workflowOptions) { @@ -34,18 +72,37 @@ void customize(std::vector& workflowOptions) "number-of-events,n", VariantType::Int, 10, {"number of events to process"}}); workflowOptions.push_back( ConfigParamSpec{ - "output-proxy-only", VariantType::Bool, false, {"create only the workflow up to output proxy"}}); + "proxy-mode", VariantType::String, "all", {"proxy mode: all, skip-output, only-output, skip-all"}}); } #include "Framework/runDataProcessing.h" +using namespace o2::framework; +using DataHeader = o2::header::DataHeader; +using Stack = o2::header::Stack; + #define ASSERT_ERROR(condition) \ if ((condition) == false) { \ LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \ } +template +T readConfig(ConfigContext const& config, const char* key) +{ + auto p = config.options().get(key); + std::stringstream cs(p); + T val; + cs >> val; + if (cs.fail()) { + throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key); + } + return val; +} + std::vector defineDataProcessing(ConfigContext const& config) { + using ProxyMode = test_config::ProxyMode; + auto proxyMode = readConfig(config, "proxy-mode"); std::string defaultTransportConfig = config.options().get("default-transport"); int nRolls = config.options().get("number-of-events"); if (defaultTransportConfig == "zeromq") { @@ -58,27 +115,9 @@ std::vector defineDataProcessing(ConfigContext const& config) std::vector workflow; ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // a producer process steered by a timer + // configuration of the out-of-band proxy channel // - // the compute callback of the producer - auto producerCallback = [nRolls, counter = std::make_shared()](DataAllocator& outputs, ControlService& control) { - outputs.make(OutputRef{"data", 0}) = *counter; - if (++(*counter) >= nRolls) { - // send the end of stream signal, this is transferred by the proxies - // and allows to properly terminate downstream devices - control.endOfStream(); - } - }; - - workflow.emplace_back(DataProcessorSpec{"producer", - {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, - {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, - AlgorithmSpec{adaptStateless(producerCallback)}, - {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // the dpl sink proxy process - + // used either in the output proxy ('dpl-sink') or as a direct channel of the producer // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel OutputChannelSpec externalChannelSpec; // Note: the name is hardcoded for now @@ -88,6 +127,9 @@ std::vector defineDataProcessing(ConfigContext const& config) externalChannelSpec.hostname = "localhost"; externalChannelSpec.port = 42042; externalChannelSpec.listeners = 0; + externalChannelSpec.rateLogging = 10; + externalChannelSpec.sendBufferSize = 1; + externalChannelSpec.recvBufferSize = 1; if (!defaultTransportConfig.empty()) { if (defaultTransportConfig == "zeromq") { externalChannelSpec.protocol = ChannelProtocol::Network; @@ -101,11 +143,142 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // a producer process steered by a timer + // + auto producerInitCallback = [nRolls, proxyMode, externalChannelSpec](CallbackService& callbacks, RawDeviceService& rds) { + srand(getpid()); + auto channelName = std::make_shared(); + auto producerChannelInit = [channelName, outputRoutes = rds.spec().outputs]() { + // find the output channel name, we expect all output messages to be + // sent over the same channel + if (channelName->empty()) { + OutputSpec const query{"TST", "DATA", 0}; + for (auto& route : outputRoutes) { + if (DataSpecUtils::match(route.matcher, query)) { + *channelName = route.channel; + break; + } + } + } + ASSERT_ERROR(channelName->length() > 0); + }; + if (proxyMode == ProxyMode::SkipOutput) { + *channelName = externalChannelSpec.name; + } else { + callbacks.set(CallbackService::Id::Start, producerChannelInit); + } + // the compute callback of the producer + auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds) { + int data = *counter; + //outputs.make(OutputRef{"data", 0}) = data; + + FairMQDevice& device = *(rds.device()); + auto transport = device.GetChannel(*channelName, 0).Transport(); + auto channelAlloc = o2::pmr::getTransportAllocator(transport); + + DataProcessingHeader dph{*counter, 0}; + + auto msgMode = rand() % 2 ? test_header::MsgModeHeader::MsgMode::Pair : test_header::MsgModeHeader::MsgMode::Sequence; + size_t nPayloads = rand() % 10 + 1; + + test_header::MsgModeHeader mmh{msgMode, nPayloads}; + FairMQParts messages; + auto insertHeader = [&dph, &mmh, &channelAlloc, &messages](DataHeader const& dh) -> void { + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, dph, mmh}); + messages.AddPart(std::move(header)); + }; + auto insertPayload = [&transport, &messages, &data](size_t size) -> void { + FairMQMessagePtr payload = transport->CreateMessage(size); + memcpy(payload->GetData(), &data, sizeof(data)); + messages.AddPart(std::move(payload)); + }; + auto createSequence = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.payloadSize = sizeof(data); + dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dh.splitPayloadIndex = nPayloads; + dh.splitPayloadParts = nPayloads; + insertHeader(dh); + + for (size_t i = 0; i < nPayloads; ++i) { + insertPayload(dh.payloadSize); + } + }; + + auto createPairs = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.payloadSize = sizeof(data); + dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dh.splitPayloadIndex = 0; + dh.splitPayloadParts = nPayloads; + for (size_t i = 0; i < nPayloads; ++i) { + dh.splitPayloadIndex = i; + insertHeader(dh); + insertPayload(dh.payloadSize); + } + }; + + if (msgMode == test_header::MsgModeHeader::MsgMode::Pair) { + createPairs(nPayloads, DataHeader{"DATA", "TST", 0}); + } else { + createSequence(nPayloads, DataHeader{"DATA", "TST", 0}); + } + // using utility from ExternalFairMQDeviceProxy + o2::framework::sendOnChannel(device, messages, *channelName); + + if (++(*counter) >= nRolls) { + // send the end of stream signal, this is transferred by the proxies + // and allows to properly terminate downstream devices + control.endOfStream(); + if (proxyMode == ProxyMode::SkipOutput) { + // since we are sending on the bare channel, also the EOS message needs to be created. + DataHeader dhEOS; + dhEOS.dataOrigin = "DPL"; + dhEOS.dataDescription = "EOS"; + dhEOS.subSpecification = 0; + dhEOS.payloadSize = 0; + dhEOS.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dhEOS.tfCounter = 0; + dhEOS.firstTForbit = 0; + SourceInfoHeader sih; + sih.state = InputChannelState::Completed; + auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dhEOS, dph, sih}); + FairMQParts out; + out.AddPart(std::move(headerMessage)); + // add empty payload message + out.AddPart(std::move(device.NewMessageFor(*channelName, 0, 0))); + o2::framework::sendOnChannel(device, out, *channelName); + } + } + }; + return adaptStateless(producerCallback); + }; + + workflow.emplace_back(DataProcessorSpec{"producer", + {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, + {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, + AlgorithmSpec{adaptStateful(producerInitCallback)}, + {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); + + if (proxyMode == ProxyMode::SkipOutput) { + // create the out-of-band channel in the producer if the output proxy is bypassed + const char* d = strdup(channelConfig.c_str()); + workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}}); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // the dpl sink proxy process + Inputs sinkInputs = {InputSpec{"external", "TST", "DATA", 0, Lifetime::Timeframe}}; auto channelSelector = [](InputSpec const&, const std::unordered_map>&) -> std::string { return "downstream"; }; - workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); + if (proxyMode == ProxyMode::All || proxyMode == ProxyMode::OnlyOutput) { + workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); + } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // a simple checker process subscribing to the output of the input proxy @@ -113,8 +286,25 @@ std::vector defineDataProcessing(ConfigContext const& config) // the compute callback of the checker auto counter = std::make_shared(0); auto checkerCallback = [counter](InputRecord& inputs, ControlService& control) { - LOG(debug) << "got inputs " << inputs.size(); - ASSERT_ERROR(inputs.get("datain") == *counter); + auto const* dh = DataRefUtils::getHeader(inputs.get("datain")); + auto const* mmh = DataRefUtils::getHeader(inputs.get("datain")); + ASSERT_ERROR(dh != nullptr); + ASSERT_ERROR(mmh != nullptr); + LOGP(info, "{} input slots(s), data {}, parts {}, mode {}", inputs.size(), inputs.get("datain"), mmh->nPayloadParts, mmh->mode); + if (mmh->mode == test_header::MsgModeHeader::MsgMode::Pair) { + ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); + ASSERT_ERROR(dh->splitPayloadIndex == 0); + } else { + ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); + ASSERT_ERROR(dh->splitPayloadIndex == mmh->nPayloadParts); + } + size_t nPayloads = 0; + for (auto const& ref : InputRecordWalker(inputs)) { + auto data = inputs.get(ref); + ASSERT_ERROR(data == *counter); + ++nPayloads; + } + ASSERT_ERROR(nPayloads == mmh->nPayloadParts); ++(*counter); }; auto checkCounter = [counter, nRolls](EndOfStreamContext&) { @@ -129,10 +319,25 @@ std::vector defineDataProcessing(ConfigContext const& config) }; // the checker process connects to the proxy - workflow.emplace_back(DataProcessorSpec{"checker", - {InputSpec{"datain", "PRX", "DATA", 0, Lifetime::Timeframe}}, - {}, - AlgorithmSpec{adaptStateful(checkerInit)}}); + Inputs checkerInputs; + if (proxyMode != ProxyMode::All) { + checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}); + //for (unsigned int i = 0; i < pState->nChannels; i++) { + // checkerInputs.emplace_back(InputSpec{{"datain"}, "TST", "DATA", i, Lifetime::Timeframe}); + //} + } else { + checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}); + //for (unsigned int i = 0; i < pState->nChannels; i++) { + // checkerInputs.emplace_back(InputSpec{{"datain"}, "PRX", "DATA", i, Lifetime::Timeframe}); + //} + } + if (proxyMode != ProxyMode::OnlyOutput) { + // the checker is not added if the input proxy is skipped + workflow.emplace_back(DataProcessorSpec{"checker", + std::move(checkerInputs), + {}, + AlgorithmSpec{adaptStateful(checkerInit)}}); + } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // the input proxy process @@ -195,19 +400,85 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } - // Note: in order to make the DPL output proxy and an input proxy working in the same - // workflow, we use different data description - Outputs inputProxyOutputs = {OutputSpec{"PRX", "DATA", 0, Lifetime::Timeframe}}; - workflow.emplace_back(specifyExternalFairMQDeviceProxy( - "input-proxy", - std::move(inputProxyOutputs), - channelConfig.c_str(), - converter)); - - if (config.options().get("output-proxy-only")) { - // remove the input proxy and checker from the workflow - workflow.pop_back(); - workflow.pop_back(); + if (proxyMode == ProxyMode::All) { + // Note: in order to make the DPL output proxy and an input proxy working in the same + // workflow, we use different data description + Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}}; + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + "input-proxy", + std::move(inputProxyOutputs), + channelConfig.c_str(), + converter)); + } else if (proxyMode == ProxyMode::SkipOutput) { + Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}}; + // we use the same specs as filters in the dpl adaptor + auto filterSpecs = inputProxyOutputs; + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + "input-proxy", + std::move(inputProxyOutputs), + channelConfig.c_str(), + o2::framework::dplModelAdaptor(filterSpecs, true))); } + return workflow; } + +std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val) +{ + std::string token; + in >> token; + if (token == "all" || token == "a") { + val = test_config::ProxyMode::All; + } else if (token == "skip-output") { + val = test_config::ProxyMode::SkipOutput; + } else if (token == "only-output") { + val = test_config::ProxyMode::OnlyOutput; + } else if (token == "skip-all" || token == "skip-proxies") { + val = test_config::ProxyMode::NoProxies; + } else { + in.setstate(std::ios_base::failbit); + } + return in; +} + +std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val) +{ + if (val == test_config::ProxyMode::All) { + out << "all"; + } else if (val == test_config::ProxyMode::SkipOutput) { + out << "skip-output"; + } else if (val == test_config::ProxyMode::OnlyOutput) { + out << "only-output"; + } else if (val == test_config::ProxyMode::NoProxies) { + out << "skip-all"; + } else { + out.setstate(std::ios_base::failbit); + } + return out; +} + +std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode& val) +{ + std::string token; + in >> token; + if (token == "pair") { + val = test_header::MsgModeHeader::MsgMode::Pair; + } else if (token == "sequence") { + val = test_header::MsgModeHeader::MsgMode::Sequence; + } else { + in.setstate(std::ios_base::failbit); + } + return in; +} + +std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode& val) +{ + if (val == test_header::MsgModeHeader::MsgMode::Pair) { + out << "pair"; + } else if (val == test_header::MsgModeHeader::MsgMode::Sequence) { + out << "sequence"; + } else { + out.setstate(std::ios_base::failbit); + } + return out; +} From 71707de02b40016043e714003de264b18ba5cbf4 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 6 Jan 2022 08:56:45 +0100 Subject: [PATCH 3/3] Making message mode 'skip-output' the default The output proxy is not yet supporting forwarding of neither message pairs nor sequences, this needs to be investigated. Looks like not all parts are forwarded. The output proxy is based on message forwarding in the core framework DataProcessingDevice [480102:checker]: [08:58:09][ERROR] inconsistent number of inputs extracted [480102:checker]: [08:58:09][ERROR] inconsistent number of inputs extracted [480102:checker]: [08:58:09][ERROR] Header is not a DataHeader? [480102:checker]: [08:58:09][INFO] 1 input slots(s), data 0, parts 6, mode 1 [480102:checker]: [08:58:09][FATAL] Test condition "data == *counter" failed --- Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx index cb8c6dbb12c3d..46f3c18377978 100644 --- a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx +++ b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx @@ -72,7 +72,7 @@ void customize(std::vector& workflowOptions) "number-of-events,n", VariantType::Int, 10, {"number of events to process"}}); workflowOptions.push_back( ConfigParamSpec{ - "proxy-mode", VariantType::String, "all", {"proxy mode: all, skip-output, only-output, skip-all"}}); + "proxy-mode", VariantType::String, "skip-output", {"proxy mode: all, skip-output, only-output, skip-all"}}); } #include "Framework/runDataProcessing.h"