Skip to content

Commit a15d92f

Browse files
DPL: Extending initial parsing of input messages
This is in preparation for support of split payloads with pruned headers
1 parent 0b1ec43 commit a15d92f

1 file changed

Lines changed: 56 additions & 32 deletions

File tree

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -889,63 +889,86 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
889889
SourceInfo
890890
};
891891

892+
struct InputInfo {
893+
InputInfo(size_t p, size_t s, InputType t)
894+
: position(p), size(s), type(t)
895+
{
896+
}
897+
size_t position;
898+
size_t size;
899+
InputType type;
900+
};
901+
892902
// This is how we validate inputs. I.e. we try to enforce the O2 Data model
893903
// and we do a few stats. We bind parts as a lambda captured variable, rather
894904
// than an input, because we do not want the outer loop actually be exposed
895905
// to the implementation details of the messaging layer.
896906
auto getInputTypes = [&stats = context.registry->get<DataProcessingStats>(),
897-
&info, &context]() -> std::optional<std::vector<InputType>> {
907+
&info, &context]() -> std::optional<std::vector<InputInfo>> {
898908
auto& parts = info.parts;
899909
stats.inputParts = parts.Size();
900910

901911
TracyPlot("messages received", (int64_t)parts.Size());
902-
if (parts.Size() % 2) {
903-
return std::nullopt;
904-
}
905-
std::vector<InputType> results(parts.Size() / 2, InputType::Invalid);
912+
std::vector<InputInfo> results;
913+
// we can reserve the upper limit
914+
results.reserve(parts.Size() / 2);
915+
size_t nTotalPayloads = 0;
916+
917+
auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type) {
918+
results.emplace_back(position, length, type);
919+
if (type != InputType::Invalid && length > 1) {
920+
nTotalPayloads += length - 1;
921+
}
922+
};
906923

907-
for (size_t hi = 0; hi < parts.Size() / 2; ++hi) {
908-
auto pi = hi * 2;
924+
for (size_t pi = 0; pi < parts.Size(); pi += 2) {
909925
auto sih = o2::header::get<SourceInfoHeader*>(parts.At(pi)->GetData());
910926
if (sih) {
911927
info.state = sih->state;
912-
results[hi] = InputType::SourceInfo;
928+
insertInputInfo(pi, 2, InputType::SourceInfo);
913929
*context.wasActive = true;
914930
continue;
915931
}
916932
auto dh = o2::header::get<DataHeader*>(parts.At(pi)->GetData());
917933
if (!dh) {
918-
results[hi] = InputType::Invalid;
934+
insertInputInfo(pi, 0, InputType::Invalid);
919935
LOGP(error, "Header is not a DataHeader?");
920936
continue;
921937
}
922938
if (dh->payloadSize != parts.At(pi + 1)->GetSize()) {
923-
results[hi] = InputType::Invalid;
939+
insertInputInfo(pi, 0, InputType::Invalid);
924940
LOGP(error, "DataHeader payloadSize mismatch");
925941
continue;
926942
}
927943
TracyPlot("payload size", (int64_t)dh->payloadSize);
928944
auto dph = o2::header::get<DataProcessingHeader*>(parts.At(pi)->GetData());
929945
TracyAlloc(parts.At(pi + 1)->GetData(), parts.At(pi + 1)->GetSize());
930946
if (!dph) {
931-
results[hi] = InputType::Invalid;
947+
insertInputInfo(pi, 2, InputType::Invalid);
932948
LOGP(error, "Header stack does not contain DataProcessingHeader");
933949
continue;
934950
}
935-
// We can set the type for the next splitPayloadParts
936-
// because we are guaranteed they are all the same.
937-
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
938-
// pair.
939-
size_t finalSplitPayloadIndex = hi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1);
940-
if (finalSplitPayloadIndex > results.size()) {
941-
LOGP(error, "DataHeader::splitPayloadParts invalid");
942-
results[hi] = InputType::Invalid;
943-
continue;
944-
}
945-
for (; hi < finalSplitPayloadIndex; ++hi) {
946-
results[hi] = InputType::Data;
951+
{
952+
// We can set the type for the next splitPayloadParts
953+
// because we are guaranteed they are all the same.
954+
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
955+
// pair.
956+
size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
957+
if (finalSplitPayloadIndex > parts.Size()) {
958+
LOGP(error, "DataHeader::splitPayloadParts invalid");
959+
insertInputInfo(pi, 0, InputType::Invalid);
960+
continue;
961+
}
962+
insertInputInfo(pi, 2, InputType::Data);
963+
for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
964+
insertInputInfo(pi + 2, 2, InputType::Data);
965+
}
947966
}
948-
hi = finalSplitPayloadIndex - 1;
967+
}
968+
assert(std::accumulate(results.begin(), results.end(), 0, [](size_t const& count, auto const& element) -> size_t { return count + element.size; }));
969+
if (results.size() + nTotalPayloads != parts.Size()) {
970+
LOG(ERROR) << "inconsistent number of inputs extracted";
971+
return std::nullopt;
949972
}
950973
return results;
951974
};
@@ -954,21 +977,22 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
954977
registry.get<DataProcessingStats>().errorCount++;
955978
};
956979

957-
auto handleValidMessages = [&info, &context = context, &relayer = *context.relayer, &reportError](std::vector<InputType> const& types) {
980+
auto handleValidMessages = [&info, &context = context, &relayer = *context.relayer, &reportError](std::vector<InputInfo> const& inputInfos) {
958981
static WaitBackpressurePolicy policy;
959982
auto& parts = info.parts;
960983
// We relay execution to make sure we have a complete set of parts
961984
// available.
962-
for (size_t pi = 0; pi < (parts.Size() / 2); ++pi) {
963-
switch (types[pi]) {
985+
for (auto ii = 0; ii < inputInfos.size(); ++ii) {
986+
auto const& input = inputInfos[ii];
987+
switch (input.type) {
964988
case InputType::Data: {
965-
auto headerIndex = 2 * pi;
966-
auto payloadIndex = 2 * pi + 1;
989+
auto headerIndex = input.position;
990+
auto payloadIndex = headerIndex + 1;
967991
assert(payloadIndex < parts.Size());
968992
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
969993
auto relayed = relayer.relay(parts.At(headerIndex),
970994
&parts.At(payloadIndex), dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 0);
971-
pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
995+
ii += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
972996
switch (relayed) {
973997
case DataRelayer::Backpressured:
974998
if (info.normalOpsNotified == true && info.backpressureNotified == false) {
@@ -991,8 +1015,8 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
9911015
} break;
9921016
case InputType::SourceInfo: {
9931017
*context.wasActive = true;
994-
auto headerIndex = 2 * pi;
995-
auto payloadIndex = 2 * pi + 1;
1018+
auto headerIndex = input.position;
1019+
auto payloadIndex = input.position + 1;
9961020
assert(payloadIndex < parts.Size());
9971021
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
9981022
// FIXME: the message with the end of stream cannot contain

0 commit comments

Comments
 (0)