feat(amber): Re-enable operator reconfiguration in Amber#4220
Conversation
|
Thanks, @shengquan-ni. Can we get rid of the restriction of "not including source operators"? This restriction could suggest some part of our design is not elegant. We can discuss the complexity of removing this restriction. If needed, we can schedule a call to discuss. I am adding @zuozhiw so that he can review this PR as well. |
Earlier, I mentioned that the MCS could not include source operators due to certain ECM constraints. However, I have since identified a simple solution to address that issue, so this restriction has now been removed. That said, reconfiguring source operators themselves is still not supported. I’ve updated the PR description to explain the reasoning behind this decision. We can further discuss this if needed. |
|
Thank you, @shengquan-ni . @Yicong-Huang @zuozhiw and @aglinxinyuan : can you chime in? |
There was a problem hiding this comment.
Pull request overview
Re-enables Amber’s operator reconfiguration (backend) by adding new RPCs and a Fries-based reconfiguration planner, plus e2e coverage to validate reconfiguration behavior across Java and Python operators while disallowing source-operator modification.
Changes:
- Add controller/worker RPC support for
ReconfigureWorkflowandUpdateExecutor, including Fries MCS component computation. - Implement controller-side orchestration of reconfiguration (direct update for single-op scope; ECM alignment for multi-op scope).
- Add e2e tests covering Python UDF reconfiguration, Java operator reconfiguration, and source-operator constraints; update CI to install Python deps for e2e.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala | Adds Python source-UDF test operator and adjusts Python UDF settings used by tests. |
| amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala | New e2e suite exercising workflow reconfiguration scenarios. |
| amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala | Refactors Fries logic to output reconfiguration components for new request types. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala | New worker RPC handler to apply executor updates. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala | Refactors initialization to reuse shared executor setup logic. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala | Adds shared setupExecutor and wires in UpdateExecutorHandler. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala | New controller RPC handler implementing orchestration/alignment logic for reconfiguration. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala | Mixes in the new ReconfigurationHandler. |
| amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/init.py | Regenerates Python RPC stubs/messages for new/updated services and requests. |
| amber/src/main/python/core/runnables/network_receiver.py | Adjusts is_control handling to stabilize hashing/queue behavior. |
| amber/src/main/python/core/runnables/main_loop.py | Extends control-draining loop to also handle ECM elements. |
| amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py | Wires in the Python UpdateExecutorHandler. |
| amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py | Implements Python-side executor update on UpdateExecutorRequest. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto | Adds UpdateExecutor RPC to WorkerService. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto | Adds ReconfigureWorkflow RPC to ControllerService. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto | Replaces ModifyLogicRequest with WorkflowReconfigureRequest and updates related messages. |
| .github/workflows/github-action-build.yml | Installs Python dependencies in CI to support Python UDF e2e tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I asked Copilot to do a first round of review @shengquan-ni please see if comments are valid. |
…LogicSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
I am actually fine with excluding source operators. On the technical side it might be hard. Our implementation of source is a process function of one single special kick off tuple. This means the entire state of a source operator is associated with this single special tuple, which is hard to be broken and transferred. On the use case side, reconfigure a source operator is less useful: if you need to change the source (e.g., read another csv, fetch from another api/db), it's likely you need to rerun the entire workflow. |
|
|
||
| message WorkflowReconfigureRequest{ | ||
| ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true]; | ||
| repeated UpdateExecutorRequest reconfiguration = 1; |
There was a problem hiding this comment.
might be a good idea to update the name to match
|
I will work on this PR directly in the next 1 or 2 days. |
- Rename ModifyLogicSpec to ReconfigurationSpec to match the renamed WorkflowReconfigureRequest / reconfigureWorkflow naming. - Drop a redundant Thread.sleep(400) after resumeWorkflow; the subsequent Await on the completion promise is the proper sync point. - Replace `if not is_control: is_control = False` with `bool(...)` cast so the lazy-set is unambiguous regardless of None/falsy inputs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@Yicong-Huang and @shengquan-ni Great to merge this PR! Should we include the Fries paper in the description? |
…ow completes apache#4220 wired the engine path but never reported per-worker completion to the client, so completedReconfigurations stayed empty, the diff handler in ExecutionReconfigurationService never fired, and the frontend never received ModifyLogicCompletedEvent — pause→modify→resume applied the new logic but the UI never confirmed it. ReconfigurationHandler now wraps each worker's updateExecutor future with sendToClient(UpdateExecutorCompleted(worker)) on success (both single-op and multi-op Fries components). ExecutionReconfigurationService re-enables the client.registerCallback that records the worker into the reconfiguration store, which is what the existing diff handler watches. Wrapped the callback registration and the workflow-dependent diff handler in protected seams so the new web-service unit test can construct the service without a live AmberClient or Workflow. Added a test that drives the completion path via onWorkerReconfigured and asserts the store updates with Set semantics (idempotent on duplicate worker completion). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end. Three coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape — `UpdateExecutorRequest` now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any boxing — and resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped because the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches, and the web-service callback is re-enabled. The multi-op branch is also restructured so per-worker futures are added with `++=` instead of the original `+=` that put an `Iterable` into the `ArrayBuffer` as a single element. **3. ECM-embedded ControlInvocation reply was misrouted.** When a `ControlInvocation` travels in-band inside an ECM along a data channel between two workers, the receiving worker was replying along the (swapped) data channel — i.e. back to the upstream worker — rather than to the actor that originated the call (the controller). The upstream worker logged a "received unknown ControlReturn" warning and dropped the reply, so the controller's `Future.collect` on the per-worker futures never resolved. This was masked before because the old multi-op `+=` bug effectively ignored those futures; with #2 above fixing the collection, the routing bug surfaced and hung `ReconfigurationSpec`'s source-propagation test for 2h41m before CI killed it. Both worker implementations now use `command.context.sender / .receiver` — set at invocation time by `mkContext` — instead of the network channel's from/to. For normal RPC over a control channel they're equivalent, so the non-ECM path is unaffected. ReconfigurationSpec's bare `Await.result(...)` calls also gain 1-minute timeouts so future hangs surface in a bounded window instead of running out the GHA 6-hour limit. ### Tests `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220; all five tests including the source-propagation case now pass locally in ~1 minute. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end. Three coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape — `UpdateExecutorRequest` now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any boxing — and resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped because the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches, and the web-service callback is re-enabled. The multi-op branch is also restructured so per-worker futures are added with `++=` instead of the original `+=` that put an `Iterable` into the `ArrayBuffer` as a single element. **3. ECM-embedded ControlInvocation reply was misrouted.** When a `ControlInvocation` travels in-band inside an ECM along a data channel between two workers, the receiving worker was replying along the (swapped) data channel — i.e. back to the upstream worker — rather than to the actor that originated the call (the controller). The upstream worker logged a "received unknown ControlReturn" warning and dropped the reply, so the controller's `Future.collect` on the per-worker futures never resolved. This was masked before because the old multi-op `+=` bug effectively ignored those futures; with #2 above fixing the collection, the routing bug surfaced and hung `ReconfigurationSpec`'s source-propagation test for 2h41m before CI killed it. Both worker implementations now use `command.context.sender / .receiver` — set at invocation time by `mkContext` — instead of the network channel's from/to. For normal RPC over a control channel they're equivalent, so the non-ECM path is unaffected. ReconfigurationSpec's bare `Await.result(...)` calls also gain 1-minute timeouts so future hangs surface in a bounded window instead of running out the GHA 6-hour limit. ### Tests `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220; all five tests including the source-propagation case now pass locally in ~1 minute. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end. Three coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape — `UpdateExecutorRequest` now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any boxing — and resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped because the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches, and the web-service callback is re-enabled. The multi-op branch is also restructured so per-worker futures are added with `++=` instead of the original `+=` that put an `Iterable` into the `ArrayBuffer` as a single element. **3. ECM-embedded ControlInvocation reply was misrouted.** When a `ControlInvocation` travels in-band inside an ECM along a data channel between two workers, the receiving worker was replying along the (swapped) data channel — i.e. back to the upstream worker — rather than to the actor that originated the call (the controller). The upstream worker logged a "received unknown ControlReturn" warning and dropped the reply, so the controller's `Future.collect` on the per-worker futures never resolved. This was masked before because the old multi-op `+=` bug effectively ignored those futures; with #2 above fixing the collection, the routing bug surfaced and hung `ReconfigurationSpec`'s source-propagation test for 2h41m before CI killed it. Both worker implementations now use `command.context.sender / .receiver` — set at invocation time by `mkContext` — instead of the network channel's from/to. For normal RPC over a control channel they're equivalent, so the non-ECM path is unaffected. ReconfigurationSpec's bare `Await.result(...)` calls also gain 1-minute timeouts so future hangs surface in a bounded window instead of running out the GHA 6-hour limit. ### Tests `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220; all five tests including the source-propagation case now pass locally in ~1 minute. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…4531) ### What changes were proposed in this PR? Follow-up to #4220, restoring the full reconfiguration flow end-to-end. Two coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape (`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped — the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class was effectively dead code. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches (single-op and multi-op). The web-service `client.registerCallback[UpdateExecutorCompleted]` is re-enabled to advance `completedReconfigurations` on receipt. ### Any related issues, documentation, discussions? Follow-up to #4220. See discussion #4016. ### How was this PR tested? `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from #4220; the new `sendToClient` calls are no-ops when no callback is registered, so existing assertions are unaffected. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end. Three coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape — `UpdateExecutorRequest` now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any boxing — and resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped because the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches, and the web-service callback is re-enabled. The multi-op branch is also restructured so per-worker futures are added with `++=` instead of the original `+=` that put an `Iterable` into the `ArrayBuffer` as a single element. **3. ECM-embedded ControlInvocation reply was misrouted.** When a `ControlInvocation` travels in-band inside an ECM along a data channel between two workers, the receiving worker was replying along the (swapped) data channel — i.e. back to the upstream worker — rather than to the actor that originated the call (the controller). The upstream worker logged a "received unknown ControlReturn" warning and dropped the reply, so the controller's `Future.collect` on the per-worker futures never resolved. This was masked before because the old multi-op `+=` bug effectively ignored those futures; with #2 above fixing the collection, the routing bug surfaced and hung `ReconfigurationSpec`'s source-propagation test for 2h41m before CI killed it. Both worker implementations now use `command.context.sender / .receiver` — set at invocation time by `mkContext` — instead of the network channel's from/to. For normal RPC over a control channel they're equivalent, so the non-ECM path is unaffected. ReconfigurationSpec's bare `Await.result(...)` calls also gain 1-minute timeouts so future hangs surface in a bounded window instead of running out the GHA 6-hour limit. ### Tests `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220; all five tests including the source-propagation case now pass locally in ~1 minute. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…4546) ### What changes were proposed in this PR? Marks `ReconfigurationSpec`'s `"Engine should propagate reconfiguration through a source operator in workflow"` case as `ignore` to unblock CI. The test consistently hangs at the 1-minute `Await` because the UDF stops making progress after processing the `EndChannel` ECM in the multi-worker (Python source -> Python UDF) propagation path. The other four cases in the spec (single-op python UDF reconfigure, java operator reconfigure, source-as-target rejection, two-UDF chain) still run and pass. A code comment is left at the test pointing at the symptom and the condition for re-enabling. ### Any related issues, documentation, discussions? Put out the fire in #4545. Investigation is in parallel. Follow-up to #4220 (which added the test) and #4531 (which restored the web-service entrypoint but did not fix this hang). ### How was this PR tested? `sbt "WorkflowExecutionService/testOnly *ReconfigurationSpec"` locally — 4 cases pass, the 5th is now skipped (was previously timing out at 1 min). ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
### What changes were proposed in this PR? Per the discussion in apache#4016, we decided to bring the operator reconfiguration feature back to the Amber engine. This PR includes only the backend changes for this feature, but it is enabled on both the Java and Python sides. Since the code for the Fries Algorithm is still in the codebase, this feature is relatively straightforward to implement and maintain going forward. This PR allows source operators to be included in the reconfiguration scope (MCS), but it does not allow source operators themselves to be modified. First, under the current iterator-based interface, the state of a source operator is fully encapsulated within its iterator. Reading or manipulating the iterator state is already very difficult in both Scala and Python. Second, even if we could access the state, it would still be hard for users to clearly define the expected state transition semantics—e.g., whether to preserve the old state, reset it, or partially transfer it to the new operator. Due to the reasons above, we disable reconfiguration of source operators for now. If clear use cases emerge in the future, we can revisit this design decision. ### Any related issues, documentation, discussions? See apache#4016. ### How was this PR tested? Introduced unit tests for this feature. This PR also updates scala CI to install python dependencies as we are using Python UDFs in our e2e tests. ### Was this PR authored or co-authored using generative AI tooling? No --------- Signed-off-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com>
…pache#4531) ### What changes were proposed in this PR? Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end. Two coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape (`UpdateExecutorRequest(targetOpId, newExecInitInfo)` — no more proto-Any boxing). Resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped — the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. The `UpdateExecutorCompleted` case class was effectively dead code. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches (single-op and multi-op). The web-service `client.registerCallback[UpdateExecutorCompleted]` is re-enabled to advance `completedReconfigurations` on receipt. ### Any related issues, documentation, discussions? Follow-up to apache#4220. See discussion apache#4016. ### How was this PR tested? `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220; the new `sendToClient` calls are no-ops when no callback is registered, so existing assertions are unaffected. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pache#4546) ### What changes were proposed in this PR? Marks `ReconfigurationSpec`'s `"Engine should propagate reconfiguration through a source operator in workflow"` case as `ignore` to unblock CI. The test consistently hangs at the 1-minute `Await` because the UDF stops making progress after processing the `EndChannel` ECM in the multi-worker (Python source -> Python UDF) propagation path. The other four cases in the spec (single-op python UDF reconfigure, java operator reconfigure, source-as-target rejection, two-UDF chain) still run and pass. A code comment is left at the test pointing at the symptom and the condition for re-enabling. ### Any related issues, documentation, discussions? Put out the fire in apache#4545. Investigation is in parallel. Follow-up to apache#4220 (which added the test) and apache#4531 (which restored the web-service entrypoint but did not fix this hang). ### How was this PR tested? `sbt "WorkflowExecutionService/testOnly *ReconfigurationSpec"` locally — 4 cases pass, the 5th is now skipped (was previously timing out at 1 min). ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What changes were proposed in this PR?
Per the discussion in #4016, we decided to bring the operator reconfiguration feature back to the Amber engine. This PR includes only the backend changes for this feature, but it is enabled on both the Java and Python sides.
Since the code for the Fries Algorithm is still in the codebase, this feature is relatively straightforward to implement and maintain going forward.
This PR allows source operators to be included in the reconfiguration scope (MCS), but it does not allow source operators themselves to be modified. First, under the current iterator-based interface, the state of a source operator is fully encapsulated within its iterator. Reading or manipulating the iterator state is already very difficult in both Scala and Python. Second, even if we could access the state, it would still be hard for users to clearly define the expected state transition semantics—e.g., whether to preserve the old state, reset it, or partially transfer it to the new operator.
Due to the reasons above, we disable reconfiguration of source operators for now. If clear use cases emerge in the future, we can revisit this design decision.
Any related issues, documentation, discussions?
See #4016.
How was this PR tested?
Introduced unit tests for this feature.
This PR also updates scala CI to install python dependencies as we are using Python UDFs in our e2e tests.
Was this PR authored or co-authored using generative AI tooling?
No