test(workflow-operator): add unit test coverage for scan source executors#6076
Conversation
…tors Targets Codecov-missed lines in the file-reading source executors, all driven by local temp files (no live services): ParallelCSVScanSourceOpExec (43), JSONLScanSourceOpExec (23), CSVOldScanSourceOpExec (21), ArrowSourceOpExec (30) — open/produceTuple/close, worker partitioning, offset/limit, short-row padding, blank-line discard, and the Arrow open-failure path.
Automated Reviewer SuggestionsBased on the
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #6076 +/- ##
============================================
+ Coverage 57.21% 57.48% +0.27%
- Complexity 3103 3139 +36
============================================
Files 1130 1130
Lines 43825 43825
Branches 4747 4747
============================================
+ Hits 25075 25194 +119
+ Misses 17313 17189 -124
- Partials 1437 1442 +5
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds Scala unit tests in common/workflow-operator to increase coverage for four file-reading scan source executors (CSV, legacy CSV, JSONL, Arrow) without modifying production code. The tests are primarily temp-file driven and exercise executor open/produceTuple/close paths plus partitioning/limit behavior.
Changes:
- Add
ParallelCSVScanSourceOpExecSpeccovering header handling, null-padding, blank-line discard, and multi-worker byte-range partitioning. - Add
JSONLScanSourceOpExecSpeccovering basic JSONL ingestion, worker partitioning, and row limiting. - Add
CSVOldScanSourceOpExecSpecandArrowSourceOpExecSpeccovering schema/iteration/offset+limit/error wrapping and close-before-open behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExecSpec.scala | New JSONL executor unit tests for reading, partitioning, and limit behavior. |
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExecSpec.scala | New legacy CSV executor unit tests for header/no-header reading, limit, and safe close. |
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExecSpec.scala | New parallel CSV executor unit tests for parsing edge cases and multi-worker byte partitioning. |
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExecSpec.scala | New Arrow executor unit tests for open/produceTuple iteration, offset+limit, error wrapping, and safe close. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 390 | 0.238 | 25,166/30,115/30,115 us | 🟢 -16.9% / 🔴 +97.0% |
| ⚪ | bs=100 sw=10 sl=64 | 798 | 0.487 | 123,286/150,325/150,325 us | ⚪ within ±5% / 🔴 +38.7% |
| ⚪ | bs=1000 sw=10 sl=64 | 913 | 0.557 | 1,098,136/1,128,440/1,128,440 us | ⚪ within ±5% / 🔴 +10.1% |
Baseline details
Latest main a53d95a from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 390 tuples/sec | 405 tuples/sec | 770.95 tuples/sec | -3.7% | -49.4% |
| bs=10 sw=10 sl=64 | MB/s | 0.238 MB/s | 0.247 MB/s | 0.471 MB/s | -3.6% | -49.4% |
| bs=10 sw=10 sl=64 | p50 | 25,166 us | 22,127 us | 12,775 us | +13.7% | +97.0% |
| bs=10 sw=10 sl=64 | p95 | 30,115 us | 36,251 us | 15,286 us | -16.9% | +97.0% |
| bs=10 sw=10 sl=64 | p99 | 30,115 us | 36,251 us | 18,795 us | -16.9% | +60.2% |
| bs=100 sw=10 sl=64 | throughput | 798 tuples/sec | 814 tuples/sec | 976.93 tuples/sec | -2.0% | -18.3% |
| bs=100 sw=10 sl=64 | MB/s | 0.487 MB/s | 0.497 MB/s | 0.596 MB/s | -2.0% | -18.3% |
| bs=100 sw=10 sl=64 | p50 | 123,286 us | 121,673 us | 102,557 us | +1.3% | +20.2% |
| bs=100 sw=10 sl=64 | p95 | 150,325 us | 147,390 us | 108,383 us | +2.0% | +38.7% |
| bs=100 sw=10 sl=64 | p99 | 150,325 us | 147,390 us | 115,249 us | +2.0% | +30.4% |
| bs=1000 sw=10 sl=64 | throughput | 913 tuples/sec | 918 tuples/sec | 1,009 tuples/sec | -0.5% | -9.5% |
| bs=1000 sw=10 sl=64 | MB/s | 0.557 MB/s | 0.561 MB/s | 0.616 MB/s | -0.7% | -9.6% |
| bs=1000 sw=10 sl=64 | p50 | 1,098,136 us | 1,091,196 us | 997,695 us | +0.6% | +10.1% |
| bs=1000 sw=10 sl=64 | p95 | 1,128,440 us | 1,145,592 us | 1,036,731 us | -1.5% | +8.8% |
| bs=1000 sw=10 sl=64 | p99 | 1,128,440 us | 1,145,592 us | 1,069,334 us | -1.5% | +5.5% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,512.96,200,128000,390,0.238,25166.46,30115.24,30115.24
1,100,10,64,20,2506.31,2000,1280000,798,0.487,123286.39,150324.86,150324.86
2,1000,10,64,20,21908.36,20000,12800000,913,0.557,1098135.77,1128440.04,1128440.04- JSONL: write keys name-then-id so the sorted-field-order assertion actually validates reordering - ParallelCSV: use uneven row widths so the two-worker split lands mid-row, deterministically exercising the leading-partial-line skip
What changes were proposed in this PR?
Add unit test coverage for the four file-reading scan source executors, selected from the Codecov report. No production-code changes. Every test is driven by a local temp file (
file:URI →ReadonlyLocalFileDocument); no live DB/network/S3/Iceberg.ParallelCSVScanSourceOpExec.scalaopenbyte-range partitioning (bothendOffsetbranches + mid-file partial-line skip + header skip),produceTuplehappy path, short-row null padding, all-null (blank) line discard,closeJSONLScanSourceOpExec.scalaproduceTuple,openline counting + worker partitioning (both ternary branches), row limit,closeCSVOldScanSourceOpExec.scalaopen(header vs no-header start offset),produceTuple+ limit,closenull-guard before openArrowSourceOpExec.scalaopensuccess + error-wrapping path,produceTuplebatch iteration + offset/limit,closeno-op before openNew specs live in each executor's own package so their
private[...]constructors are reachable, mirroring the existingFileScanSourceOpExecSpectemp-file/URI pattern.Any related issues, documentation, discussions?
Follow-up to the review feedback on #6043: prioritize tests that fill uncovered code paths.
How was this PR tested?
sbt "WorkflowOperator/testOnly *ParallelCSVScanSourceOpExecSpec *JSONLScanSourceOpExecSpec *CSVOldScanSourceOpExecSpec *ArrowSourceOpExecSpec"— 17 tests, all greensbt "WorkflowOperator/Test/scalafmtCheck"andsbt "WorkflowOperator/scalafixAll --check"— cleanWas this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])