Skip to content

Commit 2618d2e

Browse files
DPL: Adjusting the InputSpan backend
InputRecord is providing all payloads as header payload pairs, even if there are multiple payloads per header. The InputSpan backend is using a specific API of messageSet to access all payloads in header-payload pairs This is following the original O2 data model. As an extension, the API allows to access also the payloads of a sequence with multiple payloads per header in header-payload pairs.
1 parent 94ef6f6 commit 2618d2e

2 files changed

Lines changed: 84 additions & 28 deletions

File tree

Framework/Core/include/Framework/MessageSet.h

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,44 @@ namespace o2
2121
namespace framework
2222
{
2323

24-
/// A set of associated inflight messages.
24+
/// A set of inflight messages.
25+
/// The messages are stored in a linear vector. Originally, an O2 message was
26+
/// comprised of a header-payload pair which makes indexing of pairs in the
27+
/// storage simple. To support O2 messages with multiple payloads in a future
28+
/// update of the data model, a message index is needed to store position in the
29+
/// linear storage and number of messages.
30+
/// DPL InputRecord API is providing refs of header-payload pairs, the original
31+
/// O2 message model. For this purpose, also the pair index is filled and can
32+
/// be used to access header and payload associated with a pair
2533
struct MessageSet {
2634
struct Index {
2735
Index(size_t p, size_t s) : position(p), size(s) {}
2836
size_t position = 0;
2937
size_t size = 0;
3038
};
39+
// linear storage of messages
3140
std::vector<FairMQMessagePtr> messages;
32-
std::vector<Index> index;
41+
// message map descibes O2 messages consisting of a header message and
42+
// payload message(s), index descibes position in the linear storage
43+
std::vector<Index> messageMap;
44+
// pair map describes all messages in header-payload pairs and where
45+
// in the message index the an associated header and payload can be found
46+
std::vector<std::pair<size_t, size_t>> pairMap;
3347

3448
MessageSet()
35-
: messages(), index()
49+
: messages(), messageMap(), pairMap()
3650
{
3751
}
3852

3953
template <typename F>
4054
MessageSet(F getter, size_t size)
41-
: messages(), index()
55+
: messages(), messageMap(), pairMap()
4256
{
4357
add(std::forward<F>(getter), size);
4458
}
4559

4660
MessageSet(MessageSet&& other)
47-
: messages(std::move(other.messages)), index(std::move(other.index))
61+
: messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
4862
{
4963
other.clear();
5064
}
@@ -55,72 +69,106 @@ struct MessageSet {
5569
return *this;
5670
}
5771
messages = std::move(other.messages);
58-
index = std::move(other.index);
72+
messageMap = std::move(other.messageMap);
73+
pairMap = std::move(other.pairMap);
5974
other.clear();
6075
return *this;
6176
}
6277

78+
/// get number of in-flight O2 messages
6379
size_t size() const
6480
{
65-
return index.size();
81+
return messageMap.size();
6682
}
6783

68-
size_t getNumberOfPayloads(size_t part) const
84+
/// get number of header-payload pairs
85+
size_t getNumberOfPairs() const
6986
{
70-
return index[part].size;
87+
return pairMap.size();
7188
}
7289

90+
/// get number of payloads for an in-flight message
91+
size_t getNumberOfPayloads(size_t mi) const
92+
{
93+
return messageMap[mi].size;
94+
}
95+
96+
/// clear the set
7397
void clear()
7498
{
7599
messages.clear();
76-
index.clear();
100+
messageMap.clear();
101+
pairMap.clear();
77102
}
78103

79104
// this is more or less legacy
105+
// PartRef has been earlier used to store fixed header-payload pairs
106+
// reset the set and store content of the part ref
80107
void reset(PartRef&& ref)
81108
{
82109
clear();
83110
add(std::move(ref));
84111
}
85112

113+
// this is more or less legacy
114+
// PartRef has been earlier used to store fixed header-payload pairs
115+
// add content of the part ref
86116
void add(PartRef&& ref)
87117
{
88-
index.emplace_back(messages.size(), 1);
118+
pairMap.emplace_back(messageMap.size(), 0);
119+
messageMap.emplace_back(messages.size(), 1);
89120
messages.emplace_back(std::move(ref.header));
90121
messages.emplace_back(std::move(ref.payload));
91122
}
92123

124+
/// add an O2 message
93125
template <typename F>
94126
void add(F getter, size_t size)
95127
{
96-
index.emplace_back(messages.size(), size - 1);
128+
auto partid = messageMap.size();
129+
messageMap.emplace_back(messages.size(), size - 1);
97130
for (size_t i = 0; i < size; ++i) {
131+
if (i > 0) {
132+
pairMap.emplace_back(partid, i - 1);
133+
}
98134
messages.emplace_back(std::move(getter(i)));
99135
}
100136
}
101137

102138
FairMQMessagePtr& header(size_t partIndex)
103139
{
104-
return messages[index[partIndex].position];
140+
return messages[messageMap[partIndex].position];
105141
}
106142

107143
FairMQMessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
108144
{
109-
assert(partIndex < index.size());
110-
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
111-
return messages[index[partIndex].position + payloadIndex + 1];
145+
assert(partIndex < messageMap.size());
146+
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
147+
return messages[messageMap[partIndex].position + payloadIndex + 1];
112148
}
113149

114150
FairMQMessagePtr const& header(size_t partIndex) const
115151
{
116-
return messages[index[partIndex].position];
152+
return messages[messageMap[partIndex].position];
117153
}
118154

119155
FairMQMessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
120156
{
121-
assert(partIndex < index.size());
122-
assert(index[partIndex].position + payloadIndex + 1 < messages.size());
123-
return messages[index[partIndex].position + payloadIndex + 1];
157+
assert(partIndex < messageMap.size());
158+
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
159+
return messages[messageMap[partIndex].position + payloadIndex + 1];
160+
}
161+
162+
FairMQMessagePtr const& associatedHeader(size_t pos) const
163+
{
164+
return messages[messageMap[pairMap[pos].first].position];
165+
}
166+
167+
FairMQMessagePtr const& associatedPayload(size_t pos) const
168+
{
169+
auto partIndex = pairMap[pos].first;
170+
auto payloadIndex = pairMap[pos].second;
171+
return messages[messageMap[partIndex].position + payloadIndex + 1];
124172
}
125173
};
126174

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,18 +1163,26 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
11631163
currentSetOfInputs = relayer->consumeExistingInputsForTimeslice(slot);
11641164
}
11651165
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
1166-
if (currentSetOfInputs[i].size() > partindex) {
1167-
auto header = currentSetOfInputs[i].header(partindex).get();
1168-
auto payload = currentSetOfInputs[i].payload(partindex).get();
1169-
return DataRef{nullptr,
1170-
static_cast<char const*>(header->GetData()),
1171-
static_cast<char const*>(payload ? payload->GetData() : nullptr),
1172-
payload ? payload->GetSize() : 0};
1166+
if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
1167+
const char* headerptr = nullptr;
1168+
const char* payloadptr = nullptr;
1169+
size_t payloadSize = 0;
1170+
// - each input can have multiple parts
1171+
// - "part" denotes a sequence of messages belonging together, the first message of the
1172+
// sequence is the header message
1173+
// - each part has one or more payload messages
1174+
// - InputRecord provides all payloads as header-payload pairs
1175+
auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
1176+
auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
1177+
headerptr = static_cast<char const*>(headerMsg->GetData());
1178+
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
1179+
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
1180+
return DataRef{nullptr, headerptr, payloadptr, payloadSize};
11731181
}
11741182
return DataRef{};
11751183
};
11761184
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
1177-
return currentSetOfInputs[i].size();
1185+
return currentSetOfInputs[i].getNumberOfPairs();
11781186
};
11791187
return InputSpan{getter, nofPartsGetter, currentSetOfInputs.size()};
11801188
};

0 commit comments

Comments
 (0)