Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 68 additions & 20 deletions Framework/Core/include/Framework/MessageSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,44 @@ namespace o2
namespace framework
{

/// A set of associated inflight messages.
/// A set of inflight messages.
/// The messages are stored in a linear vector. Originally, an O2 message was
/// comprised of a header-payload pair which makes indexing of pairs in the
/// storage simple. To support O2 messages with multiple payloads in a future
/// update of the data model, a message index is needed to store position in the
/// linear storage and number of messages.
/// DPL InputRecord API is providing refs of header-payload pairs, the original
/// O2 message model. For this purpose, also the pair index is filled and can
/// be used to access header and payload associated with a pair
struct MessageSet {
struct Index {
Index(size_t p, size_t s) : position(p), size(s) {}
size_t position = 0;
size_t size = 0;
};
// linear storage of messages
std::vector<FairMQMessagePtr> messages;
std::vector<Index> index;
// message map descibes O2 messages consisting of a header message and
// payload message(s), index descibes position in the linear storage
std::vector<Index> messageMap;
// pair map describes all messages in header-payload pairs and where
// in the message index the an associated header and payload can be found
std::vector<std::pair<size_t, size_t>> pairMap;

MessageSet()
: messages(), index()
: messages(), messageMap(), pairMap()
{
}

template <typename F>
MessageSet(F getter, size_t size)
: messages(), index()
: messages(), messageMap(), pairMap()
{
add(std::forward<F>(getter), size);
}

MessageSet(MessageSet&& other)
: messages(std::move(other.messages)), index(std::move(other.index))
: messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
{
other.clear();
}
Expand All @@ -55,72 +69,106 @@ struct MessageSet {
return *this;
}
messages = std::move(other.messages);
index = std::move(other.index);
messageMap = std::move(other.messageMap);
pairMap = std::move(other.pairMap);
other.clear();
return *this;
}

/// get number of in-flight O2 messages
size_t size() const
{
return index.size();
return messageMap.size();
}

size_t getNumberOfPayloads(size_t part) const
/// get number of header-payload pairs
size_t getNumberOfPairs() const
{
return index[part].size;
return pairMap.size();
}

/// get number of payloads for an in-flight message
size_t getNumberOfPayloads(size_t mi) const
{
return messageMap[mi].size;
}

/// clear the set
void clear()
{
messages.clear();
index.clear();
messageMap.clear();
pairMap.clear();
}

// this is more or less legacy
// PartRef has been earlier used to store fixed header-payload pairs
// reset the set and store content of the part ref
void reset(PartRef&& ref)
{
clear();
add(std::move(ref));
}

// this is more or less legacy
// PartRef has been earlier used to store fixed header-payload pairs
// add content of the part ref
void add(PartRef&& ref)
{
index.emplace_back(messages.size(), 1);
pairMap.emplace_back(messageMap.size(), 0);
messageMap.emplace_back(messages.size(), 1);
messages.emplace_back(std::move(ref.header));
messages.emplace_back(std::move(ref.payload));
}

/// add an O2 message
template <typename F>
void add(F getter, size_t size)
{
index.emplace_back(messages.size(), size - 1);
auto partid = messageMap.size();
messageMap.emplace_back(messages.size(), size - 1);
for (size_t i = 0; i < size; ++i) {
if (i > 0) {
pairMap.emplace_back(partid, i - 1);
}
messages.emplace_back(std::move(getter(i)));
}
}

FairMQMessagePtr& header(size_t partIndex)
{
return messages[index[partIndex].position];
return messages[messageMap[partIndex].position];
}

FairMQMessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
{
assert(partIndex < index.size());
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
return messages[index[partIndex].position + payloadIndex + 1];
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}

FairMQMessagePtr const& header(size_t partIndex) const
{
return messages[index[partIndex].position];
return messages[messageMap[partIndex].position];
}

FairMQMessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
{
assert(partIndex < index.size());
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
return messages[index[partIndex].position + payloadIndex + 1];
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}

FairMQMessagePtr const& associatedHeader(size_t pos) const
{
return messages[messageMap[pairMap[pos].first].position];
}

FairMQMessagePtr const& associatedPayload(size_t pos) const
{
auto partIndex = pairMap[pos].first;
auto payloadIndex = pairMap[pos].second;
return messages[messageMap[partIndex].position + payloadIndex + 1];
}
};

Expand Down
24 changes: 16 additions & 8 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1163,18 +1163,26 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
currentSetOfInputs = relayer->consumeExistingInputsForTimeslice(slot);
}
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
if (currentSetOfInputs[i].size() > partindex) {
auto header = currentSetOfInputs[i].header(partindex).get();
auto payload = currentSetOfInputs[i].payload(partindex).get();
return DataRef{nullptr,
static_cast<char const*>(header->GetData()),
static_cast<char const*>(payload ? payload->GetData() : nullptr),
payload ? payload->GetSize() : 0};
if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
const char* headerptr = nullptr;
const char* payloadptr = nullptr;
size_t payloadSize = 0;
// - each input can have multiple parts
// - "part" denotes a sequence of messages belonging together, the first message of the
// sequence is the header message
// - each part has one or more payload messages
// - InputRecord provides all payloads as header-payload pairs
auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
headerptr = static_cast<char const*>(headerMsg->GetData());
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
return DataRef{nullptr, headerptr, payloadptr, payloadSize};
}
return DataRef{};
};
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
return currentSetOfInputs[i].size();
return currentSetOfInputs[i].getNumberOfPairs();
};
return InputSpan{getter, nofPartsGetter, currentSetOfInputs.size()};
};
Expand Down