Skip to content

Commit f70f87e

Browse files
DPL: Implementing support for multiple payloads per header
This is supporting an upcoming extension of the O2 data model to allow not only header-payload pairs but also sequences of payloads preceded by a single header message. Such a sequence can be indicated by setting splitPayloadIndex == splitpayloadParts with splitpayloadParts > 1. A change in the version of DataHeader will be discussed, although there is no change in the members of the header. There is no change on the InputRecord API, only the payload size of the current payload is provided in the DataRef object. It needs to be discussed, which payload size is going to be provided in the data header in case of a sequence with multiple payloads. Changes alongside (most of them meanwhile moved to separate commits): - make MessageSet the owner of FairMQMessage objects in a linear vector, enhances performance also for split payload sequences in header-paiload format - generalizing DataRelayer to relay array of messages - storing the payload message size in the DataRef Extending and cleaning up DataRelayer test and benchmark - allocating header stack through transport allocator and avoiding copy - adding test for split parts in the header-payload scheme - adding test for split parts in payloads sequence scheme During the development a performance regression has been observed when using o2::pmr::getMessage, needs to be investigated.
1 parent 97572bd commit f70f87e

5 files changed

Lines changed: 160 additions & 18 deletions

File tree

Framework/Core/include/Framework/MessageSet.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,19 @@ struct MessageSet {
3838
};
3939
// linear storage of messages
4040
std::vector<FairMQMessagePtr> messages;
41-
// message map descibes O2 messages consisting of a header message and
42-
// payload message(s), index descibes position in the linear storage
41+
// message map describes O2 messages consisting of a header message and
42+
// payload message(s), index describes position in the linear storage
4343
std::vector<Index> messageMap;
44-
// pair map describes all messages in header-payload pairs and where
45-
// in the message index the an associated header and payload can be found
46-
std::vector<std::pair<size_t, size_t>> pairMap;
44+
// pair map describes all messages in one sequence of header-payload pairs and
45+
// where in the message index the associated header and payload can be found
46+
struct PairMapping {
47+
PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
48+
// O2 message where the pair is located in
49+
size_t partIndex = 0;
50+
// payload index within the O2 message
51+
size_t payloadIndex = 0;
52+
};
53+
std::vector<PairMapping> pairMap;
4754

