Skip to content

Perf/prepared write v2 scalar memo#4020

Closed
d-v-b wants to merge 30 commits into
zarr-developers:mainfrom
d-v-b:perf/prepared-write-v2-scalar-memo
Closed

Perf/prepared write v2 scalar memo#4020
d-v-b wants to merge 30 commits into
zarr-developers:mainfrom
d-v-b:perf/prepared-write-v2-scalar-memo

Conversation

@d-v-b
Copy link
Copy Markdown
Contributor

@d-v-b d-v-b commented May 30, 2026

[Description of PR]

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.md
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

d-v-b and others added 30 commits April 7, 2026 10:38
`PreparedWrite` models a set of per-chunk changes that would be applied to a stored chunk. `SupportsChunkPacking`
is a protocol for array -> bytes codecs that can use `PreparedWrite` objects to update an existing chunk.
Adds a SupportsSetRange protocol to zarr.abc.store for stores that
allow overwriting a byte range within an existing value. Implementations
are added for LocalStore (using file-handle seek+write) and MemoryStore
(in-memory bytearray slice assignment).

This is the prerequisite for the partial-shard write fast path in
ShardingCodec, which can patch individual inner-chunk slots without
rewriting the entire shard blob when the inner codec chain is fixed-size.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
V2Codec, BytesCodec, BloscCodec, etc. previously only implemented the
async _decode_single / _encode_single methods. Add their sync
counterparts (_decode_sync / _encode_sync) so that the upcoming
SyncCodecPipeline can dispatch through them without spinning up an
event loop.

For codecs that wrap external compressors (numcodecs.Zstd, numcodecs.Blosc,
the V2 fallback chain), the sync versions just call the underlying
compressor's blocking API directly instead of routing through
asyncio.to_thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arallelism

Adds SyncCodecPipeline alongside BatchedCodecPipeline. The new pipeline
runs codecs through their sync entry points (_decode_sync / _encode_sync)
and dispatches per-chunk work to a module-level thread pool sized by
the codec_pipeline.max_workers config (default = os.cpu_count()).

Each chunk's full lifecycle (fetch + decode + scatter for reads;
get-existing + merge + encode + set/delete for writes) runs as one
pool task — overlapping IO of one chunk with compute of another.
Scatter into the shared output buffer is thread-safe because chunks
have non-overlapping output selections.

The async wrappers (read/write) detect SupportsGetSync/SupportsSetSync
stores and dispatch to the sync fast path, passing the configured
max_workers. Other stores fall through to the async path, which still
uses asyncio.concurrent_map at async.concurrency.

Notes on perf:
- Default (None → cpu_count) is tuned for chunks ≥ ~512 KB.
- Small chunks (≤ 64 KB) regress 1.5-3x because pool dispatch overhead
  (~30-50 µs/task) dominates per-chunk work. Workaround:
  zarr.config.set({"codec_pipeline.max_workers": 1}).
- For large chunks on local/memory stores, IO+compute parallelism
  yields 1.7-2.5x over BatchedCodecPipeline on direct-API reads and
  ~2.5x on roundtrip.

ChunkTransform encapsulates the sync codec chain. It caches resolved
ArraySpecs across calls with the same chunk_spec — combined with the
constant-ArraySpec optimization in indexing, hot-path overhead is
minimized.

Includes test scaffolding for the new pipeline (test_sync_codec_pipeline)
and config plumbing for the max_workers key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _encode_partial_sync and _decode_partial_sync to ShardingCodec.
For fixed-size inner codec chains and stores that implement
SupportsSetRange, partial writes patch individual inner-chunk slots
in-place instead of rewriting the whole shard:

  - Reads existing shard index (one byte-range get).
  - For each affected inner chunk: decodes the slot, merges the new
    region, re-encodes.
  - Writes each modified slot at its deterministic byte offset, then
    rewrites just the index.

For variable-size inner codecs (e.g. with compression) or stores that
don't support byte-range writes, falls through to a full-shard rewrite
matching BatchedCodecPipeline semantics.

The partial-decode path computes a ReadPlan from the shard index and
issues one byte-range get per overlapping chunk, decoding only what
the read selection touches.

Both paths are dispatched from SyncCodecPipeline via the existing
supports_partial_decode / supports_partial_encode protocol checks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new test files:

  test_codec_invariants — asserts contract-level properties that every
  codec / shard / buffer combination must satisfy: round-trip exactness,
  prototype propagation, fill-value handling, all-empty shard handling.

  test_pipeline_parity — exhaustive matrix asserting that
  SyncCodecPipeline and BatchedCodecPipeline produce semantically
  identical results across codec configs, layouts (including
  nested sharding), write sequences, and write_empty_chunks settings.
  Three checks per cell:
    1. Same array contents on read.
    2. Same set of store keys after writes.
    3. Each pipeline reads the other's output identically (catches
       layout-divergence bugs).

These tests pinned the design throughout the SyncCodecPipeline +
partial-shard development.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .gitignore entries for .claude/, CLAUDE.md, and docs/superpowers/
so local IDE/agent planning artifacts don't get committed by accident.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both were exported from zarr.abc.codec.__all__ but never referenced
by either codec pipeline or any test. Artifacts of an earlier design
iteration superseded by the current SyncCodecPipeline.

Also remove now-unused imports of `dataclass` and `ChunkProjection`
that were only needed by the deleted symbols.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both tests/test_phased_codec_pipeline.py and tests/test_pipeline_benchmark.py
import PhasedCodecPipeline, which no longer exists in src/. Each failed at
collection. The benchmarking intent of test_pipeline_benchmark.py is replaced
by extensions to tests/benchmarks/test_e2e.py later in this branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The new name describes what the pipeline does (fuses fetch+decode+scatter
into one task per chunk) rather than the implementation detail of using
sync codec entry points. The name also stays accurate when this pipeline
gains a remote-store / async fast path in a future change.

Mechanical rename across the class, register_pipeline call, dotted-path
strings used by zarr.config, isinstance checks, parametrize values, and
docstrings. tests/test_sync_pipeline.py renamed to tests/test_fused_pipeline.py.

Nothing on this branch is released, so no deprecation alias is needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The BatchedCodecPipeline and FusedCodecPipeline classes had identical
copies of _merge_chunk_array (one method, one staticmethod). Extract
once as a module-level free function and call from both. No new base
class or mixin is introduced.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both BatchedCodecPipeline.read_batch (non-partial-decode branch) and
FusedCodecPipeline.read (async fallback) duplicate the same sequence:
concurrent_map(get) -> pipeline.decode -> scatter into out. Lift to a
module-level free function and call from both.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both BatchedCodecPipeline.write_batch (non-partial-encode branch) and
FusedCodecPipeline.write (async fallback) duplicate the same sequence:
read existing bytes -> decode -> merge -> encode -> set/delete. Lift to
a module-level free function and call from both.

After this change, neither pipeline class carries _merge_chunk_array,
nor the duplicated read/write fallback bodies. Each class is reduced to
its constructor, fast-path methods, and thin async dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a `pipeline` fixture with values ["batched", "fused"] that swaps
codec_pipeline.path for the duration of each benchmark. Both
test_write_array and test_read_array now produce one benchmark cell
per (compression x layout x store x pipeline). CodSpeed will report
comparable numbers for both pipelines on the same workloads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `latency in {0, 0.001, 0.05, 0.2}` and a bench_store fixture that
wraps the underlying memory store in zarr.testing.store.LatencyStore
when latency > 0. Local-store cells skip nonzero latency — adding
synthetic latency on top of a real filesystem double-counts and is not
the intended measurement.

Combined with the pipeline parameter, the matrix now produces
comparable benchmark numbers for {Batched, Fused} x {0, 1ms, 50ms, 200ms}
on memory-shaped operation. The numbers are signal under one simple
model of remote latency, not absolute predictions of S3 behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Commit 7f45aba (which converted _decode_single -> _decode_sync) dropped
two explanatory comment blocks from the dtype-handling branches in
V2Codec.decode. Both comments document non-obvious WHY:

- The TypeError catch is for chunks whose stored dtype doesn't match
  the array spec dtype (e.g. string dtype vs object array).
- The elif branch fires when filters were tampered with: an object array
  needs an object codec in the filter chain to be read correctly.

These were removed as drive-by cleanup during the sync-method rename
without intent to delete the substance. Restoring verbatim.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add docstring substance and a couple of inline notes to the new sync
methods on ShardingCodec that landed on this branch. Concretely:

- _decode_sync / _encode_sync: explain how each relates to the async
  counterpart and the partial-* variants, and why inner chunks are
  iterated in Morton order on the encode path.
- _encode_shard_dict_sync: explain the two-pass offset shift in the
  index-at-start branch (offsets are written relative to the data
  section, then bumped by len(index_bytes)) and the MAX_UINT_64
  empty-chunk sentinel that must not be touched.
- _encode_partial_sync byte-range path: explain WHY morton-rank
  determines byte offset deterministically (fixed-size inner chunks =
  every slot at a stable offset regardless of which others are present);
  this is the load-bearing invariant for the byte-range fast path.
- _decode_partial_sync: docstring now lists the two sub-paths
  (full-shard fetch vs. index-then-byte-ranges) and the reason the
  full-shard branch exists (one round trip beats N+1 small ones).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Docstrings added on this branch used RST-style ``literal`` markup
(double-backticks). Convert to Markdown-style `literal` (single
backticks) so the docstrings render correctly in Markdown-aware
viewers without needing a separate RST-to-Markdown step.

Two cases worth calling out:

- src/zarr/core/codec_pipeline.py and src/zarr/codecs/sharding.py:
  every ``literal`` in these files came in on this branch, so the
  conversion is global within those files.
- src/zarr/abc/store.py and src/zarr/core/array.py: only docstrings
  added on this branch are converted; pre-existing RST-style
  literals from main are left alone (out of scope).

Also converted one .. note:: directive in
src/zarr/core/array.py (the regular_chunk_array_spec helper) to
a Markdown blockquote, since that directive was added on this branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
In ShardingCodec._encode_partial_sync's full-shard-rewrite loop, a scalar
broadcast value produces byte-for-byte identical results for every complete
inner chunk (same fill, same empty-check, same encoded bytes). Compute that
outcome once and reuse it across all complete chunks instead of re-merging,
re-checking write_empty_chunks, and re-encoding tens of thousands of identical
chunks. Incomplete edge chunks still merge against their own data individually.

Target case (fused, memory, chunks=100/shards=1M, no compression):
write 92.26ms -> 21.59ms (4.3x). Pipeline parity (byte-identical to batched)
and 956 tests pass under the fused pipeline; adversarial partial-overwrite/
edge/compression/2D/aliasing checks pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@d-v-b
Copy link
Copy Markdown
Contributor Author

d-v-b commented May 30, 2026

this was supposed to opened against my branch. closing!

@d-v-b d-v-b closed this May 30, 2026
@d-v-b d-v-b deleted the perf/prepared-write-v2-scalar-memo branch May 30, 2026 07:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant