Skip to content

Commit b4ea5d0

Browse files
DPL: using linear message store for the DataRelayer cache
1 parent 3f2ac97 commit b4ea5d0

2 files changed

Lines changed: 32 additions & 66 deletions

File tree

Framework/Core/include/Framework/MessageSet.h

Lines changed: 31 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,28 @@ namespace framework
2323

2424
/// A set of associated inflight messages.
2525
struct MessageSet {
26-
std::vector<PartRef> parts;
26+
struct Index {
27+
Index(size_t p, size_t s) : position(p), size(s) {}
28+
size_t position = 0;
29+
size_t size = 0;
30+
};
31+
std::vector<FairMQMessagePtr> messages;
32+
std::vector<Index> index;
2733

2834
MessageSet()
29-
: parts()
35+
: messages(), index()
3036
{
3137
}
3238

3339
template <typename F>
34-
MessageSet(F&& getter, size_t size)
35-
: parts()
40+
MessageSet(F getter, size_t size)
41+
: messages(), index()
3642
{
3743
add(std::forward<F>(getter), size);
3844
}
3945

4046
MessageSet(MessageSet&& other)
41-
: parts(std::move(other.parts))
47+
: messages(std::move(other.messages)), index(std::move(other.index))
4248
{
4349
other.clear();
4450
}
@@ -48,25 +54,26 @@ struct MessageSet {
4854
if (&other == this) {
4955
return *this;
5056
}
51-
parts = std::move(other.parts);
57+
messages = std::move(other.messages);
58+
index = std::move(other.index);
5259
other.clear();
5360
return *this;
5461
}
5562

5663
size_t size() const
5764
{
58-
return parts.size();
65+
return index.size();
5966
}
6067

6168
size_t getNumberOfPayloads(size_t part) const
6269
{
63-
// this is for upcoming change of message store
64-
return 1;
70+
return index[part].size;
6571
}
6672

6773
void clear()
6874
{
69-
parts.clear();
75+
messages.clear();
76+
index.clear();
7077
}
7178

7279
// this is more or less legacy
@@ -78,83 +85,42 @@ struct MessageSet {
7885

7986
void add(PartRef&& ref)
8087
{
81-
parts.emplace_back(std::move(ref));
88+
index.emplace_back(messages.size(), 1);
89+
messages.emplace_back(std::move(ref.header));
90+
messages.emplace_back(std::move(ref.payload));
8291
}
8392

8493
template <typename F>
8594
void add(F getter, size_t size)
8695
{
96+
index.emplace_back(messages.size(), size - 1);
8797
for (size_t i = 0; i < size; ++i) {
88-
PartRef ref{std::move(getter(i)), std::move(getter(i + 1))};
89-
parts.emplace_back(std::move(ref));
90-
++i;
98+
messages.emplace_back(std::move(getter(i)));
9199
}
92100
}
93101

94102
FairMQMessagePtr& header(size_t partIndex)
95103
{
96-
assert(partIndex < parts.size());
97-
return parts[partIndex].header;
104+
return messages[index[partIndex].position];
98105
}
99106

100107
FairMQMessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
101108
{
102-
assert(partIndex < parts.size());
103-
// payload index will be supported in linear message store
104-
assert(payloadIndex == 0);
105-
return parts[partIndex].payload;
109+
assert(partIndex < index.size());
110+
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
111+
return messages[index[partIndex].position + payloadIndex + 1];
106112
}
107113

108114
FairMQMessagePtr const& header(size_t partIndex) const
109115
{
110-
assert(partIndex < parts.size());
111-
return parts[partIndex].header;
116+
return messages[index[partIndex].position];
112117
}
113118

114-
FairMQMessagePtr const& payload(size_t partIndex) const
119+
FairMQMessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
115120
{
116-
assert(partIndex < parts.size());
117-
return parts[partIndex].payload;
118-
}
119-
120-
PartRef& operator[](size_t index)
121-
{
122-
return parts[index];
123-
}
124-
125-
PartRef const& operator[](size_t index) const
126-
{
127-
return parts[index];
128-
}
129-
130-
PartRef& at(size_t index)
131-
{
132-
return parts.at(index);
133-
}
134-
135-
PartRef const& at(size_t index) const
136-
{
137-
return parts.at(index);
138-
}
139-
140-
decltype(auto) begin()
141-
{
142-
return parts.begin();
143-
}
144-
145-
decltype(auto) begin() const
146-
{
147-
return parts.begin();
148-
}
149-
150-
decltype(auto) end()
151-
{
152-
return parts.end();
153-
}
154-
155-
decltype(auto) end() const
156-
{
157-
return parts.end();
121+
assert(partIndex < index.size());
122+
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
123+
return messages[index[partIndex].position + payloadIndex + 1];
158124
}
159125
};
160126

Framework/Core/src/DataRelayer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ std::vector<o2::framework::MessageSet> DataRelayer::getInputsForTimeslice(Timesl
593593
// FIXME: what happens when we have enough timeslices to hit the invalid one?
594594
auto invalidateCacheFor = [&numInputTypes, &cachedStateMetrics = mCachedStateMetrics, &index, &cache](TimesliceSlot s) {
595595
for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
596-
assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.header.get() == nullptr && element.payload.get() == nullptr; }));
596+
assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
597597
cache[ai].clear();
598598
}
599599
index.markAsInvalid(s);

0 commit comments

Comments
 (0)