Skip to content

feat(amber): Re-enable operator reconfiguration in Amber#4220

Merged
Yicong-Huang merged 23 commits into
mainfrom
shengquan-add-reconfigration
Apr 25, 2026
Merged

feat(amber): Re-enable operator reconfiguration in Amber#4220
Yicong-Huang merged 23 commits into
mainfrom
shengquan-add-reconfigration

Conversation

@shengquan-ni

@shengquan-ni shengquan-ni commented Feb 15, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Per the discussion in #4016, we decided to bring the operator reconfiguration feature back to the Amber engine. This PR includes only the backend changes for this feature, but it is enabled on both the Java and Python sides.

Since the code for the Fries Algorithm is still in the codebase, this feature is relatively straightforward to implement and maintain going forward.

This PR allows source operators to be included in the reconfiguration scope (MCS), but it does not allow source operators themselves to be modified. First, under the current iterator-based interface, the state of a source operator is fully encapsulated within its iterator. Reading or manipulating the iterator state is already very difficult in both Scala and Python. Second, even if we could access the state, it would still be hard for users to clearly define the expected state transition semantics—e.g., whether to preserve the old state, reset it, or partially transfer it to the new operator.

Due to the reasons above, we disable reconfiguration of source operators for now. If clear use cases emerge in the future, we can revisit this design decision.

Any related issues, documentation, discussions?

See #4016.

How was this PR tested?

Introduced unit tests for this feature.

This PR also updates scala CI to install python dependencies as we are using Python UDFs in our e2e tests.

Was this PR authored or co-authored using generative AI tooling?

No

@chenlica chenlica requested a review from zuozhiw February 15, 2026 17:41
@chenlica

Copy link
Copy Markdown
Contributor

Thanks, @shengquan-ni. Can we get rid of the restriction of "not including source operators"? This restriction could suggest some part of our design is not elegant. We can discuss the complexity of removing this restriction. If needed, we can schedule a call to discuss.

I am adding @zuozhiw so that he can review this PR as well.

@github-actions github-actions Bot added the ci changes related to CI label Feb 15, 2026
@shengquan-ni

Copy link
Copy Markdown
Contributor Author

Thanks, @shengquan-ni. Can we get rid of the restriction of "not including source operators"? This restriction could suggest some part of our design is not elegant. We can discuss the complexity of removing this restriction. If needed, we can schedule a call to discuss.

I am adding @zuozhiw so that he can review this PR as well.

Earlier, I mentioned that the MCS could not include source operators due to certain ECM constraints. However, I have since identified a simple solution to address that issue, so this restriction has now been removed.

That said, reconfiguring source operators themselves is still not supported. I’ve updated the PR description to explain the reasoning behind this decision.

We can further discuss this if needed.

@chenlica

Copy link
Copy Markdown
Contributor

Thank you, @shengquan-ni .

@Yicong-Huang @zuozhiw and @aglinxinyuan : can you chime in?

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Re-enables Amber’s operator reconfiguration (backend) by adding new RPCs and a Fries-based reconfiguration planner, plus e2e coverage to validate reconfiguration behavior across Java and Python operators while disallowing source-operator modification.

Changes:

  • Add controller/worker RPC support for ReconfigureWorkflow and UpdateExecutor, including Fries MCS component computation.
  • Implement controller-side orchestration of reconfiguration (direct update for single-op scope; ECM alignment for multi-op scope).
  • Add e2e tests covering Python UDF reconfiguration, Java operator reconfiguration, and source-operator constraints; update CI to install Python deps for e2e.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala Adds Python source-UDF test operator and adjusts Python UDF settings used by tests.
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala New e2e suite exercising workflow reconfiguration scenarios.
amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala Refactors Fries logic to output reconfiguration components for new request types.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala New worker RPC handler to apply executor updates.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala Refactors initialization to reuse shared executor setup logic.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala Adds shared setupExecutor and wires in UpdateExecutorHandler.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala New controller RPC handler implementing orchestration/alignment logic for reconfiguration.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala Mixes in the new ReconfigurationHandler.
amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/init.py Regenerates Python RPC stubs/messages for new/updated services and requests.
amber/src/main/python/core/runnables/network_receiver.py Adjusts is_control handling to stabilize hashing/queue behavior.
amber/src/main/python/core/runnables/main_loop.py Extends control-draining loop to also handle ECM elements.
amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py Wires in the Python UpdateExecutorHandler.
amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py Implements Python-side executor update on UpdateExecutorRequest.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto Adds UpdateExecutor RPC to WorkerService.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto Adds ReconfigureWorkflow RPC to ControllerService.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto Replaces ModifyLogicRequest with WorkflowReconfigureRequest and updates related messages.
.github/workflows/github-action-build.yml Installs Python dependencies in CI to support Python UDF e2e tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala Outdated
@Yicong-Huang

Copy link
Copy Markdown
Contributor

I asked Copilot to do a first round of review @shengquan-ni please see if comments are valid.

…LogicSpec.scala

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
@Yicong-Huang

Copy link
Copy Markdown
Contributor

Earlier, I mentioned that the MCS could not include source operators due to certain ECM constraints. However, I have since identified a simple solution to address that issue, so this restriction has now been removed.

That said, reconfiguring source operators themselves is still not supported. I’ve updated the PR description to explain the reasoning behind this decision.

I am actually fine with excluding source operators. On the technical side it might be hard. Our implementation of source is a process function of one single special kick off tuple. This means the entire state of a source operator is associated with this single special tuple, which is hard to be broken and transferred. On the use case side, reconfigure a source operator is less useful: if you need to change the source (e.g., read another csv, fetch from another api/db), it's likely you need to rerun the entire workflow.


message WorkflowReconfigureRequest{
ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true];
repeated UpdateExecutorRequest reconfiguration = 1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be a good idea to update the name to match

@Yicong-Huang

Copy link
Copy Markdown
Contributor

I will work on this PR directly in the next 1 or 2 days.

Comment thread .github/workflows/github-action-build.yml
Comment thread amber/src/main/python/core/runnables/network_receiver.py Outdated
Comment thread amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala Outdated
Comment thread amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala Outdated
- Rename ModifyLogicSpec to ReconfigurationSpec to match the renamed
  WorkflowReconfigureRequest / reconfigureWorkflow naming.
- Drop a redundant Thread.sleep(400) after resumeWorkflow; the
  subsequent Await on the completion promise is the proper sync point.
- Replace `if not is_control: is_control = False` with `bool(...)`
  cast so the lazy-set is unambiguous regardless of None/falsy inputs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Yicong-Huang Yicong-Huang merged commit e635dd0 into main Apr 25, 2026
11 checks passed
@Yicong-Huang Yicong-Huang deleted the shengquan-add-reconfigration branch April 25, 2026 23:30
@chenlica

Copy link
Copy Markdown
Contributor

@Yicong-Huang and @shengquan-ni Great to merge this PR! Should we include the Fries paper in the description?

Yicong-Huang added a commit to Yicong-Huang/texera that referenced this pull request Apr 26, 2026
…ow completes

apache#4220 wired the engine path but never reported per-worker completion to the
client, so completedReconfigurations stayed empty, the diff handler in
ExecutionReconfigurationService never fired, and the frontend never received
ModifyLogicCompletedEvent — pause→modify→resume applied the new logic but the
UI never confirmed it.

ReconfigurationHandler now wraps each worker's updateExecutor future with
sendToClient(UpdateExecutorCompleted(worker)) on success (both single-op and
multi-op Fries components). ExecutionReconfigurationService re-enables the
client.registerCallback that records the worker into the reconfiguration
store, which is what the existing diff handler watches.

