fix(python-worker): coerce integral floats to int for INT/LONG fields#6053
fix(python-worker): coerce integral floats to int for INT/LONG fields#6053eugenegujing wants to merge 4 commits into
Conversation
Problem: pandas-based Python operators (e.g. Sort via TableOperator) build a DataFrame from input tuples. When an INT/LONG column contains nulls, pandas promotes the whole column to float64 because an int column cannot hold NaN, so 119 becomes 119.0. On output, the worker's strict schema validation in Tuple.finalize() then fails with "TypeError: Unmatched type for field 'weight', expected AttributeType.INT, got 119.0 (<class 'float'>) instead." This crashed every workflow whose CSV had an integer column with at least one missing value, forcing users to manually insert a Type Casting operator. Fix (Tuple.cast_to_schema only; validate_schema unchanged): when the target type is INT or LONG and the value is a float (including np.float64) with a zero fractional part, cast it back to int, but only when the result is provably the original integer: - INT window: Arrow int32 capacity [-2^31, 2^31 - 1]. int32 values are always exactly representable in float64, so capacity is the only constraint. - LONG window: the float64 exact-integer range [-(2^53) + 1, 2^53 - 1] instead of int64 capacity. Above 2^53 float64 rounds, so the received float may already be a corrupted rendition of the original integer; coercing it would turn a loud validation error into silent data corruption. - The endpoint 2^53 is excluded because it is ambiguous: 2^53 + 1 also rounds to float 2^53. - The range check compares the converted int, not floats, to avoid rounding at the endpoints. - Non-integral, infinite, and out-of-window floats are left untouched so validate_schema() still rejects them: lossy coercion must never happen silently. - An out-of-window integral float additionally logs an actionable warning suggesting a cast to STRING or DOUBLE (or LONG for large integers in an INT field). Also fixes a pre-existing stale-variable bug exposed by restructuring the if-chain: a NaN destined for a BINARY field was first set to None and then re-pickled from the stale local variable, producing pickled-NaN bytes instead of None. NaN in a BINARY field now correctly finalizes to None (guarded by a dedicated test). Tests: 34 new cases in test_tuple.py: - coercion cases incl. int32 and float64-exact-window boundaries and np.float64 - rejection of non-integral / infinite / out-of-window floats, plus the out-of-window warning - NaN/None handling; DOUBLE and STRING fields stay untouched - coercion pinned into cast_to_schema rather than validate_schema - an integration-style test reproducing the full pipeline (Table.from_tuple_likes -> float64 promotion -> as_tuples -> finalize) Fixes apache#5935
Automated Reviewer SuggestionsBased on the
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #6053 +/- ##
============================================
+ Coverage 56.62% 56.76% +0.13%
Complexity 3019 3019
============================================
Files 1122 1122
Lines 43200 43367 +167
Branches 4648 4648
============================================
+ Hits 24464 24618 +154
- Misses 17299 17312 +13
Partials 1437 1437
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 429 | 0.262 | 23,793/31,450/31,450 us | 🔴 +17.0% / 🔴 +105.7% |
| 🔴 | bs=100 sw=10 sl=64 | 961 | 0.587 | 103,611/133,675/133,675 us | 🔴 +9.3% / 🔴 +23.3% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,106 | 0.675 | 908,370/937,910/937,910 us | ⚪ within ±5% / 🟢 -12.3% |
Baseline details
Latest main d861036 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 429 tuples/sec | 486 tuples/sec | 770.95 tuples/sec | -11.7% | -44.4% |
| bs=10 sw=10 sl=64 | MB/s | 0.262 MB/s | 0.296 MB/s | 0.471 MB/s | -11.5% | -44.3% |
| bs=10 sw=10 sl=64 | p50 | 23,793 us | 20,328 us | 12,775 us | +17.0% | +86.2% |
| bs=10 sw=10 sl=64 | p95 | 31,450 us | 28,841 us | 15,286 us | +9.0% | +105.7% |
| bs=10 sw=10 sl=64 | p99 | 31,450 us | 28,841 us | 18,795 us | +9.0% | +67.3% |
| bs=100 sw=10 sl=64 | throughput | 961 tuples/sec | 1,001 tuples/sec | 976.93 tuples/sec | -4.0% | -1.6% |
| bs=100 sw=10 sl=64 | MB/s | 0.587 MB/s | 0.611 MB/s | 0.596 MB/s | -3.9% | -1.6% |
| bs=100 sw=10 sl=64 | p50 | 103,611 us | 100,902 us | 102,557 us | +2.7% | +1.0% |
| bs=100 sw=10 sl=64 | p95 | 133,675 us | 122,344 us | 108,383 us | +9.3% | +23.3% |
| bs=100 sw=10 sl=64 | p99 | 133,675 us | 122,344 us | 115,249 us | +9.3% | +16.0% |
| bs=1000 sw=10 sl=64 | throughput | 1,106 tuples/sec | 1,094 tuples/sec | 1,009 tuples/sec | +1.1% | +9.6% |
| bs=1000 sw=10 sl=64 | MB/s | 0.675 MB/s | 0.668 MB/s | 0.616 MB/s | +1.0% | +9.6% |
| bs=1000 sw=10 sl=64 | p50 | 908,370 us | 914,222 us | 997,695 us | -0.6% | -9.0% |
| bs=1000 sw=10 sl=64 | p95 | 937,910 us | 953,786 us | 1,036,731 us | -1.7% | -9.5% |
| bs=1000 sw=10 sl=64 | p99 | 937,910 us | 953,786 us | 1,069,334 us | -1.7% | -12.3% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,466.16,200,128000,429,0.262,23792.83,31450.06,31450.06
1,100,10,64,20,2080.67,2000,1280000,961,0.587,103610.80,133675.48,133675.48
2,1000,10,64,20,18081.22,20000,12800000,1106,0.675,908369.62,937910.06,937910.06Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
/request-review @aicam |
Yicong-Huang
left a comment
There was a problem hiding this comment.
Thanks, please see inline comments
- test_tuple.py: use loguru's `logger` directly instead of aliasing it. - Move INTEGRAL_TYPE_RANGES next to the other per-AttributeType maps in models/schema/attribute_type.py; import it into tuple.py. - Document that the int-column-with-nulls -> float64 promotion is inherent to numpy-backed columns (pandas' default) rather than a specific pandas version, verified on the pinned pandas 2.2.3. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
- test_tuple.py: use loguru's `logger` directly instead of aliasing it. - Move INTEGRAL_TYPE_RANGES next to the other per-AttributeType maps in models/schema/attribute_type.py; import it into tuple.py.
12f0241 to
d345bee
Compare
- tuple.py: label out pandas version
What changes were proposed in this PR?
Problem. Pandas-based Python operators (e.g. Sort via
TableOperator) build a DataFrame from input tuples. When an INT/LONG column contains nulls, pandas promotes the whole column to float64 because an int column cannot hold NaN, so119becomes119.0. On output, the worker's strict schema validation inTuple.finalize()then fails withTypeError: Unmatched type for field 'weight', expected AttributeType.INT, got 119.0 (<class 'float'>) instead.This crashed every workflow whose CSV had an integer column with at least one missing value, and the only workaround was manually inserting a Type Casting operator for each affected column.Why fix it in the Python worker (option (a) of the issue). The CSV schema inference is correct (an all-integer column with nulls is INTEGER; the JVM side handles null ints fine), and a UI per-column override (option (c)) would not remove the crash. The type contract is broken by pandas at the Python-worker boundary, so the fix belongs at that boundary's single chokepoint:
Tuple.cast_to_schema(), which already performs safe casts (NaN -> None, object -> pickled bytes) right beforevalidate_schema().The fix (in
Tuple.cast_to_schema()only;validate_schema()is unchanged): when the target type is INT or LONG and the value is a float (includingnp.float64) with a zero fractional part, cast it back to int — but only when the result is provably the original integer:[-2^31, 2^31 - 1]. int32 values are always exactly representable in float64, so capacity is the only constraint.[-(2^53) + 1, 2^53 - 1]instead of int64 capacity. Above 2^53, float64 rounds, so the received float may already be a corrupted rendition of the original integer; coercing it would turn a loud validation error into silent data corruption. The endpoint 2^53 itself is excluded because it is ambiguous (2^53 + 1also rounds to float2^53).validate_schema()still rejects them: lossy coercion must never happen silently. An out-of-window integral float additionally logs an actionable warning suggesting a cast to STRING or DOUBLE (or LONG for large integers in an INT field).Deliberate behavior change for reviewers to note. Restructuring the if-chain in
cast_to_schema()also fixes a pre-existing stale-variable bug: a NaN destined for a BINARY field was first set to None and then re-pickled from the stale local variable, producing pickled-NaN bytes instead of None. NaN in a BINARY field now correctly finalizes to None (guarded by a dedicated test).The changed logic in
core/models/tuple.py. A new module-level constant defines the safely coercible window per integral type:cast_to_schema()'s per-field loop is restructured from two independentifs into mutually exclusive branches (null handling / integral-float coercion / BINARY pickling), which both hosts the new coercion and eliminates the stale-variable read described above:The outer per-field
try/except(keep the value unchanged if a cast fails, continue with the next field) is preserved, andvalidate_schema()is untouched, so anything the coercion deliberately skips still fails validation loudly.Any related issues, documentation, discussions?
Fixes #5935
How was this PR tested?
TDD: the tests were written first and confirmed to reproduce the crash (red), then the fix turned them green.
amber/src/test/python/core/models/test_tuple.py(59 total in the file, all passing): coercion cases including the int32 and float64-exact-window boundaries andnp.float64; rejection of non-integral / infinite / out-of-window floats; the out-of-window warning; NaN/None handling; DOUBLE and STRING fields staying untouched; tests pinning the coercion intocast_to_schemarather thanvalidate_schema; and an integration-style test reproducing the full pipeline (Table.from_tuple_likes-> float64 promotion ->as_tuples->finalize).cd amber && pytest -m "not integration"— all pass.ruff checkandruff format --checkclean on both changed files.sbt "scalafixAll --check"andsbt scalafmtCheckAllpass.AMBER_TEST_FILTER=skip-integration sbt test: the full suite passes — 0 failed, 0 aborted (WorkflowCore 1570, amber 1076, all other service modules green). Run against a clean iceberg catalog, matching how CI provisions one per run.Table->finalizecode path the worker uses.Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Fable 5)