Skip to content

fix(workflow-operator): no null padding in reservoir sampling#5606

Merged
Yicong-Huang merged 3 commits into
apache:mainfrom
suyashj1231:fix/reservoir-sampling-null-tuples
Jun 12, 2026
Merged

fix(workflow-operator): no null padding in reservoir sampling#5606
Yicong-Huang merged 3 commits into
apache:mainfrom
suyashj1231:fix/reservoir-sampling-null-tuples

Conversation

@suyashj1231

@suyashj1231 suyashj1231 commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

ReservoirSamplingOpExec allocates a fixed-size reservoir of length count (the per-worker share of k). When a worker receives fewer tuples than count, only the first n slots are filled, but onFinish returned the whole array, yielding count - n trailing null entries. The nulls are currently swallowed by a distant null-guard in DataProcessor, so the bug is latent — but the operator violates the "do not emit null tuples" contract and breaks if that guard is ever narrowed or bypassed.

Before:  input < k  ->  onFinish emits [t0 .. tn-1, null, ..., null]  (engine guard hides them)
After:   input < k  ->  onFinish emits [t0 .. tn-1]                   (no nulls emitted at all)

The fix emits only the filled prefix:

override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n)

take(n) is a no-op when n >= count (input ≥ k), so the sampled output is unchanged in the normal case.

Any related issues, documentation, discussions?

Closes #5592

How was this PR tested?

Added three regression cases to ReservoirSamplingOpExecSpec:

Case Asserts
input size < k only the received tuples are emitted, in order, no nulls
empty input onFinish emits nothing
skewed partitioning (k=10, 3 workers, worker 0 gets 2 tuples) no null padding for an under-filled worker share

All three fail against the old reservoir.iterator and pass with reservoir.iterator.take(n); the 9 pre-existing cases stay green (TDD red → green verified by stashing the source fix).

sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
# Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0

sbt WorkflowOperator/scalafixAll and sbt WorkflowOperator/scalafmtAll produce no further diff.

Was this PR authored or co-authored using generative AI tooling?

Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff, ran the spec locally, and verified the red → green behavior of the new regression tests myself before opening the PR.

Generated-by: Claude Code (Claude Opus 4.7)

@github-actions

Copy link
Copy Markdown
Contributor

👋 Thanks for your first contribution to Texera, @suyashj1231!

You can drive common housekeeping tasks just by leaving a comment. Type the command on its own line.

On issues

Command What it does
/take Assign the issue to yourself (self-claim it)
/untake Remove yourself as assignee

To find unclaimed work, search is:issue is:open no:assignee — there's no "triage" label; the search filter is the triage state.

Linking sub-issues

Command Where to run it What it does
/sub-issue #12 #13 On the parent Links #12 and #13 as children of this issue
/unsub-issue #12 #13 On the parent Unlinks those children
/parent-issue #5 On the child Sets #5 as this issue's parent
/unparent-issue On the child Removes this issue's parent (auto-detected)
/unparent-issue #5 On the child Removes parent #5 explicitly

You can write references as #12 or bare 12. Cross-repo references like owner/repo#12 aren't supported and are ignored.

On pull requests (author only)

Command What it does
/request-review @user [@user ...] Request reviews from those users
/unrequest-review @user [@user ...] Cancel those review requests

You can mention teams as @org/team, and @copilot works too. Only the PR author can use these commands.

Note: Commands must match exactly — /take this won't work, only /take. Bots are ignored, and you can't self-link an issue or set an issue as its own parent.

For the full contribution flow, see CONTRIBUTING.md.

@xuang7 xuang7 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! LGTM.

@codecov-commenter

codecov-commenter commented Jun 10, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 52.87%. Comparing base (227cbd7) to head (dd83a20).
✅ All tests successful. No failed tests found.

Additional details and impacted files
@@            Coverage Diff             @@
##               main    #5606    +/-   ##
==========================================
  Coverage     52.86%   52.87%            
- Complexity     2510     2522    +12     
==========================================
  Files          1075     1071     -4     
  Lines         41665    41414   -251     
  Branches       4495     4460    -35     
==========================================
- Hits          22027    21896   -131     
+ Misses        18338    18229   -109     
+ Partials       1300     1289    -11     
Flag Coverage Δ *Carryforward flag
access-control-service 64.61% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from 8001e4c
amber 53.69% <100.00%> (+0.06%) ⬆️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 58.57% <ø> (+1.85%) ⬆️
file-service 57.06% <ø> (ø)
frontend 47.17% <ø> (-0.18%) ⬇️ Carriedforward from 8001e4c
pyamber 90.72% <ø> (ø) Carriedforward from 8001e4c
python 90.75% <ø> (ø) Carriedforward from 8001e4c
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Yicong-Huang

Copy link
Copy Markdown
Contributor

@xuang7 shall we backport this fix to v1.2?

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @suyashj1231!

@xuang7 xuang7 added the release/v1.2 back porting to release/v1.2 label Jun 10, 2026
@xuang7 xuang7 enabled auto-merge June 12, 2026 19:12
@github-actions

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 2 better · 🔴 13 worse · ⚪ 0 noise (<±5%) · 0 without baseline

CI benchmark results are noisy; treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 356 0.217 24,233/40,913/40,913 us 🔴 +19.6% / 🔴 +14.0%
🔴 bs=100 sw=10 sl=64 802 0.489 125,608/141,217/141,217 us 🔴 +17.0% / 🔴 +11.5%
🔴 bs=1000 sw=10 sl=64 917 0.559 1,100,271/1,130,579/1,130,579 us 🔴 +19.0% / 🔴 +12.7%
Baseline details

Latest main 227cbd7 from 2026-06-12T19:07:43.071Z

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 356 tuples/sec 422.14 tuples/sec 398.78 tuples/sec -15.7% -10.7%
bs=10 sw=10 sl=64 MB/s 0.217 MB/s 0.258 MB/s 0.243 MB/s -15.8% -10.8%
bs=10 sw=10 sl=64 p50 24,233 us 22,135 us 24,396 us +9.5% -0.7%
bs=10 sw=10 sl=64 p95 40,913 us 34,198 us 35,892 us +19.6% +14.0%
bs=10 sw=10 sl=64 p99 40,913 us 34,198 us 35,892 us +19.6% +14.0%
bs=100 sw=10 sl=64 throughput 802 tuples/sec 894.49 tuples/sec 886.25 tuples/sec -10.3% -9.5%
bs=100 sw=10 sl=64 MB/s 0.489 MB/s 0.546 MB/s 0.541 MB/s -10.4% -9.6%
bs=100 sw=10 sl=64 p50 125,608 us 107,334 us 112,613 us +17.0% +11.5%
bs=100 sw=10 sl=64 p95 141,217 us 160,327 us 135,197 us -11.9% +4.5%
bs=100 sw=10 sl=64 p99 141,217 us 160,327 us 135,197 us -11.9% +4.5%
bs=1000 sw=10 sl=64 throughput 917 tuples/sec 1,077 tuples/sec 1,029 tuples/sec -14.9% -10.9%
bs=1000 sw=10 sl=64 MB/s 0.559 MB/s 0.657 MB/s 0.628 MB/s -15.0% -11.0%
bs=1000 sw=10 sl=64 p50 1,100,271 us 928,059 us 976,698 us +18.6% +12.7%
bs=1000 sw=10 sl=64 p95 1,130,579 us 950,455 us 1,032,282 us +19.0% +9.5%
bs=1000 sw=10 sl=64 p99 1,130,579 us 950,455 us 1,032,282 us +19.0% +9.5%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,561.58,200,128000,356,0.217,24232.85,40913.07,40913.07
1,100,10,64,20,2495.13,2000,1280000,802,0.489,125608.21,141216.54,141216.54
2,1000,10,64,20,21820.77,20000,12800000,917,0.559,1100270.64,1130578.84,1130578.84

@xuang7 xuang7 disabled auto-merge June 12, 2026 19:16
@xuang7 xuang7 added this pull request to the merge queue Jun 12, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Jun 12, 2026
@Yicong-Huang Yicong-Huang added this pull request to the merge queue Jun 12, 2026
Merged via the queue into apache:main with commit d5f5e12 Jun 12, 2026
30 checks passed
Yicong-Huang pushed a commit that referenced this pull request Jun 12, 2026
### What changes were proposed in this PR?

`ReservoirSamplingOpExec` allocates a fixed-size reservoir of length
`count` (the per-worker share of `k`). When a worker receives fewer
tuples than `count`, only the first `n` slots are filled, but `onFinish`
returned the whole array, yielding `count - n` trailing `null` entries.
The nulls are currently swallowed by a distant null-guard in
`DataProcessor`, so the bug is latent — but the operator violates the
"do not emit null tuples" contract and breaks if that guard is ever
narrowed or bypassed.

```
Before:  input < k  ->  onFinish emits [t0 .. tn-1, null, ..., null]  (engine guard hides them)
After:   input < k  ->  onFinish emits [t0 .. tn-1]                   (no nulls emitted at all)
```

The fix emits only the filled prefix:

```scala
override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n)
```

`take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled
output is unchanged in the normal case.

### Any related issues, documentation, discussions?

Closes #5592

### How was this PR tested?

Added three regression cases to `ReservoirSamplingOpExecSpec`:

| Case | Asserts |
| --- | --- |
| `input size < k` | only the received tuples are emitted, in order, no
nulls |
| empty input | `onFinish` emits nothing |
| skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no
null padding for an under-filled worker share |

All three fail against the old `reservoir.iterator` and pass with
`reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD
red → green verified by stashing the source fix).

```
sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
# Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
```

`sbt WorkflowOperator/scalafixAll` and `sbt
WorkflowOperator/scalafmtAll` produce no further diff.

### Was this PR authored or co-authored using generative AI tooling?

Yes, partially. I (Suyash Jain) worked on this PR together with Claude
Code as a pair-programming assistant. I reviewed the final diff, ran the
spec locally, and verified the red → green behavior of the new
regression tests myself before opening the PR.

Generated-by: Claude Code (Claude Opus 4.7)

(backported from commit d5f5e12)

Co-authored-by: Xuan Gu <162244362+xuang7@users.noreply.github.com>
@github-actions

Copy link
Copy Markdown
Contributor

Backport to release/v1.2 succeeded as b876b47. Run

yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…#5606)

### What changes were proposed in this PR?

`ReservoirSamplingOpExec` allocates a fixed-size reservoir of length
`count` (the per-worker share of `k`). When a worker receives fewer
tuples than `count`, only the first `n` slots are filled, but `onFinish`
returned the whole array, yielding `count - n` trailing `null` entries.
The nulls are currently swallowed by a distant null-guard in
`DataProcessor`, so the bug is latent — but the operator violates the
"do not emit null tuples" contract and breaks if that guard is ever
narrowed or bypassed.

```
Before:  input < k  ->  onFinish emits [t0 .. tn-1, null, ..., null]  (engine guard hides them)
After:   input < k  ->  onFinish emits [t0 .. tn-1]                   (no nulls emitted at all)
```

The fix emits only the filled prefix:

```scala
override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n)
```

`take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled
output is unchanged in the normal case.

### Any related issues, documentation, discussions?

Closes apache#5592

### How was this PR tested?

Added three regression cases to `ReservoirSamplingOpExecSpec`:

| Case | Asserts |
| --- | --- |
| `input size < k` | only the received tuples are emitted, in order, no
nulls |
| empty input | `onFinish` emits nothing |
| skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no
null padding for an under-filled worker share |

All three fail against the old `reservoir.iterator` and pass with
`reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD
red → green verified by stashing the source fix).

```
sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
# Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
```

`sbt WorkflowOperator/scalafixAll` and `sbt
WorkflowOperator/scalafmtAll` produce no further diff.

### Was this PR authored or co-authored using generative AI tooling?

Yes, partially. I (Suyash Jain) worked on this PR together with Claude
Code as a pair-programming assistant. I reviewed the final diff, ran the
spec locally, and verified the red → green behavior of the new
regression tests myself before opening the PR.

Generated-by: Claude Code (Claude Opus 4.7)

Co-authored-by: Xuan Gu <162244362+xuang7@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common fix release/v1.2 back porting to release/v1.2

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Harden ReservoirSamplingOpExec.onFinish to not emit null tuples when input size < k

4 participants