Worker-side IOperatorExecutor Creation#1772
Conversation
|
@shengquan-ni I see many changes regarding initializing python workers, can you elaborate in the PR description? |
Yicong-Huang
left a comment
There was a problem hiding this comment.
It is not very clear to me what is the new life cycle of an operator exec. My understanding is that an operator would be 1) initialized with a function/lambda/python code, 2) opened, 3) process tuples, 4) process end of stream (on finish), 5) closed. An operator may also be updated during execution (i.e., UpdateOperatorLogic for python, and potentially also for java).
Can you elaborate more in the PR description and code comments, to mention how are those life cycles be managed in the new design? For example, it is not clear when an operator is initialized with a function, how can an operator update its function?
| pythonProxyClient.enqueueCommand( | ||
| ControlInvocation( | ||
| IgnoreReply, | ||
| InitializeOperatorLogic( | ||
| operatorExecutor.asInstanceOf[PythonUDFOpExecV2].getCode, | ||
| allUpstreamLinkIds.toArray, | ||
| operatorExecutor.isInstanceOf[ISourceOperatorExecutor], | ||
| operatorExecutor.asInstanceOf[PythonUDFOpExecV2].getOutputSchema | ||
| ) | ||
| ), | ||
| SELF | ||
| ) |
There was a problem hiding this comment.
this change is a bit weird to me. During the construction of PythonWorkflowWorker, the proxy client and proxy server have not been constructed and the connections have not been established yet. Is it a good idea to Enqueue a command now? Can we do it after all the components are initialized?
There was a problem hiding this comment.
I think it is a minor concern because this message will stay in the queue if the proxy is not initialized yet. But I can move it to preStart() call if you want.
There was a problem hiding this comment.
I think it may be better to move it after the proxy layer gets initialized. I am not sure if preStart is the correct location either.
| allUpstreamLinkIds: Set[LinkIdentity], | ||
| supportFaultTolerance: Boolean | ||
| ) extends WorkflowActor(actorId, parentNetworkCommunicationActorRef, supportFaultTolerance) { | ||
| lazy val operatorExecutor: IOperatorExecutor = opFn(workerIdx) |
There was a problem hiding this comment.
this could be a life cycle question that is fundamental to this PR. When exactly is the operatorExecutor initialized? Here you are using lazy val, it seems the operator exec instance will be implicitly initialized when it is first used. In the current implementation, the first usage is when the operator exec is opened. You are effectively pushing initializing right before opening.
The behavior could change as we change the implementation. Would it be better to explicitly initialize an operator instead of depending on implementation?
There was a problem hiding this comment.
We are using a DI framework called macwire. It recommends using lazy val. But it's a minor thing, I can change it to val if needed.
| if (o.operatorInfo.operatorGroupName == OperatorGroupConstants.UDF_GROUP) { | ||
| // Currently, all UDFs are python operators. | ||
| // TODO: change this if we add UDFs for other languages. | ||
| pythonOperators.add(amberOperator) | ||
| } |
There was a problem hiding this comment.
I don't feel using the group name is a good way to determine if it is a python operator. We might easily move python operators into different groups. For example, we will introduce "native Python Operator", such as the RedditSource operator, which is written in Python but belongs to the source operator group, user will drag the operator and use it as all other native python operators.
There was a problem hiding this comment.
So the reason for having this change is now we don't want to have any IOperatorExecutor on the controller side. Previously we check if IOperatorExecutor is a PythonUDFOpExecV2, but now we need another way of identifying python operators. I know this is not a good way to determine, so that's why I left a TODO there. We can refine it later.
There was a problem hiding this comment.
we are having some "native python" executors, for example reddit search operator and lambda expression operator, they will have a java OpDesc but executed with a PythonOpExec. they are likely not to be put under the OperatorGroupConstants.UDF_GROUP group. So I think we must find a better way to determine them now.
|
|
||
| def getLink(linkID: LinkIdentity): LinkStrategy = idToLink(linkID) | ||
|
|
||
| def getPythonWorkers: Iterable[ActorVirtualIdentity] = |
There was a problem hiding this comment.
I like the new filtering predicates you added. but can we keep this method? it is convenient if I want to send some control messages to all python workers.
There was a problem hiding this comment.
As I said, the old one is not available anymore because we don't have the IOperatorExecutor on the controller side.
There was a problem hiding this comment.
IOperatorExecutor is just one implementation for getPythonWorkers. I think it's better to keep the getPythonWorkers method but we can replace it with another implementation (but not through the operator group).
|
|
||
| class Workflow( | ||
| workflowId: WorkflowIdentity, | ||
| pythonOperators: mutable.Set[OpExecConfig], |
There was a problem hiding this comment.
I think this is a redundant information. operatorToOpExecConfig should already contain all OpExecConfigs. And whether it is python or not should be determined from operatorToOpExecConfig. I suggest we do not pass pythonOperators again.
There was a problem hiding this comment.
By only looking at OpExecConfig, we cannot say whether an operator is a python operator or not. Previously we check the IOperatorExecutor but now we can't. So this change is necessary.
There was a problem hiding this comment.
then isn't it better to add some information to OpExecConfig?
|
@Yicong-Huang the following diagram illustrates what's changed in this PR. |
Could you please edit the PR description to include this diagram? I think it is useful to readers. |
|
The change in #1807 is also doing worker-side operator creation. Close this one. |

This PR fixes #1005.
IOperatorExecutorto a lambda function() => IOperatorExecutorso that only the lambda function is serialized.PythonWorkflowWorkerdirectly initialize the operator by sendingInitializeOperatorLogicto the python side. Earlier this control message is sent by the controller before starting the workflow. The payload of this control message is retrieved from theIOperatorExecutor. Since now we are moving the creation to the worker side, this logic needs to be updated.IOperatorExecutoron the controller side to determine if a worker is a python worker, this PR changed this part by passing the python operator information from the logical plan so we do not rely onIOperatorExecutoranymore.