From bd8b5c709eef0793168a06eea1f9bfca55157ceb Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 4 Nov 2021 08:36:07 +0100 Subject: [PATCH] DPL: using linear message store for the DataRelayer cache --- Framework/Core/include/Framework/MessageSet.h | 96 ++++++------------- Framework/Core/src/DataRelayer.cxx | 2 +- 2 files changed, 32 insertions(+), 66 deletions(-) diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 7093486c6c888..c41bb46bfaefb 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -23,22 +23,28 @@ namespace framework /// A set of associated inflight messages. struct MessageSet { - std::vector parts; + struct Index { + Index(size_t p, size_t s) : position(p), size(s) {} + size_t position = 0; + size_t size = 0; + }; + std::vector messages; + std::vector index; MessageSet() - : parts() + : messages(), index() { } template - MessageSet(F&& getter, size_t size) - : parts() + MessageSet(F getter, size_t size) + : messages(), index() { add(std::forward(getter), size); } MessageSet(MessageSet&& other) - : parts(std::move(other.parts)) + : messages(std::move(other.messages)), index(std::move(other.index)) { other.clear(); } @@ -48,25 +54,26 @@ struct MessageSet { if (&other == this) { return *this; } - parts = std::move(other.parts); + messages = std::move(other.messages); + index = std::move(other.index); other.clear(); return *this; } size_t size() const { - return parts.size(); + return index.size(); } size_t getNumberOfPayloads(size_t part) const { - // this is for upcoming change of message store - return 1; + return index[part].size; } void clear() { - parts.clear(); + messages.clear(); + index.clear(); } // this is more or less legacy @@ -78,83 +85,42 @@ struct MessageSet { void add(PartRef&& ref) { - parts.emplace_back(std::move(ref)); + index.emplace_back(messages.size(), 1); + messages.emplace_back(std::move(ref.header)); + messages.emplace_back(std::move(ref.payload)); } template void add(F getter, size_t size) { + index.emplace_back(messages.size(), size - 1); for (size_t i = 0; i < size; ++i) { - PartRef ref{std::move(getter(i)), std::move(getter(i + 1))}; - parts.emplace_back(std::move(ref)); - ++i; + messages.emplace_back(std::move(getter(i))); } } FairMQMessagePtr& header(size_t partIndex) { - assert(partIndex < parts.size()); - return parts[partIndex].header; + return messages[index[partIndex].position]; } FairMQMessagePtr& payload(size_t partIndex, size_t payloadIndex = 0) { - assert(partIndex < parts.size()); - // payload index will be supported in linear message store - assert(payloadIndex == 0); - return parts[partIndex].payload; + assert(partIndex < index.size()); + assert(index[partIndex].position + payloadIndex + 1 < messages.size()); + return messages[index[partIndex].position + payloadIndex + 1]; } FairMQMessagePtr const& header(size_t partIndex) const { - assert(partIndex < parts.size()); - return parts[partIndex].header; + return messages[index[partIndex].position]; } - FairMQMessagePtr const& payload(size_t partIndex) const + FairMQMessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const { - assert(partIndex < parts.size()); - return parts[partIndex].payload; - } - - PartRef& operator[](size_t index) - { - return parts[index]; - } - - PartRef const& operator[](size_t index) const - { - return parts[index]; - } - - PartRef& at(size_t index) - { - return parts.at(index); - } - - PartRef const& at(size_t index) const - { - return parts.at(index); - } - - decltype(auto) begin() - { - return parts.begin(); - } - - decltype(auto) begin() const - { - return parts.begin(); - } - - decltype(auto) end() - { - return parts.end(); - } - - decltype(auto) end() const - { - return parts.end(); + assert(partIndex < index.size()); + assert(index[partIndex].position + payloadIndex + 1 < messages.size()); + return messages[index[partIndex].position + payloadIndex + 1]; } }; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 086411dd5ff0c..a6be13960ee3b 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -593,7 +593,7 @@ std::vector DataRelayer::getInputsForTimeslice(Timesl // FIXME: what happens when we have enough timeslices to hit the invalid one? auto invalidateCacheFor = [&numInputTypes, &cachedStateMetrics = mCachedStateMetrics, &index, &cache](TimesliceSlot s) { for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) { - assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.header.get() == nullptr && element.payload.get() == nullptr; })); + assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; })); cache[ai].clear(); } index.markAsInvalid(s);