diff --git a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h index 65033743c96ea..06c41c1c73373 100644 --- a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h @@ -56,29 +56,12 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime /// multipart ensemble. InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step); -/// @struct DPLModelAdapterConfig -/// Configuration object for dplModelAdaptor -struct DPLModelAdapterConfig { - /// throw runtime error if an input message is not matched by filter rules - bool throwOnUnmatchedInputs = true; - /// do all kinds of consistency checks - bool paranoid = false; - /// blindly forward on one channel - bool blindForward = false; -}; - /// This is to be used when the input data is already formatted like DPL /// expects it, i.e. with the DataProcessingHeader in the header stack /// The list of specs is used as a filter list, all incoming data matching an entry /// in the list will be send through the corresponding channel InjectorFunction dplModelAdaptor(std::vector const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}}, - DPLModelAdapterConfig config = DPLModelAdapterConfig{}); - -/// legacy function -inline InjectorFunction dplModelAdaptor(std::vector const& specs, bool throwOnUnmatchedInputs) -{ - return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs}); -} + bool throwOnUnmatchedInputs = true); /// The default connection method for the custom source static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1); diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 771b273630bd4..09a52dd9a73b7 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -184,10 +184,9 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, }; } -InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPLModelAdapterConfig config) +InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, bool throwOnUnmatchedInputs) { - bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs; - // structure to hold information on the unmatched data and print a warning at cleanup + // structure to hald information on the unmatch ed data and print a warning at cleanup class DroppedDataSpecs { public: @@ -210,8 +209,7 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL void warning() const { if (not descriptions.empty()) { - LOG(warning) << "Some input data could not be matched by filter rules to output specs\n" - << "Active rules: " << descriptions << "\n" + LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n" << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION"; } } @@ -223,9 +221,11 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) { std::unordered_map outputs; std::vector unmatchedDescriptions; + int lastSplitPartIndex = -1; + std::string channelNameForSplitParts; static int64_t dplCounter = -1; dplCounter++; - for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) { + for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) { const auto dh = o2::header::get(parts.At(msgidx * 2)->GetData()); if (!dh) { LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing"; @@ -241,63 +241,49 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification}; LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts; - size_t finalBlockIndex = 0; - std::string channelName = ""; - + bool indexDone = false; for (auto const& spec : filterSpecs) { // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any' if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) || DataSpecUtils::match(spec, query)) { - channelName = channelRetriever(query, dph->startTime); + auto channelName = channelRetriever(query, dph->startTime); if (channelName.empty()) { LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query); + break; + } + // the checks for consistency of split payload parts are of informative nature + // forwarding happens independently + if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { + if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { + LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; + } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { + LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex + << " of " << dh->splitPayloadParts; + } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { + LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex + << ", matching " << channelName << ", expecting " << channelNameForSplitParts; + } + lastSplitPartIndex = dh->splitPayloadIndex; + channelNameForSplitParts = channelName; + if (lastSplitPartIndex + 1 == dh->splitPayloadParts) { + lastSplitPartIndex = -1; + channelNameForSplitParts = ""; + } + } else if (lastSplitPartIndex != -1) { + LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1 + << " but got a new data block"; } + outputs[channelName].AddPart(std::move(parts.At(msgidx * 2))); + outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1))); + LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")"; + indexDone = true; break; } } - if (!channelName.empty()) { - if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { - // this is indicating a sequence of payloads following the header - // FIXME: we will probably also set the DataHeader version - finalBlockIndex = msgidx + dh->splitPayloadParts + 1; - } else { - // We can consider the next splitPayloadParts as one block of messages pairs - // because we are guaranteed they are all the same. - // If splitPayloadParts = 0, we assume that means there is only one (header, payload) - // pair. - finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; - } - assert(finalBlockIndex >= msgidx + 2); - if (finalBlockIndex > parts.Size()) { - // TODO error handling - //LOGP(error, "DataHeader::splitPayloadParts invalid"); - continue; - } - - // the checks for consistency of split payload parts are of informative nature - // forwarding happens independently - //if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { - // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { - // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; - // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { - // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex - // << " of " << dh->splitPayloadParts; - // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { - // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex - // << ", matching " << channelName << ", expecting " << channelNameForSplitParts; - // } - //} - LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size()); - for (; msgidx < finalBlockIndex; ++msgidx) { - outputs[channelName].AddPart(std::move(parts.At(msgidx))); - } - msgidx -= 2; - } - if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { + if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query)); } - } // end of loop over parts - + } for (auto& [channelName, channelParts] : outputs) { if (channelParts.Size() == 0) { continue; diff --git a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx index 46f3c18377978..f552f093add36 100644 --- a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx +++ b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx @@ -15,52 +15,14 @@ using namespace o2::framework; #include "Framework/AlgorithmSpec.h" #include "Framework/DataProcessorSpec.h" #include "Framework/ChannelSpec.h" -#include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" -#include "Framework/SourceInfoHeader.h" #include "Framework/ExternalFairMQDeviceProxy.h" #include "Framework/ControlService.h" #include "Framework/CallbackService.h" -#include "Framework/RawDeviceService.h" #include "Framework/Logger.h" -#include "Framework/InputRecordWalker.h" #include "Headers/DataHeader.h" #include "fairmq/FairMQDevice.h" -namespace test_config -{ -enum struct ProxyMode { - All, - SkipOutput, - OnlyOutput, // also excludes checker - NoProxies, -}; -} - -namespace test_header -{ -struct MsgModeHeader : public o2::header::BaseHeader { - enum struct MsgMode { - Pair, - Sequence, - }; - - static constexpr uint32_t sVersion = 1; - static constexpr o2::header::HeaderType sHeaderType = "MsgMode"; - MsgModeHeader(MsgMode _mode, size_t nParts) - : BaseHeader(sizeof(MsgModeHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), mode(_mode), nPayloadParts(nParts) - { - } - - MsgMode mode; - size_t nPayloadParts; -}; -} // namespace test_header -std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val); -std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val); -std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode val); -std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode val); - // we need to add workflow options before including Framework/runDataProcessing void customize(std::vector& workflowOptions) { @@ -72,37 +34,18 @@ void customize(std::vector& workflowOptions) "number-of-events,n", VariantType::Int, 10, {"number of events to process"}}); workflowOptions.push_back( ConfigParamSpec{ - "proxy-mode", VariantType::String, "skip-output", {"proxy mode: all, skip-output, only-output, skip-all"}}); + "output-proxy-only", VariantType::Bool, false, {"create only the workflow up to output proxy"}}); } #include "Framework/runDataProcessing.h" -using namespace o2::framework; -using DataHeader = o2::header::DataHeader; -using Stack = o2::header::Stack; - #define ASSERT_ERROR(condition) \ if ((condition) == false) { \ LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \ } -template -T readConfig(ConfigContext const& config, const char* key) -{ - auto p = config.options().get(key); - std::stringstream cs(p); - T val; - cs >> val; - if (cs.fail()) { - throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key); - } - return val; -} - std::vector defineDataProcessing(ConfigContext const& config) { - using ProxyMode = test_config::ProxyMode; - auto proxyMode = readConfig(config, "proxy-mode"); std::string defaultTransportConfig = config.options().get("default-transport"); int nRolls = config.options().get("number-of-events"); if (defaultTransportConfig == "zeromq") { @@ -115,9 +58,27 @@ std::vector defineDataProcessing(ConfigContext const& config) std::vector workflow; ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // configuration of the out-of-band proxy channel + // a producer process steered by a timer // - // used either in the output proxy ('dpl-sink') or as a direct channel of the producer + // the compute callback of the producer + auto producerCallback = [nRolls, counter = std::make_shared()](DataAllocator& outputs, ControlService& control) { + outputs.make(OutputRef{"data", 0}) = *counter; + if (++(*counter) >= nRolls) { + // send the end of stream signal, this is transferred by the proxies + // and allows to properly terminate downstream devices + control.endOfStream(); + } + }; + + workflow.emplace_back(DataProcessorSpec{"producer", + {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, + {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, + AlgorithmSpec{adaptStateless(producerCallback)}, + {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // the dpl sink proxy process + // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel OutputChannelSpec externalChannelSpec; // Note: the name is hardcoded for now @@ -127,9 +88,6 @@ std::vector defineDataProcessing(ConfigContext const& config) externalChannelSpec.hostname = "localhost"; externalChannelSpec.port = 42042; externalChannelSpec.listeners = 0; - externalChannelSpec.rateLogging = 10; - externalChannelSpec.sendBufferSize = 1; - externalChannelSpec.recvBufferSize = 1; if (!defaultTransportConfig.empty()) { if (defaultTransportConfig == "zeromq") { externalChannelSpec.protocol = ChannelProtocol::Network; @@ -143,142 +101,11 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } - ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // a producer process steered by a timer - // - auto producerInitCallback = [nRolls, proxyMode, externalChannelSpec](CallbackService& callbacks, RawDeviceService& rds) { - srand(getpid()); - auto channelName = std::make_shared(); - auto producerChannelInit = [channelName, outputRoutes = rds.spec().outputs]() { - // find the output channel name, we expect all output messages to be - // sent over the same channel - if (channelName->empty()) { - OutputSpec const query{"TST", "DATA", 0}; - for (auto& route : outputRoutes) { - if (DataSpecUtils::match(route.matcher, query)) { - *channelName = route.channel; - break; - } - } - } - ASSERT_ERROR(channelName->length() > 0); - }; - if (proxyMode == ProxyMode::SkipOutput) { - *channelName = externalChannelSpec.name; - } else { - callbacks.set(CallbackService::Id::Start, producerChannelInit); - } - // the compute callback of the producer - auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds) { - int data = *counter; - //outputs.make(OutputRef{"data", 0}) = data; - - FairMQDevice& device = *(rds.device()); - auto transport = device.GetChannel(*channelName, 0).Transport(); - auto channelAlloc = o2::pmr::getTransportAllocator(transport); - - DataProcessingHeader dph{*counter, 0}; - - auto msgMode = rand() % 2 ? test_header::MsgModeHeader::MsgMode::Pair : test_header::MsgModeHeader::MsgMode::Sequence; - size_t nPayloads = rand() % 10 + 1; - - test_header::MsgModeHeader mmh{msgMode, nPayloads}; - FairMQParts messages; - auto insertHeader = [&dph, &mmh, &channelAlloc, &messages](DataHeader const& dh) -> void { - FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, dph, mmh}); - messages.AddPart(std::move(header)); - }; - auto insertPayload = [&transport, &messages, &data](size_t size) -> void { - FairMQMessagePtr payload = transport->CreateMessage(size); - memcpy(payload->GetData(), &data, sizeof(data)); - messages.AddPart(std::move(payload)); - }; - auto createSequence = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { - // one header with index set to the number of split parts indicates sequence - // of payloads without additional headers - dh.payloadSize = sizeof(data); - dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; - dh.splitPayloadIndex = nPayloads; - dh.splitPayloadParts = nPayloads; - insertHeader(dh); - - for (size_t i = 0; i < nPayloads; ++i) { - insertPayload(dh.payloadSize); - } - }; - - auto createPairs = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { - // one header with index set to the number of split parts indicates sequence - // of payloads without additional headers - dh.payloadSize = sizeof(data); - dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; - dh.splitPayloadIndex = 0; - dh.splitPayloadParts = nPayloads; - for (size_t i = 0; i < nPayloads; ++i) { - dh.splitPayloadIndex = i; - insertHeader(dh); - insertPayload(dh.payloadSize); - } - }; - - if (msgMode == test_header::MsgModeHeader::MsgMode::Pair) { - createPairs(nPayloads, DataHeader{"DATA", "TST", 0}); - } else { - createSequence(nPayloads, DataHeader{"DATA", "TST", 0}); - } - // using utility from ExternalFairMQDeviceProxy - o2::framework::sendOnChannel(device, messages, *channelName); - - if (++(*counter) >= nRolls) { - // send the end of stream signal, this is transferred by the proxies - // and allows to properly terminate downstream devices - control.endOfStream(); - if (proxyMode == ProxyMode::SkipOutput) { - // since we are sending on the bare channel, also the EOS message needs to be created. - DataHeader dhEOS; - dhEOS.dataOrigin = "DPL"; - dhEOS.dataDescription = "EOS"; - dhEOS.subSpecification = 0; - dhEOS.payloadSize = 0; - dhEOS.payloadSerializationMethod = o2::header::gSerializationMethodNone; - dhEOS.tfCounter = 0; - dhEOS.firstTForbit = 0; - SourceInfoHeader sih; - sih.state = InputChannelState::Completed; - auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dhEOS, dph, sih}); - FairMQParts out; - out.AddPart(std::move(headerMessage)); - // add empty payload message - out.AddPart(std::move(device.NewMessageFor(*channelName, 0, 0))); - o2::framework::sendOnChannel(device, out, *channelName); - } - } - }; - return adaptStateless(producerCallback); - }; - - workflow.emplace_back(DataProcessorSpec{"producer", - {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, - {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, - AlgorithmSpec{adaptStateful(producerInitCallback)}, - {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); - - if (proxyMode == ProxyMode::SkipOutput) { - // create the out-of-band channel in the producer if the output proxy is bypassed - const char* d = strdup(channelConfig.c_str()); - workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}}); - } - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // the dpl sink proxy process - Inputs sinkInputs = {InputSpec{"external", "TST", "DATA", 0, Lifetime::Timeframe}}; auto channelSelector = [](InputSpec const&, const std::unordered_map>&) -> std::string { return "downstream"; }; - if (proxyMode == ProxyMode::All || proxyMode == ProxyMode::OnlyOutput) { - workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); - } + workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // a simple checker process subscribing to the output of the input proxy @@ -286,25 +113,8 @@ std::vector defineDataProcessing(ConfigContext const& config) // the compute callback of the checker auto counter = std::make_shared(0); auto checkerCallback = [counter](InputRecord& inputs, ControlService& control) { - auto const* dh = DataRefUtils::getHeader(inputs.get("datain")); - auto const* mmh = DataRefUtils::getHeader(inputs.get("datain")); - ASSERT_ERROR(dh != nullptr); - ASSERT_ERROR(mmh != nullptr); - LOGP(info, "{} input slots(s), data {}, parts {}, mode {}", inputs.size(), inputs.get("datain"), mmh->nPayloadParts, mmh->mode); - if (mmh->mode == test_header::MsgModeHeader::MsgMode::Pair) { - ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); - ASSERT_ERROR(dh->splitPayloadIndex == 0); - } else { - ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); - ASSERT_ERROR(dh->splitPayloadIndex == mmh->nPayloadParts); - } - size_t nPayloads = 0; - for (auto const& ref : InputRecordWalker(inputs)) { - auto data = inputs.get(ref); - ASSERT_ERROR(data == *counter); - ++nPayloads; - } - ASSERT_ERROR(nPayloads == mmh->nPayloadParts); + LOG(debug) << "got inputs " << inputs.size(); + ASSERT_ERROR(inputs.get("datain") == *counter); ++(*counter); }; auto checkCounter = [counter, nRolls](EndOfStreamContext&) { @@ -319,25 +129,10 @@ std::vector defineDataProcessing(ConfigContext const& config) }; // the checker process connects to the proxy - Inputs checkerInputs; - if (proxyMode != ProxyMode::All) { - checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}); - //for (unsigned int i = 0; i < pState->nChannels; i++) { - // checkerInputs.emplace_back(InputSpec{{"datain"}, "TST", "DATA", i, Lifetime::Timeframe}); - //} - } else { - checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}); - //for (unsigned int i = 0; i < pState->nChannels; i++) { - // checkerInputs.emplace_back(InputSpec{{"datain"}, "PRX", "DATA", i, Lifetime::Timeframe}); - //} - } - if (proxyMode != ProxyMode::OnlyOutput) { - // the checker is not added if the input proxy is skipped - workflow.emplace_back(DataProcessorSpec{"checker", - std::move(checkerInputs), - {}, - AlgorithmSpec{adaptStateful(checkerInit)}}); - } + workflow.emplace_back(DataProcessorSpec{"checker", + {InputSpec{"datain", "PRX", "DATA", 0, Lifetime::Timeframe}}, + {}, + AlgorithmSpec{adaptStateful(checkerInit)}}); ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // the input proxy process @@ -400,85 +195,19 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } - if (proxyMode == ProxyMode::All) { - // Note: in order to make the DPL output proxy and an input proxy working in the same - // workflow, we use different data description - Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}}; - workflow.emplace_back(specifyExternalFairMQDeviceProxy( - "input-proxy", - std::move(inputProxyOutputs), - channelConfig.c_str(), - converter)); - } else if (proxyMode == ProxyMode::SkipOutput) { - Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}}; - // we use the same specs as filters in the dpl adaptor - auto filterSpecs = inputProxyOutputs; - workflow.emplace_back(specifyExternalFairMQDeviceProxy( - "input-proxy", - std::move(inputProxyOutputs), - channelConfig.c_str(), - o2::framework::dplModelAdaptor(filterSpecs, true))); - } + // Note: in order to make the DPL output proxy and an input proxy working in the same + // workflow, we use different data description + Outputs inputProxyOutputs = {OutputSpec{"PRX", "DATA", 0, Lifetime::Timeframe}}; + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + "input-proxy", + std::move(inputProxyOutputs), + channelConfig.c_str(), + converter)); - return workflow; -} - -std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val) -{ - std::string token; - in >> token; - if (token == "all" || token == "a") { - val = test_config::ProxyMode::All; - } else if (token == "skip-output") { - val = test_config::ProxyMode::SkipOutput; - } else if (token == "only-output") { - val = test_config::ProxyMode::OnlyOutput; - } else if (token == "skip-all" || token == "skip-proxies") { - val = test_config::ProxyMode::NoProxies; - } else { - in.setstate(std::ios_base::failbit); - } - return in; -} - -std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val) -{ - if (val == test_config::ProxyMode::All) { - out << "all"; - } else if (val == test_config::ProxyMode::SkipOutput) { - out << "skip-output"; - } else if (val == test_config::ProxyMode::OnlyOutput) { - out << "only-output"; - } else if (val == test_config::ProxyMode::NoProxies) { - out << "skip-all"; - } else { - out.setstate(std::ios_base::failbit); + if (config.options().get("output-proxy-only")) { + // remove the input proxy and checker from the workflow + workflow.pop_back(); + workflow.pop_back(); } - return out; -} - -std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode& val) -{ - std::string token; - in >> token; - if (token == "pair") { - val = test_header::MsgModeHeader::MsgMode::Pair; - } else if (token == "sequence") { - val = test_header::MsgModeHeader::MsgMode::Sequence; - } else { - in.setstate(std::ios_base::failbit); - } - return in; -} - -std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode& val) -{ - if (val == test_header::MsgModeHeader::MsgMode::Pair) { - out << "pair"; - } else if (val == test_header::MsgModeHeader::MsgMode::Sequence) { - out << "sequence"; - } else { - out.setstate(std::ios_base::failbit); - } - return out; + return workflow; }