-
Notifications
You must be signed in to change notification settings - Fork 510
DPL raw proxy: Supporting message sequences/pruned headers #7914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, | |
| }; | ||
| } | ||
|
|
||
| InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, bool throwOnUnmatchedInputs) | ||
| InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config) | ||
| { | ||
| // structure to hald information on the unmatch ed data and print a warning at cleanup | ||
| bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs; | ||
| // structure to hold information on the unmatched data and print a warning at cleanup | ||
| class DroppedDataSpecs | ||
| { | ||
| public: | ||
|
|
@@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo | |
| void warning() const | ||
| { | ||
| if (not descriptions.empty()) { | ||
| LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n" | ||
| LOG(warning) << "Some input data could not be matched by filter rules to output specs\n" | ||
| << "Active rules: " << descriptions << "\n" | ||
| << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION"; | ||
| } | ||
| } | ||
|
|
@@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo | |
| return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) { | ||
| std::unordered_map<std::string, FairMQParts> outputs; | ||
| std::vector<std::string> unmatchedDescriptions; | ||
| int lastSplitPartIndex = -1; | ||
| std::string channelNameForSplitParts; | ||
| static int64_t dplCounter = -1; | ||
| dplCounter++; | ||
| for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) { | ||
| for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) { | ||
| const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx * 2)->GetData()); | ||
| if (!dh) { | ||
| LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing"; | ||
|
|
@@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo | |
|
|
||
| OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification}; | ||
| LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts; | ||
| bool indexDone = false; | ||
| size_t finalBlockIndex = 0; | ||
| std::string channelName = ""; | ||
|
|
||
| for (auto const& spec : filterSpecs) { | ||
| // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any' | ||
| if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) || | ||
| DataSpecUtils::match(spec, query)) { | ||
| auto channelName = channelRetriever(query, dph->startTime); | ||
| channelName = channelRetriever(query, dph->startTime); | ||
| if (channelName.empty()) { | ||
| LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query); | ||
| break; | ||
| } | ||
| // the checks for consistency of split payload parts are of informative nature | ||
| // forwarding happens independently | ||
| if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) { | ||
| if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { | ||
| LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; | ||
| } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { | ||
| LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex | ||
| << " of " << dh->splitPayloadParts; | ||
| } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { | ||
| LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex | ||
| << ", matching " << channelName << ", expecting " << channelNameForSplitParts; | ||
| } | ||
| lastSplitPartIndex = dh->splitPayloadIndex; | ||
| channelNameForSplitParts = channelName; | ||
| if (lastSplitPartIndex + 1 == dh->splitPayloadParts) { | ||
| lastSplitPartIndex = -1; | ||
| channelNameForSplitParts = ""; | ||
| } | ||
| } else if (lastSplitPartIndex != -1) { | ||
| LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1 | ||
| << " but got a new data block"; | ||
| } | ||
| outputs[channelName].AddPart(std::move(parts.At(msgidx * 2))); | ||
| outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1))); | ||
| LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")"; | ||
| indexDone = true; | ||
| break; | ||
| } | ||
| } | ||
| if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { | ||
| if (!channelName.empty()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would invert the logic and have a
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, yeah, can revise the logic. Actually I did not want to continue in the loop because the unmatched specs are logged. But I will combine this, there is room for cleanup in the logic. |
||
| if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { | ||
| // this is indicating a sequence of payloads following the header | ||
| // FIXME: we will probably also set the DataHeader version | ||
| finalBlockIndex = msgidx + dh->splitPayloadParts + 1; | ||
| } else { | ||
| // We can consider the next splitPayloadParts as one block of messages pairs | ||
| // because we are guaranteed they are all the same. | ||
| // If splitPayloadParts = 0, we assume that means there is only one (header, payload) | ||
| // pair. | ||
| finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; | ||
| } | ||
| assert(finalBlockIndex >= msgidx + 2); | ||
| if (finalBlockIndex > parts.Size()) { | ||
| // TODO error handling | ||
| //LOGP(error, "DataHeader::splitPayloadParts invalid"); | ||
| continue; | ||
| } | ||
|
|
||
| // the checks for consistency of split payload parts are of informative nature | ||
| // forwarding happens independently | ||
| //if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) { | ||
| // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) { | ||
| // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)"; | ||
| // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) { | ||
| // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex | ||
| // << " of " << dh->splitPayloadParts; | ||
| // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) { | ||
| // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex | ||
| // << ", matching " << channelName << ", expecting " << channelNameForSplitParts; | ||
| // } | ||
| //} | ||
| LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size()); | ||
| for (; msgidx < finalBlockIndex; ++msgidx) { | ||
| outputs[channelName].AddPart(std::move(parts.At(msgidx))); | ||
| } | ||
| msgidx -= 2; | ||
| } | ||
| if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) { | ||
| unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query)); | ||
| } | ||
| } | ||
| } // end of loop over parts | ||
|
|
||
| for (auto& [channelName, channelParts] : outputs) { | ||
| if (channelParts.Size() == 0) { | ||
| continue; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.