feat: add loop operators#4206
Conversation
237942f to
157156c
Compare
b7091cb to
157156c
Compare
473d756 to
45a51e1
Compare
e5d8e50 to
84bd376
Compare
|
@Xiao-zhen-Liu, please review the PR again. |
There was a problem hiding this comment.
Suggestion: consider splitting this PR
This is now 38 files / +2280 −94. It's mostly tests (good), but it bundles four separate pieces of work, and one of them — the change to how operator state is stored and sent — affects every stateful operator, not just loops. That part is hard to review alongside the loop feature, because approving the loops also means approving a change to all state handling.
This is only a suggestion. At a minimum I'd split it in two: the state-format change on its own, and the loop feature on top. If you want to go further, it splits cleanly into four:
PR 1 Worker-id helper (~48 lines) independent
get_operator_id + test (replaces the worker-name string parsing)
PR 2 Generalize how state is stored and sent (~250 lines)
4-column State schema (Scala + Python) + bytes-in-JSON +
StateFrame + network sender/receiver + materialization reader
PR 3 Reuse output storage on a region re-run (~30 lines)
RegionExecutionCoordinator skip-create + a flag named for the
behavior + its test
PR 4 Loop operators (~700 + tests) builds on 1-3
LoopStart/EndOpDesc, operator.py loop classes, main_loop logic,
execution-mode validation, reset_storage, icons, tests
If you only do the two-way split, the state-format change is the part to split out: state written before this PR has the old one-column schema, so it needs backward-compatible reading and a Scala-side round-trip test — neither of which is here yet.
One process note first. Several resolved threads (11, 17, 19, 21, 22) were closed with replies pointing to fix commits (e6bea518f2, e281c61b4c, ca9e5ce8cc, 1848ce00fb, 540b7ba274, bbec98282e) that aren't in the branch (070a844). The code they describe isn't there — it looks lost in a rebase or never pushed. So those threads read as done, but the code still has the original issue. Could you re-push or confirm? I've re-opened the specifics inline.
How to split, given the history can't be rebased apart (300+ commits, many merges): branch off main and copy over one group of files at a time — the four pieces touch almost entirely separate files.
Addresses PR #4206 review feedback (#4206 (comment)): an earlier reply claimed reset_storage had been renamed to reset_output_storage, documented, and guarded, but that work was lost when the branch was rebased -- the method was back to a bare reset_storage with no docstring and no checks, and the reason truncation is safe lived only in the PR description. Re-apply on the consolidated branch: * Rename reset_storage -> reset_output_storage (matching main_loop's caller and the two __init__/set_up comments that referenced the intended name). * Add a docstring: what it does (drop+recreate the result AND state tables, reopen the writers), that it's called only by a Loop End worker once per iteration, and -- the rationale that previously lived only in the PR description -- WHY truncating live storage is safe: loops run in MATERIALIZED mode, so downstream operators don't read this output until the loop region completes, so no reader can observe an intermediate truncation. * Add the two previously-implicit precondition guards: exactly one output port, and the storage writer already set up (_storage_uri_base populated). Both raise a clear RuntimeError instead of silently resetting the wrong port or dereferencing None. * test_output_manager.py: new TestResetOutputStorage covering the happy path (close -> recreate result+state docs -> reopen writer) and both guard failures, with iceberg/thread collaborators mocked.
✅ No material benchmark regressions detected🟢 13 better · 🔴 0 worse · ⚪ 2 noise (<±5%) · 0 without baseline
Baseline detailsLatest main
Raw CSVconfig_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,451.00,200,128000,443,0.271,22281.94,33193.83,33193.83
1,100,10,64,20,2129.84,2000,1280000,939,0.573,104393.57,124887.02,124887.02
2,1000,10,64,20,18220.65,20000,12800000,1098,0.670,905235.65,984613.93,984613.93 |
…ermination Addresses PR #4206 review feedback (#4206 (comment)): both E2E loop tests only checked the workflow reached COMPLETED, so a counter bug that still terminated (e.g. off-by-one) would have passed. Each test now asserts the terminal LoopEnd's cumulative output-tuple count, captured from ExecutionStatsUpdate (keyed by logical op id; delivered before COMPLETED; the worker persists across the JumpToOperatorRegion re-executions so its output statistic accumulates across iterations). LoopEnd is an identity pass-through on data, so by conservation that count equals the number of rows that flowed through the loop -- i.e. the iteration count. * Single loop: assert terminal LoopEnd output == 3 (i advances 0,1,2, stops at 3). * Nested loop: the outer LoopStart now emits the WHOLE table (output = "table") instead of one row, so the inner loop genuinely iterates over 3 rows and the inner body runs 3 x 3 = 9 times (matching the Nested.Loop.json demo). Assert the terminal outer LoopEnd output == 9. The previous operators (outer output = table.iloc[i]) would have run the inner body only once per outer iteration -- 3 total, not the 9 the comment/PR description claimed. runToCompletion -> runAndGetOutputCounts, returning the per-operator output-count map so each test asserts the count it cares about.
…tion Addresses PR #4206 review feedback (#4206 (comment)): the PhysicalOp flag was named after the operator (isLoopEnd) rather than the behavior the scheduler actually checks -- 'keep this operator's output storage across a region re-run.' (This rename was made earlier but lost when the branch was rebased.) * PhysicalOp: isLoopEnd -> reusesOutputStorageOnReExecution (+ doc comment); withIsLoopEnd -> withReusesOutputStorageOnReExecution. * RegionExecutionCoordinator: the skip-recreate guard checks reusesOutputStorageOnReExecution; local val + comment reworded to the behavior. * LoopEndOpDesc sets .withReusesOutputStorageOnReExecution(true). * Loop specs + mixin updated to the new name. Now any future operator that must preserve its output across a region re-execution can set the flag without a LoopEnd-specific misnomer. Verified: WorkflowCore + WorkflowOperator compile and all 29 LoopStart/ LoopEnd op-desc specs pass. (The amber module's local compile is blocked by an unrelated pre-existing JOOQ issue -- PveManager references the virtual_environments table from #5577, absent from the un-migrated local DB; CI compiles it against a fresh schema.)
…nCoordinator Addresses the remaining half of PR #4206 review feedback (#4206 (comment)): the skip-create branch (reuse an existing output document on region re-run instead of clobbering it) was untested. (The rename half landed in 3d4f15b.) Extract the create-or-reuse decision out of the private createOutputPortStorageObjects into a pure companion method RegionExecutionCoordinator.provisionOutputDocument(uri, reuseExistingStorage, documentExists, createDocument) with the storage operations injected, so the four cases can be unit-tested without an iceberg backend or a live region. New RegionOutputProvisioningSpec pins: * reuse + existing document -> NOT recreated (createDocument not called), so accumulated loop output survives the re-run; * reuse + no document yet -> created (first iteration); * no-reuse + existing -> recreated/overwritten (fresh per run); * no-reuse + none -> created; * no-reuse short-circuits documentExists (always recreate, never probe). Verified the production change compiles (the only remaining amber compile errors are the pre-existing PveManager/virtual_environments JOOQ issue from #5577, unrelated to this change and resolved by a migrated DB in CI); the new spec is a pure ScalaTest unit with no iceberg/actor dependency.
… a class Addresses PR #4206 review feedback (#4206 (comment)): the execution-mode check used isInstanceOf[LoopStartOpDesc], which (1) tied a generic service to one operator class, (2) only matched LoopStart -- a plan with a LoopEnd but no LoopStart would skip the check -- and (3) had no test. (This was done earlier but lost when the branch was rebased.) * Add LogicalOp.requiresMaterializedExecution (default false); LoopStart and LoopEnd both override it to true. * Extract WorkflowExecutionService.validateExecutionMode(operators, settings) -- a pure, testable method that throws an IllegalArgumentException with an actionable message when the plan needs MATERIALIZED but the request asked for something else. Keyed off the flag, so the service no longer imports or matches LoopStartOpDesc, and Loop Start OR Loop End on its own triggers it. * The constructor delegates to it (still fail-loud, per the earlier decision). * New WorkflowExecutionServiceSpec: the flag is set on both loop ops and unset on a non-loop op (SleepOpDesc); validateExecutionMode rejects a loop+PIPELINED plan (incl. a LoopEnd-only plan), and accepts loop+MATERIALIZED, non-loop+PIPELINED, and an empty plan. Verified locally: 6 new tests pass, 29 LoopStart/LoopEnd op-desc specs pass, amber compiles.
…tate path Addresses PR #4206 review feedback (#4206 (comment)): the loop state path still (1) read the parent's private field via the mangled name self._TableOperator__table_data[port] -- silently breaking if TableOperator is renamed -- and (2) moved the table with pickle (dumps in LoopStart.produce_state_on_finish, loads in LoopEnd.run_update), an RCE surface for data that lives in iceberg. (Fixed earlier but lost in a force-rebase.) * Add TableOperator._buffered_table(port): inside the class self.__table_data resolves via normal name mangling, so subclasses read the buffer without the mangled prefix and a rename stays transparent. LoopStart.produce_state_on_finish uses it. * Replace pickle with Apache Arrow IPC (structured, typed, no callable payload): new table_to_ipc_bytes / table_from_ipc_bytes in table.py (pyarrow.ipc). produce_state_on_finish encodes; run_update decodes. Wire shape (bytes in state["table"]) is unchanged; only the format. * Tests: TestBufferedTableAccessor pins the accessor; the produce-state test now asserts the bytes parse as an Arrow IPC stream and round-trip back to the same tuples (would fail if pickle returned); the matching- branch tests feed Arrow bytes. 14 tests pass. Verified: 14 loop-operator tests pass; ruff clean. (The 1 failure in test_tuple.py::test_hash is a pre-existing Windows OSError in tuple.py hashing, unrelated to this change.)
…l site Addresses PR #4206 review feedback (#4206 (comment)): the reviewer read the reset_output_storage() call in process_input_state against the (stale) PR description and was unsure when it fires. The code is correct -- reset fires once per iteration for every LoopEnd on its matching-loop consume (loop_counter == 0); the nested pass-through (loop_counter > 0) is forwarded and returned in _process_state_frame before reaching this call, so it never resets. Add a comment at the call site stating this, matching the reset_output_storage docstring. No behavior change. (The PR description, which had the wrong claim, is corrected separately.)
Addresses PR #4206 review feedback (#4206 (comment)): eval_condition read self._loop_table / self.state, which only run_update assigns. MainLoop.complete() calls condition() on every LoopEnd, so a LoopEnd that finished without consuming a matching state -- an inner LoopEnd that only forwarded outer-loop pass-through state, or a loop with no matching-branch consume -- raised AttributeError. (The _StubLoopEnd test stub masked it by pre-seeding self.state.) * LoopEndOperator.__init__ now initializes self.state = {}, self._loop_table = None, and self._consumed_state = False (the class previously had no __init__; the generated ProcessLoopEndOperator has no __init__/open either, so it inherits this). * run_update sets self._consumed_state = True after the consume. * eval_condition returns False when nothing has been consumed -- the loop never iterated here, so don't fire the back-edge. Bare field init alone wasn't enough: the user condition (e.g. ) would then NameError on undefined loop variables. Tests: unmask _StubLoopEnd (drop its self.state = {} so it mirrors the generated operator and exercises the base __init__); add test_condition_returns_false_before_any_state_is_consumed (the reviewer scenario) and test_consumed_flag_flips_after_run_update. 16 loop + 33 main_loop/output_manager tests pass; no behavior change on the normal consume path.
…hing the worker thread complete() evaluates a Loop End's user-supplied condition() on the main loop thread, before close()/COMPLETED and outside DataProcessor's guarded executor session. A typo or undefined name in the condition propagated through run()'s logger.catch(reraise=True) and silently killed the worker thread, so the controller never learned of the error. Guard the call so a bad condition is reported the same way a UDF error is: record it on the exception manager, queue an ERROR console message, flush it, then enter EXCEPTION_PAUSE, skipping the loop-back edge and completion. Extract the console-message build out of DataProcessor._report_exception into a shared ConsoleMessageManager.report_exception so the data path and this main-loop path report identically. Add a MainLoop.complete() test: a Loop End whose condition() raises is reported and paused, with no loop-back and no completion.
…tion A Loop End's materialized output should accumulate the results of all of its own iterations; for a nested loop, the inner Loop End should accumulate only within the current outer iteration and drop its rows when the outer loop advances. That per-outer-iteration reset was never wired up: - reset_output_storage() was called from process_input_state, which only runs on the matching consume (loop_counter == 0), not the outer pass-through (loop_counter > 0) where the "outer advanced" signal arrives. - Worse, it sat under `if output_state is not None`, and a Loop End's generated process_state returns None (produce_state_on_finish is not overridden either), so output_state is always None for a Loop End -- the reset block was dead. The inner Loop End therefore accumulated across all outer iterations (9 rows in the 3x3 nested case instead of 3). Move the reset to the inner pass-through branch in _process_state_frame (loop_counter > 0). The input reader replays all states before any data each region execution, so the result/state tables still hold the previous outer iteration's rows when the outer boundary state passes through, and clearing there makes each outer iteration accumulate from empty. It fires exactly once per outer iteration because the inner LoopStart's output (and thus this pass-through) is recreated on every inner back-edge -- each loop operator is its own region, so the inner LoopStart's region does not carry reusesOutputStorageOnReExecution and its output is recreated per inner iteration. A single / outermost Loop End never sees loop_counter > 0, so it never resets and keeps all of its iterations. The Scala controller side (reusesOutputStorageOnReExecution) is unchanged; it provides the base per-loop accumulation that this reset carves the nested exception out of. Remove the dead consume-path reset call, and correct the reset_output_storage docstring, the call-site comment, and the RegionExecutionCoordinator comment to describe the actual (inner-Loop-End, per-outer-iteration) behavior. Tests: - Unit (test_main_loop): the inner pass-through (loop_counter > 0) triggers reset_output_storage once and does not invoke the operator; the consume path (loop_counter == 0, single loop) and a LoopStart pass-through never reset. - Integration (LoopIntegrationSpec): assert the materialized result row counts -- single loop = 3 (accumulate); nested inner Loop End = 3, not 9 (resets per outer iteration); nested outer Loop End = 9. The pre-existing cumulative output-tuple counts cannot distinguish accumulate from reset.
The materialized-result assertions added in e61681d failed deterministically in the amber-integration CI job (both LoopIntegrationSpec tests, on repeated attempts), while the workflow itself completed with the correct iteration counts and no FatalError. The read was the problem: it ran in the test body after client.shutdown() and threw a NoSuchElementException when an operator's external RESULT uri was absent. Restructure the read to match the working DataProcessingSpec pattern: - read each operator's materialized RESULT row count inside the ExecutionStateUpdate(COMPLETED) callback, while the engine and storage singletons are still alive; - tolerate operators whose external RESULT uri is absent (or whose iceberg document fails to open) by omitting them instead of throwing; - log the resolved per-operator materialized row counts, throughput counts, and any operators missing a RESULT uri, so a URI/count mismatch is visible in the CI output. Assertions are unchanged in intent (single LoopEnd = 3; nested outer = 9, inner = 3 after the per-outer-iteration reset) but now report expected-vs- actual. This is verified in the amber-integration CI job; the integration test cannot run locally on Windows (Python workers use signal.SIGKILL).
…roughput stats CI diagnostics (amber-integration on 735bd7b) showed the materialized RESULT row counts are exactly right -- single LoopEnd = 3, nested outer LoopEnd = 9, nested inner LoopEnd = 3 (reset per outer iteration) -- while the cumulative ExecutionStatsUpdate output counts were 1 and 3. So the long-standing failure was the throughput-based iteration-count assertions (counts == 3 / == 9, added in 962b6c1), not the materialized read. They rest on a false premise: that a loop operator's worker persists across JumpToOperatorRegion re-executions and accumulates its output stat. It does not -- each iteration spawns a fresh worker actor, so the per-logical-op statistic reflects only the final iteration's worker and never sums to the iteration count. The iceberg result, by contrast, persists across iterations (reusesOutputStorageOnReExecution), so it is the reliable signal. Drop the throughput assertions (and the now-unused ExecutionStatsUpdate capture) and assert on the materialized row counts instead, which correctly verify both accumulation (single = 3, outer = 9) and the inner LoopEnd's per-outer-iteration reset (inner = 3, not 9). Termination is still checked by the bounded Await on completion.
…t flag
A /simplify pass over the loop changes in operator.py:
- Extract `_strip_reserved(state)` -- the `{k: v ... if k not in
_RESERVED_STATE_KEYS}` comprehension was copy-pasted three times
(produce_state_on_finish + twice in run_update).
- Extract `_eval_loop_expr(expr, state, table)` -- LoopStart.eval_output and
LoopEnd.eval_condition were near-identical "exec the expression as `output`
in a throwaway namespace" evaluators.
- Drop LoopEndOperator._consumed_state: it was redundant with _loop_table,
which is None exactly until run_update consumes a matching state. condition()
now short-circuits on `self._loop_table is None`.
Behavior unchanged; test_loop_operators / test_main_loop / test_output_manager
stay green (the one unit test that pinned the flag now pins the equivalent
_loop_table state). Net fewer lines and no duplicated logic.
The "run a workflow to COMPLETED, then read each operator's materialized RESULT document" skeleton was copy-pasted three times: DataProcessingSpec .executeWorkflow, TestUtils.shouldReconfigure, and LoopIntegrationSpec's materialized-row-count helper. Extract two shared helpers in TestUtils: - readMaterializedResults(executionId, operatorIds, extract): resolve each operator's external RESULT uri, open the document, apply extract; omit operators with no uri. - runWorkflowAndReadResults(system, workflow, operatorIds, extract, timeout): run to COMPLETED (FatalError aborts via the completion await), then read. Route all three callers through them: - DataProcessingSpec.executeWorkflow -> runWorkflowAndReadResults(..., _.get().toList) - LoopIntegrationSpec -> runWorkflowAndReadResults(..., _.getCount) (keyed by OperatorIdentity now, not the id string) - shouldReconfigure -> readMaterializedResults for its read block (it keeps its own pause/reconfigure/resume run loop) Drops the now-dead imports from both specs. Net ~50 fewer lines and one copy of the harness instead of three. Verified locally: DataProcessingSpec + ReconfigurationSpec (18 tests) pass via the shared helpers; LoopIntegrationSpec uses the identical harness and is verified in the amber-integration CI job.
…it, trim docstrings Line-count reduction pass over the loop code (no behavior change): - Extract an abstract LoopOpDesc base for LoopStartOpDesc / LoopEndOpDesc: the shared getPhysicalOp / operatorInfo / requiresMaterializedExecution (and its comment) move to the base; each leaf keeps only its @JsonProperty fields, name/description, the generated Python template, and -- for LoopEnd -- reusesOutputStorage = true. - output_manager.set_up_port_storage_writer: fold the identical result/state writer setup into a local start_writer helper; close_port_storage_writers loops over both registries instead of four hand-rolled loops. - main_loop: extract _emit_batches for the duplicated DataElement -> output queue block (process_input_tuple + _emit_and_save_state). - operator.py: condense the LoopStart / LoopEnd class docstrings (drop the method-by-method Lifecycle narration the inline comments already cover, fold the duplicated "Reserved names" section into a one-line _RESERVED_STATE_KEYS pointer); keep the subclass contract and the reviewer-driven invariants. Verified: test_loop_operators + test_main_loop + test_output_manager (51) pass, ruff clean; LoopStartOpDescSpec + LoopEndOpDescSpec (29) pass, scalafmt clean.
…ent calls More line-count cleanup (the deferred items from the prior review; no behavior change): - test_main_loop: hoist the inline `StubLoopEnd(condition -> False)` (repeated across 4 tests) to a module-level `_FalseLoopEnd`, and the duplicated `_Reader`/`_R` materialization-reader stub (3 copies) to a module-level `_MatReader`. - test_loop_operators: extract `_ipc_one_row()` for the one-row Arrow-IPC table literal built inline in 4 LoopEnd tests. - RegionExecutionCoordinator.createOutputPortStorageObjects: provision the result and state documents in one `Seq(...).foreach` instead of two near-identical `provisionOutputDocument` calls. Verified: test_loop_operators + test_main_loop + test_output_manager (51) pass, ruff clean; amber Test/compile + scalafmt clean.
|
Superseded by #5700, which re-opens this from my fork (aglinxinyuan/texera) to satisfy the requirement that contributions come from a fork rather than a branch on the main repo. The code is identical and the labels/CI carry over. Continuing on #5700 — the full review discussion here remains for reference. Thanks! |
What changes were proposed in this PR?
Adds two new operators — Loop Start and Loop End — that let users write a for-loop inside a visual workflow. The loop iterates over rows of a pandas table. The user supplies four small Python snippets:
initializationi = 0outputtable.iloc[i]— the row passed into the loop body each iterationupdatei += 1conditioni < len(table)— keep looping while this is trueOperators placed between Loop Start and Loop End make up the loop body and run once per iteration. When
conditionreturns true, the runtime starts another iteration; when it returns false, downstream operators run on the accumulated output.How an iteration works
The arrow from Loop End back to Loop Start is not an edge in the workflow graph — the region DAG stays acyclic. The loop-back is done with two separate steps when an iteration ends:
jump_to_operator_region, asking the controller to schedule the Loop Start region one more time.i, any accumulators, and the pickled table — into the iceberg table that Loop Start reads from at the start of every iteration.The newly scheduled Loop Start region then picks up that state and runs the next iteration. The "iceberg table Loop Start reads from" is the same cross-region state channel introduced in #4490; this PR reuses it as the back-edge for loops.
What changed
LoopStartOpDesc.scala,LoopEndOpDesc.scalainitialization/output/update/conditionexpressionscore/models/operator.py—LoopStartOperator,LoopEndOperatorloop_counterand the state dictRegionExecutionCoordinator.scalaPhysicalOp.scala—reusesOutputStorageOnReExecution: Boolean+withReusesOutputStorageOnReExecutionMainLoop._attach_loop_start_id,_jump_to_loop_startOutputManager.reset_output_storagereset_output_storageexists only for the inner Loop End of a nested loop: it fires once per outer iteration, on the outer-loop pass-through (loop_counter > 0) in_process_state_frame, dropping and recreating that Loop End's result/state iceberg tables so the new outer iteration accumulates from empty instead of concatenating across outer iterations. A single / outermost Loop End never seesloop_counter > 0, so it never resets. Safe because loops run MATERIALIZED — downstream doesn't read until the loop region completes.WorkflowExecutionService.scalaIllegalArgumentExceptionwith an actionable message.WorkflowServicecatches it and routes it througherrorHandler→WorkflowFatalError→ the existing diff handler, so the frontend sees a clear "switch the mode and re-run" error instead of the UI and the engine silently disagreeing.LoopStart.png,LoopEnd.pngNested loops
Each state carries a
loop_counterinteger marking which loop's iteration the state belongs to. This is what keeps an inner Loop End from accidentally consuming an outer loop's state.LoopStart.process_stateLoopStartStateURIis set),loop_counter += 1and pass it through.LoopEnd.process_stateloop_counter > 0, decrement and pass it through (this state belongs to an outer loop).LoopEndloop_counter == 0, the state is mine: runupdate, evaluatecondition, jump back to my paired Loop Start.So when two loops are nested, the outer loop's state walks through the inner Loop Start (+1) and the inner Loop End (−1) untouched, arrives at the outer Loop End at
loop_counter == 0, and only there is it consumed.Any related issues, documentation, discussions?
Closes #4442. Builds on #4490 (cross-region state materialization) and #5085 (
DocumentFactory.documentExists).How was this PR tested?
LoopStartOpDescSpec.scala/LoopEndOpDescSpec.scala— code-gen output, ports,isLoopEndflag.test_loop_operators.py— runtime base classes: flat-loop matching branch (runsupdate/condition) and nested-loop pass-through (loop_counter +/− symmetric across an outer × inner traversal).sbt scalafmtCheckAll scalafixAll --checkandruff checkclean.Manual workflows
Input for both is a 3-row table from
TextInput("1\n2\n3"). Each loop's condition isi < len(table).TextInput → LoopStart → LoopEndTextInput → OuterLoopStart → InnerLoopStart → InnerLoopEnd → OuterLoopEndDemo:

Basic Loop:
Nested Loop:

Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF.