fix(execution-service): surface init-time fatal errors to the websocket#5781
Conversation
When execution initialization fails, WorkflowService.initExecutionService caught the error and recorded it into the execution metadata store via errorHandler, but never pushed it to errorSubject -- so connected frontend clients saw no error, especially for failures during WorkflowExecutionService construction (before the execution is published to subscribers). Push a WorkflowErrorEvent carrying the recorded fatal errors to errorSubject after errorHandler runs, so init-time failures surface in the UI. Extracted the push into reportFatalErrorsToSubscribers and added WorkflowServiceSpec (red/green: pins the event reaches connect() subscribers carrying the errors).
There was a problem hiding this comment.
Pull request overview
This PR ensures workflow execution init-time failures are surfaced to connected frontend clients by pushing recorded fatal errors to the workflow-level websocket channel (instead of only persisting/logging them when initialization fails before the execution is published).
Changes:
- In
WorkflowService.initExecutionService, after recording a fatal error viaerrorHandler, also emit aWorkflowErrorEventto websocket subscribers. - Extract the websocket emission into
reportFatalErrorsToSubscribersfor easier unit testing. - Add
WorkflowServiceSpecto verify the emittedWorkflowErrorEventcontains the fatal errors recorded in theExecutionStateStore.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala | Emits WorkflowErrorEvent with recorded fatal errors on init-time failure via a new helper method. |
| amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala | Adds unit tests that subscribe via connect() and validate reportFatalErrorsToSubscribers emits error events containing the store’s fatal errors. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 394 | 0.241 | 24,899/38,708/38,708 us | 🔴 +19.6% / 🔴 +10.7% |
| ⚪ | bs=100 sw=10 sl=64 | 936 | 0.572 | 104,567/123,632/123,632 us | ⚪ within ±5% / 🟢 -11.6% |
| 🟢 | bs=1000 sw=10 sl=64 | 1,105 | 0.675 | 897,648/962,014/962,014 us | 🟢 -5.8% / 🟢 -7.7% |
Baseline details
Latest main 0eb8427 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 394 tuples/sec | 459 tuples/sec | 410.82 tuples/sec | -14.2% | -4.1% |
| bs=10 sw=10 sl=64 | MB/s | 0.241 MB/s | 0.28 MB/s | 0.251 MB/s | -13.9% | -3.9% |
| bs=10 sw=10 sl=64 | p50 | 24,899 us | 21,462 us | 23,785 us | +16.0% | +4.7% |
| bs=10 sw=10 sl=64 | p95 | 38,708 us | 32,357 us | 34,980 us | +19.6% | +10.7% |
| bs=10 sw=10 sl=64 | p99 | 38,708 us | 32,357 us | 34,980 us | +19.6% | +10.7% |
| bs=100 sw=10 sl=64 | throughput | 936 tuples/sec | 979 tuples/sec | 891.94 tuples/sec | -4.4% | +4.9% |
| bs=100 sw=10 sl=64 | MB/s | 0.572 MB/s | 0.598 MB/s | 0.544 MB/s | -4.3% | +5.1% |
| bs=100 sw=10 sl=64 | p50 | 104,567 us | 102,021 us | 112,277 us | +2.5% | -6.9% |
| bs=100 sw=10 sl=64 | p95 | 123,632 us | 118,401 us | 139,802 us | +4.4% | -11.6% |
| bs=100 sw=10 sl=64 | p99 | 123,632 us | 118,401 us | 139,802 us | +4.4% | -11.6% |
| bs=1000 sw=10 sl=64 | throughput | 1,105 tuples/sec | 1,116 tuples/sec | 1,041 tuples/sec | -1.0% | +6.1% |
| bs=1000 sw=10 sl=64 | MB/s | 0.675 MB/s | 0.681 MB/s | 0.635 MB/s | -0.9% | +6.2% |
| bs=1000 sw=10 sl=64 | p50 | 897,648 us | 885,454 us | 972,714 us | +1.4% | -7.7% |
| bs=1000 sw=10 sl=64 | p95 | 962,014 us | 1,021,361 us | 1,023,057 us | -5.8% | -6.0% |
| bs=1000 sw=10 sl=64 | p99 | 962,014 us | 1,021,361 us | 1,023,057 us | -5.8% | -6.0% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,507.48,200,128000,394,0.241,24899.10,38707.66,38707.66
1,100,10,64,20,2135.79,2000,1280000,936,0.572,104567.01,123631.75,123631.75
2,1000,10,64,20,18096.22,20000,12800000,1105,0.675,897648.42,962013.94,962013.94
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5781 +/- ##
============================================
+ Coverage 54.55% 54.63% +0.07%
- Complexity 2914 3075 +161
============================================
Files 1109 1122 +13
Lines 42823 44298 +1475
Branches 4607 4750 +143
============================================
+ Hits 23362 24200 +838
- Misses 18097 18700 +603
- Partials 1364 1398 +34
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…al errors Address Copilot review: the assertions used contain/contain-allOf, which would pass even if extra fatal errors leaked into the event. Tighten both to theSameElementsAs so the test pins the exact contract (no more, no less).
Yicong-Huang
left a comment
There was a problem hiding this comment.
A question as such failure should go through the state store, and how come the existing logic cannot work for this case?
…is published Per review (Yicong): route fatal errors through the state-store path wherever it works. Once the execution is published via executionService.onNext, the metadata store's diff handler already emits a WorkflowErrorEvent that connectToExecution forwards, so the errorSubject push was redundant there (double-emit). Gate it to the pre-publish window -- the WorkflowExecutionService-constructor failure case, where no diff-handler emitter and no websocket subscriber exist yet, so the state-store path cannot reach the frontend.
Automated Reviewer SuggestionsBased on the
|
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM. Please record refactor plan.
|
another nit: change |
|
Done — updated the PR title to |
…et (#5781) ### What changes were proposed in this PR? When workflow execution initialization fails, the error was recorded into the execution metadata store but never pushed to the websocket, so connected frontend clients saw nothing — particularly for failures during `WorkflowExecutionService` construction, which happens *before* the execution is published to subscribers. `WorkflowService.initExecutionService`'s catch arm now, after `errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent` (carrying the recorded fatal errors) to `errorSubject` — the workflow-level channel that `connect()` subscribers listen on — so init-time failures surface in the UI. | init failure | before | after | |---|---|---| | during `WorkflowExecutionService` construction (pre-publish) | logged + stored, invisible to the UI | `WorkflowErrorEvent` delivered to the frontend | | during `executeWorkflow()` | recorded; UI delivery depended on subscription timing | `WorkflowErrorEvent` delivered to the frontend | The push is extracted into a small `reportFatalErrorsToSubscribers` method so it can be unit-tested without a database (the init path itself is DB-bound). ### Any related issues, documentation, discussions? Resolves #5782. Discovered while splitting #5700 (loop operators) into smaller PRs; this fix is independent of that feature and applies to `main` on its own. ### How was this PR tested? New `WorkflowServiceSpec` (TDD, red → green): pins that `reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a `connect()` subscriber carrying exactly the fatal errors recorded in the execution state store (single error, and all errors when several are present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"` passes (2/2); scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. (backported from commit 1c580e5)
|
Backport to |
…rkflowExecutionService (apache#5922) ### What changes were proposed in this PR? Consolidates execution init-error reporting into a **single site owned by `WorkflowExecutionService`**, replacing the two-phase split apache#5781 introduced (per @Yicong-Huang's suggestion, tracked in apache#5921). - `WorkflowExecutionService` now registers its `fatalErrors → WorkflowErrorEvent` (and state) diff handler as the **first** construction action. Construction itself does no external work and cannot throw — it only assigns `workflowSettings`, creates a `WebsocketInput` (a `PublishSubject`), and registers the handler. All throwing work lives in `executeWorkflow()`, which runs **after** `executionService.onNext(...)` publishes the execution, so any failure there is recorded by `errorHandler` into the metadata store and surfaced by the handler that `connectToExecution` forwards. - `WorkflowService.initExecutionService` drops the pre-publish `errorSubject` fallback: the `executionPublished` gating and `reportFatalErrorsToSubscribers` are gone, and the catch is simply `errorHandler(e)`. The now-unused `errorSubject` field and its `connect()` subscription are removed. - `WorkflowServiceSpec` is removed — it only tested the deleted `reportFatalErrorsToSubscribers`; the surfacing behavior is exercised by the integration/e2e suites. Net: a single reporting path (the metadata-store diff handler), with no change to the init-error surfacing apache#5781 added — construction is provably side-effect-free, so the pre-publish window no longer has a failure mode. ### Any related issues, documentation, discussions? Resolves apache#5921 — the follow-up refactor agreed during the apache#5781 review. ### How was this PR tested? `scalafmtCheckAll` clean. This is a behavior-preserving refactor of init-error reporting: the single reporting path is the metadata-store diff handler that already surfaced post-publish errors, and construction is side-effect-free so no error can escape it. The full compile, scalafix, and the integration/e2e suites that exercise init-error surfacing run in CI. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
…rkflowExecutionService (apache#5922) ### What changes were proposed in this PR? Consolidates execution init-error reporting into a **single site owned by `WorkflowExecutionService`**, replacing the two-phase split apache#5781 introduced (per @Yicong-Huang's suggestion, tracked in apache#5921). - `WorkflowExecutionService` now registers its `fatalErrors → WorkflowErrorEvent` (and state) diff handler as the **first** construction action. Construction itself does no external work and cannot throw — it only assigns `workflowSettings`, creates a `WebsocketInput` (a `PublishSubject`), and registers the handler. All throwing work lives in `executeWorkflow()`, which runs **after** `executionService.onNext(...)` publishes the execution, so any failure there is recorded by `errorHandler` into the metadata store and surfaced by the handler that `connectToExecution` forwards. - `WorkflowService.initExecutionService` drops the pre-publish `errorSubject` fallback: the `executionPublished` gating and `reportFatalErrorsToSubscribers` are gone, and the catch is simply `errorHandler(e)`. The now-unused `errorSubject` field and its `connect()` subscription are removed. - `WorkflowServiceSpec` is removed — it only tested the deleted `reportFatalErrorsToSubscribers`; the surfacing behavior is exercised by the integration/e2e suites. Net: a single reporting path (the metadata-store diff handler), with no change to the init-error surfacing apache#5781 added — construction is provably side-effect-free, so the pre-publish window no longer has a failure mode. ### Any related issues, documentation, discussions? Resolves apache#5921 — the follow-up refactor agreed during the apache#5781 review. ### How was this PR tested? `scalafmtCheckAll` clean. This is a behavior-preserving refactor of init-error reporting: the single reporting path is the metadata-store diff handler that already surfaced post-publish errors, and construction is side-effect-free so no error can escape it. The full compile, scalafix, and the integration/e2e suites that exercise init-error surfacing run in CI. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
What changes were proposed in this PR?
When workflow execution initialization fails, the error was recorded into the execution metadata store but never pushed to the websocket, so connected frontend clients saw nothing — particularly for failures during
WorkflowExecutionServiceconstruction, which happens before the execution is published to subscribers.WorkflowService.initExecutionService's catch arm now, aftererrorHandler(e)records the fatal error, pushes aWorkflowErrorEvent(carrying the recorded fatal errors) toerrorSubject— the workflow-level channel thatconnect()subscribers listen on — so init-time failures surface in the UI.WorkflowExecutionServiceconstruction (pre-publish)WorkflowErrorEventdelivered to the frontendexecuteWorkflow()WorkflowErrorEventdelivered to the frontendThe push is extracted into a small
reportFatalErrorsToSubscribersmethod so it can be unit-tested without a database (the init path itself is DB-bound).Any related issues, documentation, discussions?
Resolves #5782. Discovered while splitting #5700 (loop operators) into smaller PRs; this fix is independent of that feature and applies to
mainon its own.How was this PR tested?
New
WorkflowServiceSpec(TDD, red → green): pins thatreportFatalErrorsToSubscribersdelivers aWorkflowErrorEventto aconnect()subscriber carrying exactly the fatal errors recorded in the execution state store (single error, and all errors when several are present).sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"passes (2/2); scalafmt + scalafix clean.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.