diff --git a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h index 06c41c1c73373..65033743c96ea 100644 --- a/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h @@ -56,12 +56,29 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime /// multipart ensemble. InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step); +/// @struct DPLModelAdapterConfig +/// Configuration object for dplModelAdaptor +struct DPLModelAdapterConfig { + /// throw runtime error if an input message is not matched by filter rules + bool throwOnUnmatchedInputs = true; + /// do all kinds of consistency checks + bool paranoid = false; + /// blindly forward on one channel + bool blindForward = false; +}; + /// This is to be used when the input data is already formatted like DPL /// expects it, i.e. with the DataProcessingHeader in the header stack /// The list of specs is used as a filter list, all incoming data matching an entry /// in the list will be send through the corresponding channel InjectorFunction dplModelAdaptor(std::vector const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}}, - bool throwOnUnmatchedInputs = true); + DPLModelAdapterConfig config = DPLModelAdapterConfig{}); + +/// legacy function +inline InjectorFunction dplModelAdaptor(std::vector const& specs, bool throwOnUnmatchedInputs) +{ + return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs}); +} /// The default connection method for the custom source static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1); diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 09a52dd9a73b7..771b273630bd4 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, }; } -InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, bool throwOnUnmatchedInputs) +InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPLModelAdapterConfig config) { - // structure to hald information on the unmatch ed data and print a warning at cleanup + bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs; + // structure to hold information on the unmatched data and print a warning at cleanup class DroppedDataSpecs { public: @@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo void warning() const { if (not descriptions.empty()) { - LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n" + LOG(warning) << "Some input data could not be matched by filter rules to output specs\n" + << "Active rules: " << descriptions << "\n" << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION"; } } @@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) { std::unordered_map outputs; std::vector unmatchedDescriptions; - int lastSplitPartIndex = -1; - std::string channelNameForSplitParts; static int64_t dplCounter = -1; dplCounter++; - for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) { + for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) { const auto dh = o2::header::get(parts.At(msgidx * 2)->GetData()); if (!dh) { LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing"; @@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, boo OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification}; LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts; - bool indexDone = false; + size_t finalBlockIndex = 0; + std::string channelName = ""; + for (auto const& spec : filterSpecs) { // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any' if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) || DataSpecUtils::match(spec, query)) { - auto channelName = channelRetriever(query, dph->startTime); + channelName = channelRetriever(query, dph->startTime); if (channelName.empty()) { LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query); - break; - } - // the checks for consistency of split payload parts are of informative nature - // forwarding happens independently - if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { - if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { - LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; - } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { - LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex - << " of " << dh->splitPayloadParts; - } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { - LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex - << ", matching " << channelName << ", expecting " << channelNameForSplitParts; - } - lastSplitPartIndex = dh->splitPayloadIndex; - channelNameForSplitParts = channelName; - if (lastSplitPartIndex + 1 == dh->splitPayloadParts) { - lastSplitPartIndex = -1; - channelNameForSplitParts = ""; - } - } else if (lastSplitPartIndex != -1) { - LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1 - << " but got a new data block"; } - outputs[channelName].AddPart(std::move(parts.At(msgidx * 2))); - outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1))); - LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")"; - indexDone = true; break; } } - if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { + if (!channelName.empty()) { + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // this is indicating a sequence of payloads following the header + // FIXME: we will probably also set the DataHeader version + finalBlockIndex = msgidx + dh->splitPayloadParts + 1; + } else { + // We can consider the next splitPayloadParts as one block of messages pairs + // because we are guaranteed they are all the same. + // If splitPayloadParts = 0, we assume that means there is only one (header, payload) + // pair. + finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; + } + assert(finalBlockIndex >= msgidx + 2); + if (finalBlockIndex > parts.Size()) { + // TODO error handling + //LOGP(error, "DataHeader::splitPayloadParts invalid"); + continue; + } + + // the checks for consistency of split payload parts are of informative nature + // forwarding happens independently + //if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limitssplitPayloadParts)>::max()) { + // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { + // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; + // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { + // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex + // << " of " << dh->splitPayloadParts; + // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { + // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex + // << ", matching " << channelName << ", expecting " << channelNameForSplitParts; + // } + //} + LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size()); + for (; msgidx < finalBlockIndex; ++msgidx) { + outputs[channelName].AddPart(std::move(parts.At(msgidx))); + } + msgidx -= 2; + } + if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query)); } - } + } // end of loop over parts + for (auto& [channelName, channelParts] : outputs) { if (channelParts.Size() == 0) { continue; diff --git a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx index f552f093add36..46f3c18377978 100644 --- a/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx +++ b/Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx @@ -15,14 +15,52 @@ using namespace o2::framework; #include "Framework/AlgorithmSpec.h" #include "Framework/DataProcessorSpec.h" #include "Framework/ChannelSpec.h" +#include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/SourceInfoHeader.h" #include "Framework/ExternalFairMQDeviceProxy.h" #include "Framework/ControlService.h" #include "Framework/CallbackService.h" +#include "Framework/RawDeviceService.h" #include "Framework/Logger.h" +#include "Framework/InputRecordWalker.h" #include "Headers/DataHeader.h" #include "fairmq/FairMQDevice.h" +namespace test_config +{ +enum struct ProxyMode { + All, + SkipOutput, + OnlyOutput, // also excludes checker + NoProxies, +}; +} + +namespace test_header +{ +struct MsgModeHeader : public o2::header::BaseHeader { + enum struct MsgMode { + Pair, + Sequence, + }; + + static constexpr uint32_t sVersion = 1; + static constexpr o2::header::HeaderType sHeaderType = "MsgMode"; + MsgModeHeader(MsgMode _mode, size_t nParts) + : BaseHeader(sizeof(MsgModeHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), mode(_mode), nPayloadParts(nParts) + { + } + + MsgMode mode; + size_t nPayloadParts; +}; +} // namespace test_header +std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val); +std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val); +std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode val); +std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode val); + // we need to add workflow options before including Framework/runDataProcessing void customize(std::vector& workflowOptions) { @@ -34,18 +72,37 @@ void customize(std::vector& workflowOptions) "number-of-events,n", VariantType::Int, 10, {"number of events to process"}}); workflowOptions.push_back( ConfigParamSpec{ - "output-proxy-only", VariantType::Bool, false, {"create only the workflow up to output proxy"}}); + "proxy-mode", VariantType::String, "skip-output", {"proxy mode: all, skip-output, only-output, skip-all"}}); } #include "Framework/runDataProcessing.h" +using namespace o2::framework; +using DataHeader = o2::header::DataHeader; +using Stack = o2::header::Stack; + #define ASSERT_ERROR(condition) \ if ((condition) == false) { \ LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \ } +template +T readConfig(ConfigContext const& config, const char* key) +{ + auto p = config.options().get(key); + std::stringstream cs(p); + T val; + cs >> val; + if (cs.fail()) { + throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key); + } + return val; +} + std::vector defineDataProcessing(ConfigContext const& config) { + using ProxyMode = test_config::ProxyMode; + auto proxyMode = readConfig(config, "proxy-mode"); std::string defaultTransportConfig = config.options().get("default-transport"); int nRolls = config.options().get("number-of-events"); if (defaultTransportConfig == "zeromq") { @@ -58,27 +115,9 @@ std::vector defineDataProcessing(ConfigContext const& config) std::vector workflow; ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // a producer process steered by a timer + // configuration of the out-of-band proxy channel // - // the compute callback of the producer - auto producerCallback = [nRolls, counter = std::make_shared()](DataAllocator& outputs, ControlService& control) { - outputs.make(OutputRef{"data", 0}) = *counter; - if (++(*counter) >= nRolls) { - // send the end of stream signal, this is transferred by the proxies - // and allows to properly terminate downstream devices - control.endOfStream(); - } - }; - - workflow.emplace_back(DataProcessorSpec{"producer", - {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, - {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, - AlgorithmSpec{adaptStateless(producerCallback)}, - {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // the dpl sink proxy process - + // used either in the output proxy ('dpl-sink') or as a direct channel of the producer // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel OutputChannelSpec externalChannelSpec; // Note: the name is hardcoded for now @@ -88,6 +127,9 @@ std::vector defineDataProcessing(ConfigContext const& config) externalChannelSpec.hostname = "localhost"; externalChannelSpec.port = 42042; externalChannelSpec.listeners = 0; + externalChannelSpec.rateLogging = 10; + externalChannelSpec.sendBufferSize = 1; + externalChannelSpec.recvBufferSize = 1; if (!defaultTransportConfig.empty()) { if (defaultTransportConfig == "zeromq") { externalChannelSpec.protocol = ChannelProtocol::Network; @@ -101,11 +143,142 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // a producer process steered by a timer + // + auto producerInitCallback = [nRolls, proxyMode, externalChannelSpec](CallbackService& callbacks, RawDeviceService& rds) { + srand(getpid()); + auto channelName = std::make_shared(); + auto producerChannelInit = [channelName, outputRoutes = rds.spec().outputs]() { + // find the output channel name, we expect all output messages to be + // sent over the same channel + if (channelName->empty()) { + OutputSpec const query{"TST", "DATA", 0}; + for (auto& route : outputRoutes) { + if (DataSpecUtils::match(route.matcher, query)) { + *channelName = route.channel; + break; + } + } + } + ASSERT_ERROR(channelName->length() > 0); + }; + if (proxyMode == ProxyMode::SkipOutput) { + *channelName = externalChannelSpec.name; + } else { + callbacks.set(CallbackService::Id::Start, producerChannelInit); + } + // the compute callback of the producer + auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds) { + int data = *counter; + //outputs.make(OutputRef{"data", 0}) = data; + + FairMQDevice& device = *(rds.device()); + auto transport = device.GetChannel(*channelName, 0).Transport(); + auto channelAlloc = o2::pmr::getTransportAllocator(transport); + + DataProcessingHeader dph{*counter, 0}; + + auto msgMode = rand() % 2 ? test_header::MsgModeHeader::MsgMode::Pair : test_header::MsgModeHeader::MsgMode::Sequence; + size_t nPayloads = rand() % 10 + 1; + + test_header::MsgModeHeader mmh{msgMode, nPayloads}; + FairMQParts messages; + auto insertHeader = [&dph, &mmh, &channelAlloc, &messages](DataHeader const& dh) -> void { + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, dph, mmh}); + messages.AddPart(std::move(header)); + }; + auto insertPayload = [&transport, &messages, &data](size_t size) -> void { + FairMQMessagePtr payload = transport->CreateMessage(size); + memcpy(payload->GetData(), &data, sizeof(data)); + messages.AddPart(std::move(payload)); + }; + auto createSequence = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.payloadSize = sizeof(data); + dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dh.splitPayloadIndex = nPayloads; + dh.splitPayloadParts = nPayloads; + insertHeader(dh); + + for (size_t i = 0; i < nPayloads; ++i) { + insertPayload(dh.payloadSize); + } + }; + + auto createPairs = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void { + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.payloadSize = sizeof(data); + dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dh.splitPayloadIndex = 0; + dh.splitPayloadParts = nPayloads; + for (size_t i = 0; i < nPayloads; ++i) { + dh.splitPayloadIndex = i; + insertHeader(dh); + insertPayload(dh.payloadSize); + } + }; + + if (msgMode == test_header::MsgModeHeader::MsgMode::Pair) { + createPairs(nPayloads, DataHeader{"DATA", "TST", 0}); + } else { + createSequence(nPayloads, DataHeader{"DATA", "TST", 0}); + } + // using utility from ExternalFairMQDeviceProxy + o2::framework::sendOnChannel(device, messages, *channelName); + + if (++(*counter) >= nRolls) { + // send the end of stream signal, this is transferred by the proxies + // and allows to properly terminate downstream devices + control.endOfStream(); + if (proxyMode == ProxyMode::SkipOutput) { + // since we are sending on the bare channel, also the EOS message needs to be created. + DataHeader dhEOS; + dhEOS.dataOrigin = "DPL"; + dhEOS.dataDescription = "EOS"; + dhEOS.subSpecification = 0; + dhEOS.payloadSize = 0; + dhEOS.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dhEOS.tfCounter = 0; + dhEOS.firstTForbit = 0; + SourceInfoHeader sih; + sih.state = InputChannelState::Completed; + auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dhEOS, dph, sih}); + FairMQParts out; + out.AddPart(std::move(headerMessage)); + // add empty payload message + out.AddPart(std::move(device.NewMessageFor(*channelName, 0, 0))); + o2::framework::sendOnChannel(device, out, *channelName); + } + } + }; + return adaptStateless(producerCallback); + }; + + workflow.emplace_back(DataProcessorSpec{"producer", + {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, + {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}}, + AlgorithmSpec{adaptStateful(producerInitCallback)}, + {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); + + if (proxyMode == ProxyMode::SkipOutput) { + // create the out-of-band channel in the producer if the output proxy is bypassed + const char* d = strdup(channelConfig.c_str()); + workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}}); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // the dpl sink proxy process + Inputs sinkInputs = {InputSpec{"external", "TST", "DATA", 0, Lifetime::Timeframe}}; auto channelSelector = [](InputSpec const&, const std::unordered_map>&) -> std::string { return "downstream"; }; - workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); + if (proxyMode == ProxyMode::All || proxyMode == ProxyMode::OnlyOutput) { + workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector))); + } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // a simple checker process subscribing to the output of the input proxy @@ -113,8 +286,25 @@ std::vector defineDataProcessing(ConfigContext const& config) // the compute callback of the checker auto counter = std::make_shared(0); auto checkerCallback = [counter](InputRecord& inputs, ControlService& control) { - LOG(debug) << "got inputs " << inputs.size(); - ASSERT_ERROR(inputs.get("datain") == *counter); + auto const* dh = DataRefUtils::getHeader(inputs.get("datain")); + auto const* mmh = DataRefUtils::getHeader(inputs.get("datain")); + ASSERT_ERROR(dh != nullptr); + ASSERT_ERROR(mmh != nullptr); + LOGP(info, "{} input slots(s), data {}, parts {}, mode {}", inputs.size(), inputs.get("datain"), mmh->nPayloadParts, mmh->mode); + if (mmh->mode == test_header::MsgModeHeader::MsgMode::Pair) { + ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); + ASSERT_ERROR(dh->splitPayloadIndex == 0); + } else { + ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts); + ASSERT_ERROR(dh->splitPayloadIndex == mmh->nPayloadParts); + } + size_t nPayloads = 0; + for (auto const& ref : InputRecordWalker(inputs)) { + auto data = inputs.get(ref); + ASSERT_ERROR(data == *counter); + ++nPayloads; + } + ASSERT_ERROR(nPayloads == mmh->nPayloadParts); ++(*counter); }; auto checkCounter = [counter, nRolls](EndOfStreamContext&) { @@ -129,10 +319,25 @@ std::vector defineDataProcessing(ConfigContext const& config) }; // the checker process connects to the proxy - workflow.emplace_back(DataProcessorSpec{"checker", - {InputSpec{"datain", "PRX", "DATA", 0, Lifetime::Timeframe}}, - {}, - AlgorithmSpec{adaptStateful(checkerInit)}}); + Inputs checkerInputs; + if (proxyMode != ProxyMode::All) { + checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}); + //for (unsigned int i = 0; i < pState->nChannels; i++) { + // checkerInputs.emplace_back(InputSpec{{"datain"}, "TST", "DATA", i, Lifetime::Timeframe}); + //} + } else { + checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}); + //for (unsigned int i = 0; i < pState->nChannels; i++) { + // checkerInputs.emplace_back(InputSpec{{"datain"}, "PRX", "DATA", i, Lifetime::Timeframe}); + //} + } + if (proxyMode != ProxyMode::OnlyOutput) { + // the checker is not added if the input proxy is skipped + workflow.emplace_back(DataProcessorSpec{"checker", + std::move(checkerInputs), + {}, + AlgorithmSpec{adaptStateful(checkerInit)}}); + } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // the input proxy process @@ -195,19 +400,85 @@ std::vector defineDataProcessing(ConfigContext const& config) channelConfig += ",transport=" + defaultTransportConfig; } - // Note: in order to make the DPL output proxy and an input proxy working in the same - // workflow, we use different data description - Outputs inputProxyOutputs = {OutputSpec{"PRX", "DATA", 0, Lifetime::Timeframe}}; - workflow.emplace_back(specifyExternalFairMQDeviceProxy( - "input-proxy", - std::move(inputProxyOutputs), - channelConfig.c_str(), - converter)); - - if (config.options().get("output-proxy-only")) { - // remove the input proxy and checker from the workflow - workflow.pop_back(); - workflow.pop_back(); + if (proxyMode == ProxyMode::All) { + // Note: in order to make the DPL output proxy and an input proxy working in the same + // workflow, we use different data description + Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}}; + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + "input-proxy", + std::move(inputProxyOutputs), + channelConfig.c_str(), + converter)); + } else if (proxyMode == ProxyMode::SkipOutput) { + Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}}; + // we use the same specs as filters in the dpl adaptor + auto filterSpecs = inputProxyOutputs; + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + "input-proxy", + std::move(inputProxyOutputs), + channelConfig.c_str(), + o2::framework::dplModelAdaptor(filterSpecs, true))); } + return workflow; } + +std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val) +{ + std::string token; + in >> token; + if (token == "all" || token == "a") { + val = test_config::ProxyMode::All; + } else if (token == "skip-output") { + val = test_config::ProxyMode::SkipOutput; + } else if (token == "only-output") { + val = test_config::ProxyMode::OnlyOutput; + } else if (token == "skip-all" || token == "skip-proxies") { + val = test_config::ProxyMode::NoProxies; + } else { + in.setstate(std::ios_base::failbit); + } + return in; +} + +std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val) +{ + if (val == test_config::ProxyMode::All) { + out << "all"; + } else if (val == test_config::ProxyMode::SkipOutput) { + out << "skip-output"; + } else if (val == test_config::ProxyMode::OnlyOutput) { + out << "only-output"; + } else if (val == test_config::ProxyMode::NoProxies) { + out << "skip-all"; + } else { + out.setstate(std::ios_base::failbit); + } + return out; +} + +std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode& val) +{ + std::string token; + in >> token; + if (token == "pair") { + val = test_header::MsgModeHeader::MsgMode::Pair; + } else if (token == "sequence") { + val = test_header::MsgModeHeader::MsgMode::Sequence; + } else { + in.setstate(std::ios_base::failbit); + } + return in; +} + +std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode& val) +{ + if (val == test_header::MsgModeHeader::MsgMode::Pair) { + out << "pair"; + } else if (val == test_header::MsgModeHeader::MsgMode::Sequence) { + out << "sequence"; + } else { + out.setstate(std::ios_base::failbit); + } + return out; +}