Skip to content

feat(amber): supporting consistent operator stats retrieval #3557

Merged
shengquan-ni merged 9 commits into
masterfrom
shengquan-consistent-stats
Jul 12, 2025
Merged

feat(amber): supporting consistent operator stats retrieval #3557
shengquan-ni merged 9 commits into
masterfrom
shengquan-consistent-stats

Conversation

@shengquan-ni

@shengquan-ni shengquan-ni commented Jul 11, 2025

Copy link
Copy Markdown
Contributor

This PR introduces consistent operator retrieval in QueryStats by adopting a reverse-topological strategy.

Problem

Previously, we retrieved operator statistics by sending direct control messages to each worker independently. This approach could result in inconsistent snapshots: for example, a downstream operator might appear as completed while its upstream is still running, which is logically impossible in a pipelined execution.

Consistent Retrieval via Reverse-Topological Order

We now compute a layered topological order of operators, where operators in the same layer have the same rank, and retrieve stats layer by layer in reverse topological order. Only after all relevant stats are retrieved do we update the execution state and send the update to the frontend. This guarantees that downstream stats are not visible before their upstream stats are available, maintaining a consistent global view.

This approach increases the time complexity to O(# of layers), compared to the previous O(1) method. Therefore, we should avoid issuing stats queries too frequently, as doing so may flood the control message queue and delay other control operations.

SubDAG Query and Race Condition Mitigation

In some scenarios—such as querying the stats of a completed worker—we now query the entire subDAG rooted at that operator to ensure upstream context is also retrieved. This avoids inconsistencies in localized queries.

However, this introduces the possibility of a race condition:

  1. A global query stats request is fired.
  2. Operator A's stats are retrieved at timestamp T₀.
  3. A subDAG stats query (including operator A) is fired.
  4. Operator A's stats are retrieved again at timestamp T₁.
  5. The subDAG query finishes and updates the execution state with timestamp T₁.
  6. The earlier global query finishes and overwrites A's stats with older data from T₀.

To prevent this, we now attach a nanosecond-level timestamp to each execution state update, and only allow updates with newer timestamps to overwrite the existing state.

@shengquan-ni shengquan-ni self-assigned this Jul 11, 2025
@aglinxinyuan

Copy link
Copy Markdown
Contributor

Need more testing. Will do it offline.

@shengquan-ni shengquan-ni changed the title feat(engine): supporting consistent operator stats retrieval feat(amer): supporting consistent operator stats retrieval Jul 12, 2025
@shengquan-ni shengquan-ni changed the title feat(amer): supporting consistent operator stats retrieval feat(amber): supporting consistent operator stats retrieval Jul 12, 2025
@shengquan-ni shengquan-ni merged commit 875708d into master Jul 12, 2025
11 checks passed
@shengquan-ni shengquan-ni deleted the shengquan-consistent-stats branch July 12, 2025 21:07
SarahAsad23 pushed a commit to madisonmlin/texera that referenced this pull request May 20, 2026
)

This PR introduces consistent operator retrieval in `QueryStats` by
adopting a reverse-topological strategy.

## Problem
Previously, we retrieved operator statistics by sending direct control
messages to each worker independently. This approach could result in
inconsistent snapshots: for example, a downstream operator might appear
as completed while its upstream is still running, which is logically
impossible in a pipelined execution.

## Consistent Retrieval via Reverse-Topological Order
We now compute a layered topological order of operators, where operators
in the same layer have the same rank, and retrieve stats layer by layer
in reverse topological order. Only after all relevant stats are
retrieved do we update the execution state and send the update to the
frontend. This guarantees that downstream stats are not visible before
their upstream stats are available, maintaining a consistent global
view.

This approach increases the time complexity to O(# of layers), compared
to the previous O(1) method. Therefore, we should avoid issuing stats
queries too frequently, as doing so may flood the control message queue
and delay other control operations.

## SubDAG Query and Race Condition Mitigation
In some scenarios—such as querying the stats of a completed worker—we
now query the entire subDAG rooted at that operator to ensure upstream
context is also retrieved. This avoids inconsistencies in localized
queries.

However, this introduces the possibility of a race condition:

1. A global query stats request is fired.
2. Operator A's stats are retrieved at timestamp T₀.
3. A subDAG stats query (including operator A) is fired.
4. Operator A's stats are retrieved again at timestamp T₁.
5. The subDAG query finishes and updates the execution state with
timestamp T₁.
6. The earlier global query finishes and overwrites A's stats with older
data from T₀.

To prevent this, we now attach a **nanosecond-level** timestamp to each
execution state update, and only allow updates with newer timestamps to
overwrite the existing state.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
)

This PR introduces consistent operator retrieval in `QueryStats` by
adopting a reverse-topological strategy.

## Problem
Previously, we retrieved operator statistics by sending direct control
messages to each worker independently. This approach could result in
inconsistent snapshots: for example, a downstream operator might appear
as completed while its upstream is still running, which is logically
impossible in a pipelined execution.

## Consistent Retrieval via Reverse-Topological Order
We now compute a layered topological order of operators, where operators
in the same layer have the same rank, and retrieve stats layer by layer
in reverse topological order. Only after all relevant stats are
retrieved do we update the execution state and send the update to the
frontend. This guarantees that downstream stats are not visible before
their upstream stats are available, maintaining a consistent global
view.

This approach increases the time complexity to O(# of layers), compared
to the previous O(1) method. Therefore, we should avoid issuing stats
queries too frequently, as doing so may flood the control message queue
and delay other control operations.

## SubDAG Query and Race Condition Mitigation
In some scenarios—such as querying the stats of a completed worker—we
now query the entire subDAG rooted at that operator to ensure upstream
context is also retrieved. This avoids inconsistencies in localized
queries.

However, this introduces the possibility of a race condition:

1. A global query stats request is fired.
2. Operator A's stats are retrieved at timestamp T₀.
3. A subDAG stats query (including operator A) is fired.
4. Operator A's stats are retrieved again at timestamp T₁.
5. The subDAG query finishes and updates the execution state with
timestamp T₁.
6. The earlier global query finishes and overwrites A's stats with older
data from T₀.

To prevent this, we now attach a **nanosecond-level** timestamp to each
execution state update, and only allow updates with newer timestamps to
overwrite the existing state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants