feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler#5720
Conversation
…submission Add a general LogicalOp.requiresMaterializedExecution flag (default false) and WorkflowExecutionService.validateExecutionMode, which rejects a non-MATERIALIZED submission when any operator requires materialized execution -- keyed off the flag, not a specific operator class. Default false keeps all existing behavior; operators opt in by overriding the flag. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the loop operators set the flag.
There was a problem hiding this comment.
Pull request overview
Adds an operator-declared capability flag to enforce execution-mode constraints at workflow submission time, preventing workflows that require materialization from being run in PIPELINED mode (avoiding UI/engine mode mismatches).
Changes:
- Added
LogicalOp.requiresMaterializedExecution: Boolean = falsefor operators to declare a MATERIALIZED-only requirement. - Introduced
WorkflowExecutionService.validateExecutionMode(operators, settings)to reject incompatible submissions with an actionable error message. - Added ScalaTest coverage for the default flag value and validate/reject/accept paths.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala | Adds requiresMaterializedExecution flag (default false) for operator-declared mode requirements. |
| amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala | Implements and invokes submission-time execution mode validation keyed off the new flag. |
| amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala | Adds unit tests covering the flag default and validation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5720 +/- ##
============================================
- Coverage 53.96% 53.91% -0.06%
- Complexity 2782 2783 +1
============================================
Files 1100 1099 -1
Lines 42588 42567 -21
Branches 4583 4578 -5
============================================
- Hits 22984 22949 -35
- Misses 18268 18285 +17
+ Partials 1336 1333 -3
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…e#5720) The requiresMaterializedExecution flag + WorkflowExecutionService.validateExecutionMode are split out to apache#5720. loop-feb keeps the flag (set on the loop operators via LoopOpDesc) and the submission-time validation call; drop the loop-based WorkflowExecutionServiceSpec (the split adds a stub-based one on main) and move the loop-flag coverage into LoopStart/EndOpDescSpec.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 417 | 0.254 | 24,096/28,749/28,749 us | 🟢 -7.6% / 🟢 -17.8% |
| 🔴 | bs=100 sw=10 sl=64 | 947 | 0.578 | 103,416/135,293/135,293 us | 🔴 +5.9% / 🟢 -7.9% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,107 | 0.676 | 906,134/959,592/959,592 us | ⚪ within ±5% / 🟢 -6.8% |
Baseline details
Latest main 44df4f7 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 417 tuples/sec | 445 tuples/sec | 410.82 tuples/sec | -6.3% | +1.5% |
| bs=10 sw=10 sl=64 | MB/s | 0.254 MB/s | 0.272 MB/s | 0.251 MB/s | -6.6% | +1.3% |
| bs=10 sw=10 sl=64 | p50 | 24,096 us | 23,608 us | 23,785 us | +2.1% | +1.3% |
| bs=10 sw=10 sl=64 | p95 | 28,749 us | 31,112 us | 34,980 us | -7.6% | -17.8% |
| bs=10 sw=10 sl=64 | p99 | 28,749 us | 31,112 us | 34,980 us | -7.6% | -17.8% |
| bs=100 sw=10 sl=64 | throughput | 947 tuples/sec | 961 tuples/sec | 891.94 tuples/sec | -1.5% | +6.2% |
| bs=100 sw=10 sl=64 | MB/s | 0.578 MB/s | 0.586 MB/s | 0.544 MB/s | -1.4% | +6.2% |
| bs=100 sw=10 sl=64 | p50 | 103,416 us | 105,553 us | 112,277 us | -2.0% | -7.9% |
| bs=100 sw=10 sl=64 | p95 | 135,293 us | 127,787 us | 139,802 us | +5.9% | -3.2% |
| bs=100 sw=10 sl=64 | p99 | 135,293 us | 127,787 us | 139,802 us | +5.9% | -3.2% |
| bs=1000 sw=10 sl=64 | throughput | 1,107 tuples/sec | 1,116 tuples/sec | 1,041 tuples/sec | -0.8% | +6.3% |
| bs=1000 sw=10 sl=64 | MB/s | 0.676 MB/s | 0.681 MB/s | 0.635 MB/s | -0.7% | +6.4% |
| bs=1000 sw=10 sl=64 | p50 | 906,134 us | 892,761 us | 972,714 us | +1.5% | -6.8% |
| bs=1000 sw=10 sl=64 | p95 | 959,592 us | 937,723 us | 1,023,057 us | +2.3% | -6.2% |
| bs=1000 sw=10 sl=64 | p99 | 959,592 us | 937,723 us | 1,023,057 us | +2.3% | -6.2% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,480.08,200,128000,417,0.254,24095.81,28748.53,28748.53
1,100,10,64,20,2111.39,2000,1280000,947,0.578,103415.58,135292.91,135292.91
2,1000,10,64,20,18062.18,20000,12800000,1107,0.676,906133.86,959591.51,959591.51|
had a discussion with @aglinxinyuan, two comments:
would suffice. Note that this simple consumer can help reviewer to understand how this property will be used. |
… + consume in scheduler Per @Yicong-Huang's review on apache#5720: - It is a physical-execution property, so move the flag from LogicalOp to PhysicalOp (+ a withRequiresMaterializedExecution builder). - Give it a real consumer in the scheduler: CostBasedScheduleGenerator forces a fully-materialized schedule when any physical op requires it, instead of validating at submission. Drops WorkflowExecutionService.validateExecutionMode and its spec. Default false -> dormant and behavior-preserving (no operator requires it yet); the loop operators set it on their PhysicalOp.
|
Addressed both points in
Default |
…ion on PhysicalOp apache#5720 was redesigned: the flag moved LogicalOp -> PhysicalOp and the consumer moved from WorkflowExecutionService.validateExecutionMode to the scheduler (CostBasedScheduleGenerator). Match that here: - mechanism files (PhysicalOp flag + builder, CostBasedScheduleGenerator consumer, WorkflowCoreTypesSpec test) brought in line with apache#5720; WorkflowExecutionService reverted (validateExecutionMode dropped). - LoopOpDesc.getPhysicalOp sets requiresMaterializedExecution on the physical op (Loop Start and Loop End); dropped the LogicalOp-level override. - LoopStart/EndOpDescSpec assert the flag via getPhysicalOp.
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM, left one comment inline
…onsumer Address @Yicong-Huang's review on apache#5720: extract the schedule-mode decision in CostBasedScheduleGenerator into a testable `effectiveExecutionMode` helper (forces MATERIALIZED when any physical op requires it, otherwise the requested mode) and add CostBasedScheduleGeneratorSpec cases covering both branches.
…rom apache#5720 apache#5720 extracted the schedule-mode decision into CostBasedScheduleGenerator.effectiveExecutionMode and added test coverage. Sync the helper refactor + tests here so the files match apache#5720 (loop ops set the flag, so the helper forces a materialized schedule for loop workflows).
|
Thanks. Let's get @Xiao-zhen-Liu's approval before merging |
|
@Xiao-zhen-Liu Please chime in. |
|
@Xiao-zhen-Liu please chime in. this PR is getting old. |
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
LGTM in general, left a few minor comments.
…omment Address review (Xiao-zhen-Liu): - Reword the PhysicalOp.requiresMaterializedExecution doc to say the scheduler runs the WHOLE workflow fully materialized when any op sets it (not just that op's region boundaries), and note this is the minimal correct behavior for loops today, with per-op narrowing as a possible future optimization. - Drop the inline comment in createRegionDAG that duplicated the effectiveExecutionMode helper scaladoc and the field doc.
Automated Reviewer SuggestionsBased on the
|
…erialization) Keep loop-feb's PhysicalOp + CostBasedScheduleGenerator identical to apache#5720 after its review cleanup (reworded requiresMaterializedExecution doc + dropped the duplicate inline comment), so they drop cleanly when apache#5720 merges.
…ed by the scheduler (apache#5720) ### What changes were proposed in this PR? Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it: - `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+ a `withRequiresMaterializedExecution` builder). It is a physical-execution property, so it lives on the physical op. - `CostBasedScheduleGenerator` consumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged. Default `false` ⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op. ### Any related issues, documentation, discussions? Resolves apache#5719 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700. Reflects the review discussion with @Yicong-Huang: the property belongs on `PhysicalOp`, and it is consumed by the scheduler. ### How was this PR tested? `WorkflowCoreTypesSpec` covers the `PhysicalOp.requiresMaterializedExecution` default + builder. `WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and `scalafmtCheckAll` pass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
What changes were proposed in this PR?
Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it:
PhysicalOpgainsrequiresMaterializedExecution: Boolean = false(+ awithRequiresMaterializedExecutionbuilder). It is a physical-execution property, so it lives on the physical op.CostBasedScheduleGeneratorconsumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged.Default
false⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op.Any related issues, documentation, discussions?
Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of #5700. Reflects the review discussion with @Yicong-Huang: the property belongs on
PhysicalOp, and it is consumed by the scheduler.How was this PR tested?
WorkflowCoreTypesSpeccovers thePhysicalOp.requiresMaterializedExecutiondefault + builder.WorkflowExecutionService/Test/compile,scalafixAll --check, andscalafmtCheckAllpass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.