Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions Framework/Core/include/Framework/DataRelayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,18 @@ class DataRelayer

/// This is to relay a whole set of FairMQMessages, all which are part
/// of the same set of split parts.
/// @a firstHeader is the first message of such set
/// @a restOfParts is a pointer to the rest of the messages
/// @a restSize is how many messages are there in restOfParts
/// is the header which is common across all subsequent elements.
/// @a rawHeader raw header pointer
/// @a messages pointer to array of messages
/// @a nMessages size of the array
/// @a nPayloads number of payploads in the message sequence, default is 1
/// which is the standard header-payload message pair, in this
/// case nMessages / 2 pairs will be inserted and considered
/// separate parts
/// Notice that we expect that the header is an O2 Header Stack
RelayChoice relay(std::unique_ptr<FairMQMessage>& firstHeader,
std::unique_ptr<FairMQMessage>* restOfParts,
size_t restSize);

/// This is used to ask for relaying a given (header,payload) pair.
/// Notice that we expect that the header is an O2 Header Stack
/// with a DataProcessingHeader inside so that we can assess time.
RelayChoice relay(std::unique_ptr<FairMQMessage>& header,
std::unique_ptr<FairMQMessage>& payload);
RelayChoice relay(void const* rawHeader,
std::unique_ptr<FairMQMessage>* messages,
size_t nMessages,
size_t nPayloads = 1);

