Skip to content

fix(amber): Python internal marker replay during reconfiguration#4547

Merged
Yicong-Huang merged 2 commits into
apache:mainfrom
Yicong-Huang:codex/issue-4545-root-cause
Apr 28, 2026
Merged

fix(amber): Python internal marker replay during reconfiguration#4547
Yicong-Huang merged 2 commits into
apache:mainfrom
Yicong-Huang:codex/issue-4545-root-cause

Conversation

@Yicong-Huang

@Yicong-Huang Yicong-Huang commented Apr 28, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

This PR fixes the Python reconfiguration hang reported in #4545 and explicitly re-enables the regression test that was temporarily ignored in #4546.

The regression came from #4424 (ef66190f22), which changed the lifetime of Python current_internal_marker. After that change, get_internal_marker() stopped consuming the marker on read, and the main loop deferred cleanup until after replaying internal channel markers.

For Python source operators, that allowed an internal EndChannel marker to remain visible across the pause and reconfiguration window. When the reconfiguration ECM was processed, the stale marker could be observed and replayed again, which corrupted end-of-stream handling and caused the workflow to hang.

This PR restores the expected one-time consumption behavior by:

  • making get_internal_marker() consume current_internal_marker when it is read
  • removing the extra delayed cleanup after replaying internal channel markers in the Python main loop
  • changing should propagate reconfiguration through a source operator in workflow in ReconfigurationSpec from ignore back to a normal enabled test

Any related issues, documentation, discussions?

Fixes #4545.

Regression introduced by #4424.
Re-enables the temporary test disable from #4546 after fixing the underlying lifecycle bug.

How was this PR tested?

Tested with existing Scala tests using Java 11:

  • WorkflowExecutionService/testOnly org.apache.texera.amber.engine.e2e.ReconfigurationSpec

This run included the re-enabled should propagate reconfiguration through a source operator in workflow case. The full ReconfigurationSpec passed on the rebased branch (5/5 passed, 0 ignored).

Was this PR authored or co-authored using generative AI tooling?

Generated-by: OpenAI Codex (GPT-5)

@Yicong-Huang Yicong-Huang changed the title [codex] Fix Python internal marker replay during reconfiguration fix(amber): Python internal marker replay during reconfiguration Apr 28, 2026
@Yicong-Huang Yicong-Huang force-pushed the codex/issue-4545-root-cause branch from c7da7a5 to 05ba9f4 Compare April 28, 2026 07:21
@Yicong-Huang Yicong-Huang marked this pull request as ready for review April 28, 2026 07:35
@Yicong-Huang Yicong-Huang self-assigned this Apr 28, 2026

@aglinxinyuan aglinxinyuan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think the comment in #4424 (comment) is valid?

@Yicong-Huang

Copy link
Copy Markdown
Contributor Author

Do you think the comment in #4424 (comment) is valid?

I replied there. I think the comment is not valid, and the true issue in $4424 is not about switch_context.

@Yicong-Huang Yicong-Huang merged commit 02018cd into apache:main Apr 28, 2026
15 checks passed
aglinxinyuan added a commit that referenced this pull request Apr 30, 2026
### What changes were proposed in this PR?

Restores reliable state-output emission for Python operators after the
#4552 revert. After this PR, both per-input-state outputs
(`Operator.process_state(...)`) and the end-of-input-port output
(`Operator.produce_state_on_finish(...)`) reach downstream channels.

`MainLoop.process_input_state` previously did two `_switch_context()`
calls with the read of `current_output_state` in between. The executor
only writes that field during the *second* switch — so `MainLoop` always
captured the previous cycle's value, and the finish-state set on
`EndChannel` ended up in `current_output_state` after `MainLoop` had
returned, never to be read again. This PR collapses the read to a single
switch + read-after, drops the duplicate post-init and end-of-body
switches in `DataProcessor.run`, and makes the run-loop's input dispatch
peek-then-consume so `current_internal_marker` keeps the atomic
single-consume semantics whose absence was the root cause of #4545.

<details>
<summary>History — third attempt at this fix</summary>

- #4421 reported that a Python operator could process its first state
input but not its second.
- PR #4424 added three `_switch_context()` calls to keep `MainLoop` and
`DataProcessor` in sync, closed #4421, but changed
`current_internal_marker` lifetime and broke the source-propagation case
in `ReconfigurationSpec` (#4545).
- PR #4547 tried to restore atomic marker consumption on top of #4424
and re-enabled the source-propagation case in `ReconfigurationSpec`. CI
continued to fail.
- PR #4552 reverted #4424 outright as a stop-gap. State-processing is
back to its pre-#4424 broken state — see #4559.

</details>

### Any related issues, documentation, discussions?

Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552.

### How was this PR tested?

Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added
three new tests:

- `test_process_state_can_emit_multiple_states` — stub-level coverage of
the #4421 "second state not processed" scenario.
- `test_main_loop_thread_can_process_state` — full real-thread coverage
of state DataElements and `produce_state_on_finish` on `EndChannel`.
Times out on plain `main` (#4559); passes on this branch.
- `test_main_loop_thread_can_process_state_after_tuple` — coverage for
the mixed `tuple → state` input sequence.

`ReconfigurationSpec`'s source-propagation case (re-enabled in #4547)
should be re-run on this branch to confirm the new handshake does not
re-introduce #4545.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Anthropic Claude Opus 4.7
@Yicong-Huang Yicong-Huang deleted the codex/issue-4545-root-cause branch June 15, 2026 06:46
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…che#4547)

### What changes were proposed in this PR?
This PR fixes the Python reconfiguration hang reported in apache#4545 and
explicitly re-enables the regression test that was temporarily ignored
in apache#4546.

The regression came from `apache#4424` (`ef66190f22`), which changed the
lifetime of Python `current_internal_marker`. After that change,
`get_internal_marker()` stopped consuming the marker on read, and the
main loop deferred cleanup until after replaying internal channel
markers.

For Python source operators, that allowed an internal `EndChannel`
marker to remain visible across the pause and reconfiguration window.
When the reconfiguration ECM was processed, the stale marker could be
observed and replayed again, which corrupted end-of-stream handling and
caused the workflow to hang.

This PR restores the expected one-time consumption behavior by:
- making `get_internal_marker()` consume `current_internal_marker` when
it is read
- removing the extra delayed cleanup after replaying internal channel
markers in the Python main loop
- changing `should propagate reconfiguration through a source operator
in workflow` in `ReconfigurationSpec` from `ignore` back to a normal
enabled test

### Any related issues, documentation, discussions?
Fixes apache#4545.

Regression introduced by apache#4424.
Re-enables the temporary test disable from apache#4546 after fixing the
underlying lifecycle bug.

### How was this PR tested?
Tested with existing Scala tests using Java 11:
- `WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.e2e.ReconfigurationSpec`

This run included the re-enabled `should propagate reconfiguration
through a source operator in workflow` case. The full
`ReconfigurationSpec` passed on the rebased branch (`5/5` passed, `0`
ignored).

### Was this PR authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex (GPT-5)
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
### What changes were proposed in this PR?

Restores reliable state-output emission for Python operators after the
apache#4552 revert. After this PR, both per-input-state outputs
(`Operator.process_state(...)`) and the end-of-input-port output
(`Operator.produce_state_on_finish(...)`) reach downstream channels.

`MainLoop.process_input_state` previously did two `_switch_context()`
calls with the read of `current_output_state` in between. The executor
only writes that field during the *second* switch — so `MainLoop` always
captured the previous cycle's value, and the finish-state set on
`EndChannel` ended up in `current_output_state` after `MainLoop` had
returned, never to be read again. This PR collapses the read to a single
switch + read-after, drops the duplicate post-init and end-of-body
switches in `DataProcessor.run`, and makes the run-loop's input dispatch
peek-then-consume so `current_internal_marker` keeps the atomic
single-consume semantics whose absence was the root cause of apache#4545.

<details>
<summary>History — third attempt at this fix</summary>

- apache#4421 reported that a Python operator could process its first state
input but not its second.
- PR apache#4424 added three `_switch_context()` calls to keep `MainLoop` and
`DataProcessor` in sync, closed apache#4421, but changed
`current_internal_marker` lifetime and broke the source-propagation case
in `ReconfigurationSpec` (apache#4545).
- PR apache#4547 tried to restore atomic marker consumption on top of apache#4424
and re-enabled the source-propagation case in `ReconfigurationSpec`. CI
continued to fail.
- PR apache#4552 reverted apache#4424 outright as a stop-gap. State-processing is
back to its pre-apache#4424 broken state — see apache#4559.

</details>

### Any related issues, documentation, discussions?

Fixes apache#4559. Follow-up to apache#4421 / apache#4424 / apache#4545 / apache#4547 / apache#4552.

### How was this PR tested?

Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added
three new tests:

- `test_process_state_can_emit_multiple_states` — stub-level coverage of
the apache#4421 "second state not processed" scenario.
- `test_main_loop_thread_can_process_state` — full real-thread coverage
of state DataElements and `produce_state_on_finish` on `EndChannel`.
Times out on plain `main` (apache#4559); passes on this branch.
- `test_main_loop_thread_can_process_state_after_tuple` — coverage for
the mixed `tuple → state` input sequence.

`ReconfigurationSpec`'s source-propagation case (re-enabled in apache#4547)
should be re-run on this branch to confirm the new handshake does not
re-introduce apache#4545.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Anthropic Claude Opus 4.7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ReconfigurationSpec: source-propagation case hangs after UDF processes EndChannel

2 participants