Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from opentelemetry.sdk.metrics._internal.measurement import Measurement
from opentelemetry.sdk.metrics._internal.point import DataPointT
from opentelemetry.sdk.metrics._internal.view import View
from opentelemetry.sdk.util import get_dict_as_key
from opentelemetry.util.types import AttributesAsKey

_logger = getLogger(__name__)

Expand All @@ -32,7 +34,7 @@ def __init__(
):
self._view = view
self._instrument = instrument
self._attributes_aggregation: dict[frozenset, _Aggregation] = {}
self._attributes_aggregation: dict[AttributesAsKey, _Aggregation] = {}
self._lock = Lock()
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
Expand Down Expand Up @@ -98,7 +100,7 @@ def consume_measurement(
else:
attributes = {}

aggr_key = frozenset(attributes.items())
aggr_key = get_dict_as_key(attributes)

if aggr_key not in self._attributes_aggregation:
with self._lock:
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ def get_dict_as_key(labels):
sorted(
map(
lambda kv: (
(kv[0], tuple(kv[1])) if isinstance(kv[1], list) else kv
(kv[0], tuple(kv[1]))
if isinstance(kv[1], Sequence)
and not isinstance(kv[1], (str, bytes))
else kv
),
labels.items(),
)
Expand Down
64 changes: 56 additions & 8 deletions opentelemetry-sdk/tests/metrics/test_view_instrument_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
LastValueAggregation,
View,
)
from opentelemetry.sdk.util import get_dict_as_key


def generalized_reservoir_factory(
Expand Down Expand Up @@ -101,7 +102,7 @@ def test_consume_measurement(self):
)
self.assertEqual(
view_instrument_match._attributes_aggregation,
{frozenset([("c", "d")]): self.mock_created_aggregation},
{get_dict_as_key({"c": "d"}): self.mock_created_aggregation},
)

view_instrument_match.consume_measurement(
Expand All @@ -117,8 +118,8 @@ def test_consume_measurement(self):
self.assertEqual(
view_instrument_match._attributes_aggregation,
{
frozenset(): self.mock_created_aggregation,
frozenset([("c", "d")]): self.mock_created_aggregation,
get_dict_as_key({}): self.mock_created_aggregation,
get_dict_as_key({"c": "d"}): self.mock_created_aggregation,
},
)

Expand Down Expand Up @@ -147,8 +148,8 @@ def test_consume_measurement(self):
self.assertEqual(
view_instrument_match._attributes_aggregation,
{
frozenset(
[("c", "d"), ("f", "g")]
get_dict_as_key(
{"c": "d", "f": "g"}
): self.mock_created_aggregation
},
)
Expand Down Expand Up @@ -178,7 +179,7 @@ def test_consume_measurement(self):
)
self.assertEqual(
view_instrument_match._attributes_aggregation,
{frozenset({}): self.mock_created_aggregation},
{get_dict_as_key({}): self.mock_created_aggregation},
)

# Test that a drop aggregation is handled in the same way as any
Expand Down Expand Up @@ -207,7 +208,7 @@ def test_consume_measurement(self):
)
)
self.assertIsInstance(
view_instrument_match._attributes_aggregation[frozenset({})],
view_instrument_match._attributes_aggregation[get_dict_as_key({})],
_DropAggregation,
)

Expand Down Expand Up @@ -297,6 +298,53 @@ def test_consume_measurement_attributes_are_copied(self):
self.assertEqual(len(number_data_points), 1)
self.assertEqual(number_data_points[0].attributes, {"key": "original"})

def test_consume_measurement_with_sequence_attributes(self):
instrument1 = _Counter(
"instrument1",
Mock(),
Mock(),
description="description",
unit="unit",
)
instrument1.instrumentation_scope = self.mock_instrumentation_scope
view_instrument_match = _ViewInstrumentMatch(
view=View(
instrument_name="instrument1",
name="name",
aggregation=DefaultAggregation(),
),
instrument=instrument1,
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
)

attributes = {
"list_attr": ["value"],
"tuple_attr": ("another", "value"),
}
view_instrument_match.consume_measurement(
Measurement(
value=1,
time_unix_nano=time_ns(),
instrument=instrument1,
context=Context(),
attributes=attributes,
)
)

self.assertIn(
get_dict_as_key(attributes),
view_instrument_match._attributes_aggregation,
)

number_data_points = view_instrument_match.collect(
AggregationTemporality.CUMULATIVE, 0
)
number_data_points = list(number_data_points)
self.assertEqual(len(number_data_points), 1)
self.assertEqual(number_data_points[0].attributes, attributes)

@patch(
"opentelemetry.sdk.metrics._internal._view_instrument_match.time_ns",
side_effect=[0, 1, 2],
Expand Down Expand Up @@ -515,7 +563,7 @@ def test_setting_aggregation(self):

self.assertIsInstance(
view_instrument_match._attributes_aggregation[
frozenset({("c", "d")})
get_dict_as_key({"c": "d"})
],
_LastValueAggregation,
)
Expand Down
Loading