From fd7be769dc9b05703c3a635edd9beaf403191c55 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 2 Nov 2021 11:00:01 +0100 Subject: [PATCH] DPL: adjusting DataRelayer interface DataRelayer is using new version of MessageSet API which will allow to switch message store in MessageSet transparently --- .../Core/include/Framework/DataRelayer.h | 24 ++- Framework/Core/src/DataProcessingDevice.cxx | 50 +++--- Framework/Core/src/DataRelayer.cxx | 93 +++++------ Framework/Core/test/benchmark_DataRelayer.cxx | 45 ++++-- Framework/Core/test/test_DataRelayer.cxx | 152 ++++++++++-------- 5 files changed, 198 insertions(+), 166 deletions(-) diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index e318f5d1354da..05365e7bd961d 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -88,20 +88,18 @@ class DataRelayer /// This is to relay a whole set of FairMQMessages, all which are part /// of the same set of split parts. - /// @a firstHeader is the first message of such set - /// @a restOfParts is a pointer to the rest of the messages - /// @a restSize is how many messages are there in restOfParts - /// is the header which is common across all subsequent elements. + /// @a rawHeader raw header pointer + /// @a messages pointer to array of messages + /// @a nMessages size of the array + /// @a nPayloads number of payploads in the message sequence, default is 1 + /// which is the standard header-payload message pair, in this + /// case nMessages / 2 pairs will be inserted and considered + /// separate parts /// Notice that we expect that the header is an O2 Header Stack - RelayChoice relay(std::unique_ptr& firstHeader, - std::unique_ptr* restOfParts, - size_t restSize); - - /// This is used to ask for relaying a given (header,payload) pair. - /// Notice that we expect that the header is an O2 Header Stack - /// with a DataProcessingHeader inside so that we can assess time. - RelayChoice relay(std::unique_ptr& header, - std::unique_ptr& payload); + RelayChoice relay(void const* rawHeader, + std::unique_ptr* messages, + size_t nMessages, + size_t nPayloads = 1); /// @returns the actions ready to be performed. void getReadyToProcess(std::vector& completed); diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 5a7fd5723b05a..e896f3554b693 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -987,12 +987,19 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne switch (input.type) { case InputType::Data: { auto headerIndex = input.position; - auto payloadIndex = headerIndex + 1; - assert(payloadIndex < parts.Size()); - auto dh = o2::header::get(parts.At(headerIndex)->GetData()); - auto relayed = relayer.relay(parts.At(headerIndex), - &parts.At(payloadIndex), dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 0); - ii += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0; + auto nMessages = 0; + auto nPayloadsPerHeader = 0; + { + // multiple header-payload pairs + auto dh = o2::header::get(parts.At(headerIndex)->GetData()); + nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2; + nPayloadsPerHeader = 1; + ii += (nMessages / 2) - 1; + } + auto relayed = relayer.relay(parts.At(headerIndex)->GetData(), + &parts.At(headerIndex), + nMessages, + nPayloadsPerHeader); switch (relayed) { case DataRelayer::Backpressured: if (info.normalOpsNotified == true && info.backpressureNotified == false) { @@ -1142,8 +1149,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, auto getter = [¤tSetOfInputs](size_t i, size_t partindex) -> DataRef { if (currentSetOfInputs[i].size() > partindex) { return DataRef{nullptr, - static_cast(currentSetOfInputs[i].at(partindex).header->GetData()), - static_cast(currentSetOfInputs[i].at(partindex).payload->GetData())}; + static_cast(currentSetOfInputs[i].header(partindex)->GetData()), + static_cast(currentSetOfInputs[i].payload(partindex)->GetData())}; } return DataRef{nullptr, nullptr, nullptr}; }; @@ -1261,9 +1268,9 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, int cachedForwardingChoice = -1; for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) { - auto& part = currentSetOfInputs[ii][pi]; - auto& header = part.header; - auto& payload = part.payload; + auto& messageSet = currentSetOfInputs[ii]; + auto const& header = messageSet.header(pi); + if (header.get() == nullptr) { // FIXME: this should not happen, however it's actually harmless and // we can simply discard it for the moment. @@ -1271,12 +1278,12 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, continue; } - auto fdph = o2::header::get(header.get()->GetData()); + auto fdph = o2::header::get(header->GetData()); if (fdph == nullptr) { LOG(ERROR) << "Data is missing DataProcessingHeader"; continue; } - auto fdh = o2::header::get(header.get()->GetData()); + auto fdh = o2::header::get(header->GetData()); if (fdh == nullptr) { LOG(ERROR) << "Data is missing DataHeader"; continue; @@ -1303,15 +1310,19 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, if (copy) { auto&& newHeader = header->GetTransport()->CreateMessage(); - auto&& newPayload = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); - newPayload->Copy(*payload); - forwardedParts[cachedForwardingChoice].AddPart(std::move(newHeader)); - forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload)); + + for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { + auto&& newPayload = header->GetTransport()->CreateMessage(); + newPayload->Copy(*messageSet.payload(pi, payloadIndex)); + forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload)); + } } else { - forwardedParts[cachedForwardingChoice].AddPart(std::move(header)); - forwardedParts[cachedForwardingChoice].AddPart(std::move(payload)); + forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.header(pi))); + for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { + forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.payload(pi, payloadIndex))); + } } } } @@ -1319,7 +1330,6 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, if (forwardedParts[fi].Size() == 0) { continue; } - assert(forwardedParts[fi].Size() % 2 == 0); // in DPL we are using subchannel 0 only device->Send(forwardedParts[fi], spec->forwards[fi].channel, 0); } diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 1e1f4b45683d3..086411dd5ff0c 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -126,10 +126,10 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && part[0].header != nullptr) { + if (part.size() > 0 && part.header(0) != nullptr) { continue; } - if (part.size() > 0 && part[0].payload != nullptr) { + if (part.size() > 0 && part.payload(0) != nullptr) { continue; } // We check that the cell can actually be expired. @@ -145,16 +145,14 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector const& matchers, std::vector const& index, VariableContext& context) @@ -203,19 +201,13 @@ void sendVariableContextMetrics(VariableContext& context, TimesliceSlot slot, } DataRelayer::RelayChoice - DataRelayer::relay(std::unique_ptr& header, - std::unique_ptr& payload) -{ - return relay(header, &payload, 1); -} - -DataRelayer::RelayChoice - DataRelayer::relay(std::unique_ptr& firstPart, - std::unique_ptr* restOfParts, - size_t restOfPartsSize) + DataRelayer::relay(void const* rawHeader, + std::unique_ptr* messages, + size_t nMessages, + size_t nPayloads) { std::scoped_lock lock(mMutex); - DataProcessingHeader const* dph = o2::header::get(firstPart->GetData()); + DataProcessingHeader const* dph = o2::header::get(rawHeader); // STATE HOLDING VARIABLES // This is the class level state of the relaying. If we start supporting // multithreading this will have to be made thread safe before we can invoke @@ -238,12 +230,12 @@ DataRelayer::RelayChoice // become more complicated when we will start supporting ranges. auto getInputTimeslice = [&matchers = mInputMatchers, &distinctRoutes = mDistinctRoutesIndex, - &firstPart, + &rawHeader, &index](VariableContext& context) -> std::tuple { /// FIXME: for the moment we only use the first context and reset /// between one invokation and the other. - auto input = matchToContext(firstPart->GetData(), matchers, distinctRoutes, context); + auto input = matchToContext(rawHeader, matchers, distinctRoutes, context); if (input == INVALID_INPUT) { return { @@ -285,24 +277,24 @@ DataRelayer::RelayChoice }; // Actually save the header / payload in the slot - auto saveInSlot = [&firstPart, - &cachedStateMetrics = mCachedStateMetrics, - &restOfParts, - &restOfPartsSize, + auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics, + &messages, + &nMessages, + &nPayloads, &cache, &numInputTypes, &metrics](TimesliceId timeslice, int input, TimesliceSlot slot) { auto cacheIdx = numInputTypes * slot.index + input; - std::vector& parts = cache[cacheIdx].parts; + MessageSet& target = cache[cacheIdx]; cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING; // TODO: make sure that multiple parts can only be added within the same call of // DataRelayer::relay - PartRef entry{std::move(firstPart), std::move(restOfParts[0])}; - parts.emplace_back(std::move(entry)); - auto rest = restOfParts + 1; - for (size_t pi = 0; pi < (restOfPartsSize - 1) / 2; ++pi) { - PartRef entry{std::move(rest[pi * 2]), std::move(rest[pi * 2 + 1])}; - parts.emplace_back(std::move(entry)); + assert(nPayloads == 1); + assert(nMessages % 2 == 0); + for (size_t mi = 0; mi < nMessages; ++mi) { + assert(mi + nPayloads < nMessages); + target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1); + mi += nPayloads; } }; @@ -390,9 +382,10 @@ DataRelayer::RelayChoice VariableContext pristineContext; std::tie(input, timeslice) = getInputTimeslice(pristineContext); - auto DataHeaderInfo = [&firstPart]() { + auto DataHeaderInfo = [&rawHeader]() { std::string error; - const auto* dh = o2::header::get(firstPart->GetData()); + // extract header from message model + const auto* dh = o2::header::get(rawHeader); if (dh) { error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification); } else { @@ -405,10 +398,8 @@ DataRelayer::RelayChoice LOG(ERROR) << "Could not match incoming data to any input route: " << DataHeaderInfo(); mStats.malformedInputs++; mStats.droppedIncomingMessages++; - firstPart.reset(nullptr); - for (size_t pi = 0; pi < restOfPartsSize; ++pi) { - auto& payload = restOfParts[pi]; - payload.reset(nullptr); + for (size_t pi = 0; pi < nMessages; ++pi) { + messages[pi].reset(nullptr); } return Invalid; } @@ -417,10 +408,8 @@ DataRelayer::RelayChoice LOG(ERROR) << "Could not determine the timeslice for input: " << DataHeaderInfo(); mStats.malformedInputs++; mStats.droppedIncomingMessages++; - firstPart.reset(nullptr); - for (size_t pi = 0; pi < restOfPartsSize; ++pi) { - auto& payload = restOfParts[pi]; - payload.reset(nullptr); + for (size_t pi = 0; pi < nMessages; ++pi) { + messages[pi].reset(nullptr); } return Invalid; } @@ -447,10 +436,8 @@ DataRelayer::RelayChoice LOG(WARNING) << "Incoming data is invalid, not relaying."; mStats.malformedInputs++; mStats.droppedIncomingMessages++; - firstPart.reset(nullptr); - for (size_t pi = 0; pi < restOfPartsSize; ++pi) { - auto& payload = restOfParts[pi]; - payload.reset(nullptr); + for (size_t pi = 0; pi < nMessages; ++pi) { + messages[pi].reset(nullptr); } return Invalid; case TimesliceIndex::ActionTaken::ReplaceUnused: @@ -518,11 +505,12 @@ void DataRelayer::getReadyToProcess(std::vector& comp continue; } auto partial = getPartialRecord(li); + // TODO: get the data ref from message model auto getter = [&partial](size_t idx, size_t part) { - if (partial[idx].size() > 0 && partial[idx].at(part).header && partial[idx].at(part).payload) { + if (partial[idx].size() > 0 && partial[idx].header(part) && partial[idx].payload(part)) { return DataRef{nullptr, - reinterpret_cast(partial[idx].at(part).header->GetData()), - reinterpret_cast(partial[idx].at(part).payload->GetData())}; + reinterpret_cast(partial[idx].header(part)->GetData()), + reinterpret_cast(partial[idx].payload(part)->GetData())}; } return DataRef{}; }; @@ -657,6 +645,9 @@ void DataRelayer::publishMetrics() std::scoped_lock lock(mMutex); auto numInputTypes = mDistinctRoutesIndex.size(); + // FIXME: many of the DataRelayer function rely on allocated cache, so its + // maybe misleading to have the allocation in a function primarily for + // metrics publishing, do better in setPipelineLength? mCache.resize(numInputTypes * mTimesliceIndex.size()); mMetrics.send({(int)numInputTypes, "data_relayer/h", Verbosity::Debug}); mMetrics.send({(int)mTimesliceIndex.size(), "data_relayer/w", Verbosity::Debug}); diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index 1bc215ac015b8..0f1cef6196542 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -19,6 +19,8 @@ #include #include #include +#include +#include using Monitoring = o2::monitoring::Monitoring; using namespace o2::framework; @@ -98,12 +100,15 @@ static void BM_RelaySingleSlot(benchmark::State& state) for (auto _ : state) { // FIXME: Understand why pausing the timer makes it slower.. //state.PauseTiming(); - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); + std::array messages; + messages[0] = transport->CreateMessage(stack.size()); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; memcpy(header->GetData(), stack.data(), stack.size()); //state.ResumeTiming(); - relayer.relay(header, payload); + relayer.relay(header->GetData(), messages.data(), messages.size()); std::vector ready; relayer.getReadyToProcess(ready); assert(ready.size() == 1); @@ -150,13 +155,16 @@ static void BM_RelayMultipleSlots(benchmark::State& state) DataProcessingHeader dph{timeslice++, 1}; Stack stack{dh, dph}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); + std::array messages; + messages[0] = transport->CreateMessage(stack.size()); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; memcpy(header->GetData(), stack.data(), stack.size()); //state.ResumeTiming(); - relayer.relay(header, payload); + relayer.relay(header->GetData(), messages.data(), messages.size()); std::vector ready; relayer.getReadyToProcess(ready); assert(ready.size() == 1); @@ -210,27 +218,32 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) DataProcessingHeader dph1{timeslice, 1}; Stack stack1{dh1, dph1}; - FairMQMessagePtr header1 = transport->CreateMessage(stack1.size()); - FairMQMessagePtr payload1 = transport->CreateMessage(1000); + std::array messages; + messages[0] = transport->CreateMessage(stack1.size()); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header1 = messages[0]; + FairMQMessagePtr& payload1 = messages[1]; memcpy(header1->GetData(), stack1.data(), stack1.size()); DataProcessingHeader dph2{timeslice, 1}; Stack stack2{dh2, dph2}; - FairMQMessagePtr header2 = transport->CreateMessage(stack2.size()); - FairMQMessagePtr payload2 = transport->CreateMessage(1000); + messages[2] = transport->CreateMessage(stack2.size()); + messages[3] = transport->CreateMessage(1000); + FairMQMessagePtr& header2 = messages[2]; + FairMQMessagePtr& payload2 = messages[3]; memcpy(header2->GetData(), stack2.data(), stack2.size()); //state.ResumeTiming(); - relayer.relay(header1, payload1); + relayer.relay(header1->GetData(), &messages[0], 2); std::vector ready; relayer.getReadyToProcess(ready); assert(ready.size() == 1); assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); - relayer.relay(header2, payload2); + relayer.relay(header2->GetData(), &messages[2], 2); ready.clear(); relayer.getReadyToProcess(ready); assert(ready.size() == 1); @@ -275,12 +288,14 @@ static void BM_RelaySplitParts(benchmark::State& state) for (auto _ : state) { // FIXME: Understand why pausing the timer makes it slower.. state.PauseTiming(); + const int nSplitParts = 10; std::vector> splitParts; + splitParts.reserve(nSplitParts); - for (size_t i = 0; i < 100; ++i) { + for (size_t i = 0; i < nSplitParts; ++i) { DataProcessingHeader dph1{timeslice, 1}; dh1.splitPayloadIndex = i; - dh1.splitPayloadParts = 10; + dh1.splitPayloadParts = nSplitParts; Stack stack1{dh1, dph1}; FairMQMessagePtr header1 = transport->CreateMessage(stack1.size()); @@ -292,7 +307,7 @@ static void BM_RelaySplitParts(benchmark::State& state) } state.ResumeTiming(); - relayer.relay(splitParts[0], &splitParts[1], splitParts.size() - 1); + relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size()); std::vector ready; relayer.getReadyToProcess(ready); assert(ready.size() == 1); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 9c54830a48b38..29cbae472be97 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -16,6 +16,7 @@ #include #include "Headers/DataHeader.h" #include "Headers/Stack.h" +#include "MemoryResources/MemoryResources.h" #include "Framework/CompletionPolicyHelpers.h" #include "Framework/DataRelayer.h" #include "../src/DataRelayerHelpers.h" @@ -23,7 +24,8 @@ #include "Framework/WorkflowSpec.h" #include #include -#include +#include +#include using Monitoring = o2::monitoring::Monitoring; using namespace o2::framework; @@ -58,12 +60,14 @@ BOOST_AUTO_TEST_CASE(TestNoWait) dh.splitPayloadParts = 1; DataProcessingHeader dph{0, 1}; - Stack stack{dh, dph}; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - relayer.relay(header, payload); + std::array messages; + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), messages.data(), messages.size()); std::vector ready; relayer.getReadyToProcess(ready); BOOST_REQUIRE_EQUAL(ready.size(), 1); @@ -103,12 +107,14 @@ BOOST_AUTO_TEST_CASE(TestNoWaitMatcher) dh.splitPayloadParts = 1; DataProcessingHeader dph{0, 1}; - Stack stack{dh, dph}; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - relayer.relay(header, payload); + std::array messages; + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), messages.data(), messages.size()); std::vector ready; relayer.getReadyToProcess(ready); BOOST_REQUIRE_EQUAL(ready.size(), 1); @@ -151,14 +157,15 @@ BOOST_AUTO_TEST_CASE(TestRelay) relayer.setPipelineLength(4); auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - - auto createMessage = [&transport, &relayer](DataHeader& dh, size_t time) { - DataProcessingHeader dph{time, 1}; - Stack stack{dh, dph}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) { + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), messages.data(), messages.size()); BOOST_CHECK_EQUAL(header.get(), nullptr); BOOST_CHECK_EQUAL(payload.get(), nullptr); }; @@ -228,14 +235,15 @@ BOOST_AUTO_TEST_CASE(TestRelayBug) relayer.setPipelineLength(3); auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - - auto createMessage = [&transport, &relayer](DataHeader& dh, size_t time) { - DataProcessingHeader dph{time, 1}; - Stack stack{dh, dph}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) { + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), messages.data(), messages.size()); BOOST_CHECK_EQUAL(header.get(), nullptr); BOOST_CHECK_EQUAL(payload.get(), nullptr); }; @@ -318,12 +326,14 @@ BOOST_AUTO_TEST_CASE(TestCache) DataProcessingHeader dph{0, 1}; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - auto createMessage = [&transport, &relayer, &dh](const DataProcessingHeader& h) { - Stack stack{dh, h}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - auto res = relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto createMessage = [&transport, &channelAlloc, &relayer, &dh](auto const& h) { + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + auto res = relayer.relay(header->GetData(), messages.data(), messages.size()); BOOST_REQUIRE(res != DataRelayer::RelayChoice::WillRelay || header.get() == nullptr); BOOST_REQUIRE(res != DataRelayer::RelayChoice::WillRelay || payload.get() == nullptr); BOOST_REQUIRE(res != DataRelayer::RelayChoice::Backpressured || header.get() != nullptr); @@ -397,12 +407,14 @@ BOOST_AUTO_TEST_CASE(TestPolicies) dh2.splitPayloadParts = 1; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - auto createMessage = [&transport, &relayer](DataHeader const& dh, DataProcessingHeader const& h) { - Stack stack{dh, h}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - return relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) { + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + return relayer.relay(header->GetData(), messages.data(), messages.size()); }; // This fills the cache, and then empties it. @@ -465,12 +477,14 @@ BOOST_AUTO_TEST_CASE(TestClear) dh2.splitPayloadParts = 1; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - auto createMessage = [&transport, &relayer](DataHeader const& dh, DataProcessingHeader const& h) { - Stack stack{dh, h}; - FairMQMessagePtr header = transport->CreateMessage(stack.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), stack.data(), stack.size()); - return relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) { + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + return relayer.relay(header->GetData(), messages.data(), messages.size()); }; // This fills the cache, and then empties it. @@ -520,20 +534,22 @@ BOOST_AUTO_TEST_CASE(TestTooMany) dh2.splitPayloadParts = 1; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - - Stack s1{dh1, DataProcessingHeader{0, 1}}; - FairMQMessagePtr header = transport->CreateMessage(s1.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), s1.data(), s1.size()); - relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), &messages[0], 2); BOOST_CHECK_EQUAL(header.get(), nullptr); BOOST_CHECK_EQUAL(payload.get(), nullptr); // This fills the cache, and then waits. - Stack s2{dh1, DataProcessingHeader{1, 1}}; - FairMQMessagePtr header2 = transport->CreateMessage(s2.size()); - FairMQMessagePtr payload2 = transport->CreateMessage(1000); - memcpy(header2->GetData(), s2.data(), s2.size()); - auto action = relayer.relay(header2, payload2); + messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}}); + messages[3] = transport->CreateMessage(1000); + FairMQMessagePtr& header2 = messages[2]; + FairMQMessagePtr& payload2 = messages[3]; + auto action = relayer.relay(header2->GetData(), &messages[2], 2); BOOST_CHECK_EQUAL(action, DataRelayer::Backpressured); BOOST_CHECK_NE(header2.get(), nullptr); BOOST_CHECK_NE(payload2.get(), nullptr); @@ -575,20 +591,22 @@ BOOST_AUTO_TEST_CASE(SplitParts) dh2.splitPayloadParts = 1; auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq"); - - Stack s1{dh1, DataProcessingHeader{0, 1}}; - FairMQMessagePtr header = transport->CreateMessage(s1.size()); - FairMQMessagePtr payload = transport->CreateMessage(1000); - memcpy(header->GetData(), s1.data(), s1.size()); - relayer.relay(header, payload); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + std::array messages; + messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}}); + messages[1] = transport->CreateMessage(1000); + FairMQMessagePtr& header = messages[0]; + FairMQMessagePtr& payload = messages[1]; + relayer.relay(header->GetData(), &messages[0], 2); BOOST_CHECK_EQUAL(header.get(), nullptr); BOOST_CHECK_EQUAL(payload.get(), nullptr); // This fills the cache, and then waits. - Stack s2{dh1, DataProcessingHeader{1, 1}}; - FairMQMessagePtr header2 = transport->CreateMessage(s2.size()); - FairMQMessagePtr payload2 = transport->CreateMessage(1000); - memcpy(header2->GetData(), s2.data(), s2.size()); - auto action = relayer.relay(header2, payload2); + messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}}); + messages[3] = transport->CreateMessage(1000); + FairMQMessagePtr& header2 = messages[2]; + FairMQMessagePtr& payload2 = messages[3]; + auto action = relayer.relay(header2->GetData(), &messages[2], 2); BOOST_CHECK_EQUAL(action, DataRelayer::Backpressured); BOOST_CHECK_NE(header2.get(), nullptr); BOOST_CHECK_NE(payload2.get(), nullptr);