Skip to content

feat: [DNM] Async instant generation fixes over 0.14.1#18767

Open
danny0405 wants to merge 4 commits into
apache:release-0.14.1from
danny0405:uber-fixes
Open

feat: [DNM] Async instant generation fixes over 0.14.1#18767
danny0405 wants to merge 4 commits into
apache:release-0.14.1from
danny0405:uber-fixes

Conversation

@danny0405
Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

Summary and Changelog

Impact

Risk Level

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

danny0405 and others added 4 commits May 18, 2026 13:59
)

The new instant time generation utilizes RPC request to coordinate creation of new instants.
Each write task will send an RPC request to the coordinator for the instant time, the coordinator
uses a global lock to guard the access from multiple tasks.

Now one checkpoint id corresponds to one instant.

Basic work flow:

* write task send request: current ckp id
* write task expected response: the instant time
* coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}}

Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored
from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job.

(cherry picked from commit e34a7ab)
)

The new instant time generation utilizes RPC request to coordinate creation of new instants.
Each write task will send an RPC request to the coordinator for the instant time, the coordinator
uses a global lock to guard the access from multiple tasks.

Now one checkpoint id corresponds to one instant.

Basic work flow:

* write task send request: current ckp id
* write task expected response: the instant time
* coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}}

Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored
from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job.

(cherry picked from commit e34a7ab)
)

The new instant time generation utilizes RPC request to coordinate creation of new instants.
Each write task will send an RPC request to the coordinator for the instant time, the coordinator
uses a global lock to guard the access from multiple tasks.

Now one checkpoint id corresponds to one instant.

Basic work flow:

* write task send request: current ckp id
* write task expected response: the instant time
* coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}}

Note that the ckp id used in the request is the last known id instead of the current checkpoint, if a task is restored
from a state of the current job, it is the state checkpoint id, otherwise -1 for fresh new job.

(cherry picked from commit e34a7ab)
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan changed the title feat: [DNM] Uber fixes feat: [DNM] Async instant generation fixes over 0.14.1 May 18, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR (chunk 2 of 2) is part of the async instant generation overhaul, replacing the bootstrap-event-driven instant initialization with an explicit request/response pattern (Correspondent) and reworking the related test harness around per-checkpoint instant requests. The production code surface is small here (thread-factory plumbing, a forced LAZY cleaning policy under non-blocking instant generation, and removing the fixed parallelism on the dummy sink); most of the diff is test-harness restructuring around the new flow. One thing worth a look in the inline comments around the silent policy override in FlinkWriteClients. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability issues below — a dead local variable and a copy-paste doc error. a few small naming and type-consistency nits below; overall the refactoring is clean and the new abstractions (Correspondent, EventBuffers, CommitGuard) are well-structured.

// open the function and ingest data
this.pipeline.openFunction();
this.ckpMetadata = CkpMetadata.getInstance(conf);
HoodieWriteConfig writeConfig = this.pipeline.getCoordinator().getWriteClient().getConfig();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: writeConfig is declared but never used — it looks like dead code left over from replacing the ckpMetadata assignment. Could you either wire it into the rest of the method or remove it?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

/**
* Asserts the data files are empty.
*/
public TestHarness assertEmptyBaseFiles() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the Javadoc (line 503) is a copy-paste from assertEmptyDataFiles — it says "data files" instead of "base files". The assertion message on the next line has the same copy-paste issue ("No data files should have been created""No base files should have been created").

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.taskID = taskID;
}

public Long getCheckpointId() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you change the return type to primitive long here? The underlying field is long and setCheckpointId also takes long, so returning boxed Long implies the value could be null when it never can be. Keeping it primitive stays consistent with getTaskID() and avoids any misleading nullable signal to callers.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


private Thread t;

public ExplicitClassloaderThreadFactory(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the single-character field name t makes the guard check if (t != null) in newThread() harder to parse at a glance — it's not immediately obvious what t represents. Renaming it to thread would make the intent self-explanatory.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Line 268: Could we log a WARN when this overrides a user-set policy? isBlockingInstantGeneration is false for MOR upserts without CDC (a very common case), so most users who explicitly configured EAGER will silently get LAZY here with no indication. A one-line WARN when the existing value differs from LAZY would make this much easier to debug.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants