Skip to content

Worker-side IOperatorExecutor Creation#1772

Closed
shengquan-ni wants to merge 9 commits into
masterfrom
shengquan-operator-remote-creation
Closed

Worker-side IOperatorExecutor Creation#1772
shengquan-ni wants to merge 9 commits into
masterfrom
shengquan-operator-remote-creation

Conversation

@shengquan-ni

@shengquan-ni shengquan-ni commented Dec 13, 2022

Copy link
Copy Markdown
Contributor

This PR fixes #1005.

  1. For native operators, this PR moves the creation of operators to the worker side by changing the parameter from IOperatorExecutor to a lambda function () => IOperatorExecutor so that only the lambda function is serialized.
  2. For python operators, this PR makes PythonWorkflowWorker directly initialize the operator by sending InitializeOperatorLogic to the python side. Earlier this control message is sent by the controller before starting the workflow. The payload of this control message is retrieved from the IOperatorExecutor. Since now we are moving the creation to the worker side, this logic needs to be updated.
  3. Eailer we also use IOperatorExecutor on the controller side to determine if a worker is a python worker, this PR changed this part by passing the python operator information from the logical plan so we do not rely on IOperatorExecutor anymore.

@Yicong-Huang

Yicong-Huang commented Dec 13, 2022

Copy link
Copy Markdown
Contributor

@shengquan-ni I see many changes regarding initializing python workers, can you elaborate in the PR description?

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not very clear to me what is the new life cycle of an operator exec. My understanding is that an operator would be 1) initialized with a function/lambda/python code, 2) opened, 3) process tuples, 4) process end of stream (on finish), 5) closed. An operator may also be updated during execution (i.e., UpdateOperatorLogic for python, and potentially also for java).

Can you elaborate more in the PR description and code comments, to mention how are those life cycles be managed in the new design? For example, it is not clear when an operator is initialized with a function, how can an operator update its function?

Comment on lines +62 to +73
pythonProxyClient.enqueueCommand(
ControlInvocation(
IgnoreReply,
InitializeOperatorLogic(
operatorExecutor.asInstanceOf[PythonUDFOpExecV2].getCode,
allUpstreamLinkIds.toArray,
operatorExecutor.isInstanceOf[ISourceOperatorExecutor],
operatorExecutor.asInstanceOf[PythonUDFOpExecV2].getOutputSchema
)
),
SELF
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is a bit weird to me. During the construction of PythonWorkflowWorker, the proxy client and proxy server have not been constructed and the connections have not been established yet. Is it a good idea to Enqueue a command now? Can we do it after all the components are initialized?

@shengquan-ni shengquan-ni Dec 28, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a minor concern because this message will stay in the queue if the proxy is not initialized yet. But I can move it to preStart() call if you want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better to move it after the proxy layer gets initialized. I am not sure if preStart is the correct location either.

allUpstreamLinkIds: Set[LinkIdentity],
supportFaultTolerance: Boolean
) extends WorkflowActor(actorId, parentNetworkCommunicationActorRef, supportFaultTolerance) {
lazy val operatorExecutor: IOperatorExecutor = opFn(workerIdx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a life cycle question that is fundamental to this PR. When exactly is the operatorExecutor initialized? Here you are using lazy val, it seems the operator exec instance will be implicitly initialized when it is first used. In the current implementation, the first usage is when the operator exec is opened. You are effectively pushing initializing right before opening.

The behavior could change as we change the implementation. Would it be better to explicitly initialize an operator instead of depending on implementation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using a DI framework called macwire. It recommends using lazy val. But it's a minor thing, I can change it to val if needed.

Comment on lines +105 to +109
if (o.operatorInfo.operatorGroupName == OperatorGroupConstants.UDF_GROUP) {
// Currently, all UDFs are python operators.
// TODO: change this if we add UDFs for other languages.
pythonOperators.add(amberOperator)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel using the group name is a good way to determine if it is a python operator. We might easily move python operators into different groups. For example, we will introduce "native Python Operator", such as the RedditSource operator, which is written in Python but belongs to the source operator group, user will drag the operator and use it as all other native python operators.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the reason for having this change is now we don't want to have any IOperatorExecutor on the controller side. Previously we check if IOperatorExecutor is a PythonUDFOpExecV2, but now we need another way of identifying python operators. I know this is not a good way to determine, so that's why I left a TODO there. We can refine it later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are having some "native python" executors, for example reddit search operator and lambda expression operator, they will have a java OpDesc but executed with a PythonOpExec. they are likely not to be put under the OperatorGroupConstants.UDF_GROUP group. So I think we must find a better way to determine them now.


def getLink(linkID: LinkIdentity): LinkStrategy = idToLink(linkID)

def getPythonWorkers: Iterable[ActorVirtualIdentity] =

@Yicong-Huang Yicong-Huang Dec 16, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new filtering predicates you added. but can we keep this method? it is convenient if I want to send some control messages to all python workers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, the old one is not available anymore because we don't have the IOperatorExecutor on the controller side.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOperatorExecutor is just one implementation for getPythonWorkers. I think it's better to keep the getPythonWorkers method but we can replace it with another implementation (but not through the operator group).


class Workflow(
workflowId: WorkflowIdentity,
pythonOperators: mutable.Set[OpExecConfig],

@Yicong-Huang Yicong-Huang Dec 16, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a redundant information. operatorToOpExecConfig should already contain all OpExecConfigs. And whether it is python or not should be determined from operatorToOpExecConfig. I suggest we do not pass pythonOperators again.

@shengquan-ni shengquan-ni Dec 28, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By only looking at OpExecConfig, we cannot say whether an operator is a python operator or not. Previously we check the IOperatorExecutor but now we can't. So this change is necessary.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then isn't it better to add some information to OpExecConfig?

@shengquan-ni

Copy link
Copy Markdown
Contributor Author

@Yicong-Huang the following diagram illustrates what's changed in this PR.
Untitled Diagram-Page-1 drawio (9)

@Yicong-Huang

Copy link
Copy Markdown
Contributor

@Yicong-Huang the following diagram illustrates what's changed in this PR.
Untitled Diagram-Page-1 drawio (9)

Could you please edit the PR description to include this diagram? I think it is useful to readers.

@shengquan-ni

Copy link
Copy Markdown
Contributor Author

The change in #1807 is also doing worker-side operator creation. Close this one.

@aglinxinyuan aglinxinyuan deleted the shengquan-operator-remote-creation branch September 6, 2025 00:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Create OperatorExecutor on Worker side

2 participants