feat: [DNM] Async instant generation fixes over 0.14.1#18767
Conversation
) The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send an RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks. Now one checkpoint id corresponds to one instant. Basic work flow: * write task send request: current ckp id * write task expected response: the instant time * coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}} Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job. (cherry picked from commit e34a7ab)
) The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send an RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks. Now one checkpoint id corresponds to one instant. Basic work flow: * write task send request: current ckp id * write task expected response: the instant time * coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}} Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job. (cherry picked from commit e34a7ab)
) The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send an RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks. Now one checkpoint id corresponds to one instant. Basic work flow: * write task send request: current ckp id * write task expected response: the instant time * coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}} Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job. (cherry picked from commit e34a7ab)
…ite path (apache#18264) (cherry picked from commit 69e24ea)
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR (chunk 2 of 2) is part of the async instant generation overhaul, replacing the bootstrap-event-driven instant initialization with an explicit request/response pattern (Correspondent) and reworking the related test harness around per-checkpoint instant requests. The production code surface is small here (thread-factory plumbing, a forced LAZY cleaning policy under non-blocking instant generation, and removing the fixed parallelism on the dummy sink); most of the diff is test-harness restructuring around the new flow. One thing worth a look in the inline comments around the silent policy override in FlinkWriteClients. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability issues below — a dead local variable and a copy-paste doc error. a few small naming and type-consistency nits below; overall the refactoring is clean and the new abstractions (Correspondent, EventBuffers, CommitGuard) are well-structured.
| // open the function and ingest data | ||
| this.pipeline.openFunction(); | ||
| this.ckpMetadata = CkpMetadata.getInstance(conf); | ||
| HoodieWriteConfig writeConfig = this.pipeline.getCoordinator().getWriteClient().getConfig(); |
There was a problem hiding this comment.
🤖 nit: writeConfig is declared but never used — it looks like dead code left over from replacing the ckpMetadata assignment. Could you either wire it into the rest of the method or remove it?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| /** | ||
| * Asserts the data files are empty. | ||
| */ | ||
| public TestHarness assertEmptyBaseFiles() { |
There was a problem hiding this comment.
🤖 nit: the Javadoc (line 503) is a copy-paste from assertEmptyDataFiles — it says "data files" instead of "base files". The assertion message on the next line has the same copy-paste issue ("No data files should have been created" → "No base files should have been created").
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.taskID = taskID; | ||
| } | ||
|
|
||
| public Long getCheckpointId() { |
There was a problem hiding this comment.
🤖 nit: could you change the return type to primitive long here? The underlying field is long and setCheckpointId also takes long, so returning boxed Long implies the value could be null when it never can be. Keeping it primitive stays consistent with getTaskID() and avoids any misleading nullable signal to callers.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
|
||
| private Thread t; | ||
|
|
||
| public ExplicitClassloaderThreadFactory( |
There was a problem hiding this comment.
🤖 nit: the single-character field name t makes the guard check if (t != null) in newThread() harder to parse at a glance — it's not immediately obvious what t represents. Renaming it to thread would make the intent self-explanatory.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
🤖 Line 268: Could we log a WARN when this overrides a user-set policy? isBlockingInstantGeneration is false for MOR upserts without CDC (a very common case), so most users who explicitly configured EAGER will silently get LAZY here with no indication. A one-line WARN when the existing value differs from LAZY would make this much easier to debug.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist