Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,12 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
/// multipart ensemble.
InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step);

/// @struct DPLModelAdapterConfig
/// Configuration object for dplModelAdaptor
struct DPLModelAdapterConfig {
/// throw runtime error if an input message is not matched by filter rules
bool throwOnUnmatchedInputs = true;
/// do all kinds of consistency checks
bool paranoid = false;
/// blindly forward on one channel
bool blindForward = false;
};

/// This is to be used when the input data is already formatted like DPL
/// expects it, i.e. with the DataProcessingHeader in the header stack
/// The list of specs is used as a filter list, all incoming data matching an entry
/// in the list will be send through the corresponding channel
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}},
DPLModelAdapterConfig config = DPLModelAdapterConfig{});

/// legacy function
inline InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs, bool throwOnUnmatchedInputs)
{
return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs});
}
bool throwOnUnmatchedInputs = true);

/// The default connection method for the custom source
static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1);
Expand Down
88 changes: 37 additions & 51 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime,
};
}

InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config)
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, bool throwOnUnmatchedInputs)
{
bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs;
// structure to hold information on the unmatched data and print a warning at cleanup
// structure to hald information on the unmatch ed data and print a warning at cleanup
class DroppedDataSpecs
{
public:
Expand All @@ -210,8 +209,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
void warning() const
{
if (not descriptions.empty()) {
LOG(warning) << "Some input data could not be matched by filter rules to output specs\n"
<< "Active rules: " << descriptions << "\n"
LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n"
<< "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
}
}
Expand All @@ -223,9 +221,11 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
std::unordered_map<std::string, FairMQParts> outputs;
std::vector<std::string> unmatchedDescriptions;
int lastSplitPartIndex = -1;
std::string channelNameForSplitParts;
static int64_t dplCounter = -1;
dplCounter++;
for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) {
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx * 2)->GetData());
if (!dh) {
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
Expand All @@ -241,63 +241,49 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL

OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts;
size_t finalBlockIndex = 0;
std::string channelName = "";

bool indexDone = false;
for (auto const& spec : filterSpecs) {
// filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) ||
DataSpecUtils::match(spec, query)) {
channelName = channelRetriever(query, dph->startTime);
auto channelName = channelRetriever(query, dph->startTime);
if (channelName.empty()) {
LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query);
break;
}
// the checks for consistency of split payload parts are of informative nature
// forwarding happens independently
if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
} else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
<< " of " << dh->splitPayloadParts;
} else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
<< ", matching " << channelName << ", expecting " << channelNameForSplitParts;
}
lastSplitPartIndex = dh->splitPayloadIndex;
channelNameForSplitParts = channelName;
if (lastSplitPartIndex + 1 == dh->splitPayloadParts) {
lastSplitPartIndex = -1;
channelNameForSplitParts = "";
}
} else if (lastSplitPartIndex != -1) {
LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1
<< " but got a new data block";
}
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2)));
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1)));
LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")";
indexDone = true;
break;
}
}
if (!channelName.empty()) {
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
// this is indicating a sequence of payloads following the header
// FIXME: we will probably also set the DataHeader version
finalBlockIndex = msgidx + dh->splitPayloadParts + 1;
} else {
// We can consider the next splitPayloadParts as one block of messages pairs
// because we are guaranteed they are all the same.
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
// pair.
finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
}
assert(finalBlockIndex >= msgidx + 2);
if (finalBlockIndex > parts.Size()) {
// TODO error handling
//LOGP(error, "DataHeader::splitPayloadParts invalid");
continue;
}

// the checks for consistency of split payload parts are of informative nature
// forwarding happens independently
//if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
// if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
// LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
// } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
// LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
// << " of " << dh->splitPayloadParts;
// } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
// LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
// << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
// }
//}
LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size());
for (; msgidx < finalBlockIndex; ++msgidx) {
outputs[channelName].AddPart(std::move(parts.At(msgidx)));
}
msgidx -= 2;
}
if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query));
}
} // end of loop over parts

}
for (auto& [channelName, channelParts] : outputs) {
if (channelParts.Size() == 0) {
continue;
Expand Down
Loading