4855
MessageSet()
4956
: messages(), messageMap(), pairMap()
@@ -161,13 +168,13 @@ struct MessageSet {
161168

162169
FairMQMessagePtr const& associatedHeader(size_t pos) const
163170
{
164-
return messages[messageMap[pairMap[pos].first].position];
171+
return messages[messageMap[pairMap[pos].partIndex].position];
165172
}
166173

167174
FairMQMessagePtr const& associatedPayload(size_t pos) const
168175
{
169-
auto partIndex = pairMap[pos].first;
170-
auto payloadIndex = pairMap[pos].second;
176+
auto partIndex = pairMap[pos].partIndex;
177+
auto payloadIndex = pairMap[pos].payloadIndex;
171178
return messages[messageMap[partIndex].position + payloadIndex + 1];
172179
}
173180
};

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
947947
LOGP(error, "Header is not a DataHeader?");
948948
continue;
949949
}
950-
if (dh->payloadSize != parts.At(pi + 1)->GetSize()) {
950+
if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
951951
insertInputInfo(pi, 0, InputType::Invalid);
952952
LOGP(error, "DataHeader payloadSize mismatch");
953953
continue;
@@ -960,7 +960,12 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
960960
LOGP(error, "Header stack does not contain DataProcessingHeader");
961961
continue;
962962
}
963-
{
963+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
964+
// this is indicating a sequence of payloads following the header
965+
// FIXME: we will probably also set the DataHeader version
966+
insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data);
967+
pi += dh->splitPayloadParts - 1;
968+
} else {
964969
// We can set the type for the next splitPayloadParts
965970
// because we are guaranteed they are all the same.
966971
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
@@ -1001,7 +1006,11 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
10011006
auto headerIndex = input.position;
10021007
auto nMessages = 0;
10031008
auto nPayloadsPerHeader = 0;
1004-
{
1009+
if (input.size > 2) {
1010+
// header and multiple payload sequence
1011+
nMessages = input.size;
1012+
nPayloadsPerHeader = nMessages - 1;
1013+
} else {
10051014
// multiple header-payload pairs
10061015
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
10071016
nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
@@ -1213,6 +1222,9 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
12131222
auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
12141223
assert(record.size() == currentSetOfInputs.size());
12151224
for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
1225+
// assuming that for timer inputs we do have exactly one PartRef object
1226+
// in the MessageSet, multiple PartRef Objects are only possible for either
1227+
// split payload messages of wildcard matchers, both for data inputs
12161228
DataRef input = record.getByPos(ii);
12171229
if (input.spec->lifetime != Lifetime::Timer) {
12181230
continue;
@@ -1327,7 +1339,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
13271339
}
13281340
// We need to find the forward route only for the first
13291341
// part of a split payload. All the others will use the same.
1330-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1) {
1342+
// but always check if we have a sequence of multiple payloads
1343+
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 1) {
13311344
cachedForwardingChoice = -1;
13321345
for (size_t fi = 0; fi < spec->forwards.size(); fi++) {
13331346
auto& forward = spec->forwards[fi];

Framework/Core/src/DataRelayer.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,7 @@ DataRelayer::RelayChoice
292292
cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
293293
// TODO: make sure that multiple parts can only be added within the same call of
294294
// DataRelayer::relay
295-
assert(nPayloads == 1);
296-
assert(nMessages % 2 == 0);
295+
assert(nPayloads > 0);
297296
for (size_t mi = 0; mi < nMessages; ++mi) {
298297
assert(mi + nPayloads < nMessages);
299298
target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1);

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,3 +611,123 @@ BOOST_AUTO_TEST_CASE(SplitParts)
611611
BOOST_CHECK_NE(header2.get(), nullptr);
612612
BOOST_CHECK_NE(payload2.get(), nullptr);
613613
}
614+
615+
BOOST_AUTO_TEST_CASE(SplitPayloadPairs)
616+
{
617+
Monitoring metrics;
618+
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
619+
620+
std::vector<InputRoute> inputs = {
621+
InputRoute{spec1, 0, "Fake1", 0},
622+
};
623+
624+
std::vector<ForwardRoute> forwards;
625+
TimesliceIndex index{1};
626+
627+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
628+
DataRelayer relayer(policy, inputs, metrics, index);
629+
relayer.setPipelineLength(4);
630+
631+
DataHeader dh{"CLUSTERS", "TPC", 0};
632+
633+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
634+
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
635+
size_t timeslice = 0;
636+
637+
const int nSplitParts = 100;
638+
std::vector<std::unique_ptr<FairMQMessage>> splitParts;
639+
splitParts.reserve(2 * nSplitParts);
640+
641+
for (size_t i = 0; i < nSplitParts; ++i) {
642+
dh.splitPayloadIndex = i;
643+
dh.splitPayloadParts = nSplitParts;
644+
645+
FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
646+
FairMQMessagePtr payload = transport->CreateMessage(100);
647+
648+
splitParts.emplace_back(std::move(header));
649+
splitParts.emplace_back(std::move(payload));
650+
}
651+
BOOST_REQUIRE_EQUAL(splitParts.size(), 2 * nSplitParts);
652+
653+
relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size());
654+
std::vector<RecordAction> ready;
655+
relayer.getReadyToProcess(ready);
656+
BOOST_REQUIRE_EQUAL(ready.size(), 1);
657+
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
658+
auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
659+
// we have one input route and thus one message set containing pairs for all
660+
// payloads
661+
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
662+
BOOST_CHECK_EQUAL(messageSet[0].size(), nSplitParts);
663+
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 1);
664+
}
665+
666+
BOOST_AUTO_TEST_CASE(SplitPayloadSequence)
667+
{
668+
Monitoring metrics;
669+
InputSpec spec1{"clusters", "TST", "COUNTER"};
670+
671+
std::vector<InputRoute> inputs = {
672+
InputRoute{spec1, 0, "Fake1", 0},
673+
};
674+
675+
std::vector<ForwardRoute> forwards;
676+
TimesliceIndex index{1};
677+
678+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
679+
DataRelayer relayer(policy, inputs, metrics, index);
680+
relayer.setPipelineLength(4);
681+
682+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
683+
size_t timeslice = 0;
684+
685+
std::vector<size_t> sequenceSize;
686+
size_t nTotalPayloads = 0;
687+
688+
auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
689+
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
690+
std::vector<std::unique_ptr<FairMQMessage>> messages;
691+
messages.reserve(nPayloads + 1);
692+
DataHeader dh{"COUNTER", "TST", 0};
693+
694+
// one header with index set to the number of split parts indicates sequence
695+
// of payloads without additional headers
696+
dh.splitPayloadIndex = nPayloads;
697+
dh.splitPayloadParts = nPayloads;
698+
FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
699+
messages.emplace_back(std::move(header));
700+
701+
for (size_t i = 0; i < nPayloads; ++i) {
702+
messages.emplace_back(transport->CreateMessage(100));
703+
*(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
704+
++nTotalPayloads;
705+
}
706+
BOOST_CHECK_EQUAL(messages.size(), nPayloads + 1);
707+
relayer.relay(messages[0]->GetData(), messages.data(), messages.size(), nPayloads);
708+
sequenceSize.emplace_back(nPayloads);
709+
};
710+
createSequence(100);
711+
createSequence(1);
712+
createSequence(42);
713+
714+
std::vector<RecordAction> ready;
715+
relayer.getReadyToProcess(ready);
716+
BOOST_REQUIRE_EQUAL(ready.size(), 1);
717+
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
718+
auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
719+
// we have one input route
720+
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
721+
// one message set containing number of added sequences of messages
722+
BOOST_REQUIRE_EQUAL(messageSet[0].size(), sequenceSize.size());
723+
size_t counter = 0;
724+
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
725+
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(seqid), sequenceSize[seqid]);
726+
for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
727+
BOOST_REQUIRE(messageSet[0].payload(seqid, pi));
728+
auto const* data = messageSet[0].payload(seqid, pi)->GetData();
729+
BOOST_CHECK_EQUAL(*(reinterpret_cast<size_t const*>(data)), counter);
730+
++counter;
731+
}
732+
}
733+
}

