Skip to content

Commit 981053e

Browse files
DPL: adjusting DataRelayer interface
DataRelayer is using new version of MessageSet API which will allow to switch message store in MessageSet transparently
1 parent efcd24c commit 981053e

5 files changed

Lines changed: 198 additions & 166 deletions

File tree

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,18 @@ class DataRelayer
8888

8989
/// This is to relay a whole set of FairMQMessages, all which are part
9090
/// of the same set of split parts.
91-
/// @a firstHeader is the first message of such set
92-
/// @a restOfParts is a pointer to the rest of the messages
93-
/// @a restSize is how many messages are there in restOfParts
94-
/// is the header which is common across all subsequent elements.
91+
/// @a rawHeader raw header pointer
92+
/// @a messages pointer to array of messages
93+
/// @a nMessages size of the array
94+
/// @a nPayloads number of payploads in the message sequence, default is 1
95+
/// which is the standard header-payload message pair, in this
96+
/// case nMessages / 2 pairs will be inserted and considered
97+
/// separate parts
9598
/// Notice that we expect that the header is an O2 Header Stack
96-
RelayChoice relay(std::unique_ptr<FairMQMessage>& firstHeader,
97-
std::unique_ptr<FairMQMessage>* restOfParts,
98-
size_t restSize);
99-
100-
/// This is used to ask for relaying a given (header,payload) pair.
101-
/// Notice that we expect that the header is an O2 Header Stack
102-
/// with a DataProcessingHeader inside so that we can assess time.
103-
RelayChoice relay(std::unique_ptr<FairMQMessage>& header,
104-
std::unique_ptr<FairMQMessage>& payload);
99+
RelayChoice relay(void const* rawHeader,
100+
std::unique_ptr<FairMQMessage>* messages,
101+
size_t nMessages,
102+
size_t nPayloads = 1);
105103

106104
/// @returns the actions ready to be performed.
107105
void getReadyToProcess(std::vector<RecordAction>& completed);

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -987,12 +987,19 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
987987
switch (input.type) {
988988
case InputType::Data: {
989989
auto headerIndex = input.position;
990-
auto payloadIndex = headerIndex + 1;
991-
assert(payloadIndex < parts.Size());
992-
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
993-
auto relayed = relayer.relay(parts.At(headerIndex),
994-
&parts.At(payloadIndex), dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 0);
995-
ii += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
990+
auto nMessages = 0;
991+
auto nPayloadsPerHeader = 0;
992+
{
993+
// multiple header-payload pairs
994+
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
995+
nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
996+
nPayloadsPerHeader = 1;
997+
ii += (nMessages / 2) - 1;
998+
}
999+
auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1000+
&parts.At(headerIndex),
1001+
nMessages,
1002+
nPayloadsPerHeader);
9961003
switch (relayed) {
9971004
case DataRelayer::Backpressured:
9981005
if (info.normalOpsNotified == true && info.backpressureNotified == false) {
@@ -1142,8 +1149,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
11421149
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
11431150
if (currentSetOfInputs[i].size() > partindex) {
11441151
return DataRef{nullptr,
1145-
static_cast<char const*>(currentSetOfInputs[i].at(partindex).header->GetData()),
1146-
static_cast<char const*>(currentSetOfInputs[i].at(partindex).payload->GetData())};
1152+
static_cast<char const*>(currentSetOfInputs[i].header(partindex)->GetData()),
1153+
static_cast<char const*>(currentSetOfInputs[i].payload(partindex)->GetData())};
11471154
}
11481155
return DataRef{nullptr, nullptr, nullptr};
11491156
};
@@ -1261,22 +1268,22 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
12611268
int cachedForwardingChoice = -1;
12621269

12631270
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
1264-
auto& part = currentSetOfInputs[ii][pi];
1265-
auto& header = part.header;
1266-
auto& payload = part.payload;
1271+
auto& messageSet = currentSetOfInputs[ii];
1272+
auto const& header = messageSet.header(pi);
1273+
12671274
if (header.get() == nullptr) {
12681275
// FIXME: this should not happen, however it's actually harmless and
12691276
// we can simply discard it for the moment.
12701277
// LOG(ERROR) << "Missing header! " << dh->dataDescription;
12711278
continue;
12721279
}
12731280

1274-
auto fdph = o2::header::get<DataProcessingHeader*>(header.get()->GetData());
1281+
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
12751282
if (fdph == nullptr) {
12761283
LOG(ERROR) << "Data is missing DataProcessingHeader";
12771284
continue;
12781285
}
1279-
auto fdh = o2::header::get<DataHeader*>(header.get()->GetData());
1286+
auto fdh = o2::header::get<DataHeader*>(header->GetData());
12801287
if (fdh == nullptr) {
12811288
LOG(ERROR) << "Data is missing DataHeader";
12821289
continue;
@@ -1303,23 +1310,26 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
13031310

13041311
if (copy) {
13051312
auto&& newHeader = header->GetTransport()->CreateMessage();
1306-
auto&& newPayload = header->GetTransport()->CreateMessage();
13071313
newHeader->Copy(*header);
1308-
newPayload->Copy(*payload);
1309-
13101314
forwardedParts[cachedForwardingChoice].AddPart(std::move(newHeader));
1311-
forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload));
1315+
1316+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
1317+
auto&& newPayload = header->GetTransport()->CreateMessage();
1318+
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
1319+
forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload));
1320+
}
13121321
} else {
1313-
forwardedParts[cachedForwardingChoice].AddPart(std::move(header));
1314-
forwardedParts[cachedForwardingChoice].AddPart(std::move(payload));
1322+
forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.header(pi)));
1323+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
1324+
forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
1325+
}
13151326
}
13161327
}
13171328
}
13181329
for (size_t fi = 0; fi < spec->forwards.size(); fi++) {
13191330
if (forwardedParts[fi].Size() == 0) {
13201331
continue;
13211332
}
1322-
assert(forwardedParts[fi].Size() % 2 == 0);
13231333
// in DPL we are using subchannel 0 only
13241334
device->Send(forwardedParts[fi], spec->forwards[fi].channel, 0);
13251335
}

Framework/Core/src/DataRelayer.cxx

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
126126
// We check that no data is already there for the given cell
127127
// it is enough to check the first element
128128
auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
129-
if (part.size() > 0 && part[0].header != nullptr) {
129+
if (part.size() > 0 && part.header(0) != nullptr) {
130130
continue;
131131
}
132-
if (part.size() > 0 && part[0].payload != nullptr) {
132+
if (part.size() > 0 && part.payload(0) != nullptr) {
133133
continue;
134134
}
135135
// We check that the cell can actually be expired.
@@ -145,16 +145,14 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
145145

146146
assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
147147
assert(expirator.handler);
148-
// expired, so we create one entry
149-
if (part.size() == 0) {
150-
part.parts.resize(1);
151-
}
152-
expirator.handler(services, part[0], variables);
148+
PartRef newRef;
149+
expirator.handler(services, newRef, variables);
150+
part.reset(std::move(newRef));
153151
activity.expiredSlots++;
154152

155153
mTimesliceIndex.markAsDirty(slot, true);
156-
assert(part[0].header != nullptr);
157-
assert(part[0].payload != nullptr);
154+
assert(part.header(0) != nullptr);
155+
assert(part.payload(0) != nullptr);
158156
}
159157
}
160158
return activity;
@@ -163,7 +161,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
163161
/// This does the mapping between a route and a InputSpec. The
164162
/// reason why these might diffent is that when you have timepipelining
165163
/// you have one route per timeslice, even if the type is the same.
166-
size_t matchToContext(void* data,
164+
size_t matchToContext(void const* data,
167165
std::vector<DataDescriptorMatcher> const& matchers,
168166
std::vector<size_t> const& index,
169167
VariableContext& context)
@@ -203,19 +201,13 @@ void sendVariableContextMetrics(VariableContext& context, TimesliceSlot slot,
203201
}
204202

205203
DataRelayer::RelayChoice
206-
DataRelayer::relay(std::unique_ptr<FairMQMessage>& header,
207-
std::unique_ptr<FairMQMessage>& payload)
208-
{
209-
return relay(header, &payload, 1);
210-
}
211-
212-
DataRelayer::RelayChoice
213-
DataRelayer::relay(std::unique_ptr<FairMQMessage>& firstPart,
214-
std::unique_ptr<FairMQMessage>* restOfParts,
215-
size_t restOfPartsSize)
204+
DataRelayer::relay(void const* rawHeader,
205+
std::unique_ptr<FairMQMessage>* messages,
206+
size_t nMessages,
207+
size_t nPayloads)
216208
{
217209
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
218-
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(firstPart->GetData());
210+
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
219211
// STATE HOLDING VARIABLES
220212
// This is the class level state of the relaying. If we start supporting
221213
// multithreading this will have to be made thread safe before we can invoke
@@ -238,12 +230,12 @@ DataRelayer::RelayChoice
238230
// become more complicated when we will start supporting ranges.
239231
auto getInputTimeslice = [&matchers = mInputMatchers,
240232
&distinctRoutes = mDistinctRoutesIndex,
241-
&firstPart,
233+
&rawHeader,
242234
&index](VariableContext& context)
243235
-> std::tuple<int, TimesliceId> {
244236
/// FIXME: for the moment we only use the first context and reset
245237
/// between one invokation and the other.
246-
auto input = matchToContext(firstPart->GetData(), matchers, distinctRoutes, context);
238+
auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
247239

248240
if (input == INVALID_INPUT) {
249241
return {
@@ -285,24 +277,24 @@ DataRelayer::RelayChoice
285277
};
286278

287279
// Actually save the header / payload in the slot
288-
auto saveInSlot = [&firstPart,
289-
&cachedStateMetrics = mCachedStateMetrics,
290-
&restOfParts,
291-
&restOfPartsSize,
280+
auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
281+
&messages,
282+
&nMessages,
283+
&nPayloads,
292284
&cache,
293285
&numInputTypes,
294286
&metrics](TimesliceId timeslice, int input, TimesliceSlot slot) {
295287
auto cacheIdx = numInputTypes * slot.index + input;
296-
std::vector<PartRef>& parts = cache[cacheIdx].parts;
288+
MessageSet& target = cache[cacheIdx];
297289
cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
298290
// TODO: make sure that multiple parts can only be added within the same call of
299291
// DataRelayer::relay
300-
PartRef entry{std::move(firstPart), std::move(restOfParts[0])};
301-
parts.emplace_back(std::move(entry));
302-
auto rest = restOfParts + 1;
303-
for (size_t pi = 0; pi < (restOfPartsSize - 1) / 2; ++pi) {
304-
PartRef entry{std::move(rest[pi * 2]), std::move(rest[pi * 2 + 1])};
305-
parts.emplace_back(std::move(entry));
292+
assert(nPayloads == 1);
293+
assert(nMessages % 2 == 0);
294+
for (size_t mi = 0; mi < nMessages; ++mi) {
295+
assert(mi + nPayloads < nMessages);
296+
target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1);
297+
mi += nPayloads;
306298
}
307299
};
308300

@@ -390,9 +382,10 @@ DataRelayer::RelayChoice
390382
VariableContext pristineContext;
391383
std::tie(input, timeslice) = getInputTimeslice(pristineContext);
392384

393-
auto DataHeaderInfo = [&firstPart]() {
385+
auto DataHeaderInfo = [&rawHeader]() {
394386
std::string error;
395-
const auto* dh = o2::header::get<o2::header::DataHeader*>(firstPart->GetData());
387+
// extract header from message model
388+
const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
396389
if (dh) {
397390
error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
398391
} else {
@@ -405,10 +398,8 @@ DataRelayer::RelayChoice
405398
LOG(ERROR) << "Could not match incoming data to any input route: " << DataHeaderInfo();
406399
mStats.malformedInputs++;
407400
mStats.droppedIncomingMessages++;
408-
firstPart.reset(nullptr);
409-
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
410-
auto& payload = restOfParts[pi];
411-
payload.reset(nullptr);
401+
for (size_t pi = 0; pi < nMessages; ++pi) {
402+
messages[pi].reset(nullptr);
412403
}
413404
return Invalid;
414405
}
@@ -417,10 +408,8 @@ DataRelayer::RelayChoice
417408
LOG(ERROR) << "Could not determine the timeslice for input: " << DataHeaderInfo();
418409
mStats.malformedInputs++;
419410
mStats.droppedIncomingMessages++;
420-
firstPart.reset(nullptr);
421-
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
422-
auto& payload = restOfParts[pi];
423-
payload.reset(nullptr);
411+
for (size_t pi = 0; pi < nMessages; ++pi) {
412+
messages[pi].reset(nullptr);
424413
}
425414
return Invalid;
426415
}
@@ -447,10 +436,8 @@ DataRelayer::RelayChoice
447436
LOG(WARNING) << "Incoming data is invalid, not relaying.";
448437
mStats.malformedInputs++;
449438
mStats.droppedIncomingMessages++;
450-
firstPart.reset(nullptr);
451-
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
452-
auto& payload = restOfParts[pi];
453-
payload.reset(nullptr);
439+
for (size_t pi = 0; pi < nMessages; ++pi) {
440+
messages[pi].reset(nullptr);
454441
}
455442
return Invalid;
456443
case TimesliceIndex::ActionTaken::ReplaceUnused:
@@ -518,11 +505,12 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
518505
continue;
519506
}
520507
auto partial = getPartialRecord(li);
508+
// TODO: get the data ref from message model
521509
auto getter = [&partial](size_t idx, size_t part) {
522-
if (partial[idx].size() > 0 && partial[idx].at(part).header && partial[idx].at(part).payload) {
510+
if (partial[idx].size() > 0 && partial[idx].header(part) && partial[idx].payload(part)) {
523511
return DataRef{nullptr,
524-
reinterpret_cast<const char*>(partial[idx].at(part).header->GetData()),
525-
reinterpret_cast<const char*>(partial[idx].at(part).payload->GetData())};
512+
reinterpret_cast<const char*>(partial[idx].header(part)->GetData()),
513+
reinterpret_cast<const char*>(partial[idx].payload(part)->GetData())};
526514
}
527515
return DataRef{};
528516
};
@@ -657,6 +645,9 @@ void DataRelayer::publishMetrics()
657645
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
658646

659647
auto numInputTypes = mDistinctRoutesIndex.size();
648+
// FIXME: many of the DataRelayer function rely on allocated cache, so its
649+
// maybe misleading to have the allocation in a function primarily for
650+
// metrics publishing, do better in setPipelineLength?
660651
mCache.resize(numInputTypes * mTimesliceIndex.size());
661652
mMetrics.send({(int)numInputTypes, "data_relayer/h", Verbosity::Debug});
662653
mMetrics.send({(int)mTimesliceIndex.size(), "data_relayer/w", Verbosity::Debug});

0 commit comments

Comments
 (0)