Skip to content

Commit b3cc676

Browse files
DPL: Extending initial parsing of input messages
This is in preparation for support of split payloads with pruned headers
1 parent a5dbb75 commit b3cc676

1 file changed

Lines changed: 56 additions & 32 deletions

File tree

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -886,63 +886,86 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
886886
SourceInfo
887887
};
888888

889+
struct InputInfo {
890+
InputInfo(size_t p, size_t s, InputType t)
891+
: position(p), size(s), type(t)
892+
{
893+
}
894+
size_t position;
895+
size_t size;
896+
InputType type;
897+
};
898+
889899
// This is how we validate inputs. I.e. we try to enforce the O2 Data model
890900
// and we do a few stats. We bind parts as a lambda captured variable, rather
891901
// than an input, because we do not want the outer loop actually be exposed
892902
// to the implementation details of the messaging layer.
893903
auto getInputTypes = [&stats = context.registry->get<DataProcessingStats>(),
894-
&info, &context]() -> std::optional<std::vector<InputType>> {
904+
&info, &context]() -> std::optional<std::vector<InputInfo>> {
895905
auto& parts = info.parts;
896906
stats.inputParts = parts.Size();
897907

898908
TracyPlot("messages received", (int64_t)parts.Size());
899-
if (parts.Size() % 2) {
900-
return std::nullopt;
901-
}
902-
std::vector<InputType> results(parts.Size() / 2, InputType::Invalid);
909+
std::vector<InputInfo> results;
910+
// we can reserve the upper limit
911+
results.reserve(parts.Size() / 2);
912+
size_t nTotalPayloads = 0;
913+
914+
auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type) {
915+
results.emplace_back(position, length, type);
916+
if (type != InputType::Invalid && length > 1) {
917+
nTotalPayloads += length - 1;
918+
}
919+
};
903920

904-
for (size_t hi = 0; hi < parts.Size() / 2; ++hi) {
905-
auto pi = hi * 2;
921+
for (size_t pi = 0; pi < parts.Size(); pi += 2) {
906922
auto sih = o2::header::get<SourceInfoHeader*>(parts.At(pi)->GetData());
907923
if (sih) {
908924
info.state = sih->state;
909-
results[hi] = InputType::SourceInfo;
925+
insertInputInfo(pi, 2, InputType::SourceInfo);
910926
*context.wasActive = true;
911927
continue;
912928
}
913929
auto dh = o2::header::get<DataHeader*>(parts.At(pi)->GetData());
914930
if (!dh) {
915-
results[hi] = InputType::Invalid;
931+
insertInputInfo(pi, 0, InputType::Invalid);
916932
LOGP(error, "Header is not a DataHeader?");
917933
continue;
918934
}
919935
if (dh->payloadSize != parts.At(pi + 1)->GetSize()) {
920-
results[hi] = InputType::Invalid;
936+
insertInputInfo(pi, 0, InputType::Invalid);
921937
LOGP(error, "DataHeader payloadSize mismatch");
922938
continue;
923939
}
924940
TracyPlot("payload size", (int64_t)dh->payloadSize);
925941
auto dph = o2::header::get<DataProcessingHeader*>(parts.At(pi)->GetData());
926942
TracyAlloc(parts.At(pi + 1)->GetData(), parts.At(pi + 1)->GetSize());
927943
if (!dph) {
928-
results[hi] = InputType::Invalid;
944+
insertInputInfo(pi, 2, InputType::Invalid);
929945
LOGP(error, "Header stack does not contain DataProcessingHeader");
930946
continue;
931947
}
932-
// We can set the type for the next splitPayloadParts
933-
// because we are guaranteed they are all the same.
934-
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
935-
// pair.
936-
size_t finalSplitPayloadIndex = hi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1);
937-
if (finalSplitPayloadIndex > results.size()) {
938-
LOGP(error, "DataHeader::splitPayloadParts invalid");
939-
results[hi] = InputType::Invalid;
940-
continue;
941-
}
942-
for (; hi < finalSplitPayloadIndex; ++hi) {
943-
results[hi] = InputType::Data;
948+
{
949+
// We can set the type for the next splitPayloadParts
950+
// because we are guaranteed they are all the same.
951+
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
952+
// pair.
953+
size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
954+
if (finalSplitPayloadIndex > parts.Size()) {
955+
LOGP(error, "DataHeader::splitPayloadParts invalid");
956+
insertInputInfo(pi, 0, InputType::Invalid);
957+
continue;
958+
}
959+
insertInputInfo(pi, 2, InputType::Data);
960+
for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
961+
insertInputInfo(pi + 2, 2, InputType::Data);
962+
}
944963
}
945-
hi = finalSplitPayloadIndex - 1;
964+
}
965+
assert(std::accumulate(results.begin(), results.end(), 0, [](size_t const& count, auto const& element) -> size_t { return count + element.size; }));
966+
if (results.size() + nTotalPayloads != parts.Size()) {
967+
LOG(ERROR) << "inconsistent number of inputs extracted";
968+
return std::nullopt;
946969
}
947970
return results;
948971
};
@@ -951,21 +974,22 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
951974
registry.get<DataProcessingStats>().errorCount++;
952975
};
953976

954-
auto handleValidMessages = [&info, &context = context, &relayer = *context.relayer, &reportError](std::vector<InputType> const& types) {
977+
auto handleValidMessages = [&info, &context = context, &relayer = *context.relayer, &reportError](std::vector<InputInfo> const& inputInfos) {
955978
static WaitBackpressurePolicy policy;
956979
auto& parts = info.parts;
957980
// We relay execution to make sure we have a complete set of parts
958981
// available.
959-
for (size_t pi = 0; pi < (parts.Size() / 2); ++pi) {
960-
switch (types[pi]) {
982+
for (auto ii = 0; ii < inputInfos.size(); ++ii) {
983+
auto const& input = inputInfos[ii];
984+
switch (input.type) {
961985
case InputType::Data: {
962-
auto headerIndex = 2 * pi;
963-
auto payloadIndex = 2 * pi + 1;
986+
auto headerIndex = input.position;
987+
auto payloadIndex = headerIndex + 1;
964988
assert(payloadIndex < parts.Size());
965989
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
966990
auto relayed = relayer.relay(parts.At(headerIndex),
967991
&parts.At(payloadIndex), dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 0);
968-
pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
992+
ii += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
969993
switch (relayed) {
970994
case DataRelayer::Backpressured:
971995
if (info.normalOpsNotified == true && info.backpressureNotified == false) {
@@ -988,8 +1012,8 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
9881012
} break;
9891013
case InputType::SourceInfo: {
9901014
*context.wasActive = true;
991-
auto headerIndex = 2 * pi;
992-
auto payloadIndex = 2 * pi + 1;
1015+
auto headerIndex = input.position;
1016+
auto payloadIndex = input.position + 1;
9931017
assert(payloadIndex < parts.Size());
9941018
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
9951019
// FIXME: the message with the end of stream cannot contain

0 commit comments

Comments
 (0)