Framework/Core/test/test_VariablePayloadSequenceWorkflow.cxx

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
107107
outputs.make<int>(OutputRef{"allocator", 0}) = counter;
108108

109109
if (channelName.empty()) {
110-
OutputSpec const query{"TST", "PAIR", 0};
110+
OutputSpec const query{"TST", "SEQUENCE", 0};
111111
auto outputRoutes = rds.spec().outputs;
112112
for (auto& route : outputRoutes) {
113113
if (DataSpecUtils::match(route.matcher, query)) {
@@ -161,7 +161,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
161161
}
162162
};
163163

164-
//createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0});
164+
createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0});
165165
createPairs(counter + 1, DataHeader{"PAIR", "TST", 0});
166166

167167
// using utility from ExternalFairMQDeviceProxy
@@ -178,6 +178,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
178178
workflow.emplace_back(DataProcessorSpec{"producer",
179179
{InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
180180
{OutputSpec{{"pair"}, "TST", "PAIR", 0, Lifetime::Timeframe},
181+
OutputSpec{{"sequence"}, "TST", "SEQUENCE", 0, Lifetime::Timeframe},
181182
OutputSpec{{"allocator"}, "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
182183
AlgorithmSpec{adaptStateless(producerCallback)},
183184
{ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}});
@@ -223,7 +224,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
223224
}
224225
};
225226

226-
auto createCounters = [](RawDeviceService& rds) -> std::shared_ptr<ConsumerCounters> {
227+
auto createCounters = [](RawDeviceService const& rds) -> std::shared_ptr<ConsumerCounters> {
227228
auto counters = std::make_shared<ConsumerCounters>();
228229
ConsumerCounters& c = *counters;
229230
for (auto const& channelSpec : rds.spec().inputChannels) {
@@ -248,7 +249,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
248249
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
249250
// the consumer process connects to the producer
250251
//
251-
auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService& rds, CallbackService& callbacks) {
252+
auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService const& rds, CallbackService& callbacks) {
252253
auto counters = createCounters(rds);
253254
callbacks.set(CallbackService::Id::Stop, [counters, checkCounters]() {
254255
ASSERT_ERROR(checkCounters(counters));
@@ -267,6 +268,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
267268

268269
workflow.emplace_back(DataProcessorSpec{"consumer",
269270
{InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
271+
InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
270272
InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
271273
{},
272274
AlgorithmSpec{adaptStateful(consumerInit)}});
@@ -276,6 +278,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
276278
//
277279
workflow.emplace_back(DataProcessorSpec{"spectator",
278280
{InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
281+
InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
279282
InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
280283
{},
281284
AlgorithmSpec{adaptStateful(consumerInit)}});

0 commit comments

Comments
 (0)