Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
* @temporalio/sdk
/langgraph_plugin/ @temporalio/sdk @temporalio/ai-sdk
2 changes: 2 additions & 0 deletions langgraph_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Samples are organized by API style:
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
| **Streaming** | [graph_api/streaming](graph_api/streaming) | -- | Streams live output from a running workflow via [Workflow Streams](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams): node tokens via `get_stream_writer()` + `streaming_topic`, plus workflow-side `astream` progress published with `WorkflowStream.topic().publish()`. |
| **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. |

## Prerequisites
Expand Down Expand Up @@ -67,6 +68,7 @@ uv run langgraph_plugin/<api>/langsmith_tracing/main.py
- **Continue-as-new with caching** -- `cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.
- **Streaming** -- Workflow Streams expose a durable, offset-addressed event channel that external clients subscribe to while the workflow is still running. Nodes emit fine-grained tokens via `get_stream_writer()` (routed by the plugin's `streaming_topic`), and the workflow can publish coarse `astream` progress to its own topic.

## Related

Expand Down
Empty file.
35 changes: 35 additions & 0 deletions langgraph_plugin/graph_api/streaming/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Worker for the streaming sample (Graph API)."""

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.graph_api.streaming.workflow import (
StreamingWorkflow,
make_streaming_graph,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
# streaming_topic routes node get_stream_writer() output onto the "tokens" topic.
plugin = LangGraphPlugin(
graphs={"streaming": make_streaming_graph()},
streaming_topic="tokens",
)

worker = Worker(
client,
task_queue="langgraph-streaming",
workflows=[StreamingWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
44 changes: 44 additions & 0 deletions langgraph_plugin/graph_api/streaming/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Start the streaming workflow and subscribe to its Workflow Stream (Graph API)."""

import asyncio
import os
from datetime import timedelta

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from langgraph_plugin.graph_api.streaming.workflow import StreamingWorkflow


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

handle = await client.start_workflow(
StreamingWorkflow.run,
"a brave robot",
id="streaming-workflow",
task_queue="langgraph-streaming",
)

# Subscribe to all topics on the workflow's stream and demultiplex on topic.
ws = WorkflowStreamClient.create(client, handle.id)
async for item in ws.subscribe(
from_offset=0,
result_type=dict,
poll_cooldown=timedelta(milliseconds=50),
):
if item.topic == "tokens":
print(item.data["token"], end="", flush=True)
elif item.topic == "progress":
if item.data.get("done"):
# Let the workflow know we are done consuming so it can complete.
await handle.signal(StreamingWorkflow.ack_stream)
break
print(f"\n[progress] {item.data}")

result = await handle.result()
print(f"\n\nFinal result: {result}")


if __name__ == "__main__":
asyncio.run(main())
90 changes: 90 additions & 0 deletions langgraph_plugin/graph_api/streaming/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Streaming with the LangGraph Graph API and Temporal Workflow Streams.

A workflow's :class:`WorkflowStream` is a durable, offset-addressed event channel
external clients can subscribe to while the workflow is still running. This sample
demonstrates both ways the LangGraph plugin produces stream items:

- **Node token streaming** -- the ``write_story`` node calls LangGraph's
``get_stream_writer()`` to emit fine-grained tokens. The plugin is configured with
``streaming_topic="tokens"`` (see ``run_worker.py``), which routes those writes onto
the ``"tokens"`` topic.
- **Workflow-side ``astream`` publish** -- the workflow drives the graph with
``app.astream(...)`` and publishes each node-completion chunk onto a ``"progress"``
topic it owns.

A single client subscribes to all topics and demultiplexes on ``item.topic``.
"""

from datetime import timedelta

from langgraph.config import get_stream_writer
from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph as temporal_graph
from temporalio.contrib.workflow_streams import WorkflowStream
from typing_extensions import TypedDict


class State(TypedDict):
topic: str
story: str


async def outline(state: State) -> dict[str, str]:
"""Produce a short opening line. Runs first so ``astream`` emits an early chunk."""
return {"story": f"A story about {state['topic']}:"}


async def write_story(state: State) -> dict[str, str]:
"""Write the story, emitting each word as a token via the stream writer."""
writer = get_stream_writer()
words = f"{state['story']} Once upon a time, there was {state['topic']}.".split()
for word in words:
writer({"token": word + " "})
return {"story": " ".join(words)}


def make_streaming_graph() -> StateGraph:
g = StateGraph(State)
activity_metadata = {
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=10),
}
g.add_node("outline", outline, metadata=activity_metadata)
g.add_node("write_story", write_story, metadata=activity_metadata)
g.add_edge(START, "outline")
g.add_edge("outline", "write_story")
return g


@workflow.defn
class StreamingWorkflow:
def __init__(self) -> None:
# WorkflowStream must be constructed during workflow initialization.
self.stream = WorkflowStream()
self._stream_acked = False

@workflow.signal
def ack_stream(self) -> None:
"""Signalled by the client once it has finished consuming the stream."""
self._stream_acked = True

@workflow.run
async def run(self, topic: str) -> str:
app = temporal_graph("streaming").compile()
progress = self.stream.topic("progress")

story = ""
async for chunk in app.astream({"topic": topic, "story": ""}):
# Each chunk is {node_name: {state updates}}. Forward it as progress.
progress.publish(chunk)
for node_update in chunk.values():
if "story" in node_update:
story = node_update["story"]

progress.publish({"done": True})

# The stream disappears when the workflow completes, so wait until the
# client acknowledges it has finished consuming before returning.
await workflow.wait_condition(lambda: self._stream_acked)
return story
5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ langgraph = [
"langgraph>=1.1.3",
"langchain>=0.3.0",
"langchain-anthropic>=0.3.0",
"temporalio[langgraph,langsmith]>=1.27.0",
"temporalio[langgraph,langsmith]>=1.28.0",
]
nexus = ["nexus-rpc>=1.1.0,<2"]
open-telemetry = [
Expand All @@ -71,9 +71,6 @@ cloud-export-to-parquet = [

[tool.uv]
constraint-dependencies = [
# langsmith 0.7.34 changed its aio_to_thread signature; temporalio.contrib.langsmith
# 1.27.2 still patches the older signature, causing workflow task retries to hang CI.
"langsmith<0.7.34",
# yarl 1.24.0 was published without an sdist and only has cp310 wheels, so it cannot
# install on the Python 3.14 CI jobs.
"yarl!=1.24.0",
Expand Down
63 changes: 63 additions & 0 deletions tests/langgraph_plugin/streaming_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import uuid
from datetime import timedelta
from typing import Any

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.contrib.workflow_streams import WorkflowStreamClient
from temporalio.worker import Worker

from langgraph_plugin.graph_api.streaming.workflow import (
StreamingWorkflow,
make_streaming_graph,
)


async def test_streaming_graph_api(client: Client) -> None:
task_queue = f"streaming-test-{uuid.uuid4()}"
plugin = LangGraphPlugin(
graphs={"streaming": make_streaming_graph()},
streaming_topic="tokens",
)

async with Worker(
client,
task_queue=task_queue,
workflows=[StreamingWorkflow],
plugins=[plugin],
):
handle = await client.start_workflow(
StreamingWorkflow.run,
"a brave robot",
id=f"streaming-{uuid.uuid4()}",
task_queue=task_queue,
)

ws = WorkflowStreamClient.create(client, handle.id)
tokens: list[dict[str, Any]] = []
progress: list[dict[str, Any]] = []
async for item in ws.subscribe(
from_offset=0,
result_type=dict,
poll_cooldown=timedelta(milliseconds=10),
):
if item.topic == "tokens":
tokens.append(item.data)
elif item.topic == "progress":
if item.data.get("done"):
await handle.signal(StreamingWorkflow.ack_stream)
break
progress.append(item.data)

result = await handle.result()

# Tokens reassemble into the final story.
assert tokens, "expected at least one token"
assert all("token" in t for t in tokens)
assembled = "".join(t["token"] for t in tokens).strip()
assert assembled == result

# Workflow-side astream publish: one chunk per node, in order.
assert [list(chunk)[0] for chunk in progress] == ["outline", "write_story"]
assert result == progress[-1]["write_story"]["story"]
assert "a brave robot" in result
28 changes: 13 additions & 15 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading