Skip to content

feat: add loop operators#4206

Closed
aglinxinyuan wants to merge 305 commits into
mainfrom
xinyuan-loop-feb
Closed

feat: add loop operators#4206
aglinxinyuan wants to merge 305 commits into
mainfrom
xinyuan-loop-feb

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Feb 11, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Adds two new operators — Loop Start and Loop End — that let users write a for-loop inside a visual workflow. The loop iterates over rows of a pandas table. The user supplies four small Python snippets:

Field Where Example
initialization Loop Start i = 0
output Loop Start table.iloc[i] — the row passed into the loop body each iteration
update Loop End i += 1
condition Loop End i < len(table) — keep looping while this is true

Operators placed between Loop Start and Loop End make up the loop body and run once per iteration. When condition returns true, the runtime starts another iteration; when it returns false, downstream operators run on the accumulated output.

How an iteration works

   Upstream Table
        │
        ▼
   ┌──────────┐   loop variables: row i,   ┌──────────┐               ┌─────────┐
   │ Loop     ├───counters, accumulators ─►│   loop   ├──────────────►│  Loop   │
   │ Start    │   (the loop's "state")     │   body   │               │   End   │
   └──────────┘                            └──────────┘               └────┬────┘
        ▲                                                                  │
        │  (1) DCM: "schedule the Loop Start region again"                 │
        │  (2) write the next iteration's state (i, accumulators, table)   │
        │      to the iceberg table that Loop Start reads its input from   │
        └──────────────────────────────────────────────────────────────────┘
                                when condition() == True

The arrow from Loop End back to Loop Start is not an edge in the workflow graph — the region DAG stays acyclic. The loop-back is done with two separate steps when an iteration ends:

  1. Loop End sends a DCM (Direct Control Message — Texera's worker→controller control-channel message; it does not flow along data edges, so it doesn't break the acyclic DAG) named jump_to_operator_region, asking the controller to schedule the Loop Start region one more time.
  2. Loop End writes the updated state — a dict with i, any accumulators, and the pickled table — into the iceberg table that Loop Start reads from at the start of every iteration.

The newly scheduled Loop Start region then picks up that state and runs the next iteration. The "iceberg table Loop Start reads from" is the same cross-region state channel introduced in #4490; this PR reuses it as the back-edge for loops.

What changed

Area File Purpose
Operator definitions LoopStartOpDesc.scala, LoopEndOpDesc.scala Code-gen the Python operator from the user's initialization / output / update / condition expressions
Operator runtime base core/models/operator.pyLoopStartOperator, LoopEndOperator Python superclasses the generated code extends; manage loop_counter and the state dict
Scheduler RegionExecutionCoordinator.scala Don't recreate Loop End's iceberg output between iterations (the output accumulates the body's rows; recreating it would erase what we just wrote)
Output-storage reuse flag PhysicalOp.scalareusesOutputStorageOnReExecution: Boolean + withReusesOutputStorageOnReExecution A flag the scheduler checks (instead of string-matching the operator id) to skip recreating an operator's output on region re-execution; Loop End sets it so a loop's own iterations accumulate
Worker runtime MainLoop._attach_loop_start_id, _jump_to_loop_start On Loop End completion: send the DCM and write the next state
Worker output OutputManager.reset_output_storage A Loop End's materialized output accumulates the results of all of its own iterations (a single / outermost loop keeps everything — the scheduler reuses its output doc across re-runs and the writer appends). reset_output_storage exists only for the inner Loop End of a nested loop: it fires once per outer iteration, on the outer-loop pass-through (loop_counter > 0) in _process_state_frame, dropping and recreating that Loop End's result/state iceberg tables so the new outer iteration accumulates from empty instead of concatenating across outer iterations. A single / outermost Loop End never sees loop_counter > 0, so it never resets. Safe because loops run MATERIALIZED — downstream doesn't read until the loop region completes.
Execution mode WorkflowExecutionService.scala Reject a loop workflow that is submitted with a non-MATERIALIZED execution mode by throwing an IllegalArgumentException with an actionable message. WorkflowService catches it and routes it through errorHandlerWorkflowFatalError → the existing diff handler, so the frontend sees a clear "switch the mode and re-run" error instead of the UI and the engine silently disagreeing.
Frontend LoopStart.png, LoopEnd.png Operator icons

Nested loops

Each state carries a loop_counter integer marking which loop's iteration the state belongs to. This is what keeps an inner Loop End from accidentally consuming an outer loop's state.

Operator Rule
Inner LoopStart.process_state If the state already came from a Loop Start (LoopStartStateURI is set), loop_counter += 1 and pass it through.
Inner LoopEnd.process_state If loop_counter > 0, decrement and pass it through (this state belongs to an outer loop).
Matching LoopEnd If loop_counter == 0, the state is mine: run update, evaluate condition, jump back to my paired Loop Start.

So when two loops are nested, the outer loop's state walks through the inner Loop Start (+1) and the inner Loop End (−1) untouched, arrives at the outer Loop End at loop_counter == 0, and only there is it consumed.

Any related issues, documentation, discussions?

Closes #4442. Builds on #4490 (cross-region state materialization) and #5085 (DocumentFactory.documentExists).

How was this PR tested?

  • LoopStartOpDescSpec.scala / LoopEndOpDescSpec.scala — code-gen output, ports, isLoopEnd flag.
  • test_loop_operators.py — runtime base classes: flat-loop matching branch (runs update / condition) and nested-loop pass-through (loop_counter +/− symmetric across an outer × inner traversal).
  • sbt scalafmtCheckAll scalafixAll --check and ruff check clean.

Manual workflows

Input for both is a 3-row table from TextInput("1\n2\n3"). Each loop's condition is i < len(table).

Workflow Topology Expected
Loop.json TextInput → LoopStart → LoopEnd 3 iterations, workflow terminates.
Nested Loop.json TextInput → OuterLoopStart → InnerLoopStart → InnerLoopEnd → OuterLoopEnd Outer runs 3 times, inner runs 3 times per outer iteration = 9 total inner iterations. Workflow terminates.

Demo:
Basic Loop:
loop

Nested Loop:
nested

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.7 in compliance with ASF.

@github-actions github-actions Bot added engine frontend Changes related to the frontend GUI common labels Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 12, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 14, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 14, 2026
@aglinxinyuan aglinxinyuan changed the base branch from main to xiaozhen-sync-region-kill February 14, 2026 00:30
@aglinxinyuan aglinxinyuan self-assigned this Feb 28, 2026
@aglinxinyuan aglinxinyuan changed the base branch from xiaozhen-sync-region-kill to xinyuan-state-only April 19, 2026 08:28
@aglinxinyuan aglinxinyuan changed the base branch from xinyuan-state-only to xiaozhen-sync-region-kill April 20, 2026 07:59
@aglinxinyuan aglinxinyuan changed the base branch from xiaozhen-sync-region-kill to xinyuan-state-only April 20, 2026 08:49
@aglinxinyuan aglinxinyuan changed the base branch from xinyuan-state-only to xiaozhen-sync-region-kill April 20, 2026 08:52
@aglinxinyuan aglinxinyuan changed the base branch from xiaozhen-sync-region-kill to xinyuan-state-only April 20, 2026 08:55
@aglinxinyuan aglinxinyuan force-pushed the xinyuan-loop-feb branch 2 times, most recently from 473d756 to 45a51e1 Compare April 20, 2026 09:09
@github-actions github-actions Bot added the dependencies Pull requests that update a dependency file label Apr 22, 2026
@aglinxinyuan

Copy link
Copy Markdown
Contributor Author

@Xiao-zhen-Liu, please review the PR again.

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: consider splitting this PR

This is now 38 files / +2280 −94. It's mostly tests (good), but it bundles four separate pieces of work, and one of them — the change to how operator state is stored and sent — affects every stateful operator, not just loops. That part is hard to review alongside the loop feature, because approving the loops also means approving a change to all state handling.

This is only a suggestion. At a minimum I'd split it in two: the state-format change on its own, and the loop feature on top. If you want to go further, it splits cleanly into four:

PR 1  Worker-id helper            (~48 lines)   independent
      get_operator_id + test (replaces the worker-name string parsing)

PR 2  Generalize how state is stored and sent  (~250 lines)
      4-column State schema (Scala + Python) + bytes-in-JSON +
      StateFrame + network sender/receiver + materialization reader

PR 3  Reuse output storage on a region re-run  (~30 lines)
      RegionExecutionCoordinator skip-create + a flag named for the
      behavior + its test

PR 4  Loop operators              (~700 + tests)  builds on 1-3
      LoopStart/EndOpDesc, operator.py loop classes, main_loop logic,
      execution-mode validation, reset_storage, icons, tests

If you only do the two-way split, the state-format change is the part to split out: state written before this PR has the old one-column schema, so it needs backward-compatible reading and a Scala-side round-trip test — neither of which is here yet.

One process note first. Several resolved threads (11, 17, 19, 21, 22) were closed with replies pointing to fix commits (e6bea518f2, e281c61b4c, ca9e5ce8cc, 1848ce00fb, 540b7ba274, bbec98282e) that aren't in the branch (070a844). The code they describe isn't there — it looks lost in a rebase or never pushed. So those threads read as done, but the code still has the original issue. Could you re-push or confirm? I've re-opened the specifics inline.

How to split, given the history can't be rebased apart (300+ commits, many merges): branch off main and copy over one group of files at a time — the four pieces touch almost entirely separate files.

Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py Outdated
Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py
Comment thread amber/src/main/python/core/models/operator.py Outdated
Comment thread amber/src/main/python/core/models/operator.py
Comment thread amber/src/main/python/core/runnables/main_loop.py Outdated
Comment thread amber/src/main/python/core/runnables/main_loop.py Outdated
Comment thread amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala Outdated
Addresses PR #4206 review feedback
(#4206 (comment)): an
earlier reply claimed reset_storage had been renamed to
reset_output_storage, documented, and guarded, but that work was lost
when the branch was rebased -- the method was back to a bare
reset_storage with no docstring and no checks, and the reason
truncation is safe lived only in the PR description.

Re-apply on the consolidated branch:
* Rename reset_storage -> reset_output_storage (matching main_loop's
  caller and the two __init__/set_up comments that referenced the
  intended name).
* Add a docstring: what it does (drop+recreate the result AND state
  tables, reopen the writers), that it's called only by a Loop End
  worker once per iteration, and -- the rationale that previously
  lived only in the PR description -- WHY truncating live storage is
  safe: loops run in MATERIALIZED mode, so downstream operators don't
  read this output until the loop region completes, so no reader can
  observe an intermediate truncation.
* Add the two previously-implicit precondition guards: exactly one
  output port, and the storage writer already set up
  (_storage_uri_base populated). Both raise a clear RuntimeError
  instead of silently resetting the wrong port or dereferencing None.
* test_output_manager.py: new TestResetOutputStorage covering the
  happy path (close -> recreate result+state docs -> reopen writer)
  and both guard failures, with iceberg/thread collaborators mocked.
@github-actions

github-actions Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

✅ No material benchmark regressions detected

🟢 13 better · 🔴 0 worse · ⚪ 2 noise (<±5%) · 0 without baseline

CI benchmark results are noisy; treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🟢 bs=10 sw=10 sl=64 443 0.271 22,282/33,194/33,194 us 🟢 +12.3% / 🟢 +8.1%
🟢 bs=100 sw=10 sl=64 939 0.573 104,394/124,887/124,887 us 🟢 -13.9% / 🟢 -10.5%
🟢 bs=1000 sw=10 sl=64 1,098 0.67 905,236/984,614/984,614 us 🟢 +15.2% / 🟢 -6.6%
Baseline details

Latest main a044287 from 2026-06-13T22:20:02.070Z

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 443 tuples/sec 395.36 tuples/sec 410.7 tuples/sec +12.0% +7.9%
bs=10 sw=10 sl=64 MB/s 0.271 MB/s 0.241 MB/s 0.251 MB/s +12.3% +8.1%
bs=10 sw=10 sl=64 p50 22,282 us 24,519 us 23,798 us -9.1% -6.4%
bs=10 sw=10 sl=64 p95 33,194 us 34,935 us 35,169 us -5.0% -5.6%
bs=10 sw=10 sl=64 p99 33,194 us 34,935 us 35,169 us -5.0% -5.6%
bs=100 sw=10 sl=64 throughput 939 tuples/sec 833.83 tuples/sec 894.84 tuples/sec +12.6% +4.9%
bs=100 sw=10 sl=64 MB/s 0.573 MB/s 0.509 MB/s 0.546 MB/s +12.6% +4.9%
bs=100 sw=10 sl=64 p50 104,394 us 120,315 us 111,887 us -13.2% -6.7%
bs=100 sw=10 sl=64 p95 124,887 us 145,017 us 139,602 us -13.9% -10.5%
bs=100 sw=10 sl=64 p99 124,887 us 145,017 us 139,602 us -13.9% -10.5%
bs=1000 sw=10 sl=64 throughput 1,098 tuples/sec 953.24 tuples/sec 1,045 tuples/sec +15.2% +5.1%
bs=1000 sw=10 sl=64 MB/s 0.67 MB/s 0.582 MB/s 0.638 MB/s +15.2% +5.1%
bs=1000 sw=10 sl=64 p50 905,236 us 1,052,108 us 969,370 us -14.0% -6.6%
bs=1000 sw=10 sl=64 p95 984,614 us 1,094,010 us 1,019,271 us -10.0% -3.4%
bs=1000 sw=10 sl=64 p99 984,614 us 1,094,010 us 1,019,271 us -10.0% -3.4%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,451.00,200,128000,443,0.271,22281.94,33193.83,33193.83
1,100,10,64,20,2129.84,2000,1280000,939,0.573,104393.57,124887.02,124887.02
2,1000,10,64,20,18220.65,20000,12800000,1098,0.670,905235.65,984613.93,984613.93

…ermination

Addresses PR #4206 review feedback
(#4206 (comment)): both
E2E loop tests only checked the workflow reached COMPLETED, so a counter
bug that still terminated (e.g. off-by-one) would have passed.

Each test now asserts the terminal LoopEnd's cumulative output-tuple
count, captured from ExecutionStatsUpdate (keyed by logical op id;
delivered before COMPLETED; the worker persists across the
JumpToOperatorRegion re-executions so its output statistic accumulates
across iterations). LoopEnd is an identity pass-through on data, so by
conservation that count equals the number of rows that flowed through
the loop -- i.e. the iteration count.

* Single loop: assert terminal LoopEnd output == 3 (i advances 0,1,2,
  stops at 3).
* Nested loop: the outer LoopStart now emits the WHOLE table (output =
  "table") instead of one row, so the inner loop genuinely iterates
  over 3 rows and the inner body runs 3 x 3 = 9 times (matching the
  Nested.Loop.json demo). Assert the terminal outer LoopEnd output == 9.
  The previous operators (outer output = table.iloc[i]) would have run
  the inner body only once per outer iteration -- 3 total, not the 9 the
  comment/PR description claimed.

runToCompletion -> runAndGetOutputCounts, returning the per-operator
output-count map so each test asserts the count it cares about.
…tion

Addresses PR #4206 review feedback
(#4206 (comment)): the
PhysicalOp flag was named after the operator (isLoopEnd) rather than the
behavior the scheduler actually checks -- 'keep this operator's output
storage across a region re-run.' (This rename was made earlier but lost
when the branch was rebased.)

* PhysicalOp: isLoopEnd -> reusesOutputStorageOnReExecution (+ doc
  comment); withIsLoopEnd -> withReusesOutputStorageOnReExecution.
* RegionExecutionCoordinator: the skip-recreate guard checks
  reusesOutputStorageOnReExecution; local val + comment reworded to the
  behavior.
* LoopEndOpDesc sets .withReusesOutputStorageOnReExecution(true).
* Loop specs + mixin updated to the new name.

Now any future operator that must preserve its output across a region
re-execution can set the flag without a LoopEnd-specific misnomer.

Verified: WorkflowCore + WorkflowOperator compile and all 29 LoopStart/
LoopEnd op-desc specs pass. (The amber module's local compile is blocked
by an unrelated pre-existing JOOQ issue -- PveManager references the
virtual_environments table from #5577, absent from the un-migrated local
DB; CI compiles it against a fresh schema.)
…nCoordinator

Addresses the remaining half of PR #4206 review feedback
(#4206 (comment)): the
skip-create branch (reuse an existing output document on region re-run
instead of clobbering it) was untested. (The rename half landed in
3d4f15b.)

Extract the create-or-reuse decision out of the private
createOutputPortStorageObjects into a pure companion method
RegionExecutionCoordinator.provisionOutputDocument(uri,
reuseExistingStorage, documentExists, createDocument) with the storage
operations injected, so the four cases can be unit-tested without an
iceberg backend or a live region.

New RegionOutputProvisioningSpec pins:
* reuse + existing document  -> NOT recreated (createDocument not called),
  so accumulated loop output survives the re-run;
* reuse + no document yet     -> created (first iteration);
* no-reuse + existing         -> recreated/overwritten (fresh per run);
* no-reuse + none             -> created;
* no-reuse short-circuits documentExists (always recreate, never probe).

Verified the production change compiles (the only remaining amber
compile errors are the pre-existing PveManager/virtual_environments JOOQ
issue from #5577, unrelated to this change and resolved by a migrated DB
in CI); the new spec is a pure ScalaTest unit with no iceberg/actor
dependency.
… a class

Addresses PR #4206 review feedback
(#4206 (comment)): the
execution-mode check used isInstanceOf[LoopStartOpDesc], which (1) tied a
generic service to one operator class, (2) only matched LoopStart -- a
plan with a LoopEnd but no LoopStart would skip the check -- and (3) had
no test. (This was done earlier but lost when the branch was rebased.)

* Add LogicalOp.requiresMaterializedExecution (default false); LoopStart
  and LoopEnd both override it to true.
* Extract WorkflowExecutionService.validateExecutionMode(operators,
  settings) -- a pure, testable method that throws an
  IllegalArgumentException with an actionable message when the plan needs
  MATERIALIZED but the request asked for something else. Keyed off the
  flag, so the service no longer imports or matches LoopStartOpDesc, and
  Loop Start OR Loop End on its own triggers it.
* The constructor delegates to it (still fail-loud, per the earlier
  decision).
* New WorkflowExecutionServiceSpec: the flag is set on both loop ops and
  unset on a non-loop op (SleepOpDesc); validateExecutionMode rejects a
  loop+PIPELINED plan (incl. a LoopEnd-only plan), and accepts
  loop+MATERIALIZED, non-loop+PIPELINED, and an empty plan.

Verified locally: 6 new tests pass, 29 LoopStart/LoopEnd op-desc specs
pass, amber compiles.
…tate path

Addresses PR #4206 review feedback
(#4206 (comment)): the
loop state path still (1) read the parent's private field via the mangled
name self._TableOperator__table_data[port] -- silently breaking if
TableOperator is renamed -- and (2) moved the table with pickle
(dumps in LoopStart.produce_state_on_finish, loads in LoopEnd.run_update),
an RCE surface for data that lives in iceberg. (Fixed earlier but lost in
a force-rebase.)

* Add TableOperator._buffered_table(port): inside the class
  self.__table_data resolves via normal name mangling, so subclasses read
  the buffer without the mangled prefix and a rename stays transparent.
  LoopStart.produce_state_on_finish uses it.
* Replace pickle with Apache Arrow IPC (structured, typed, no callable
  payload): new table_to_ipc_bytes / table_from_ipc_bytes in table.py
  (pyarrow.ipc). produce_state_on_finish encodes; run_update decodes.
  Wire shape (bytes in state["table"]) is unchanged; only the format.
* Tests: TestBufferedTableAccessor pins the accessor; the produce-state
  test now asserts the bytes parse as an Arrow IPC stream and round-trip
  back to the same tuples (would fail if pickle returned); the matching-
  branch tests feed Arrow bytes. 14 tests pass.

Verified: 14 loop-operator tests pass; ruff clean. (The 1 failure in
test_tuple.py::test_hash is a pre-existing Windows OSError in tuple.py
hashing, unrelated to this change.)
…l site

Addresses PR #4206 review feedback
(#4206 (comment)): the
reviewer read the reset_output_storage() call in process_input_state
against the (stale) PR description and was unsure when it fires.

The code is correct -- reset fires once per iteration for every LoopEnd
on its matching-loop consume (loop_counter == 0); the nested pass-through
(loop_counter > 0) is forwarded and returned in _process_state_frame
before reaching this call, so it never resets. Add a comment at the call
site stating this, matching the reset_output_storage docstring. No
behavior change. (The PR description, which had the wrong claim, is
corrected separately.)
Addresses PR #4206 review feedback
(#4206 (comment)):
eval_condition read self._loop_table / self.state, which only run_update
assigns. MainLoop.complete() calls condition() on every LoopEnd, so a
LoopEnd that finished without consuming a matching state -- an inner
LoopEnd that only forwarded outer-loop pass-through state, or a loop with
no matching-branch consume -- raised AttributeError. (The _StubLoopEnd
test stub masked it by pre-seeding self.state.)

* LoopEndOperator.__init__ now initializes self.state = {},
  self._loop_table = None, and self._consumed_state = False (the class
  previously had no __init__; the generated ProcessLoopEndOperator has no
  __init__/open either, so it inherits this).
* run_update sets self._consumed_state = True after the consume.
* eval_condition returns False when nothing has been consumed -- the loop
  never iterated here, so don't fire the back-edge. Bare field init alone
  wasn't enough: the user condition (e.g. ) would then
  NameError on undefined loop variables.

Tests: unmask _StubLoopEnd (drop its self.state = {} so it mirrors the
generated operator and exercises the base __init__); add
test_condition_returns_false_before_any_state_is_consumed (the reviewer
scenario) and test_consumed_flag_flips_after_run_update. 16 loop +
33 main_loop/output_manager tests pass; no behavior change on the normal
consume path.
…hing the worker thread

complete() evaluates a Loop End's user-supplied condition() on the main
loop thread, before close()/COMPLETED and outside DataProcessor's guarded
executor session. A typo or undefined name in the condition propagated
through run()'s logger.catch(reraise=True) and silently killed the worker
thread, so the controller never learned of the error.

Guard the call so a bad condition is reported the same way a UDF error is:
record it on the exception manager, queue an ERROR console message, flush
it, then enter EXCEPTION_PAUSE, skipping the loop-back edge and completion.

Extract the console-message build out of DataProcessor._report_exception
into a shared ConsoleMessageManager.report_exception so the data path and
this main-loop path report identically.

Add a MainLoop.complete() test: a Loop End whose condition() raises is
reported and paused, with no loop-back and no completion.
…tion

A Loop End's materialized output should accumulate the results of all of
its own iterations; for a nested loop, the inner Loop End should accumulate
only within the current outer iteration and drop its rows when the outer
loop advances. That per-outer-iteration reset was never wired up:

- reset_output_storage() was called from process_input_state, which only
  runs on the matching consume (loop_counter == 0), not the outer
  pass-through (loop_counter > 0) where the "outer advanced" signal arrives.
- Worse, it sat under `if output_state is not None`, and a Loop End's
  generated process_state returns None (produce_state_on_finish is not
  overridden either), so output_state is always None for a Loop End -- the
  reset block was dead. The inner Loop End therefore accumulated across all
  outer iterations (9 rows in the 3x3 nested case instead of 3).

Move the reset to the inner pass-through branch in _process_state_frame
(loop_counter > 0). The input reader replays all states before any data
each region execution, so the result/state tables still hold the previous
outer iteration's rows when the outer boundary state passes through, and
clearing there makes each outer iteration accumulate from empty. It fires
exactly once per outer iteration because the inner LoopStart's output (and
thus this pass-through) is recreated on every inner back-edge -- each loop
operator is its own region, so the inner LoopStart's region does not carry
reusesOutputStorageOnReExecution and its output is recreated per inner
iteration. A single / outermost Loop End never sees loop_counter > 0, so it
never resets and keeps all of its iterations. The Scala controller side
(reusesOutputStorageOnReExecution) is unchanged; it provides the base
per-loop accumulation that this reset carves the nested exception out of.

Remove the dead consume-path reset call, and correct the reset_output_storage
docstring, the call-site comment, and the RegionExecutionCoordinator comment
to describe the actual (inner-Loop-End, per-outer-iteration) behavior.

Tests:
- Unit (test_main_loop): the inner pass-through (loop_counter > 0) triggers
  reset_output_storage once and does not invoke the operator; the consume
  path (loop_counter == 0, single loop) and a LoopStart pass-through never
  reset.
- Integration (LoopIntegrationSpec): assert the materialized result row
  counts -- single loop = 3 (accumulate); nested inner Loop End = 3, not 9
  (resets per outer iteration); nested outer Loop End = 9. The pre-existing
  cumulative output-tuple counts cannot distinguish accumulate from reset.
The materialized-result assertions added in e61681d failed
deterministically in the amber-integration CI job (both LoopIntegrationSpec
tests, on repeated attempts), while the workflow itself completed with the
correct iteration counts and no FatalError. The read was the problem: it ran
in the test body after client.shutdown() and threw a NoSuchElementException
when an operator's external RESULT uri was absent.

Restructure the read to match the working DataProcessingSpec pattern:
- read each operator's materialized RESULT row count inside the
  ExecutionStateUpdate(COMPLETED) callback, while the engine and storage
  singletons are still alive;
- tolerate operators whose external RESULT uri is absent (or whose iceberg
  document fails to open) by omitting them instead of throwing;
- log the resolved per-operator materialized row counts, throughput counts,
  and any operators missing a RESULT uri, so a URI/count mismatch is visible
  in the CI output.

Assertions are unchanged in intent (single LoopEnd = 3; nested outer = 9,
inner = 3 after the per-outer-iteration reset) but now report expected-vs-
actual. This is verified in the amber-integration CI job; the integration
test cannot run locally on Windows (Python workers use signal.SIGKILL).
…roughput stats

CI diagnostics (amber-integration on 735bd7b) showed the materialized
RESULT row counts are exactly right -- single LoopEnd = 3, nested outer
LoopEnd = 9, nested inner LoopEnd = 3 (reset per outer iteration) -- while
the cumulative ExecutionStatsUpdate output counts were 1 and 3.

So the long-standing failure was the throughput-based iteration-count
assertions (counts == 3 / == 9, added in 962b6c1), not the materialized
read. They rest on a false premise: that a loop operator's worker persists
across JumpToOperatorRegion re-executions and accumulates its output stat. It
does not -- each iteration spawns a fresh worker actor, so the per-logical-op
statistic reflects only the final iteration's worker and never sums to the
iteration count. The iceberg result, by contrast, persists across iterations
(reusesOutputStorageOnReExecution), so it is the reliable signal.

Drop the throughput assertions (and the now-unused ExecutionStatsUpdate
capture) and assert on the materialized row counts instead, which correctly
verify both accumulation (single = 3, outer = 9) and the inner LoopEnd's
per-outer-iteration reset (inner = 3, not 9). Termination is still checked by
the bounded Await on completion.
…t flag

A /simplify pass over the loop changes in operator.py:

- Extract `_strip_reserved(state)` -- the `{k: v ... if k not in
  _RESERVED_STATE_KEYS}` comprehension was copy-pasted three times
  (produce_state_on_finish + twice in run_update).
- Extract `_eval_loop_expr(expr, state, table)` -- LoopStart.eval_output and
  LoopEnd.eval_condition were near-identical "exec the expression as `output`
  in a throwaway namespace" evaluators.
- Drop LoopEndOperator._consumed_state: it was redundant with _loop_table,
  which is None exactly until run_update consumes a matching state. condition()
  now short-circuits on `self._loop_table is None`.

Behavior unchanged; test_loop_operators / test_main_loop / test_output_manager
stay green (the one unit test that pinned the flag now pins the equivalent
_loop_table state). Net fewer lines and no duplicated logic.
The "run a workflow to COMPLETED, then read each operator's materialized
RESULT document" skeleton was copy-pasted three times: DataProcessingSpec
.executeWorkflow, TestUtils.shouldReconfigure, and LoopIntegrationSpec's
materialized-row-count helper.

Extract two shared helpers in TestUtils:
- readMaterializedResults(executionId, operatorIds, extract): resolve each
  operator's external RESULT uri, open the document, apply extract; omit
  operators with no uri.
- runWorkflowAndReadResults(system, workflow, operatorIds, extract, timeout):
  run to COMPLETED (FatalError aborts via the completion await), then read.

Route all three callers through them:
- DataProcessingSpec.executeWorkflow -> runWorkflowAndReadResults(..., _.get().toList)
- LoopIntegrationSpec -> runWorkflowAndReadResults(..., _.getCount) (keyed by
  OperatorIdentity now, not the id string)
- shouldReconfigure -> readMaterializedResults for its read block (it keeps
  its own pause/reconfigure/resume run loop)

Drops the now-dead imports from both specs. Net ~50 fewer lines and one copy
of the harness instead of three.

Verified locally: DataProcessingSpec + ReconfigurationSpec (18 tests) pass via
the shared helpers; LoopIntegrationSpec uses the identical harness and is
verified in the amber-integration CI job.
…it, trim docstrings

Line-count reduction pass over the loop code (no behavior change):

- Extract an abstract LoopOpDesc base for LoopStartOpDesc / LoopEndOpDesc: the
  shared getPhysicalOp / operatorInfo / requiresMaterializedExecution (and its
  comment) move to the base; each leaf keeps only its @JsonProperty fields,
  name/description, the generated Python template, and -- for LoopEnd --
  reusesOutputStorage = true.
- output_manager.set_up_port_storage_writer: fold the identical result/state
  writer setup into a local start_writer helper; close_port_storage_writers
  loops over both registries instead of four hand-rolled loops.
- main_loop: extract _emit_batches for the duplicated DataElement -> output
  queue block (process_input_tuple + _emit_and_save_state).
- operator.py: condense the LoopStart / LoopEnd class docstrings (drop the
  method-by-method Lifecycle narration the inline comments already cover, fold
  the duplicated "Reserved names" section into a one-line _RESERVED_STATE_KEYS
  pointer); keep the subclass contract and the reviewer-driven invariants.

Verified: test_loop_operators + test_main_loop + test_output_manager (51) pass,
ruff clean; LoopStartOpDescSpec + LoopEndOpDescSpec (29) pass, scalafmt clean.
…ent calls

More line-count cleanup (the deferred items from the prior review; no behavior
change):

- test_main_loop: hoist the inline `StubLoopEnd(condition -> False)` (repeated
  across 4 tests) to a module-level `_FalseLoopEnd`, and the duplicated
  `_Reader`/`_R` materialization-reader stub (3 copies) to a module-level
  `_MatReader`.
- test_loop_operators: extract `_ipc_one_row()` for the one-row Arrow-IPC table
  literal built inline in 4 LoopEnd tests.
- RegionExecutionCoordinator.createOutputPortStorageObjects: provision the
  result and state documents in one `Seq(...).foreach` instead of two
  near-identical `provisionOutputDocument` calls.

Verified: test_loop_operators + test_main_loop + test_output_manager (51) pass,
ruff clean; amber Test/compile + scalafmt clean.
@aglinxinyuan

Copy link
Copy Markdown
Contributor Author

Superseded by #5700, which re-opens this from my fork (aglinxinyuan/texera) to satisfy the requirement that contributions come from a fork rather than a branch on the main repo. The code is identical and the labels/CI carry over. Continuing on #5700 — the full review discussion here remains for reference. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce for loop

6 participants