fix(workflow-operator): no null padding in reservoir sampling#5606
Conversation
|
👋 Thanks for your first contribution to Texera, @suyashj1231! You can drive common housekeeping tasks just by leaving a comment. Type the command on its own line. On issues
To find unclaimed work, search Linking sub-issues
You can write references as On pull requests (author only)
You can mention teams as
For the full contribution flow, see CONTRIBUTING.md. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5606 +/- ##
==========================================
Coverage 52.86% 52.87%
- Complexity 2510 2522 +12
==========================================
Files 1075 1071 -4
Lines 41665 41414 -251
Branches 4495 4460 -35
==========================================
- Hits 22027 21896 -131
+ Misses 18338 18229 -109
+ Partials 1300 1289 -11
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
@xuang7 shall we backport this fix to v1.2? |
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM, thanks @suyashj1231!
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 356 | 0.217 | 24,233/40,913/40,913 us | 🔴 +19.6% / 🔴 +14.0% |
| 🔴 | bs=100 sw=10 sl=64 | 802 | 0.489 | 125,608/141,217/141,217 us | 🔴 +17.0% / 🔴 +11.5% |
| 🔴 | bs=1000 sw=10 sl=64 | 917 | 0.559 | 1,100,271/1,130,579/1,130,579 us | 🔴 +19.0% / 🔴 +12.7% |
Baseline details
Latest main 227cbd7 from 2026-06-12T19:07:43.071Z
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 356 tuples/sec | 422.14 tuples/sec | 398.78 tuples/sec | -15.7% | -10.7% |
| bs=10 sw=10 sl=64 | MB/s | 0.217 MB/s | 0.258 MB/s | 0.243 MB/s | -15.8% | -10.8% |
| bs=10 sw=10 sl=64 | p50 | 24,233 us | 22,135 us | 24,396 us | +9.5% | -0.7% |
| bs=10 sw=10 sl=64 | p95 | 40,913 us | 34,198 us | 35,892 us | +19.6% | +14.0% |
| bs=10 sw=10 sl=64 | p99 | 40,913 us | 34,198 us | 35,892 us | +19.6% | +14.0% |
| bs=100 sw=10 sl=64 | throughput | 802 tuples/sec | 894.49 tuples/sec | 886.25 tuples/sec | -10.3% | -9.5% |
| bs=100 sw=10 sl=64 | MB/s | 0.489 MB/s | 0.546 MB/s | 0.541 MB/s | -10.4% | -9.6% |
| bs=100 sw=10 sl=64 | p50 | 125,608 us | 107,334 us | 112,613 us | +17.0% | +11.5% |
| bs=100 sw=10 sl=64 | p95 | 141,217 us | 160,327 us | 135,197 us | -11.9% | +4.5% |
| bs=100 sw=10 sl=64 | p99 | 141,217 us | 160,327 us | 135,197 us | -11.9% | +4.5% |
| bs=1000 sw=10 sl=64 | throughput | 917 tuples/sec | 1,077 tuples/sec | 1,029 tuples/sec | -14.9% | -10.9% |
| bs=1000 sw=10 sl=64 | MB/s | 0.559 MB/s | 0.657 MB/s | 0.628 MB/s | -15.0% | -11.0% |
| bs=1000 sw=10 sl=64 | p50 | 1,100,271 us | 928,059 us | 976,698 us | +18.6% | +12.7% |
| bs=1000 sw=10 sl=64 | p95 | 1,130,579 us | 950,455 us | 1,032,282 us | +19.0% | +9.5% |
| bs=1000 sw=10 sl=64 | p99 | 1,130,579 us | 950,455 us | 1,032,282 us | +19.0% | +9.5% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,561.58,200,128000,356,0.217,24232.85,40913.07,40913.07
1,100,10,64,20,2495.13,2000,1280000,802,0.489,125608.21,141216.54,141216.54
2,1000,10,64,20,21820.77,20000,12800000,917,0.559,1100270.64,1130578.84,1130578.84### What changes were proposed in this PR? `ReservoirSamplingOpExec` allocates a fixed-size reservoir of length `count` (the per-worker share of `k`). When a worker receives fewer tuples than `count`, only the first `n` slots are filled, but `onFinish` returned the whole array, yielding `count - n` trailing `null` entries. The nulls are currently swallowed by a distant null-guard in `DataProcessor`, so the bug is latent — but the operator violates the "do not emit null tuples" contract and breaks if that guard is ever narrowed or bypassed. ``` Before: input < k -> onFinish emits [t0 .. tn-1, null, ..., null] (engine guard hides them) After: input < k -> onFinish emits [t0 .. tn-1] (no nulls emitted at all) ``` The fix emits only the filled prefix: ```scala override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n) ``` `take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled output is unchanged in the normal case. ### Any related issues, documentation, discussions? Closes #5592 ### How was this PR tested? Added three regression cases to `ReservoirSamplingOpExecSpec`: | Case | Asserts | | --- | --- | | `input size < k` | only the received tuples are emitted, in order, no nulls | | empty input | `onFinish` emits nothing | | skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no null padding for an under-filled worker share | All three fail against the old `reservoir.iterator` and pass with `reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD red → green verified by stashing the source fix). ``` sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec" # Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0 ``` `sbt WorkflowOperator/scalafixAll` and `sbt WorkflowOperator/scalafmtAll` produce no further diff. ### Was this PR authored or co-authored using generative AI tooling? Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff, ran the spec locally, and verified the red → green behavior of the new regression tests myself before opening the PR. Generated-by: Claude Code (Claude Opus 4.7) (backported from commit d5f5e12) Co-authored-by: Xuan Gu <162244362+xuang7@users.noreply.github.com>
|
Backport to |
…#5606) ### What changes were proposed in this PR? `ReservoirSamplingOpExec` allocates a fixed-size reservoir of length `count` (the per-worker share of `k`). When a worker receives fewer tuples than `count`, only the first `n` slots are filled, but `onFinish` returned the whole array, yielding `count - n` trailing `null` entries. The nulls are currently swallowed by a distant null-guard in `DataProcessor`, so the bug is latent — but the operator violates the "do not emit null tuples" contract and breaks if that guard is ever narrowed or bypassed. ``` Before: input < k -> onFinish emits [t0 .. tn-1, null, ..., null] (engine guard hides them) After: input < k -> onFinish emits [t0 .. tn-1] (no nulls emitted at all) ``` The fix emits only the filled prefix: ```scala override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n) ``` `take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled output is unchanged in the normal case. ### Any related issues, documentation, discussions? Closes apache#5592 ### How was this PR tested? Added three regression cases to `ReservoirSamplingOpExecSpec`: | Case | Asserts | | --- | --- | | `input size < k` | only the received tuples are emitted, in order, no nulls | | empty input | `onFinish` emits nothing | | skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no null padding for an under-filled worker share | All three fail against the old `reservoir.iterator` and pass with `reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD red → green verified by stashing the source fix). ``` sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec" # Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0 ``` `sbt WorkflowOperator/scalafixAll` and `sbt WorkflowOperator/scalafmtAll` produce no further diff. ### Was this PR authored or co-authored using generative AI tooling? Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff, ran the spec locally, and verified the red → green behavior of the new regression tests myself before opening the PR. Generated-by: Claude Code (Claude Opus 4.7) Co-authored-by: Xuan Gu <162244362+xuang7@users.noreply.github.com>
What changes were proposed in this PR?
ReservoirSamplingOpExecallocates a fixed-size reservoir of lengthcount(the per-worker share ofk). When a worker receives fewer tuples thancount, only the firstnslots are filled, butonFinishreturned the whole array, yieldingcount - ntrailingnullentries. The nulls are currently swallowed by a distant null-guard inDataProcessor, so the bug is latent — but the operator violates the "do not emit null tuples" contract and breaks if that guard is ever narrowed or bypassed.The fix emits only the filled prefix:
take(n)is a no-op whenn >= count(input ≥ k), so the sampled output is unchanged in the normal case.Any related issues, documentation, discussions?
Closes #5592
How was this PR tested?
Added three regression cases to
ReservoirSamplingOpExecSpec:input size < konFinishemits nothingk=10, 3 workers, worker 0 gets 2 tuples)All three fail against the old
reservoir.iteratorand pass withreservoir.iterator.take(n); the 9 pre-existing cases stay green (TDD red → green verified by stashing the source fix).sbt WorkflowOperator/scalafixAllandsbt WorkflowOperator/scalafmtAllproduce no further diff.Was this PR authored or co-authored using generative AI tooling?
Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff, ran the spec locally, and verified the red → green behavior of the new regression tests myself before opening the PR.
Generated-by: Claude Code (Claude Opus 4.7)