refactor(test): extract shared run-and-read e2e harness into TestUtils#5712
Conversation
Several amber e2e specs repeated the same run-to-COMPLETED + open-iceberg results boilerplate. Add two reusable, loop/state-agnostic helpers to TestUtils -- readMaterializedResults and runWorkflowAndReadResults -- and refactor DataProcessingSpec.executeWorkflow to use them. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the harness is independent test infrastructure.
There was a problem hiding this comment.
Pull request overview
This PR refactors Amber e2e test code by extracting the repeated “run workflow → await COMPLETED/FatalError → open RESULT docs → extract tuples” harness into reusable helpers on TestUtils, and updates DataProcessingSpec to use the shared harness.
Changes:
- Added
readMaterializedResultshelper to resolve/open external RESULT documents and apply an extraction function. - Added
runWorkflowAndReadResultshelper to run a workflow toCOMPLETED(or surfaceFatalError) and then read materialized results. - Refactored
DataProcessingSpec.executeWorkflow(and part ofTestUtils.shouldReconfigure) to use the new shared helper(s).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala | Introduces shared run-and-read helpers and refactors existing e2e harness usage to reduce duplication. |
| amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala | Switches executeWorkflow to use the shared runWorkflowAndReadResults helper and removes duplicated boilerplate/imports. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 412 | 0.251 | 23,542/32,948/32,948 us | 🔴 +6.3% / 🟢 -5.8% |
| ⚪ | bs=100 sw=10 sl=64 | 808 | 0.493 | 121,081/155,589/155,589 us | ⚪ within ±5% / 🔴 +11.3% |
| ⚪ | bs=1000 sw=10 sl=64 | 938 | 0.573 | 1,065,098/1,108,800/1,108,800 us | ⚪ within ±5% / 🔴 -9.9% |
Baseline details
Latest main 94170ae from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 412 tuples/sec | 436 tuples/sec | 410.82 tuples/sec | -5.5% | +0.3% |
| bs=10 sw=10 sl=64 | MB/s | 0.251 MB/s | 0.266 MB/s | 0.251 MB/s | -5.6% | +0.1% |
| bs=10 sw=10 sl=64 | p50 | 23,542 us | 22,148 us | 23,785 us | +6.3% | -1.0% |
| bs=10 sw=10 sl=64 | p95 | 32,948 us | 31,702 us | 34,980 us | +3.9% | -5.8% |
| bs=10 sw=10 sl=64 | p99 | 32,948 us | 31,702 us | 34,980 us | +3.9% | -5.8% |
| bs=100 sw=10 sl=64 | throughput | 808 tuples/sec | 842 tuples/sec | 891.94 tuples/sec | -4.0% | -9.4% |
| bs=100 sw=10 sl=64 | MB/s | 0.493 MB/s | 0.514 MB/s | 0.544 MB/s | -4.1% | -9.4% |
| bs=100 sw=10 sl=64 | p50 | 121,081 us | 117,164 us | 112,277 us | +3.3% | +7.8% |
| bs=100 sw=10 sl=64 | p95 | 155,589 us | 150,337 us | 139,802 us | +3.5% | +11.3% |
| bs=100 sw=10 sl=64 | p99 | 155,589 us | 150,337 us | 139,802 us | +3.5% | +11.3% |
| bs=1000 sw=10 sl=64 | throughput | 938 tuples/sec | 941 tuples/sec | 1,041 tuples/sec | -0.3% | -9.9% |
| bs=1000 sw=10 sl=64 | MB/s | 0.573 MB/s | 0.574 MB/s | 0.635 MB/s | -0.2% | -9.8% |
| bs=1000 sw=10 sl=64 | p50 | 1,065,098 us | 1,059,251 us | 972,714 us | +0.6% | +9.5% |
| bs=1000 sw=10 sl=64 | p95 | 1,108,800 us | 1,106,974 us | 1,023,057 us | +0.2% | +8.4% |
| bs=1000 sw=10 sl=64 | p99 | 1,108,800 us | 1,106,974 us | 1,023,057 us | +0.2% | +8.4% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,485.55,200,128000,412,0.251,23541.91,32948.36,32948.36
1,100,10,64,20,2476.00,2000,1280000,808,0.493,121080.50,155588.55,155588.55
2,1000,10,64,20,21321.13,20000,12800000,938,0.573,1065097.99,1108799.72,1108799.72
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5712 +/- ##
============================================
+ Coverage 52.87% 52.89% +0.01%
- Complexity 2623 2629 +6
============================================
Files 1090 1090
Lines 42188 42188
Branches 4531 4531
============================================
+ Hits 22308 22314 +6
+ Misses 18572 18566 -6
Partials 1308 1308
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Address Copilot review on apache#5712. These gaps were inherited verbatim from the original DataProcessingSpec.executeWorkflow; fix them once in the now-shared harness: - carry the result in the Promise (drop the mutable var) - wire AmberClient's errorHandler to fail the Promise - use updateIfEmpty on every terminal path so a second/late event can't throw inside a callback and get silently swallowed; a read failure in the COMPLETED callback now fails the Promise (via Try) instead of hanging - run client.shutdown() in a finally so a timeout or error can't leak actors
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM, left inline comments
Address @Yicong-Huang's review on apache#5712: - Add TestUtils.runWorkflowAndReadTerminalResults(system, workflow) and a readMaterializedResults(workflow) overload, both defaulting to the workflow's terminal operators + List[Tuple] extraction, so callers don't repeat that boilerplate. DataProcessingSpec.executeWorkflow and TestUtils.shouldReconfigure now use them. - Drop the specific spec names (DataProcessingSpec/LoopIntegrationSpec) from the runWorkflowAndReadResults docstring.
apache#5712) ### What changes were proposed in this PR? Extracts the repeated "run a workflow and read its materialized results" boilerplate from the amber e2e specs into two reusable helpers on `TestUtils`: - `readMaterializedResults(executionId, operatorIds, extract)` — resolve + open each operator's external RESULT document and apply `extract` to the opened `VirtualDocument[Tuple]` (operators with no materialized output are skipped). - `runWorkflowAndReadResults(system, workflow, operatorIds, extract, completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError` aborts and surfaces as the awaited exception), then read results via `readMaterializedResults`. `DataProcessingSpec.executeWorkflow` now calls the shared harness instead of its own inline copy. The helpers are loop/state-agnostic — they only use existing core APIs (`DocumentFactory`, `VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`, `FatalError`), so other e2e specs can adopt them too. ### Any related issues, documentation, discussions? Resolves apache#5711 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? Behavior-preserving refactor of existing e2e test infrastructure. `WorkflowExecutionService/Test/compile` and `WorkflowExecutionService/scalafmtCheckAll` pass locally. The `@IntegrationTest` specs that exercise the harness (e.g. `DataProcessingSpec`) run in CI — they spawn Python workers and can't run on Windows. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
What changes were proposed in this PR?
Extracts the repeated "run a workflow and read its materialized results" boilerplate from the amber e2e specs into two reusable helpers on
TestUtils:readMaterializedResults(executionId, operatorIds, extract)— resolve + open each operator's external RESULT document and applyextractto the openedVirtualDocument[Tuple](operators with no materialized output are skipped).runWorkflowAndReadResults(system, workflow, operatorIds, extract, completionTimeout)— run a workflow toCOMPLETED(aFatalErroraborts and surfaces as the awaited exception), then read results viareadMaterializedResults.DataProcessingSpec.executeWorkflownow calls the shared harness instead of its own inline copy. The helpers are loop/state-agnostic — they only use existing core APIs (DocumentFactory,VirtualDocument[Tuple],AmberClient,ExecutionStateUpdate,FatalError), so other e2e specs can adopt them too.Any related issues, documentation, discussions?
Resolves #5711 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.
How was this PR tested?
Behavior-preserving refactor of existing e2e test infrastructure.
WorkflowExecutionService/Test/compileandWorkflowExecutionService/scalafmtCheckAllpass locally. The@IntegrationTestspecs that exercise the harness (e.g.DataProcessingSpec) run in CI — they spawn Python workers and can't run on Windows.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.