From f70f87e0e94d59ee019bb42ab5a43e7111bd987b Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 14 Sep 2021 12:39:08 +0200 Subject: [PATCH 1/2] DPL: Implementing support for multiple payloads per header This is supporting an upcoming extension of the O2 data model to allow not only header-payload pairs but also sequences of payloads preceded by a single header message. Such a sequence can be indicated by setting splitPayloadIndex == splitpayloadParts with splitpayloadParts > 1. A change in the version of DataHeader will be discussed, although there is no change in the members of the header. There is no change on the InputRecord API, only the payload size of the current payload is provided in the DataRef object. It needs to be discussed, which payload size is going to be provided in the data header in case of a sequence with multiple payloads. Changes alongside (most of them meanwhile moved to separate commits): - make MessageSet the owner of FairMQMessage objects in a linear vector, enhances performance also for split payload sequences in header-paiload format - generalizing DataRelayer to relay array of messages - storing the payload message size in the DataRef Extending and cleaning up DataRelayer test and benchmark - allocating header stack through transport allocator and avoiding copy - adding test for split parts in the header-payload scheme - adding test for split parts in payloads sequence scheme During the development a performance regression has been observed when using o2::pmr::getMessage, needs to be investigated. --- Framework/Core/include/Framework/MessageSet.h | 23 ++-- Framework/Core/src/DataProcessingDevice.cxx | 21 ++- Framework/Core/src/DataRelayer.cxx | 3 +- Framework/Core/test/test_DataRelayer.cxx | 120 ++++++++++++++++++ .../test_VariablePayloadSequenceWorkflow.cxx | 11 +- 5 files changed, 160 insertions(+), 18 deletions(-) diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 7c28181d6d9bb..3f848280d1c84 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -38,12 +38,19 @@ struct MessageSet { }; // linear storage of messages std::vector messages; - // message map descibes O2 messages consisting of a header message and - // payload message(s), index descibes position in the linear storage + // message map describes O2 messages consisting of a header message and + // payload message(s), index describes position in the linear storage std::vector messageMap; - // pair map describes all messages in header-payload pairs and where - // in the message index the an associated header and payload can be found - std::vector> pairMap; + // pair map describes all messages in one sequence of header-payload pairs and + // where in the message index the associated header and payload can be found + struct PairMapping { + PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {} + // O2 message where the pair is located in + size_t partIndex = 0; + // payload index within the O2 message + size_t payloadIndex = 0; + }; + std::vector pairMap; MessageSet() : messages(), messageMap(), pairMap() @@ -161,13 +168,13 @@ struct MessageSet { FairMQMessagePtr const& associatedHeader(size_t pos) const { - return messages[messageMap[pairMap[pos].first].position]; + return messages[messageMap[pairMap[pos].partIndex].position]; } FairMQMessagePtr const& associatedPayload(size_t pos) const { - auto partIndex = pairMap[pos].first; - auto payloadIndex = pairMap[pos].second; + auto partIndex = pairMap[pos].partIndex; + auto payloadIndex = pairMap[pos].payloadIndex; return messages[messageMap[partIndex].position + payloadIndex + 1]; } }; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index faf5ee2c712b9..dc4860e58c366 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -947,7 +947,7 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne LOGP(error, "Header is not a DataHeader?"); continue; } - if (dh->payloadSize != parts.At(pi + 1)->GetSize()) { + if (dh->payloadSize > parts.At(pi + 1)->GetSize()) { insertInputInfo(pi, 0, InputType::Invalid); LOGP(error, "DataHeader payloadSize mismatch"); continue; @@ -960,7 +960,12 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne LOGP(error, "Header stack does not contain DataProcessingHeader"); continue; } - { + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // this is indicating a sequence of payloads following the header + // FIXME: we will probably also set the DataHeader version + insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data); + pi += dh->splitPayloadParts - 1; + } else { // We can set the type for the next splitPayloadParts // because we are guaranteed they are all the same. // If splitPayloadParts = 0, we assume that means there is only one (header, payload) @@ -1001,7 +1006,11 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne auto headerIndex = input.position; auto nMessages = 0; auto nPayloadsPerHeader = 0; - { + if (input.size > 2) { + // header and multiple payload sequence + nMessages = input.size; + nPayloadsPerHeader = nMessages - 1; + } else { // multiple header-payload pairs auto dh = o2::header::get(parts.At(headerIndex)->GetData()); nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2; @@ -1213,6 +1222,9 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, auto cleanTimers = [¤tSetOfInputs](TimesliceSlot slot, InputRecord& record) { assert(record.size() == currentSetOfInputs.size()); for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) { + // assuming that for timer inputs we do have exactly one PartRef object + // in the MessageSet, multiple PartRef Objects are only possible for either + // split payload messages of wildcard matchers, both for data inputs DataRef input = record.getByPos(ii); if (input.spec->lifetime != Lifetime::Timer) { continue; @@ -1327,7 +1339,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, } // We need to find the forward route only for the first // part of a split payload. All the others will use the same. - if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1) { + // but always check if we have a sequence of multiple payloads + if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 1) { cachedForwardingChoice = -1; for (size_t fi = 0; fi < spec->forwards.size(); fi++) { auto& forward = spec->forwards[fi]; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 37cfd19feaa6d..74451893f698d 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -292,8 +292,7 @@ DataRelayer::RelayChoice cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING; // TODO: make sure that multiple parts can only be added within the same call of // DataRelayer::relay - assert(nPayloads == 1); - assert(nMessages % 2 == 0); + assert(nPayloads > 0); for (size_t mi = 0; mi < nMessages; ++mi) { assert(mi + nPayloads < nMessages); target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 9792824cf525e..368720ce090d5 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -611,3 +611,123 @@ BOOST_AUTO_TEST_CASE(SplitParts) BOOST_CHECK_NE(header2.get(), nullptr); BOOST_CHECK_NE(payload2.get(), nullptr); } + +BOOST_AUTO_TEST_CASE(SplitPayloadPairs) +{ + Monitoring metrics; + InputSpec spec1{"clusters", "TPC", "CLUSTERS"}; + + std::vector inputs = { + InputRoute{spec1, 0, "Fake1", 0}, + }; + + std::vector forwards; + TimesliceIndex index{1}; + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, metrics, index); + relayer.setPipelineLength(4); + + DataHeader dh{"CLUSTERS", "TPC", 0}; + + auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + size_t timeslice = 0; + + const int nSplitParts = 100; + std::vector> splitParts; + splitParts.reserve(2 * nSplitParts); + + for (size_t i = 0; i < nSplitParts; ++i) { + dh.splitPayloadIndex = i; + dh.splitPayloadParts = nSplitParts; + + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}}); + FairMQMessagePtr payload = transport->CreateMessage(100); + + splitParts.emplace_back(std::move(header)); + splitParts.emplace_back(std::move(payload)); + } + BOOST_REQUIRE_EQUAL(splitParts.size(), 2 * nSplitParts); + + relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size()); + std::vector ready; + relayer.getReadyToProcess(ready); + BOOST_REQUIRE_EQUAL(ready.size(), 1); + BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume); + auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot); + // we have one input route and thus one message set containing pairs for all + // payloads + BOOST_REQUIRE_EQUAL(messageSet.size(), 1); + BOOST_CHECK_EQUAL(messageSet[0].size(), nSplitParts); + BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 1); +} + +BOOST_AUTO_TEST_CASE(SplitPayloadSequence) +{ + Monitoring metrics; + InputSpec spec1{"clusters", "TST", "COUNTER"}; + + std::vector inputs = { + InputRoute{spec1, 0, "Fake1", 0}, + }; + + std::vector forwards; + TimesliceIndex index{1}; + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, metrics, index); + relayer.setPipelineLength(4); + + auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); + size_t timeslice = 0; + + std::vector sequenceSize; + size_t nTotalPayloads = 0; + + auto createSequence = [&nTotalPayloads, ×lice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void { + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + std::vector> messages; + messages.reserve(nPayloads + 1); + DataHeader dh{"COUNTER", "TST", 0}; + + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.splitPayloadIndex = nPayloads; + dh.splitPayloadParts = nPayloads; + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}}); + messages.emplace_back(std::move(header)); + + for (size_t i = 0; i < nPayloads; ++i) { + messages.emplace_back(transport->CreateMessage(100)); + *(reinterpret_cast(messages.back()->GetData())) = nTotalPayloads; + ++nTotalPayloads; + } + BOOST_CHECK_EQUAL(messages.size(), nPayloads + 1); + relayer.relay(messages[0]->GetData(), messages.data(), messages.size(), nPayloads); + sequenceSize.emplace_back(nPayloads); + }; + createSequence(100); + createSequence(1); + createSequence(42); + + std::vector ready; + relayer.getReadyToProcess(ready); + BOOST_REQUIRE_EQUAL(ready.size(), 1); + BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume); + auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot); + // we have one input route + BOOST_REQUIRE_EQUAL(messageSet.size(), 1); + // one message set containing number of added sequences of messages + BOOST_REQUIRE_EQUAL(messageSet[0].size(), sequenceSize.size()); + size_t counter = 0; + for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { + BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(seqid), sequenceSize[seqid]); + for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) { + BOOST_REQUIRE(messageSet[0].payload(seqid, pi)); + auto const* data = messageSet[0].payload(seqid, pi)->GetData(); + BOOST_CHECK_EQUAL(*(reinterpret_cast(data)), counter); + ++counter; + } + } +} diff --git a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx index aea9413ca298f..fccde45d07b9f 100644 --- a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx +++ b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx @@ -107,7 +107,7 @@ std::vector defineDataProcessing(ConfigContext const& config) outputs.make(OutputRef{"allocator", 0}) = counter; if (channelName.empty()) { - OutputSpec const query{"TST", "PAIR", 0}; + OutputSpec const query{"TST", "SEQUENCE", 0}; auto outputRoutes = rds.spec().outputs; for (auto& route : outputRoutes) { if (DataSpecUtils::match(route.matcher, query)) { @@ -161,7 +161,7 @@ std::vector defineDataProcessing(ConfigContext const& config) } }; - //createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0}); + createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0}); createPairs(counter + 1, DataHeader{"PAIR", "TST", 0}); // using utility from ExternalFairMQDeviceProxy @@ -178,6 +178,7 @@ std::vector defineDataProcessing(ConfigContext const& config) workflow.emplace_back(DataProcessorSpec{"producer", {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, {OutputSpec{{"pair"}, "TST", "PAIR", 0, Lifetime::Timeframe}, + OutputSpec{{"sequence"}, "TST", "SEQUENCE", 0, Lifetime::Timeframe}, OutputSpec{{"allocator"}, "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, AlgorithmSpec{adaptStateless(producerCallback)}, {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); @@ -223,7 +224,7 @@ std::vector defineDataProcessing(ConfigContext const& config) } }; - auto createCounters = [](RawDeviceService& rds) -> std::shared_ptr { + auto createCounters = [](RawDeviceService const& rds) -> std::shared_ptr { auto counters = std::make_shared(); ConsumerCounters& c = *counters; for (auto const& channelSpec : rds.spec().inputChannels) { @@ -248,7 +249,7 @@ std::vector defineDataProcessing(ConfigContext const& config) ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // the consumer process connects to the producer // - auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService& rds, CallbackService& callbacks) { + auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService const& rds, CallbackService& callbacks) { auto counters = createCounters(rds); callbacks.set(CallbackService::Id::Stop, [counters, checkCounters]() { ASSERT_ERROR(checkCounters(counters)); @@ -267,6 +268,7 @@ std::vector defineDataProcessing(ConfigContext const& config) workflow.emplace_back(DataProcessorSpec{"consumer", {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe}, + InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe}, InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, {}, AlgorithmSpec{adaptStateful(consumerInit)}}); @@ -276,6 +278,7 @@ std::vector defineDataProcessing(ConfigContext const& config) // workflow.emplace_back(DataProcessorSpec{"spectator", {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe}, + InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe}, InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, {}, AlgorithmSpec{adaptStateful(consumerInit)}}); From 97ea08744c082f459ea1158004b8eae75eb9cbcb Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Fri, 22 Oct 2021 11:50:48 +0200 Subject: [PATCH 2/2] Appearently, DPL callback adapters do noy yet support const parameters Using e.g. RawDeviceService const& in the init callback leads to an exception which is difficult to understand and it comes from the adaptStateful converters (_ZNK2o29framework15ServiceRegistry3getEjmNS0_11ServiceKindEPKc+0x2b9)[0x56376fa3f8b1] --- Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx index fccde45d07b9f..8190336ff3d8d 100644 --- a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx +++ b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx @@ -224,7 +224,7 @@ std::vector defineDataProcessing(ConfigContext const& config) } }; - auto createCounters = [](RawDeviceService const& rds) -> std::shared_ptr { + auto createCounters = [](RawDeviceService& rds) -> std::shared_ptr { auto counters = std::make_shared(); ConsumerCounters& c = *counters; for (auto const& channelSpec : rds.spec().inputChannels) { @@ -249,7 +249,7 @@ std::vector defineDataProcessing(ConfigContext const& config) ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // the consumer process connects to the producer // - auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService const& rds, CallbackService& callbacks) { + auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService& rds, CallbackService& callbacks) { auto counters = createCounters(rds); callbacks.set(CallbackService::Id::Stop, [counters, checkCounters]() { ASSERT_ERROR(checkCounters(counters));