From 12bc724fc505530eb6ab2854cdf07b077ba53b5b Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 4 Jun 2026 15:27:13 -0700 Subject: [PATCH 1/2] Add LangGraph workflow streams sample Demonstrate the LangGraph plugin's Workflow Streams support: a node emits live tokens via get_stream_writer() (routed by the plugin's streaming_topic), and the workflow publishes coarse astream progress to its own topic. A single client subscribes to all topics and demultiplexes on item.topic. Bumps the langgraph group to temporalio>=1.28.0 (where workflow streams ship) and drops the now-obsolete langsmith<0.7.34 constraint, which was specific to the 1.27.2 langsmith patch. Co-Authored-By: Claude Opus 4.8 (1M context) --- langgraph_plugin/README.md | 2 + .../graph_api/streaming/__init__.py | 0 .../graph_api/streaming/run_worker.py | 35 ++++++++ .../graph_api/streaming/run_workflow.py | 44 +++++++++ .../graph_api/streaming/workflow.py | 90 +++++++++++++++++++ pyproject.toml | 5 +- tests/langgraph_plugin/streaming_test.py | 63 +++++++++++++ uv.lock | 28 +++--- 8 files changed, 248 insertions(+), 19 deletions(-) create mode 100644 langgraph_plugin/graph_api/streaming/__init__.py create mode 100644 langgraph_plugin/graph_api/streaming/run_worker.py create mode 100644 langgraph_plugin/graph_api/streaming/run_workflow.py create mode 100644 langgraph_plugin/graph_api/streaming/workflow.py create mode 100644 tests/langgraph_plugin/streaming_test.py diff --git a/langgraph_plugin/README.md b/langgraph_plugin/README.md index 8691d178..a242e943 100644 --- a/langgraph_plugin/README.md +++ b/langgraph_plugin/README.md @@ -16,6 +16,7 @@ Samples are organized by API style: | **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. | | **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. | | **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. | +| **Streaming** | [graph_api/streaming](graph_api/streaming) | -- | Streams live output from a running workflow via [Workflow Streams](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams): node tokens via `get_stream_writer()` + `streaming_topic`, plus workflow-side `astream` progress published with `WorkflowStream.topic().publish()`. | | **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. | ## Prerequisites @@ -67,6 +68,7 @@ uv run langgraph_plugin//langsmith_tracing/main.py - **Continue-as-new with caching** -- `cache()` captures completed task results; passing the cache to the next execution avoids re-running them. - **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops. - **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them. +- **Streaming** -- Workflow Streams expose a durable, offset-addressed event channel that external clients subscribe to while the workflow is still running. Nodes emit fine-grained tokens via `get_stream_writer()` (routed by the plugin's `streaming_topic`), and the workflow can publish coarse `astream` progress to its own topic. ## Related diff --git a/langgraph_plugin/graph_api/streaming/__init__.py b/langgraph_plugin/graph_api/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/langgraph_plugin/graph_api/streaming/run_worker.py b/langgraph_plugin/graph_api/streaming/run_worker.py new file mode 100644 index 00000000..46e0090e --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the streaming sample (Graph API).""" + +import asyncio +import os + +from temporalio.client import Client +from temporalio.contrib.langgraph import LangGraphPlugin +from temporalio.worker import Worker + +from langgraph_plugin.graph_api.streaming.workflow import ( + StreamingWorkflow, + make_streaming_graph, +) + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + # streaming_topic routes node get_stream_writer() output onto the "tokens" topic. + plugin = LangGraphPlugin( + graphs={"streaming": make_streaming_graph()}, + streaming_topic="tokens", + ) + + worker = Worker( + client, + task_queue="langgraph-streaming", + workflows=[StreamingWorkflow], + plugins=[plugin], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/langgraph_plugin/graph_api/streaming/run_workflow.py b/langgraph_plugin/graph_api/streaming/run_workflow.py new file mode 100644 index 00000000..94881cca --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/run_workflow.py @@ -0,0 +1,44 @@ +"""Start the streaming workflow and subscribe to its Workflow Stream (Graph API).""" + +import asyncio +import os +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from langgraph_plugin.graph_api.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + handle = await client.start_workflow( + StreamingWorkflow.run, + "a brave robot", + id="streaming-workflow", + task_queue="langgraph-streaming", + ) + + # Subscribe to all topics on the workflow's stream and demultiplex on topic. + ws = WorkflowStreamClient.create(client, handle.id) + async for item in ws.subscribe( + from_offset=0, + result_type=dict, + poll_cooldown=timedelta(milliseconds=50), + ): + if item.topic == "tokens": + print(item.data["token"], end="", flush=True) + elif item.topic == "progress": + if item.data.get("done"): + # Let the workflow know we are done consuming so it can complete. + await handle.signal(StreamingWorkflow.ack_stream) + break + print(f"\n[progress] {item.data}") + + result = await handle.result() + print(f"\n\nFinal result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/langgraph_plugin/graph_api/streaming/workflow.py b/langgraph_plugin/graph_api/streaming/workflow.py new file mode 100644 index 00000000..a698b0d6 --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/workflow.py @@ -0,0 +1,90 @@ +"""Streaming with the LangGraph Graph API and Temporal Workflow Streams. + +A workflow's :class:`WorkflowStream` is a durable, offset-addressed event channel +external clients can subscribe to while the workflow is still running. This sample +demonstrates both ways the LangGraph plugin produces stream items: + +- **Node token streaming** -- the ``write_story`` node calls LangGraph's + ``get_stream_writer()`` to emit fine-grained tokens. The plugin is configured with + ``streaming_topic="tokens"`` (see ``run_worker.py``), which routes those writes onto + the ``"tokens"`` topic. +- **Workflow-side ``astream`` publish** -- the workflow drives the graph with + ``app.astream(...)`` and publishes each node-completion chunk onto a ``"progress"`` + topic it owns. + +A single client subscribes to all topics and demultiplexes on ``item.topic``. +""" + +from datetime import timedelta + +from langgraph.config import get_stream_writer +from langgraph.graph import START, StateGraph +from temporalio import workflow +from temporalio.contrib.langgraph import graph as temporal_graph +from temporalio.contrib.workflow_streams import WorkflowStream +from typing_extensions import TypedDict + + +class State(TypedDict): + topic: str + story: str + + +async def outline(state: State) -> dict[str, str]: + """Produce a short opening line. Runs first so ``astream`` emits an early chunk.""" + return {"story": f"A story about {state['topic']}:"} + + +async def write_story(state: State) -> dict[str, str]: + """Write the story, emitting each word as a token via the stream writer.""" + writer = get_stream_writer() + words = f"{state['story']} Once upon a time, there was {state['topic']}.".split() + for word in words: + writer({"token": word + " "}) + return {"story": " ".join(words)} + + +def make_streaming_graph() -> StateGraph: + g = StateGraph(State) + activity_metadata = { + "execute_in": "activity", + "start_to_close_timeout": timedelta(seconds=10), + } + g.add_node("outline", outline, metadata=activity_metadata) + g.add_node("write_story", write_story, metadata=activity_metadata) + g.add_edge(START, "outline") + g.add_edge("outline", "write_story") + return g + + +@workflow.defn +class StreamingWorkflow: + def __init__(self) -> None: + # WorkflowStream must be constructed during workflow initialization. + self.stream = WorkflowStream() + self._stream_acked = False + + @workflow.signal + def ack_stream(self) -> None: + """Signalled by the client once it has finished consuming the stream.""" + self._stream_acked = True + + @workflow.run + async def run(self, topic: str) -> str: + app = temporal_graph("streaming").compile() + progress = self.stream.topic("progress") + + story = "" + async for chunk in app.astream({"topic": topic, "story": ""}): + # Each chunk is {node_name: {state updates}}. Forward it as progress. + progress.publish(chunk) + for node_update in chunk.values(): + if "story" in node_update: + story = node_update["story"] + + progress.publish({"done": True}) + + # The stream disappears when the workflow completes, so wait until the + # client acknowledges it has finished consuming before returning. + await workflow.wait_condition(lambda: self._stream_acked) + return story diff --git a/pyproject.toml b/pyproject.toml index 4c4151a5..34242445 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ langgraph = [ "langgraph>=1.1.3", "langchain>=0.3.0", "langchain-anthropic>=0.3.0", - "temporalio[langgraph,langsmith]>=1.27.0", + "temporalio[langgraph,langsmith]>=1.28.0", ] nexus = ["nexus-rpc>=1.1.0,<2"] open-telemetry = [ @@ -71,9 +71,6 @@ cloud-export-to-parquet = [ [tool.uv] constraint-dependencies = [ - # langsmith 0.7.34 changed its aio_to_thread signature; temporalio.contrib.langsmith - # 1.27.2 still patches the older signature, causing workflow task retries to hang CI. - "langsmith<0.7.34", # yarl 1.24.0 was published without an sdist and only has cp310 wheels, so it cannot # install on the Python 3.14 CI jobs. "yarl!=1.24.0", diff --git a/tests/langgraph_plugin/streaming_test.py b/tests/langgraph_plugin/streaming_test.py new file mode 100644 index 00000000..b96acc8a --- /dev/null +++ b/tests/langgraph_plugin/streaming_test.py @@ -0,0 +1,63 @@ +import uuid +from datetime import timedelta +from typing import Any + +from temporalio.client import Client +from temporalio.contrib.langgraph import LangGraphPlugin +from temporalio.contrib.workflow_streams import WorkflowStreamClient +from temporalio.worker import Worker + +from langgraph_plugin.graph_api.streaming.workflow import ( + StreamingWorkflow, + make_streaming_graph, +) + + +async def test_streaming_graph_api(client: Client) -> None: + task_queue = f"streaming-test-{uuid.uuid4()}" + plugin = LangGraphPlugin( + graphs={"streaming": make_streaming_graph()}, + streaming_topic="tokens", + ) + + async with Worker( + client, + task_queue=task_queue, + workflows=[StreamingWorkflow], + plugins=[plugin], + ): + handle = await client.start_workflow( + StreamingWorkflow.run, + "a brave robot", + id=f"streaming-{uuid.uuid4()}", + task_queue=task_queue, + ) + + ws = WorkflowStreamClient.create(client, handle.id) + tokens: list[dict[str, Any]] = [] + progress: list[dict[str, Any]] = [] + async for item in ws.subscribe( + from_offset=0, + result_type=dict, + poll_cooldown=timedelta(milliseconds=10), + ): + if item.topic == "tokens": + tokens.append(item.data) + elif item.topic == "progress": + if item.data.get("done"): + await handle.signal(StreamingWorkflow.ack_stream) + break + progress.append(item.data) + + result = await handle.result() + + # Tokens reassemble into the final story. + assert tokens, "expected at least one token" + assert all("token" in t for t in tokens) + assembled = "".join(t["token"] for t in tokens).strip() + assert assembled == result + + # Workflow-side astream publish: one chunk per node, in order. + assert [list(chunk)[0] for chunk in progress] == ["outline", "write_story"] + assert result == progress[-1]["write_story"]["story"] + assert "a brave robot" in result diff --git a/uv.lock b/uv.lock index f172ac54..826b08c9 100644 --- a/uv.lock +++ b/uv.lock @@ -11,10 +11,7 @@ resolution-markers = [ ] [manifest] -constraints = [ - { name = "langsmith", specifier = "<0.7.34" }, - { name = "yarl", specifier = "!=1.24.0" }, -] +constraints = [{ name = "yarl", specifier = "!=1.24.0" }] [[package]] name = "aioboto3" @@ -1580,7 +1577,7 @@ wheels = [ [[package]] name = "langsmith" -version = "0.7.33" +version = "0.8.9" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -1590,12 +1587,13 @@ dependencies = [ { name = "requests" }, { name = "requests-toolbelt" }, { name = "uuid-utils" }, + { name = "websockets" }, { name = "xxhash" }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6f/75/1ee27b3510bf5b1b569b9695c9466c256caab45885bd569c0c67720236ad/langsmith-0.7.33.tar.gz", hash = "sha256:fa2d81ad6e8374a81fda9291894f6fcae714e55fbf11a0b07578e3cd4b1ea384", size = 1186298, upload-time = "2026-04-20T16:17:54.583Z" } +sdist = { url = "https://files.pythonhosted.org/packages/e4/dd/f4c8a12987318e505b10760d30c3c2d45e8dc87ba8f47a004c753a9e7b35/langsmith-0.8.9.tar.gz", hash = "sha256:f16e37fcd5a8a2d4db30eae0e399a866a65ce5cc86218825c59409ed57a3bf53", size = 4428684, upload-time = "2026-06-03T17:56:09.448Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f4/76/53033db34ffccd25d62c32b23b9468f7228b455da6976e1c420ae31555c4/langsmith-0.7.33-py3-none-any.whl", hash = "sha256:5b535b991d52d3b664ebb8dc6f95afcf8d0acb42e062ac45a54a6a4820139f20", size = 378981, upload-time = "2026-04-20T16:17:52.503Z" }, + { url = "https://files.pythonhosted.org/packages/b5/2f/a701663c9fb4d9630448622a684bc372b4905b9a6dbe2297d55a70fde04e/langsmith-0.8.9-py3-none-any.whl", hash = "sha256:c9519cabc75568d088df045710d1b86eae9780c91054528b2aa7e6cb1fc80c52", size = 403165, upload-time = "2026-06-03T17:56:07.226Z" }, ] [[package]] @@ -3688,7 +3686,7 @@ wheels = [ [[package]] name = "temporalio" -version = "1.27.2" +version = "1.28.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "nexus-rpc" }, @@ -3697,13 +3695,13 @@ dependencies = [ { name = "types-protobuf" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ca/62/2bc1a9ad29382a3a99f088907ef2024a94420cfef340be1b33026c632828/temporalio-1.27.2.tar.gz", hash = "sha256:633bf2379492f3db1e887d1e64fdac00d9c2ddc3e9382b831d5af68256912e92", size = 2503041, upload-time = "2026-05-14T02:17:57.565Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/04/8e7cd6a203ee40700a8d3d34bca6f1da3a6083888fa5654bc05514b633fa/temporalio-1.28.0.tar.gz", hash = "sha256:eb390ee968204a9f8fda91544d6f03497a7614acbfcc9862b5bd08a2d26edb04", size = 2619977, upload-time = "2026-06-04T17:22:07.52Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/64/85/9da14f9fbdfae95435d29353bb1c55891581ad6b23c86ca56e72d83035ed/temporalio-1.27.2-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:860f706380faafec8f183f9194d0883c8033a4211c5d19c2c962c45b06cf99e9", size = 14602829, upload-time = "2026-05-14T02:17:45.624Z" }, - { url = "https://files.pythonhosted.org/packages/24/51/b7437991e71eea082dc53222da11f064974917cd59063ba57e13e5895fbc/temporalio-1.27.2-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:a8dc0c680e351f3132809861888d8326dbd5030dd4e570663597e7d4768d9502", size = 13997680, upload-time = "2026-05-14T02:17:53.968Z" }, - { url = "https://files.pythonhosted.org/packages/8c/5d/358065040e6f0cedbf669acd333622999eec737ff868ca7829d727b77746/temporalio-1.27.2-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:805f3de4d193dec52e040e41dbfc9ab44be0206d2e81142ceefaf7b7208058d1", size = 14252199, upload-time = "2026-05-14T02:17:36.972Z" }, - { url = "https://files.pythonhosted.org/packages/72/8a/85d2eab07c3e23fc1124203e76857c69ab9b22d8ccebad0835e294edb754/temporalio-1.27.2-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5bc996cb501b8a918f50037ccee6facb05bb70984acada4c2a3e01f5e7957a38", size = 14779945, upload-time = "2026-05-14T02:18:05.513Z" }, - { url = "https://files.pythonhosted.org/packages/67/81/c9b08609e2a92ecf62c97c59cabfa0608337c8d5cc9941eed5d9a7778840/temporalio-1.27.2-cp310-abi3-win_amd64.whl", hash = "sha256:62a84ae9a60c17932971e4ca3b0f3cd6f32f173b8183e759989376503fb95af6", size = 14981897, upload-time = "2026-05-14T02:17:27.333Z" }, + { url = "https://files.pythonhosted.org/packages/fb/9b/9b260f50a369ed21daad04bc58d31ce47fd7ee640d40ce9eb94115ffc6d5/temporalio-1.28.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:544e48028d83ffda51d6e0cdb1bf27babc868b3f63adb0e1613efcbbaab197a3", size = 14767177, upload-time = "2026-06-04T17:21:55.761Z" }, + { url = "https://files.pythonhosted.org/packages/e7/1f/80d7bde35f723a5871fa0f2aa01d0715a8c0dc610e15943ae0e8b0f50bc6/temporalio-1.28.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:737f3d9c470514ed0e4922ebe00ef4a186c82343e195d26b9c485e0bfcc4f14d", size = 14223876, upload-time = "2026-06-04T17:21:58.344Z" }, + { url = "https://files.pythonhosted.org/packages/34/7e/517cdff2710935105a38b58539c7d4f8959ec6241953d51bf482fedbc721/temporalio-1.28.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1b8ecc38c2cdae5efe8b127b1cbe726e9c92b10bc506753f1074957984fc6d7d", size = 14473526, upload-time = "2026-06-04T17:22:00.663Z" }, + { url = "https://files.pythonhosted.org/packages/5b/8c/518ec97457e50d67caabc40b44946b7feca0cbce20adb0bb651e7f6a7900/temporalio-1.28.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:17a9993342d4ba4ae0c4b37a95e8f2aa488379d0917a637d8fdd5f4332aad3e2", size = 14940291, upload-time = "2026-06-04T17:22:02.939Z" }, + { url = "https://files.pythonhosted.org/packages/d8/79/b7fe353287f15d501145aeff266e565e1fae05cce2875d0fde6ca4397aca/temporalio-1.28.0-cp310-abi3-win_amd64.whl", hash = "sha256:41381cbd68d1206c55750147118de3962bcc79229a61035296f3c0af44a3d006", size = 15245109, upload-time = "2026-06-04T17:22:05.365Z" }, ] [package.optional-dependencies] @@ -3858,7 +3856,7 @@ langgraph = [ { name = "langchain", specifier = ">=0.3.0" }, { name = "langchain-anthropic", specifier = ">=0.3.0" }, { name = "langgraph", specifier = ">=1.1.3" }, - { name = "temporalio", extras = ["langgraph", "langsmith"], specifier = ">=1.27.0" }, + { name = "temporalio", extras = ["langgraph", "langsmith"], specifier = ">=1.28.0" }, ] langsmith-tracing = [ { name = "langsmith", specifier = ">=0.7.0" }, From 63e53b4d80729c3b954e074a5bae52dfc7f30dc3 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 4 Jun 2026 15:27:34 -0700 Subject: [PATCH 2/2] Add langgraph_plugin code owners Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 0f4298b7..a2b75832 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,2 @@ * @temporalio/sdk +/langgraph_plugin/ @temporalio/sdk @temporalio/ai-sdk