Wrapped the callback registration and the workflow-dependent diff handler in
protected seams so the new web-service unit test can construct the service
without a live AmberClient or Workflow. Added a test that drives the
completion path via onWorkerReconfigured and asserts the store updates with
Set semantics (idempotent on duplicate worker completion).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit to Yicong-Huang/texera that referenced this pull request Apr 27, 2026
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Three coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape — `UpdateExecutorRequest`
now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any
boxing — and resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch. `StateTransferFunc` is dropped
because the new request schema doesn't carry it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those, `ExecutionReconfigurationService.completedReconfigurations`
stayed empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches, and the web-service callback is re-enabled. The multi-op
branch is also restructured so per-worker futures are added with `++=`
instead of the original `+=` that put an `Iterable` into the
`ArrayBuffer` as a single element.

**3. ECM-embedded ControlInvocation reply was misrouted.**
When a `ControlInvocation` travels in-band inside an ECM along a data
channel between two workers, the receiving worker was replying along
the (swapped) data channel — i.e. back to the upstream worker — rather
than to the actor that originated the call (the controller). The
upstream worker logged a "received unknown ControlReturn" warning and
dropped the reply, so the controller's `Future.collect` on the
per-worker futures never resolved. This was masked before because the
old multi-op `+=` bug effectively ignored those futures; with #2 above
fixing the collection, the routing bug surfaced and hung
`ReconfigurationSpec`'s source-propagation test for 2h41m before CI
killed it.

Both worker implementations now use `command.context.sender / .receiver`
— set at invocation time by `mkContext` — instead of the network
channel's from/to. For normal RPC over a control channel they're
equivalent, so the non-ECM path is unaffected.

ReconfigurationSpec's bare `Await.result(...)` calls also gain
1-minute timeouts so future hangs surface in a bounded window instead
of running out the GHA 6-hour limit.

