refactor(execution-service): consolidate init-error reporting into WorkflowExecutionService#5922
Conversation
…rkflowExecutionService Per apache#5921 (follow-up to apache#5781): make error reporting a single site owned by WorkflowExecutionService instead of the two-phase split in WorkflowService.initExecutionService. - WorkflowExecutionService registers its fatalErrors -> WorkflowErrorEvent diff handler as the FIRST construction action. Construction does no external work and cannot throw (workflowSettings assignment, WebsocketInput creation, handler registration); all throwing work is in executeWorkflow(), which runs after the execution is published, so its failures surface through this same handler. - WorkflowService.initExecutionService drops the pre-publish errorSubject fallback (reportFatalErrorsToSubscribers + the executionPublished gating); the catch is simply errorHandler(e). Removes the now-unused errorSubject field and its connect() subscription. - Remove WorkflowServiceSpec (it only tested the removed reportFatalErrorsToSubscribers); the behavior is exercised by the integration/e2e suites. Resolves apache#5921.
Automated Reviewer SuggestionsBased on the
|
There was a problem hiding this comment.
Pull request overview
This PR refactors Amber’s workflow execution initialization error surfacing to use a single reporting path owned by WorkflowExecutionService (the execution metadata-store diff handler), removing the pre-publish websocket fallback previously implemented in WorkflowService.
Changes:
- Removed
WorkflowService’s pre-publisherrorSubjectfallback and simplified init-time exception handling to rely on the metadata-store diff handler. - Reordered
WorkflowExecutionServiceconstruction to register the fatal-error/state diff handler before other initialization steps. - Deleted
WorkflowServiceSpecwhich only covered the removed fallback helper.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala | Removes errorSubject + pre-publish fallback logic; relies on metadata-store diff handler for fatal error surfacing. |
| amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala | Registers metadata-store diff handler as the first constructor action to ensure fatal error/state events always have an emitter. |
| amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala | Removes unit tests tied to the deleted fallback method. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 426 | 0.26 | 24,060/30,872/30,872 us | 🔴 +12.9% / 🔴 +101.3% |
| 🔴 | bs=100 sw=10 sl=64 | 907 | 0.554 | 107,931/153,813/153,813 us | 🟢 -7.7% / 🔴 +39.1% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,120 | 0.683 | 899,255/952,462/952,462 us | ⚪ within ±5% / 🟢 +14.3% |
Baseline details
Latest main 16a3ddf from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 426 tuples/sec | 456 tuples/sec | 757.16 tuples/sec | -6.6% | -43.7% |
| bs=10 sw=10 sl=64 | MB/s | 0.26 MB/s | 0.279 MB/s | 0.462 MB/s | -6.8% | -43.7% |
| bs=10 sw=10 sl=64 | p50 | 24,060 us | 21,301 us | 12,971 us | +12.9% | +85.5% |
| bs=10 sw=10 sl=64 | p95 | 30,872 us | 33,174 us | 15,333 us | -6.9% | +101.3% |
| bs=10 sw=10 sl=64 | p99 | 30,872 us | 33,174 us | 18,732 us | -6.9% | +64.8% |
| bs=100 sw=10 sl=64 | throughput | 907 tuples/sec | 943 tuples/sec | 957.66 tuples/sec | -3.8% | -5.3% |
| bs=100 sw=10 sl=64 | MB/s | 0.554 MB/s | 0.575 MB/s | 0.585 MB/s | -3.7% | -5.2% |
| bs=100 sw=10 sl=64 | p50 | 107,931 us | 101,067 us | 103,982 us | +6.8% | +3.8% |
| bs=100 sw=10 sl=64 | p95 | 153,813 us | 166,581 us | 110,583 us | -7.7% | +39.1% |
| bs=100 sw=10 sl=64 | p99 | 153,813 us | 166,581 us | 118,369 us | -7.7% | +29.9% |
| bs=1000 sw=10 sl=64 | throughput | 1,120 tuples/sec | 1,108 tuples/sec | 979.6 tuples/sec | +1.1% | +14.3% |
| bs=1000 sw=10 sl=64 | MB/s | 0.683 MB/s | 0.676 MB/s | 0.598 MB/s | +1.0% | +14.2% |
| bs=1000 sw=10 sl=64 | p50 | 899,255 us | 900,555 us | 1,024,553 us | -0.1% | -12.2% |
| bs=1000 sw=10 sl=64 | p95 | 952,462 us | 972,396 us | 1,063,789 us | -2.0% | -10.5% |
| bs=1000 sw=10 sl=64 | p99 | 952,462 us | 972,396 us | 1,096,239 us | -2.0% | -13.1% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,469.53,200,128000,426,0.260,24059.55,30872.20,30872.20
1,100,10,64,20,2204.62,2000,1280000,907,0.554,107930.87,153813.25,153813.25
2,1000,10,64,20,17860.97,20000,12800000,1120,0.683,899255.00,952462.43,952462.43
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5922 +/- ##
============================================
- Coverage 56.64% 56.62% -0.02%
+ Complexity 3028 3017 -11
============================================
Files 1124 1124
Lines 43298 43291 -7
Branches 4667 4666 -1
============================================
- Hits 24525 24513 -12
+ Misses 17335 17333 -2
- Partials 1438 1445 +7
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Need test coverage ;) |
Address review (Copilot + @Yicong-Huang): deleting WorkflowServiceSpec left the single-reporting-site invariant untested. Add WorkflowExecutionServiceSpec: - a recorded fatal error surfaces as a WorkflowErrorEvent through the metadata-store diff handler registered at construction (the regression guard for init-error surfacing); - a state change emits a WorkflowStateEvent (handler's other branch). The construction-unused controllerConfig/resultService are passed as null on purpose, so a future change that does external work during construction (which would reopen the pre-publish gap) fails this test.
|
Added test coverage in |
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM, left comments inline
…trim comments Address review feedback on apache#5922: - Simplify the init-error comments in WorkflowService / WorkflowExecutionService to drop the alternative-design framing (single reporting site / pre-publish fallback), keeping only the mechanism. - Add a regression test asserting fatal errors recorded at successive phases (mirroring WorkflowService's errorHandler, which the service invokes at the compile / runtime-creation / startWorkflow phases) all surface as WorkflowErrorEvents through the construction-registered diff handler. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…rkflowExecutionService (apache#5922) ### What changes were proposed in this PR? Consolidates execution init-error reporting into a **single site owned by `WorkflowExecutionService`**, replacing the two-phase split apache#5781 introduced (per @Yicong-Huang's suggestion, tracked in apache#5921). - `WorkflowExecutionService` now registers its `fatalErrors → WorkflowErrorEvent` (and state) diff handler as the **first** construction action. Construction itself does no external work and cannot throw — it only assigns `workflowSettings`, creates a `WebsocketInput` (a `PublishSubject`), and registers the handler. All throwing work lives in `executeWorkflow()`, which runs **after** `executionService.onNext(...)` publishes the execution, so any failure there is recorded by `errorHandler` into the metadata store and surfaced by the handler that `connectToExecution` forwards. - `WorkflowService.initExecutionService` drops the pre-publish `errorSubject` fallback: the `executionPublished` gating and `reportFatalErrorsToSubscribers` are gone, and the catch is simply `errorHandler(e)`. The now-unused `errorSubject` field and its `connect()` subscription are removed. - `WorkflowServiceSpec` is removed — it only tested the deleted `reportFatalErrorsToSubscribers`; the surfacing behavior is exercised by the integration/e2e suites. Net: a single reporting path (the metadata-store diff handler), with no change to the init-error surfacing apache#5781 added — construction is provably side-effect-free, so the pre-publish window no longer has a failure mode. ### Any related issues, documentation, discussions? Resolves apache#5921 — the follow-up refactor agreed during the apache#5781 review. ### How was this PR tested? `scalafmtCheckAll` clean. This is a behavior-preserving refactor of init-error reporting: the single reporting path is the metadata-store diff handler that already surfaced post-publish errors, and construction is side-effect-free so no error can escape it. The full compile, scalafix, and the integration/e2e suites that exercise init-error surfacing run in CI. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Reworks the loop back-edge write address per the apache#5900 review decision: the URI is constant per-execution config, not per-iteration data, so it no longer rides State rows / the StateFrame envelope (the merged 3-column format has no loop_start_state_uri). Instead the scheduler resolves it and ships it to workers at setup: - PhysicalOp gains a compile-time `isLoopStart` marker (set by LoopStartOpDesc, mirroring requiresMaterializedExecution). - WorkflowExecutionCoordinator derives {LoopStart logical op id -> state URI of its single input port} from the final resource-allocated schedule; the two single-port/single-reader guards that lived in the Python worker's _compute_loop_start_id move controller-side. - InitializeExecutorRequest gains `map<string,string> loopStartStateUris` (field 4), sent to every worker each region (re-)run; the Python handler stores it on Context. No static LoopStart<->LoopEnd pairing is needed: a Loop End selects the entry by the loop_start_id carried on the StateFrame it consumes (nested loops work unchanged). - MainLoop: _compute_loop_start_id shrinks to the id parse; _jump_to_loop_start looks the URI up from the injected config and fails loudly (before the jump RPC) if it is missing; the inner-LoopStart nested pass-through gate keys on frame.loop_start_id instead of the removed URI field. - LoopIntegrationSpec adopts the per-suite id isolation from apache#5888 (specId 5), and WorkflowService drops a leftover errorSubject call removed on main by apache#5922. Tests: reworked test_main_loop.py to the 3-arg emit convention and the config-driven jump (plus a new missing-URI fail-loud case), added InitializeExecutorHandler coverage for the new field, pinned isLoopStart true/false in the Loop Start/End descriptor specs.
What changes were proposed in this PR?
Consolidates execution init-error reporting into a single site owned by
WorkflowExecutionService, replacing the two-phase split #5781 introduced (per @Yicong-Huang's suggestion, tracked in #5921).WorkflowExecutionServicenow registers itsfatalErrors → WorkflowErrorEvent(and state) diff handler as the first construction action. Construction itself does no external work and cannot throw — it only assignsworkflowSettings, creates aWebsocketInput(aPublishSubject), and registers the handler. All throwing work lives inexecuteWorkflow(), which runs afterexecutionService.onNext(...)publishes the execution, so any failure there is recorded byerrorHandlerinto the metadata store and surfaced by the handler thatconnectToExecutionforwards.WorkflowService.initExecutionServicedrops the pre-publisherrorSubjectfallback: theexecutionPublishedgating andreportFatalErrorsToSubscribersare gone, and the catch is simplyerrorHandler(e). The now-unusederrorSubjectfield and itsconnect()subscription are removed.WorkflowServiceSpecis removed — it only tested the deletedreportFatalErrorsToSubscribers; the surfacing behavior is exercised by the integration/e2e suites.Net: a single reporting path (the metadata-store diff handler), with no change to the init-error surfacing #5781 added — construction is provably side-effect-free, so the pre-publish window no longer has a failure mode.
Any related issues, documentation, discussions?
Resolves #5921 — the follow-up refactor agreed during the #5781 review.
How was this PR tested?
scalafmtCheckAllclean. This is a behavior-preserving refactor of init-error reporting: the single reporting path is the metadata-store diff handler that already surfaced post-publish errors, and construction is side-effect-free so no error can escape it. The full compile, scalafix, and the integration/e2e suites that exercise init-error surfacing run in CI.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.