Skip to content

feat(scheduling): reuse output storage across region re-executions#5707

Merged
aglinxinyuan merged 15 commits into
apache:mainfrom
aglinxinyuan:loop-reuse-output-storage
Jun 17, 2026
Merged

feat(scheduling): reuse output storage across region re-executions#5707
aglinxinyuan merged 15 commits into
apache:mainfrom
aglinxinyuan:loop-reuse-output-storage

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Adds an opt-in mechanism for an output port to reuse its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR.

  • OutputPort gains a reuseStorage: Boolean proto field (alongside blocking / mode). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop.
  • DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …) is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch.
  • RegionExecutionCoordinator reads each output port's reuseStorage flag while provisioning that port's result/state documents and routes through createOrReuseDocument.
port flag region re-run behavior
false (every operator today) recreate output/state documents — unchanged
true (set by Loop End in the loop PR) keep and reopen the existing documents

A runtime guard in RegionExecutionCoordinator asserts no port sets reuseStorage for now: the flag activates only with the loop operators, which are not yet on main. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land.

Any related issues, documentation, discussions?

Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.

How was this PR tested?

  • DocumentFactorySpec — pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend.
  • OutputPortReuseFlagSpec — guards that no registered operator enables reuseStorage on any output port.
  • WorkflowCore / WorkflowOperator / WorkflowExecutionService compile; scalafmt + scalafix clean.

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false)
plus a `withReusesOutputStorageOnReExecution` builder. When set,
`RegionExecutionCoordinator` reuses an operator's existing iceberg output and
state documents on a region re-run instead of recreating them, via a new pure
`provisionOutputDocument` decision function unit-tested by
`RegionOutputProvisioningSpec`.

The flag is named for the behavior the scheduler checks, not the operator that
sets it, so any future operator needing output accumulated across region
re-executions can opt in. No operator sets it yet, so the path is dormant and
behavior-preserving.

Split out of the loop-operators PR (apache#5700) to keep that PR
reviewable; the loop feature will set the flag on Loop End.
@codecov-commenter

codecov-commenter commented Jun 14, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 25.00000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 52.96%. Comparing base (b7fe06e) to head (fe81369).

Files with missing lines Patch % Lines
...he/texera/amber/core/storage/DocumentFactory.scala 25.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5707      +/-   ##
============================================
- Coverage     53.18%   52.96%   -0.23%     
+ Complexity     2659     2647      -12     
============================================
  Files          1095     1094       -1     
  Lines         42374    42343      -31     
  Branches       4559     4557       -2     
============================================
- Hits          22535    22425     -110     
- Misses        18509    18604      +95     
+ Partials       1330     1314      -16     
Flag Coverage Δ *Carryforward flag
access-control-service 70.44% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from 3242ac9
amber 53.12% <25.00%> (-0.40%) ⬇️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 56.71% <ø> (ø)
file-service 57.06% <ø> (ø)
frontend 47.93% <ø> (-0.07%) ⬇️ Carriedforward from 3242ac9
pyamber 89.84% <ø> (-0.29%) ⬇️ Carriedforward from 3242ac9
python 90.80% <ø> (ø) Carriedforward from 3242ac9
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an opt-in mechanism for operators to preserve their output/state storage across region re-executions (e.g., loop iterations), by adding a flag on PhysicalOp and updating the region scheduler’s output-document provisioning logic to conditionally reuse existing documents. It also adds focused unit tests for the new create-vs-reuse decision function.

Changes:

  • Add reusesOutputStorageOnReExecution: Boolean and a withReusesOutputStorageOnReExecution builder to PhysicalOp.
  • Extract output document provisioning decision into RegionExecutionCoordinator.provisionOutputDocument and use it when provisioning per-output-port result/state documents.
  • Add RegionOutputProvisioningSpec unit tests covering the reuse×exists matrix and the non-reuse short-circuit.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala Adds a new operator-level flag + builder to signal output-storage reuse across region re-executions.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Adds a pure provisioning decision function and uses it to create-or-reuse result/state documents per output port based on the owning operator’s flag.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala Introduces unit tests validating the provisioning decision logic without needing an Iceberg backend.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions

github-actions Bot commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 2 better · 🔴 4 worse · ⚪ 9 noise (<±5%) · 0 without baseline

Compared against main b7fe06e benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 423 0.258 21,278/37,235/37,235 us 🔴 +17.7% / 🟢 -10.5%
bs=100 sw=10 sl=64 955 0.583 104,697/131,056/131,056 us ⚪ within ±5% / 🟢 +7.1%
🟢 bs=1000 sw=10 sl=64 1,127 0.688 898,715/937,408/937,408 us 🟢 -5.6% / 🟢 -8.4%
Baseline details

Latest main b7fe06e from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 423 tuples/sec 451 tuples/sec 410.82 tuples/sec -6.2% +3.0%
bs=10 sw=10 sl=64 MB/s 0.258 MB/s 0.275 MB/s 0.251 MB/s -6.2% +2.9%
bs=10 sw=10 sl=64 p50 21,278 us 21,592 us 23,785 us -1.5% -10.5%
bs=10 sw=10 sl=64 p95 37,235 us 31,640 us 34,980 us +17.7% +6.4%
bs=10 sw=10 sl=64 p99 37,235 us 31,640 us 34,980 us +17.7% +6.4%
bs=100 sw=10 sl=64 throughput 955 tuples/sec 963 tuples/sec 891.94 tuples/sec -0.8% +7.1%
bs=100 sw=10 sl=64 MB/s 0.583 MB/s 0.588 MB/s 0.544 MB/s -0.9% +7.1%
bs=100 sw=10 sl=64 p50 104,697 us 102,143 us 112,277 us +2.5% -6.8%
bs=100 sw=10 sl=64 p95 131,056 us 126,908 us 139,802 us +3.3% -6.3%
bs=100 sw=10 sl=64 p99 131,056 us 126,908 us 139,802 us +3.3% -6.3%
bs=1000 sw=10 sl=64 throughput 1,127 tuples/sec 1,093 tuples/sec 1,041 tuples/sec +3.1% +8.3%
bs=1000 sw=10 sl=64 MB/s 0.688 MB/s 0.667 MB/s 0.635 MB/s +3.1% +8.3%
bs=1000 sw=10 sl=64 p50 898,715 us 912,360 us 972,714 us -1.5% -7.6%
bs=1000 sw=10 sl=64 p95 937,408 us 992,709 us 1,023,057 us -5.6% -8.4%
bs=1000 sw=10 sl=64 p99 937,408 us 992,709 us 1,023,057 us -5.6% -8.4%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,473.01,200,128000,423,0.258,21278.28,37235.25,37235.25
1,100,10,64,20,2094.76,2000,1280000,955,0.583,104697.14,131056.37,131056.37
2,1000,10,64,20,17753.34,20000,12800000,1127,0.688,898715.39,937408.04,937408.04

Add a PhysicalOp builder test alongside the existing withParallelizable case,
exercising the previously-uncovered `this.copy(...)` line that Codecov flagged
on apache#5707 (patch coverage 85.71%, 1 missing line). Asserts the
default false, the flipped value, and immutability of the original instance.

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left comments inline!

…utputPort

Address @Yicong-Huang's review on apache#5707:
- Move the create-or-reuse decision out of RegionExecutionCoordinator into
  DocumentFactory.createOrReuseDocument -- it is storage-layer logic, not
  scheduling.
- Move the reuse flag off PhysicalOp onto the OutputPort proto, alongside the
  existing per-port `blocking`/`mode`; storage behavior is port-specific. The
  coordinator now reads it per output port and maintains no reuse state itself.
- Relocate the unit test to DocumentFactorySpec.

Per-port differentiation is still required (answering the "why not reuse for
all?" question): the loop back-edge re-executes LoopStart and every loop-body
operator on the same event as LoopEnd, but only LoopEnd accumulates -- the
others must recreate a fresh document each iteration.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
…ith apache#5707)

apache#5707 redesigned the reuse mechanism per review: the flag moved from
PhysicalOp.reusesOutputStorageOnReExecution onto OutputPort.reusesOutputStorage,
and the create-or-reuse decision moved out of RegionExecutionCoordinator into
DocumentFactory.createOrReuseDocument. Apply the same change here so loop-feb
stays internally consistent and rebases cleanly once apache#5707 lands:

  - mechanism files (workflow.proto, DocumentFactory, RegionExecutionCoordinator,
    PhysicalOp, DocumentFactorySpec) brought in line with apache#5707; drop the old
    RegionOutputProvisioningSpec.
  - LoopOpDesc.getPhysicalOp now sets reusesOutputStorage on the operator's
    output port (true for Loop End) instead of the removed PhysicalOp builder.
  - LoopStart/EndOpDescSpec assert the port flag; comment references updated.
Moving the create-or-reuse decision out of RegionExecutionCoordinator (into
DocumentFactory) removed the only use of `java.net.URI` here; scalafix
RemoveUnused flagged the leftover import in CI. Drop it.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
Same lint fix as apache#5707: moving the create-or-reuse decision into
DocumentFactory removed the only use of `java.net.URI` in
RegionExecutionCoordinator; scalafix RemoveUnused flagged the leftover import
in the amber Lint CI step.
…torage

Per @Yicong-Huang's review on apache#5707: add a sanity check that
iterates every registered operator (OperatorMetadataGenerator.operatorTypeMap)
and asserts no output port sets reusesOutputStorage. No operator needs it yet
-- Loop End, the only one that will, is not on main -- so this catches an
unexpected/accidental enablement. To be updated to allow Loop End's output
port (and only it) when the loop operators land.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
…guard

Follow-up to the apache#5707 guard (@Yicong-Huang). Declare the reusesOutputStorage
flag on LoopOpDesc's output port in operatorInfo (alongside where blocking/mode
live) instead of mapping it in getPhysicalOp, so it is declarative and the
cross-operator guard can see it. Add OutputPortReuseFlagSpec -- the
LoopEnd-allowing form of the apache#5707 guard: only Loop End may enable the flag;
every other operator's output ports must have it false.

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left minor comments inline.

… the flag

Address @Yicong-Huang's review on apache#5707:
- DocumentFactory.createOrReuseDocument now returns the VirtualDocument (opened
  when reused, created otherwise) instead of a Boolean, so the call site need
  not branch on create-vs-reuse.
- RegionExecutionCoordinator adds a runtime require() that no output port sets
  reusesOutputStorage -- a production guard, since the flag only activates with
  the loop operators (not on main). Remove/relax it when introducing them.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
apache#5707

apache#5707 changed DocumentFactory.createOrReuseDocument to return the
VirtualDocument (opened when reused, created otherwise) instead of a Boolean.
Sync that here. loop-feb deliberately omits apache#5707's production require-guard on
reusesOutputStorage, since the loop operators legitimately set the flag.

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the doc and PR description to reflect the final code: I think better to avoid mentioning alternative solutions that have been reviewed and decided to change.

…mments

Address review nits on apache#5707:
- rename the proto field OutputPort.reusesOutputStorage -> reuseStorage
  (it already lives on OutputPort, so the "Output" qualifier is redundant)
- drop the "Either way the caller gets the document..." sentence from the
  createOrReuseDocument scaladoc
- reword RegionExecutionCoordinator comments to describe the final behavior
  instead of contrasting alternatives ("rather than")
@aglinxinyuan

Copy link
Copy Markdown
Contributor Author

Also refreshed the PR description to match the final code (OutputPort.reuseStorage + DocumentFactory.createOrReuseDocument) and dropped the alternative-solution wording, per your note on the approval. Thanks for the review!

aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 16, 2026
 review polish

Keep loop-feb consistent with the apache#5707 review changes: proto field
OutputPort.reusesOutputStorage -> reuseStorage, the createOrReuseDocument
scaladoc, and the RegionExecutionCoordinator comment wording. Also rename
the LoopOpDesc/LoopEndOpDesc reuseStorage toggle and the spec assertions
to match the new field name.
@aglinxinyuan aglinxinyuan enabled auto-merge June 16, 2026 22:40
@aglinxinyuan aglinxinyuan added this pull request to the merge queue Jun 16, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Jun 16, 2026
@aglinxinyuan aglinxinyuan added this pull request to the merge queue Jun 16, 2026
@aglinxinyuan aglinxinyuan removed this pull request from the merge queue due to a manual request Jun 16, 2026
@aglinxinyuan aglinxinyuan added this pull request to the merge queue Jun 17, 2026
Merged via the queue into apache:main with commit da99a35 Jun 17, 2026
33 of 35 checks passed
@aglinxinyuan aglinxinyuan deleted the loop-reuse-output-storage branch June 17, 2026 00:23
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 17, 2026
apache#5707 (reuse output storage) is now on main, so its files drop out of
loop-feb's diff (proto reuseStorage field, DocumentFactory.createOrReuseDocument
+ spec). Conflict resolutions:

- RegionExecutionCoordinator: keep loop-feb's version WITHOUT the
  require(!reuseStorage) production guard -- the loop operators legitimately
  set the flag, so the guard (correct for apache#5707 in isolation) must not fire here.
- OutputPortReuseFlagSpec: keep loop-feb's LoopEnd-allowing guard (only Loop
  End may set reuseStorage), superseding main's all-false form from apache#5707.
- test_state_materialization_e2e.py: keep loop-feb's 4-column-State version
  (loop_counter/loop_start_id/loop_start_state_uri as their own columns).
  NOTE: this reverts main's apache#5682 class-based-fixture refactor of that test;
  re-apply that structure on top of the 4-column semantics in the PR2 state PR.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…pache#5707)

### What changes were proposed in this PR?

Adds an opt-in mechanism for an output port to **reuse** its storage
when the owning operator's region re-executes, instead of recreating the
document each time. Dormant and behavior-preserving — no operator sets
the flag in this PR.

- `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside
`blocking` / `mode`). It marks a port whose output accumulates across
region re-executions — e.g. a Loop End port whose result builds up over
the iterations of its own loop.
- `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)`
is the create-or-reuse decision: when reuse is requested and a document
already exists it opens and returns that one; otherwise it creates a
fresh one. It always returns the document, so the call site does not
branch.
- `RegionExecutionCoordinator` reads each output port's `reuseStorage`
flag while provisioning that port's result/state documents and routes
through `createOrReuseDocument`.

| port flag | region re-run behavior |
|---|---|
| `false` (every operator today) | recreate output/state documents —
unchanged |
| `true` (set by Loop End in the loop PR) | keep and reopen the existing
documents |

A runtime guard in `RegionExecutionCoordinator` asserts no port sets
`reuseStorage` for now: the flag activates only with the loop operators,
which are not yet on `main`. The guard keeps the dormant reuse path from
being silently exercised before its consumer exists, and is removed when
the loop operators land.

### Any related issues, documentation, discussions?

Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](apache#4206 (review)).

### How was this PR tested?

- `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse ×
exists matrix plus the "no-reuse never probes existence" short-circuit)
with injected document stubs, no iceberg backend.
- `OutputPortReuseFlagSpec` — guards that no registered operator enables
`reuseStorage` on any output port.
- `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService`
compile; scalafmt + scalafix clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reuse an operator's output storage across region re-executions

4 participants