### Tests

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right
  `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh
  `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
  `completedReconfigurations` with Set semantics (idempotent on
  duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
all five tests including the source-propagation case now pass locally
in ~1 minute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit to Yicong-Huang/texera that referenced this pull request Apr 27, 2026
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Three coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape — `UpdateExecutorRequest`
now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any
boxing — and resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch. `StateTransferFunc` is dropped
because the new request schema doesn't carry it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those, `ExecutionReconfigurationService.completedReconfigurations`
stayed empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches, and the web-service callback is re-enabled. The multi-op
branch is also restructured so per-worker futures are added with `++=`
instead of the original `+=` that put an `Iterable` into the
`ArrayBuffer` as a single element.

**3. ECM-embedded ControlInvocation reply was misrouted.**
When a `ControlInvocation` travels in-band inside an ECM along a data
channel between two workers, the receiving worker was replying along
the (swapped) data channel — i.e. back to the upstream worker — rather
than to the actor that originated the call (the controller). The
upstream worker logged a "received unknown ControlReturn" warning and
dropped the reply, so the controller's `Future.collect` on the
per-worker futures never resolved. This was masked before because the
old multi-op `+=` bug effectively ignored those futures; with #2 above
fixing the collection, the routing bug surfaced and hung
`ReconfigurationSpec`'s source-propagation test for 2h41m before CI
killed it.

Both worker implementations now use `command.context.sender / .receiver`
— set at invocation time by `mkContext` — instead of the network
channel's from/to. For normal RPC over a control channel they're
equivalent, so the non-ECM path is unaffected.

ReconfigurationSpec's bare `Await.result(...)` calls also gain
1-minute timeouts so future hangs surface in a bounded window instead
of running out the GHA 6-hour limit.

### Tests

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right
  `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh
  `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
  `completedReconfigurations` with Set semantics (idempotent on
  duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
all five tests including the source-propagation case now pass locally
in ~1 minute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit to Yicong-Huang/texera that referenced this pull request Apr 27, 2026
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Three coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape — `UpdateExecutorRequest`
now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any
boxing — and resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch. `StateTransferFunc` is dropped
because the new request schema doesn't carry it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those, `ExecutionReconfigurationService.completedReconfigurations`
stayed empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches, and the web-service callback is re-enabled. The multi-op
branch is also restructured so per-worker futures are added with `++=`
instead of the original `+=` that put an `Iterable` into the
`ArrayBuffer` as a single element.

**3. ECM-embedded ControlInvocation reply was misrouted.**
When a `ControlInvocation` travels in-band inside an ECM along a data
channel between two workers, the receiving worker was replying along
the (swapped) data channel — i.e. back to the upstream worker — rather
than to the actor that originated the call (the controller). The
upstream worker logged a "received unknown ControlReturn" warning and
dropped the reply, so the controller's `Future.collect` on the
per-worker futures never resolved. This was masked before because the
old multi-op `+=` bug effectively ignored those futures; with #2 above
fixing the collection, the routing bug surfaced and hung
`ReconfigurationSpec`'s source-propagation test for 2h41m before CI
killed it.

Both worker implementations now use `command.context.sender / .receiver`
— set at invocation time by `mkContext` — instead of the network
channel's from/to. For normal RPC over a control channel they're
equivalent, so the non-ECM path is unaffected.

ReconfigurationSpec's bare `Await.result(...)` calls also gain
1-minute timeouts so future hangs surface in a bounded window instead
of running out the GHA 6-hour limit.

### Tests

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right
  `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh
  `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
  `completedReconfigurations` with Set semantics (idempotent on
  duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
all five tests including the source-propagation case now pass locally
in ~1 minute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit that referenced this pull request Apr 27, 2026
…4531)

### What changes were proposed in this PR?

Follow-up to #4220, restoring the full reconfiguration flow end-to-end.
Two coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape
(`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more
proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch.

`StateTransferFunc` is dropped — the new request schema doesn't carry
it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those,
`ExecutionReconfigurationService.completedReconfigurations` stayed
empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class
was effectively dead code.

Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches (single-op and multi-op). The web-service
`client.registerCallback[UpdateExecutorCompleted]` is re-enabled to
advance `completedReconfigurations` on receipt.

### Any related issues, documentation, discussions?

Follow-up to #4220. See discussion #4016.

### How was this PR tested?

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right `(targetOpId,
newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
`completedReconfigurations` with Set semantics (idempotent on
duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from #4220;
the new `sendToClient` calls are no-ops when no callback is registered,
so existing assertions are unaffected.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit to Yicong-Huang/texera that referenced this pull request Apr 27, 2026
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Three coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape — `UpdateExecutorRequest`
now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any
boxing — and resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch. `StateTransferFunc` is dropped
because the new request schema doesn't carry it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those, `ExecutionReconfigurationService.completedReconfigurations`
stayed empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches, and the web-service callback is re-enabled. The multi-op
branch is also restructured so per-worker futures are added with `++=`
instead of the original `+=` that put an `Iterable` into the
`ArrayBuffer` as a single element.

**3. ECM-embedded ControlInvocation reply was misrouted.**
When a `ControlInvocation` travels in-band inside an ECM along a data
channel between two workers, the receiving worker was replying along
the (swapped) data channel — i.e. back to the upstream worker — rather
than to the actor that originated the call (the controller). The
upstream worker logged a "received unknown ControlReturn" warning and
dropped the reply, so the controller's `Future.collect` on the
per-worker futures never resolved. This was masked before because the
old multi-op `+=` bug effectively ignored those futures; with #2 above
fixing the collection, the routing bug surfaced and hung
`ReconfigurationSpec`'s source-propagation test for 2h41m before CI
killed it.

Both worker implementations now use `command.context.sender / .receiver`
— set at invocation time by `mkContext` — instead of the network
channel's from/to. For normal RPC over a control channel they're
equivalent, so the non-ECM path is unaffected.

ReconfigurationSpec's bare `Await.result(...)` calls also gain
1-minute timeouts so future hangs surface in a bounded window instead
of running out the GHA 6-hour limit.

### Tests

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right
  `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh
  `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
  `completedReconfigurations` with Set semantics (idempotent on
  duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
all five tests including the source-propagation case now pass locally
in ~1 minute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Yicong-Huang added a commit that referenced this pull request Apr 28, 2026
…4546)

### What changes were proposed in this PR?

Marks `ReconfigurationSpec`'s `"Engine should propagate reconfiguration
through a source operator in workflow"` case as `ignore` to unblock CI.
The test consistently hangs at the 1-minute `Await` because the UDF
stops making progress after processing the `EndChannel` ECM in the
multi-worker (Python source -> Python UDF) propagation path.

The other four cases in the spec (single-op python UDF reconfigure, java
operator reconfigure, source-as-target rejection, two-UDF chain) still
run and pass.

A code comment is left at the test pointing at the symptom and the
condition for re-enabling.

### Any related issues, documentation, discussions?

Put out the fire in #4545. Investigation is in parallel.

Follow-up to #4220 (which added the test) and #4531 (which restored the
web-service entrypoint but did not fix this hang).

### How was this PR tested?

`sbt "WorkflowExecutionService/testOnly *ReconfigurationSpec"` locally —
4 cases pass, the 5th is now skipped (was previously timing out at 1
min).

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
### What changes were proposed in this PR?
Per the discussion in apache#4016,
we decided to bring the operator reconfiguration feature back to the
Amber engine. This PR includes only the backend changes for this
feature, but it is enabled on both the Java and Python sides.

Since the code for the Fries Algorithm is still in the codebase, this
feature is relatively straightforward to implement and maintain going
forward.

This PR allows source operators to be included in the reconfiguration
scope (MCS), but it does not allow source operators themselves to be
modified. First, under the current iterator-based interface, the state
of a source operator is fully encapsulated within its iterator. Reading
or manipulating the iterator state is already very difficult in both
Scala and Python. Second, even if we could access the state, it would
still be hard for users to clearly define the expected state transition
semantics—e.g., whether to preserve the old state, reset it, or
partially transfer it to the new operator.

Due to the reasons above, we disable reconfiguration of source operators
for now. If clear use cases emerge in the future, we can revisit this
design decision.


 ### Any related issues, documentation, discussions?
 See apache#4016.

 ### How was this PR tested? 
Introduced unit tests for this feature. 

This PR also updates scala CI to install python dependencies as we are
using Python UDFs in our e2e tests.

 ### Was this PR authored or co-authored using generative AI tooling?
No

---------

Signed-off-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
Signed-off-by: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com>
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…pache#4531)

### What changes were proposed in this PR?

Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Two coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape
(`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more
proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch.

`StateTransferFunc` is dropped — the new request schema doesn't carry
it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those,
`ExecutionReconfigurationService.completedReconfigurations` stayed
empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class
was effectively dead code.

Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches (single-op and multi-op). The web-service
`client.registerCallback[UpdateExecutorCompleted]` is re-enabled to
advance `completedReconfigurations` on receipt.

### Any related issues, documentation, discussions?

Follow-up to apache#4220. See discussion apache#4016.

### How was this PR tested?

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right `(targetOpId,
newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
`completedReconfigurations` with Set semantics (idempotent on
duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
the new `sendToClient` calls are no-ops when no callback is registered,
so existing assertions are unaffected.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…pache#4546)

### What changes were proposed in this PR?

Marks `ReconfigurationSpec`'s `"Engine should propagate reconfiguration
through a source operator in workflow"` case as `ignore` to unblock CI.
The test consistently hangs at the 1-minute `Await` because the UDF
stops making progress after processing the `EndChannel` ECM in the
multi-worker (Python source -> Python UDF) propagation path.

The other four cases in the spec (single-op python UDF reconfigure, java
operator reconfigure, source-as-target rejection, two-UDF chain) still
run and pass.

A code comment is left at the test pointing at the symptom and the
condition for re-enabling.

### Any related issues, documentation, discussions?

Put out the fire in apache#4545. Investigation is in parallel.

Follow-up to apache#4220 (which added the test) and apache#4531 (which restored the
web-service entrypoint but did not fix this hang).

### How was this PR tested?

`sbt "WorkflowExecutionService/testOnly *ReconfigurationSpec"` locally —
4 cases pass, the 5th is now skipped (was previously timing out at 1
min).

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants