diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 7c28181d6d9bb..3f848280d1c84 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -38,12 +38,19 @@ struct MessageSet { }; // linear storage of messages std::vector messages; - // message map descibes O2 messages consisting of a header message and - // payload message(s), index descibes position in the linear storage + // message map describes O2 messages consisting of a header message and + // payload message(s), index describes position in the linear storage std::vector messageMap; - // pair map describes all messages in header-payload pairs and where - // in the message index the an associated header and payload can be found - std::vector> pairMap; + // pair map describes all messages in one sequence of header-payload pairs and + // where in the message index the associated header and payload can be found + struct PairMapping { + PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {} + // O2 message where the pair is located in + size_t partIndex = 0; + // payload index within the O2 message + size_t payloadIndex = 0; + }; + std::vector pairMap; MessageSet() : messages(), messageMap(), pairMap() @@ -161,13 +168,13 @@ struct MessageSet { FairMQMessagePtr const& associatedHeader(size_t pos) const { - return messages[messageMap[pairMap[pos].first].position]; + return messages[messageMap[pairMap[pos].partIndex].position]; } FairMQMessagePtr const& associatedPayload(size_t pos) const { - auto partIndex = pairMap[pos].first; - auto payloadIndex = pairMap[pos].second; + auto partIndex = pairMap[pos].partIndex; + auto payloadIndex = pairMap[pos].payloadIndex; return messages[messageMap[partIndex].position + payloadIndex + 1]; } }; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index faf5ee2c712b9..dc4860e58c366 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -947,7 +947,7 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne LOGP(error, "Header is not a DataHeader?"); continue; } - if (dh->payloadSize != parts.At(pi + 1)->GetSize()) { + if (dh->payloadSize > parts.At(pi + 1)->GetSize()) { insertInputInfo(pi, 0, InputType::Invalid); LOGP(error, "DataHeader payloadSize mismatch"); continue; @@ -960,7 +960,12 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne LOGP(error, "Header stack does not contain DataProcessingHeader"); continue; } - { + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // this is indicating a sequence of payloads following the header + // FIXME: we will probably also set the DataHeader version + insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data); + pi += dh->splitPayloadParts - 1; + } else { // We can set the type for the next splitPayloadParts // because we are guaranteed they are all the same. // If splitPayloadParts = 0, we assume that means there is only one (header, payload) @@ -1001,7 +1006,11 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne auto headerIndex = input.position; auto nMessages = 0; auto nPayloadsPerHeader = 0; - { + if (input.size > 2) { + // header and multiple payload sequence + nMessages = input.size; + nPayloadsPerHeader = nMessages - 1; + } else { // multiple header-payload pairs auto dh = o2::header::get(parts.At(headerIndex)->GetData()); nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2; @@ -1213,6 +1222,9 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, auto cleanTimers = [¤tSetOfInputs](TimesliceSlot slot, InputRecord& record) { assert(record.size() == currentSetOfInputs.size()); for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) { + // assuming that for timer inputs we do have exactly one PartRef object + // in the MessageSet, multiple PartRef Objects are only possible for either + // split payload messages of wildcard matchers, both for data inputs DataRef input = record.getByPos(ii); if (input.spec->lifetime != Lifetime::Timer) { continue; @@ -1327,7 +1339,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, } // We need to find the forward route only for the first // part of a split payload. All the others will use the same. - if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1) { + // but always check if we have a sequence of multiple payloads + if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 1) { cachedForwardingChoice = -1; for (size_t fi = 0; fi < spec->forwards.size(); fi++) { auto& forward = spec->forwards[fi]; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 37cfd19feaa6d..74451893f698d 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -292,8 +292,7 @@ DataRelayer::RelayChoice cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING; // TODO: make sure that multiple parts can only be added within the same call of // DataRelayer::relay - assert(nPayloads == 1); - assert(nMessages % 2 == 0); + assert(nPayloads > 0); for (size_t mi = 0; mi < nMessages; ++mi) { assert(mi + nPayloads < nMessages); target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 9792824cf525e..368720ce090d5 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -611,3 +611,123 @@ BOOST_AUTO_TEST_CASE(SplitParts) BOOST_CHECK_NE(header2.get(), nullptr); BOOST_CHECK_NE(payload2.get(), nullptr); } + +BOOST_AUTO_TEST_CASE(SplitPayloadPairs) +{ + Monitoring metrics; + InputSpec spec1{"clusters", "TPC", "CLUSTERS"}; + + std::vector inputs = { + InputRoute{spec1, 0, "Fake1", 0}, + }; + + std::vector forwards; + TimesliceIndex index{1}; + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, metrics, index); + relayer.setPipelineLength(4); + + DataHeader dh{"CLUSTERS", "TPC", 0}; + + auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + size_t timeslice = 0; + + const int nSplitParts = 100; + std::vector> splitParts; + splitParts.reserve(2 * nSplitParts); + + for (size_t i = 0; i < nSplitParts; ++i) { + dh.splitPayloadIndex = i; + dh.splitPayloadParts = nSplitParts; + + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}}); + FairMQMessagePtr payload = transport->CreateMessage(100); + + splitParts.emplace_back(std::move(header)); + splitParts.emplace_back(std::move(payload)); + } + BOOST_REQUIRE_EQUAL(splitParts.size(), 2 * nSplitParts); + + relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size()); + std::vector ready; + relayer.getReadyToProcess(ready); + BOOST_REQUIRE_EQUAL(ready.size(), 1); + BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume); + auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot); + // we have one input route and thus one message set containing pairs for all + // payloads + BOOST_REQUIRE_EQUAL(messageSet.size(), 1); + BOOST_CHECK_EQUAL(messageSet[0].size(), nSplitParts); + BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 1); +} + +BOOST_AUTO_TEST_CASE(SplitPayloadSequence) +{ + Monitoring metrics; + InputSpec spec1{"clusters", "TST", "COUNTER"}; + + std::vector inputs = { + InputRoute{spec1, 0, "Fake1", 0}, + }; + + std::vector forwards; + TimesliceIndex index{1}; + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, metrics, index); + relayer.setPipelineLength(4); + + auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); + size_t timeslice = 0; + + std::vector sequenceSize; + size_t nTotalPayloads = 0; + + auto createSequence = [&nTotalPayloads, ×lice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void { + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + std::vector> messages; + messages.reserve(nPayloads + 1); + DataHeader dh{"COUNTER", "TST", 0}; + + // one header with index set to the number of split parts indicates sequence + // of payloads without additional headers + dh.splitPayloadIndex = nPayloads; + dh.splitPayloadParts = nPayloads; + FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}}); + messages.emplace_back(std::move(header)); + + for (size_t i = 0; i < nPayloads; ++i) { + messages.emplace_back(transport->CreateMessage(100)); + *(reinterpret_cast(messages.back()->GetData())) = nTotalPayloads; + ++nTotalPayloads; + } + BOOST_CHECK_EQUAL(messages.size(), nPayloads + 1); + relayer.relay(messages[0]->GetData(), messages.data(), messages.size(), nPayloads); + sequenceSize.emplace_back(nPayloads); + }; + createSequence(100); + createSequence(1); + createSequence(42); + + std::vector ready; + relayer.getReadyToProcess(ready); + BOOST_REQUIRE_EQUAL(ready.size(), 1); + BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume); + auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot); + // we have one input route + BOOST_REQUIRE_EQUAL(messageSet.size(), 1); + // one message set containing number of added sequences of messages + BOOST_REQUIRE_EQUAL(messageSet[0].size(), sequenceSize.size()); + size_t counter = 0; + for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { + BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(seqid), sequenceSize[seqid]); + for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) { + BOOST_REQUIRE(messageSet[0].payload(seqid, pi)); + auto const* data = messageSet[0].payload(seqid, pi)->GetData(); + BOOST_CHECK_EQUAL(*(reinterpret_cast(data)), counter); + ++counter; + } + } +} diff --git a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx index aea9413ca298f..8190336ff3d8d 100644 --- a/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx +++ b/Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx @@ -107,7 +107,7 @@ std::vector defineDataProcessing(ConfigContext const& config) outputs.make(OutputRef{"allocator", 0}) = counter; if (channelName.empty()) { - OutputSpec const query{"TST", "PAIR", 0}; + OutputSpec const query{"TST", "SEQUENCE", 0}; auto outputRoutes = rds.spec().outputs; for (auto& route : outputRoutes) { if (DataSpecUtils::match(route.matcher, query)) { @@ -161,7 +161,7 @@ std::vector defineDataProcessing(ConfigContext const& config) } }; - //createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0}); + createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0}); createPairs(counter + 1, DataHeader{"PAIR", "TST", 0}); // using utility from ExternalFairMQDeviceProxy @@ -178,6 +178,7 @@ std::vector defineDataProcessing(ConfigContext const& config) workflow.emplace_back(DataProcessorSpec{"producer", {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, {OutputSpec{{"pair"}, "TST", "PAIR", 0, Lifetime::Timeframe}, + OutputSpec{{"sequence"}, "TST", "SEQUENCE", 0, Lifetime::Timeframe}, OutputSpec{{"allocator"}, "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, AlgorithmSpec{adaptStateless(producerCallback)}, {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}}); @@ -267,6 +268,7 @@ std::vector defineDataProcessing(ConfigContext const& config) workflow.emplace_back(DataProcessorSpec{"consumer", {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe}, + InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe}, InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, {}, AlgorithmSpec{adaptStateful(consumerInit)}}); @@ -276,6 +278,7 @@ std::vector defineDataProcessing(ConfigContext const& config) // workflow.emplace_back(DataProcessorSpec{"spectator", {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe}, + InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe}, InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}}, {}, AlgorithmSpec{adaptStateful(consumerInit)}});