/// @returns the actions ready to be performed.
void getReadyToProcess(std::vector<RecordAction>& completed);
Expand Down
50 changes: 30 additions & 20 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -987,12 +987,19 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
switch (input.type) {
case InputType::Data: {
auto headerIndex = input.position;
auto payloadIndex = headerIndex + 1;
assert(payloadIndex < parts.Size());
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
auto relayed = relayer.relay(parts.At(headerIndex),
&parts.At(payloadIndex), dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 0);
ii += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
auto nMessages = 0;
auto nPayloadsPerHeader = 0;
{
// multiple header-payload pairs
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
nPayloadsPerHeader = 1;
ii += (nMessages / 2) - 1;
}
auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
&parts.At(headerIndex),
nMessages,
nPayloadsPerHeader);
switch (relayed) {
case DataRelayer::Backpressured:
if (info.normalOpsNotified == true && info.backpressureNotified == false) {
Expand Down Expand Up @@ -1142,8 +1149,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
if (currentSetOfInputs[i].size() > partindex) {
return DataRef{nullptr,
static_cast<char const*>(currentSetOfInputs[i].at(partindex).header->GetData()),
static_cast<char const*>(currentSetOfInputs[i].at(partindex).payload->GetData())};
static_cast<char const*>(currentSetOfInputs[i].header(partindex)->GetData()),
static_cast<char const*>(currentSetOfInputs[i].payload(partindex)->GetData())};
}
return DataRef{nullptr, nullptr, nullptr};
};
Expand Down Expand Up @@ -1261,22 +1268,22 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
int cachedForwardingChoice = -1;

for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
auto& part = currentSetOfInputs[ii][pi];
auto& header = part.header;
auto& payload = part.payload;
auto& messageSet = currentSetOfInputs[ii];
auto const& header = messageSet.header(pi);

if (header.get() == nullptr) {
// FIXME: this should not happen, however it's actually harmless and
// we can simply discard it for the moment.
// LOG(ERROR) << "Missing header! " << dh->dataDescription;
continue;
}

auto fdph = o2::header::get<DataProcessingHeader*>(header.get()->GetData());
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
if (fdph == nullptr) {
LOG(ERROR) << "Data is missing DataProcessingHeader";
continue;
}
auto fdh = o2::header::get<DataHeader*>(header.get()->GetData());
auto fdh = o2::header::get<DataHeader*>(header->GetData());
if (fdh == nullptr) {
LOG(ERROR) << "Data is missing DataHeader";
continue;
Expand All @@ -1303,23 +1310,26 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,

if (copy) {
auto&& newHeader = header->GetTransport()->CreateMessage();
auto&& newPayload = header->GetTransport()->CreateMessage();
newHeader->Copy(*header);
newPayload->Copy(*payload);

forwardedParts[cachedForwardingChoice].AddPart(std::move(newHeader));
forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload));

for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[cachedForwardingChoice].AddPart(std::move(newPayload));
}
} else {
forwardedParts[cachedForwardingChoice].AddPart(std::move(header));
forwardedParts[cachedForwardingChoice].AddPart(std::move(payload));
forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.header(pi)));
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
forwardedParts[cachedForwardingChoice].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
}
}
}
}
for (size_t fi = 0; fi < spec->forwards.size(); fi++) {
if (forwardedParts[fi].Size() == 0) {
continue;
}
assert(forwardedParts[fi].Size() % 2 == 0);
// in DPL we are using subchannel 0 only
device->Send(forwardedParts[fi], spec->forwards[fi].channel, 0);
}
Expand Down
93 changes: 42 additions & 51 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
// We check that no data is already there for the given cell
// it is enough to check the first element
auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
if (part.size() > 0 && part[0].header != nullptr) {
if (part.size() > 0 && part.header(0) != nullptr) {
continue;
}
if (part.size() > 0 && part[0].payload != nullptr) {
if (part.size() > 0 && part.payload(0) != nullptr) {
continue;
}
// We check that the cell can actually be expired.
Expand All @@ -145,16 +145,14 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira

assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
assert(expirator.handler);
// expired, so we create one entry
if (part.size() == 0) {
part.parts.resize(1);
}
expirator.handler(services, part[0], variables);
PartRef newRef;
expirator.handler(services, newRef, variables);
part.reset(std::move(newRef));
activity.expiredSlots++;

mTimesliceIndex.markAsDirty(slot, true);
assert(part[0].header != nullptr);
assert(part[0].payload != nullptr);
assert(part.header(0) != nullptr);
assert(part.payload(0) != nullptr);
}
}
return activity;
Expand All @@ -163,7 +161,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
/// This does the mapping between a route and a InputSpec. The
/// reason why these might diffent is that when you have timepipelining
/// you have one route per timeslice, even if the type is the same.
size_t matchToContext(void* data,
size_t matchToContext(void const* data,
std::vector<DataDescriptorMatcher> const& matchers,
std::vector<size_t> const& index,
VariableContext& context)
Expand Down Expand Up @@ -203,19 +201,13 @@ void sendVariableContextMetrics(VariableContext& context, TimesliceSlot slot,
}

DataRelayer::RelayChoice
DataRelayer::relay(std::unique_ptr<FairMQMessage>& header,
std::unique_ptr<FairMQMessage>& payload)
{
return relay(header, &payload, 1);
}

DataRelayer::RelayChoice
DataRelayer::relay(std::unique_ptr<FairMQMessage>& firstPart,
std::unique_ptr<FairMQMessage>* restOfParts,
size_t restOfPartsSize)
DataRelayer::relay(void const* rawHeader,
std::unique_ptr<FairMQMessage>* messages,
size_t nMessages,
size_t nPayloads)
{
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(firstPart->GetData());
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
// STATE HOLDING VARIABLES
// This is the class level state of the relaying. If we start supporting
// multithreading this will have to be made thread safe before we can invoke
Expand All @@ -238,12 +230,12 @@ DataRelayer::RelayChoice
// become more complicated when we will start supporting ranges.
auto getInputTimeslice = [&matchers = mInputMatchers,
&distinctRoutes = mDistinctRoutesIndex,
&firstPart,
&rawHeader,
&index](VariableContext& context)
-> std::tuple<int, TimesliceId> {
/// FIXME: for the moment we only use the first context and reset
/// between one invokation and the other.
auto input = matchToContext(firstPart->GetData(), matchers, distinctRoutes, context);
auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);

if (input == INVALID_INPUT) {
return {
Expand Down Expand Up @@ -285,24 +277,24 @@ DataRelayer::RelayChoice
};

// Actually save the header / payload in the slot
auto saveInSlot = [&firstPart,
&cachedStateMetrics = mCachedStateMetrics,
&restOfParts,
&restOfPartsSize,
auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
&messages,
&nMessages,
&nPayloads,
&cache,
&numInputTypes,
&metrics](TimesliceId timeslice, int input, TimesliceSlot slot) {
auto cacheIdx = numInputTypes * slot.index + input;
std::vector<PartRef>& parts = cache[cacheIdx].parts;
MessageSet& target = cache[cacheIdx];
cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
// TODO: make sure that multiple parts can only be added within the same call of
// DataRelayer::relay
PartRef entry{std::move(firstPart), std::move(restOfParts[0])};
parts.emplace_back(std::move(entry));
auto rest = restOfParts + 1;
for (size_t pi = 0; pi < (restOfPartsSize - 1) / 2; ++pi) {
PartRef entry{std::move(rest[pi * 2]), std::move(rest[pi * 2 + 1])};
parts.emplace_back(std::move(entry));
assert(nPayloads == 1);
assert(nMessages % 2 == 0);
for (size_t mi = 0; mi < nMessages; ++mi) {
assert(mi + nPayloads < nMessages);
target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1);
mi += nPayloads;
}
};

Expand Down Expand Up @@ -390,9 +382,10 @@ DataRelayer::RelayChoice
VariableContext pristineContext;
std::tie(input, timeslice) = getInputTimeslice(pristineContext);

auto DataHeaderInfo = [&firstPart]() {
auto DataHeaderInfo = [&rawHeader]() {
std::string error;
const auto* dh = o2::header::get<o2::header::DataHeader*>(firstPart->GetData());
// extract header from message model
const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
if (dh) {
error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
} else {
Expand All @@ -405,10 +398,8 @@ DataRelayer::RelayChoice
LOG(ERROR) << "Could not match incoming data to any input route: " << DataHeaderInfo();
mStats.malformedInputs++;
mStats.droppedIncomingMessages++;
firstPart.reset(nullptr);
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
auto& payload = restOfParts[pi];
payload.reset(nullptr);
for (size_t pi = 0; pi < nMessages; ++pi) {
messages[pi].reset(nullptr);
}
return Invalid;
}
Expand All @@ -417,10 +408,8 @@ DataRelayer::RelayChoice
LOG(ERROR) << "Could not determine the timeslice for input: " << DataHeaderInfo();
mStats.malformedInputs++;
mStats.droppedIncomingMessages++;
firstPart.reset(nullptr);
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
auto& payload = restOfParts[pi];
payload.reset(nullptr);
for (size_t pi = 0; pi < nMessages; ++pi) {
messages[pi].reset(nullptr);
}
return Invalid;
}
Expand All @@ -447,10 +436,8 @@ DataRelayer::RelayChoice
LOG(WARNING) << "Incoming data is invalid, not relaying.";
mStats.malformedInputs++;
mStats.droppedIncomingMessages++;
firstPart.reset(nullptr);
for (size_t pi = 0; pi < restOfPartsSize; ++pi) {
auto& payload = restOfParts[pi];
payload.reset(nullptr);
for (size_t pi = 0; pi < nMessages; ++pi) {
messages[pi].reset(nullptr);
}
return Invalid;
case TimesliceIndex::ActionTaken::ReplaceUnused:
Expand Down Expand Up @@ -518,11 +505,12 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
continue;
}
auto partial = getPartialRecord(li);
// TODO: get the data ref from message model
auto getter = [&partial](size_t idx, size_t part) {
if (partial[idx].size() > 0 && partial[idx].at(part).header && partial[idx].at(part).payload) {
if (partial[idx].size() > 0 && partial[idx].header(part) && partial[idx].payload(part)) {
return DataRef{nullptr,
reinterpret_cast<const char*>(partial[idx].at(part).header->GetData()),
reinterpret_cast<const char*>(partial[idx].at(part).payload->GetData())};
reinterpret_cast<const char*>(partial[idx].header(part)->GetData()),
reinterpret_cast<const char*>(partial[idx].payload(part)->GetData())};
}
return DataRef{};
};
Expand Down Expand Up @@ -657,6 +645,9 @@ void DataRelayer::publishMetrics()
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);

auto numInputTypes = mDistinctRoutesIndex.size();
// FIXME: many of the DataRelayer function rely on allocated cache, so its
// maybe misleading to have the allocation in a function primarily for
// metrics publishing, do better in setPipelineLength?
mCache.resize(numInputTypes * mTimesliceIndex.size());
mMetrics.send({(int)numInputTypes, "data_relayer/h", Verbosity::Debug});
mMetrics.send({(int)mTimesliceIndex.size(), "data_relayer/w", Verbosity::Debug});
Expand Down
Loading