From ab189520d4595c4a5e2fe36259b318f651428e45 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Mon, 1 Jun 2026 10:11:22 -0400 Subject: [PATCH] feat(cli): add Temporal + LangGraph agent template and example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `temporal-langgraph` init template (registered in the Temporal submenu of `agentex init`) plus a runnable tutorial example, filling the gap left by the existing `temporal-openai-agents` / `temporal-pydantic-ai` templates — LangGraph previously only had `default-langgraph` and `sync-langgraph`. Uses the official Temporal LangGraph plugin (`temporalio.contrib.langgraph`, shipped in `temporalio[langgraph]>=1.27.0`): each LangGraph node runs as a durable Temporal activity (the LLM/`agent` node) or inline in the workflow (the `tools` node). Temporal is the runtime; LangGraph is the agent framework. Verified the plugin's behavior locally (Temporal dev server + real LLM) and encoded the working shape into the template: - agent node `execute_in="activity"`, tools node `execute_in="workflow"` (tool-node-as-activity is broken in the experimental plugin — AIMessage doesn't survive the activity boundary); - async router + async tools (sync callables hit run_in_executor, which Temporal's workflow loop forbids); - AgentexWorker's UnsandboxedWorkflowRunner makes langchain imports safe. Showcases nodes-as-activities, human-in-the-loop via LangGraph `interrupt` resumed by a Temporal `provide_approval` signal + `Command(resume=...)`, multi-turn memory on the workflow, graph-visualization queries (mermaid/ascii), and tracing. Example: examples/.../10_temporal/130_langgraph (full agent + tests). Tests: - tests/lib/cli/test_init_templates.py — renders every init template and asserts valid, substituted Python (catches .j2 regressions), with focused coverage of the new template (nodes-as-activities, HIL, queries, deps). - example tests/test_graph_temporal.py — hermetic graph + HIL coverage with a stub model, plus a live end-to-end run through the real Temporal plugin (skipped without LITELLM_API_KEY); tests/test_agent.py — live integration. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../10_temporal/130_langgraph/.dockerignore | 43 +++ .../10_temporal/130_langgraph/.env.example | 13 + .../10_temporal/130_langgraph/Dockerfile | 43 +++ .../10_temporal/130_langgraph/README.md | 58 ++++ .../10_temporal/130_langgraph/dev.ipynb | 126 +++++++++ .../130_langgraph/environments.yaml | 64 +++++ .../10_temporal/130_langgraph/manifest.yaml | 128 +++++++++ .../130_langgraph/project/__init__.py | 0 .../10_temporal/130_langgraph/project/acp.py | 42 +++ .../130_langgraph/project/graph.py | 79 ++++++ .../130_langgraph/project/run_worker.py | 50 ++++ .../130_langgraph/project/tools.py | 20 ++ .../130_langgraph/project/workflow.py | 83 ++++++ .../10_temporal/130_langgraph/pyproject.toml | 42 +++ .../130_langgraph/tests/test_agent.py | 127 +++++++++ .../tests/test_graph_temporal.py | 105 +++++++ src/agentex/lib/adk/__init__.py | 2 + .../lib/adk/_modules/_langgraph_messages.py | 85 ++++++ src/agentex/lib/cli/commands/init.py | 3 + .../temporal-langgraph/.dockerignore.j2 | 43 +++ .../temporal-langgraph/.env.example.j2 | 13 + .../temporal-langgraph/Dockerfile-uv.j2 | 55 ++++ .../temporal-langgraph/Dockerfile.j2 | 48 ++++ .../templates/temporal-langgraph/README.md.j2 | 121 ++++++++ .../templates/temporal-langgraph/dev.ipynb.j2 | 126 +++++++++ .../temporal-langgraph/environments.yaml.j2 | 64 +++++ .../temporal-langgraph/manifest.yaml.j2 | 140 ++++++++++ .../temporal-langgraph/project/acp.py.j2 | 42 +++ .../temporal-langgraph/project/graph.py.j2 | 165 +++++++++++ .../project/run_worker.py.j2 | 50 ++++ .../temporal-langgraph/project/tools.py.j2 | 57 ++++ .../temporal-langgraph/project/workflow.py.j2 | 259 ++++++++++++++++++ .../temporal-langgraph/pyproject.toml.j2 | 42 +++ .../temporal-langgraph/requirements.txt.j2 | 18 ++ .../temporal-langgraph/test_agent.py.j2 | 147 ++++++++++ tests/lib/cli/test_init_templates.py | 139 ++++++++++ 36 files changed, 2642 insertions(+) create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/.env.example create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/README.md create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/__init__.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py create mode 100644 examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py create mode 100644 src/agentex/lib/adk/_modules/_langgraph_messages.py create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 create mode 100644 src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 create mode 100644 tests/lib/cli/test_init_templates.py diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore b/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore new file mode 100644 index 000000000..c4f7a8b4b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example b/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example new file mode 100644 index 000000000..ab1a5790f --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example @@ -0,0 +1,13 @@ +# at130-langgraph - Environment Variables +# Copy this file to .env and fill in the values + +# API key for your LLM provider +LITELLM_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile b/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile new file mode 100644 index 000000000..8a125ac72 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/130_langgraph/pyproject.toml /app/130_langgraph/pyproject.toml +COPY 10_async/10_temporal/130_langgraph/README.md /app/130_langgraph/README.md + +WORKDIR /app/130_langgraph + +COPY 10_async/10_temporal/130_langgraph/project /app/130_langgraph/project +COPY 10_async/10_temporal/130_langgraph/tests /app/130_langgraph/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=at130-langgraph + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/README.md b/examples/tutorials/10_async/10_temporal/130_langgraph/README.md new file mode 100644 index 000000000..61ccaf66a --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/README.md @@ -0,0 +1,58 @@ +# at130-langgraph — AgentEx Temporal + LangGraph + +A minimal Temporal-backed [LangGraph](https://langchain-ai.github.io/langgraph/) +agent. It uses the official [`temporalio.contrib.langgraph`](https://docs.temporal.io/develop/python/integrations/langgraph) +plugin so each LangGraph node runs as a durable **Temporal activity** (the LLM +`agent` node) or inline in the **workflow** (the `tools` node) — set per node +with `execute_in`. *Temporal is the runtime; LangGraph is the agent framework.* + +> The Temporal LangGraph plugin is currently **experimental**. + +## The graph + +``` +START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END +``` + +- `agent` (`execute_in="activity"`): the LLM call — a retried, observable Temporal activity. +- `tools` (`execute_in="workflow"`): runs the tool calls inline in the workflow. + +The router and tools are `async` so LangGraph awaits them directly (a sync +callable is offloaded via `run_in_executor`, which Temporal workflows forbid). + +## Project structure + +``` +130_langgraph/ +├── project/ +│ ├── acp.py # Thin async ACP server; registers the LangGraphPlugin +│ ├── workflow.py # Runs the graph each turn; keeps multi-turn memory +│ ├── graph.py # LangGraph graph; nodes tagged execute_in activity/workflow +│ └── tools.py # Async tool(s) +└── run_worker.py is project/run_worker.py +``` + +## Running + +```bash +agentex agents run --manifest manifest.yaml +``` + +Open the Temporal UI at http://localhost:8080 to watch the workflow and the +`agent` activity execute. Use `dev.ipynb` to create a task and send messages. + +## Adding tools + +Define an **async** `@tool` in `project/tools.py` and add it to `TOOLS`. The +model is bound with `TOOLS` and the tool node runs them by name. + +For a fuller version with human-in-the-loop approval and graph-introspection +queries, scaffold the `temporal-langgraph` template via `agentex init`. + +## Tests + +- `tests/test_graph_temporal.py` — hermetic ReAct-loop test with a stub model, + plus a live end-to-end run through the real Temporal plugin (skipped unless + `LITELLM_API_KEY` is set). +- `tests/test_agent.py` — live integration against a running agent. diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb b/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb new file mode 100644 index 000000000..5320daac7 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"at130-langgraph\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml b/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml new file mode 100644 index 000000000..d54d8e5ff --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-at130-langgraph" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml b/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml new file mode 100644 index 000000000..d1f5960b1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml @@ -0,0 +1,128 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Build from the tutorials root so shared test_utils are available. + root: ../../../ + include_paths: + - 10_async/10_temporal/130_langgraph + - test_utils + dockerfile: 10_async/10_temporal/130_langgraph/Dockerfile + dockerignore: 10_async/10_temporal/130_langgraph/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: at130-langgraph + + # Description of what your agent does + # Helps with documentation and discovery + description: "A Temporal-backed LangGraph agent whose nodes run as Temporal activities" + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: at130-langgraph + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: at130_langgraph_queue + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: LITELLM_API_KEY + # secret_name: litellm-api-key + # secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: {} + # LITELLM_API_KEY: "" + # OPENAI_BASE_URL: "" + # OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret name + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/__init__.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py new file mode 100644 index 000000000..c01f8831c --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py @@ -0,0 +1,42 @@ +"""ACP server for the Temporal LangGraph agent. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create → @workflow.run on the workflow class + HTTP task/event/send → @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel → workflow cancellation via the Temporal client + +so we don't define any handlers here. The agent logic lives in +``project/workflow.py`` (the runtime) and ``project/graph.py`` (the LangGraph +graph whose nodes run as Temporal activities), executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. + +The ``LangGraphPlugin`` is registered here too so the Temporal client started +by FastACP shares the same graph registry as the worker. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + # When deployed to the cluster, the Temporal address is set automatically. + # Locally we point at the Temporal service from docker compose. + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ), +) \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py new file mode 100644 index 000000000..8d8de92d1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py @@ -0,0 +1,79 @@ +"""LangGraph graph for at130-langgraph — nodes run as Temporal activities. + +The ``temporalio.contrib.langgraph`` plugin runs each node where its +``execute_in`` metadata says: the LLM ``agent`` node as a durable Temporal +**activity**, the ``tools`` node inline in the **workflow**. + + START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END + +The router and tools are ``async`` so LangGraph awaits them directly — a sync +callable would be offloaded via ``run_in_executor``, which Temporal's workflow +event loop does not support. +""" + +from __future__ import annotations + +import os +from typing import Any, Annotated +from datetime import datetime, timedelta + +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key: + os.environ.setdefault("OPENAI_API_KEY", _litellm_key) + +from typing_extensions import TypedDict + +from langgraph.graph import END, START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS + +# Name this graph is registered under in the LangGraphPlugin (acp.py / run_worker.py). +GRAPH_NAME = "at130-langgraph" +MODEL_NAME = "gpt-4o" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Be concise and use tools when they help answer the question.""" + + +class AgentState(TypedDict): + messages: Annotated[list[Any], add_messages] + + +async def agent_node(state: AgentState) -> dict[str, Any]: + """The 'agent' node — one LLM call. Runs as a durable Temporal activity.""" + llm = ChatOpenAI(model=MODEL_NAME).bind_tools(TOOLS) + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system = SystemMessage( + content=SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + messages = [system, *messages] + return {"messages": [await llm.ainvoke(messages)]} + + +async def route_after_agent(state: AgentState) -> str: + """Go to the tools node if the model requested tools, else finish (async router).""" + last = state["messages"][-1] + return "tools" if getattr(last, "tool_calls", None) else END + + +def build_graph() -> StateGraph: + """Build the agent graph; the LLM node runs as an activity, tools in the workflow.""" + builder = StateGraph(AgentState) + builder.add_node( + "agent", + agent_node, + metadata={"execute_in": "activity", "start_to_close_timeout": timedelta(minutes=5)}, + ) + builder.add_node("tools", ToolNode(TOOLS), metadata={"execute_in": "workflow"}) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END}) + builder.add_edge("tools", "agent") + return builder diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py new file mode 100644 index 000000000..7040f560b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py @@ -0,0 +1,50 @@ +"""Temporal worker for at130-langgraph. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The ``LangGraphPlugin`` is given the graph registry (``{ GRAPH_NAME: graph }``). +At runtime it turns the graph's ``execute_in="activity"`` nodes into Temporal +activities and registers them on the worker automatically — so we don't have +to enumerate node activities by hand. +""" + +import asyncio + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from project.workflow import At130LanggraphWorkflow +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + # AgentexWorker runs workflows with an unsandboxed runner, so importing + # langchain/langgraph inside the workflow + nodes is fine. The LangGraph + # plugin registers the graph's activity-nodes for us. + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ) + + await worker.run( + activities=get_all_activities(), + workflow=At130LanggraphWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py new file mode 100644 index 000000000..20b7185ee --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py @@ -0,0 +1,20 @@ +"""Tools for the LangGraph agent. + +Tools are ``async`` so the in-workflow tool node can await them directly +(a sync tool would be offloaded via ``run_in_executor``, which Temporal's +workflow event loop does not allow). +""" + +from __future__ import annotations + +from langchain_core.tools import tool + + +@tool +async def get_weather(city: str) -> str: + """Get the current weather for a city.""" + # TODO: replace with a real weather API call. + return f"The weather in {city} is sunny and 72°F" + + +TOOLS = [get_weather] diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py new file mode 100644 index 000000000..a50670251 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py @@ -0,0 +1,83 @@ +"""Temporal workflow for at130-langgraph — Temporal as the LangGraph runtime. + +Each turn the workflow runs the LangGraph graph (``project/graph.py``) via the +``temporalio.contrib.langgraph`` plugin. The plugin runs the LLM ``agent`` node +as a durable Temporal activity and the ``tools`` node inline in the workflow. + +Multi-turn memory is kept on the workflow instance (``self._messages``) — it's +durable and replay-safe for free, so no checkpoint database is needed. +""" + +from __future__ import annotations + +import json +from typing import Any + +from temporalio import workflow +from temporalio.contrib.langgraph import graph as lg_graph + +from agentex.lib import adk +from project.graph import GRAPH_NAME +from agentex.lib.adk import emit_langgraph_messages +from agentex.protocol.acp import SendEventParams, CreateTaskParams +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class At130LanggraphWorkflow(BaseWorkflow): + """Runs the LangGraph agent each turn; its nodes run as Temporal activities.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._messages: list[Any] = [] + self._emitted = 0 + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Echo the user's message, run the graph, surface the new messages.""" + await adk.messages.create(task_id=params.task.id, content=params.event.content) + self._messages.append({"role": "user", "content": params.event.content.content}) + + compiled = lg_graph(GRAPH_NAME).compile() + result = await compiled.ainvoke({"messages": self._messages}) + self._messages = result["messages"] + + # Surface the messages this turn produced (tool calls, results, final + # text) to the AgentEx UI. The SDK helper does the LangGraph→AgentEx + # message conversion. + await emit_langgraph_messages(self._messages[self._emitted:], params.task.id) + self._emitted = len(self._messages) + + @workflow.signal + async def complete_task_signal(self) -> None: + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), + ), + ) + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml b/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml new file mode 100644 index 000000000..e22905de4 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml @@ -0,0 +1,42 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at130-langgraph" +version = "0.1.0" +description = "A Temporal-backed LangGraph agent whose nodes run as Temporal activities" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + # Temporal with the LangGraph plugin (temporalio.contrib.langgraph), + # which runs LangGraph nodes as Temporal activities. Needs >=1.27.0. + "temporalio[langgraph]>=1.27.0", + "langchain-openai", + "langchain-core", + "grandalf", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py new file mode 100644 index 000000000..b798f568f --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py @@ -0,0 +1,127 @@ +"""Integration tests for the Temporal + LangGraph agent (live agent required). + +These drive a *running* agent over the AgentEx API and verify that: +- the agent sends a welcome message on task creation, +- a weather question triggers a tool_request / tool_response round-trip + (proving the LLM node ran as a Temporal activity and the tool node ran), +- the final answer reflects the tool output. + +For fast, network-free coverage of the graph + human-in-the-loop logic, see +``test_graph_temporal.py``. + +To run: +1. Start the agent (worker + ACP server): ``agentex agents run --manifest manifest.yaml`` +2. Set AGENTEX_API_BASE_URL if not using the default +3. ``pytest tests/test_agent.py -v`` +""" + +import os +import uuid + +import pytest +import pytest_asyncio +from test_utils.async_utils import ( + poll_messages, + send_event_and_poll_yielding, +) + +from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "at130-langgraph") + + +@pytest_asyncio.fixture +async def client(): + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """The Temporal-backed LangGraph agent responds and uses tools.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): + """Create a task, ask about weather, verify the tool round-trip.""" + task_response = await client.agents.create_task( + agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex) + ) + task = task_response.result + assert task is not None + + # Wait for the welcome message from on_task_create + task_creation_found = False + async for message in poll_messages( + client=client, task_id=task.id, timeout=30, sleep_interval=1.0 + ): + assert isinstance(message, TaskMessage) + if ( + message.content + and message.content.type == "text" + and message.content.author == "agent" + ): + task_creation_found = True + break + assert task_creation_found, "Task creation welcome message not found" + + # Ask about weather — the agent (LangGraph node, as a Temporal activity) + # should call get_weather. + seen_tool_request = False + seen_tool_response = False + final_message = None + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message="What is the weather in San Francisco? Use your tool.", + timeout=60, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + + if message.content and message.content.type == "tool_request": + seen_tool_request = True + if message.content and message.content.type == "tool_response": + seen_tool_response = True + + if ( + message.content + and message.content.type == "text" + and message.content.author == "agent" + ): + final_message = message + content_length = len(getattr(message.content, "content", "") or "") + if getattr(message, "streaming_status", None) in (None, "DONE") and content_length > 0: + if seen_tool_response: + break + + assert seen_tool_request, "Expected a tool_request (agent calling get_weather)" + assert seen_tool_response, "Expected a tool_response (get_weather result)" + assert final_message is not None, "Expected a final agent text message" + final_text = ( + getattr(final_message.content, "content", None) if final_message.content else None + ) + assert isinstance(final_text, str) and len(final_text) > 0 + # get_weather always returns "72°F" — the response should mention it. + assert "72" in final_text, "Expected weather response to mention 72°F" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py new file mode 100644 index 000000000..485b896f6 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py @@ -0,0 +1,105 @@ +"""Tests for the Temporal + LangGraph agent's graph. + +Two layers: + +1. ``TestGraphLogic`` — hermetic, no network. Compiles the actual shipped + graph (``project/graph.py``) with a deterministic stub model and runs the + ReAct loop (agent → tools → agent) to completion. + +2. ``TestTemporalPlugin`` — end-to-end through the real Temporal LangGraph + plugin on a local Temporal server, proving the LLM node runs as an activity + and the tool node in the workflow. Needs a real model, so it is skipped + unless ``LITELLM_API_KEY`` (or ``OPENAI_API_KEY``) is set. + +Run from the agent's own (uv) environment: pytest tests/test_graph_temporal.py -v +""" + +from __future__ import annotations + +import os +import uuid + +import pytest + +pytest.importorskip("langgraph") +pytest.importorskip("temporalio.contrib.langgraph") + +import project.graph as graph_module +from temporalio import workflow +from project.graph import GRAPH_NAME, build_graph +from langchain_core.messages import AIMessage, ToolMessage +from temporalio.contrib.langgraph import graph as lg_graph + + +@workflow.defn +class _DriverWorkflow: + """Module-level driver workflow (Temporal forbids local workflow classes).""" + + @workflow.run + async def run(self, message: str) -> str: + compiled = lg_graph(GRAPH_NAME).compile() + result = await compiled.ainvoke({"messages": [{"role": "user", "content": message}]}) + return result["messages"][-1].content + + +class _StubModel: + """Deterministic stand-in for ``ChatOpenAI(...).bind_tools(...)``. + + First call → emit a tool call for ``get_weather``; once a ToolMessage is in + the history → emit a plain text answer. Drives the full ReAct loop offline. + """ + + def bind_tools(self, _tools): + return self + + async def ainvoke(self, messages): + if any(isinstance(m, ToolMessage) for m in messages): + return AIMessage(content="All done — the tool has run.") + return AIMessage( + content="", + tool_calls=[{"id": "call_1", "name": "get_weather", "args": {"city": "Denver"}}], + ) + + +class TestGraphLogic: + """Hermetic test of the ReAct loop, no network.""" + + @pytest.mark.asyncio + async def test_react_loop_runs_tool(self, monkeypatch): + monkeypatch.setattr(graph_module, "ChatOpenAI", lambda *_a, **_k: _StubModel()) + compiled = build_graph().compile() + result = await compiled.ainvoke({"messages": [{"role": "user", "content": "go"}]}) + + tool_outputs = [m.content for m in result["messages"] if isinstance(m, ToolMessage)] + assert any("sunny" in o for o in tool_outputs) + assert "done" in result["messages"][-1].content.lower() + + +@pytest.mark.skipif( + not (os.environ.get("LITELLM_API_KEY") or os.environ.get("OPENAI_API_KEY")), + reason="needs a real model (set LITELLM_API_KEY) for the live Temporal run", +) +class TestTemporalPlugin: + """End-to-end through the real Temporal LangGraph plugin on a local server.""" + + @pytest.mark.asyncio + async def test_nodes_run_as_activities_via_plugin(self): + from temporalio.worker import Worker, UnsandboxedWorkflowRunner + from temporalio.testing import WorkflowEnvironment + from temporalio.contrib.langgraph import LangGraphPlugin + + plugin = LangGraphPlugin(graphs={GRAPH_NAME: build_graph()}) + async with await WorkflowEnvironment.start_local(plugins=[plugin]) as env: + async with Worker( + env.client, + task_queue="tq", + workflows=[_DriverWorkflow], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + out = await env.client.execute_workflow( + _DriverWorkflow.run, + "What's the weather in Denver? Use the get_weather tool.", + id=f"wf-{uuid.uuid4()}", + task_queue="tq", + ) + assert "denver" in out.lower() diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index cbff5a3fe..a08131260 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -8,6 +8,7 @@ from agentex.lib.adk._modules.checkpointer import create_checkpointer from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events @@ -47,6 +48,7 @@ "create_checkpointer", "create_langgraph_tracing_handler", "stream_langgraph_events", + "emit_langgraph_messages", "convert_langgraph_to_agentex_events", # Pydantic AI "stream_pydantic_ai_events", diff --git a/src/agentex/lib/adk/_modules/_langgraph_messages.py b/src/agentex/lib/adk/_modules/_langgraph_messages.py new file mode 100644 index 000000000..c8856755b --- /dev/null +++ b/src/agentex/lib/adk/_modules/_langgraph_messages.py @@ -0,0 +1,85 @@ +"""Emit finished LangGraph messages as Agentex task messages. + +This is the non-streaming counterpart to ``stream_langgraph_events``. Use it +when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed +agent using the LangGraph plugin, where streaming deltas aren't available) and +want to surface the resulting messages to the Agentex UI after the fact. + +It maps LangGraph/LangChain message objects to Agentex content types: + +- ``AIMessage`` tool calls → ``ToolRequestContent`` (one per call) +- ``AIMessage`` text content → ``TextContent`` +- ``ToolMessage`` → ``ToolResponseContent`` + +Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) +so each message is surfaced exactly once across a multi-turn conversation. +""" + +from __future__ import annotations + +from typing import Any + + +async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: + """Create Agentex messages for a list of LangGraph messages. + + Args: + messages: LangGraph/LangChain message objects to surface — typically + the new messages a turn produced. + task_id: The Agentex task to create messages on. + + Returns: + The last assistant text emitted (useful as a span/turn output), or "". + """ + # Lazy imports so langchain isn't required at module load time. + from langchain_core.messages import AIMessage, ToolMessage + + from agentex.lib import adk + from agentex.types.text_content import TextContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + + final_text = "" + for message in messages: + if isinstance(message, AIMessage): + for tool_call in message.tool_calls or []: + await adk.messages.create( + task_id=task_id, + content=ToolRequestContent( + author="agent", + tool_call_id=tool_call["id"], + name=tool_call["name"], + arguments=tool_call["args"], + ), + ) + # ``content`` may be a plain string (OpenAI) or a list of content + # blocks (Anthropic/Claude via LangChain, e.g. + # ``[{"type": "text", "text": "..."}]``). Extract and join the text + # so the response is visible regardless of the underlying model. + if isinstance(message.content, str): + text = message.content + else: + text = "".join( + block.get("text", "") if isinstance(block, dict) else str(block) + for block in message.content + if not isinstance(block, dict) or block.get("type") == "text" + ) + if text: + final_text = text + await adk.messages.create( + task_id=task_id, + content=TextContent(author="agent", content=text, format="markdown"), + ) + elif isinstance(message, ToolMessage): + await adk.messages.create( + task_id=task_id, + content=ToolResponseContent( + author="agent", + tool_call_id=message.tool_call_id, + name=message.name or "unknown", + content=message.content + if isinstance(message.content, str) + else str(message.content), + ), + ) + return final_text diff --git a/src/agentex/lib/cli/commands/init.py b/src/agentex/lib/cli/commands/init.py index 69b18e8e7..307a5d0e8 100644 --- a/src/agentex/lib/cli/commands/init.py +++ b/src/agentex/lib/cli/commands/init.py @@ -25,6 +25,7 @@ class TemplateType(str, Enum): TEMPORAL = "temporal" TEMPORAL_OPENAI_AGENTS = "temporal-openai-agents" TEMPORAL_PYDANTIC_AI = "temporal-pydantic-ai" + TEMPORAL_LANGGRAPH = "temporal-langgraph" DEFAULT = "default" DEFAULT_LANGGRAPH = "default-langgraph" DEFAULT_PYDANTIC_AI = "default-pydantic-ai" @@ -64,6 +65,7 @@ def create_project_structure( TemplateType.TEMPORAL: ["acp.py", "workflow.py", "run_worker.py"], TemplateType.TEMPORAL_OPENAI_AGENTS: ["acp.py", "workflow.py", "run_worker.py", "activities.py"], TemplateType.TEMPORAL_PYDANTIC_AI: ["acp.py", "workflow.py", "run_worker.py", "agent.py", "tools.py"], + TemplateType.TEMPORAL_LANGGRAPH: ["acp.py", "workflow.py", "run_worker.py", "graph.py", "tools.py"], TemplateType.DEFAULT: ["acp.py"], TemplateType.DEFAULT_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], TemplateType.DEFAULT_PYDANTIC_AI: ["acp.py", "agent.py", "tools.py"], @@ -195,6 +197,7 @@ def validate_agent_name(text: str) -> bool | str: {"name": "Basic Temporal", "value": TemplateType.TEMPORAL}, {"name": "Temporal + OpenAI Agents SDK (Recommended)", "value": TemplateType.TEMPORAL_OPENAI_AGENTS}, {"name": "Temporal + Pydantic AI", "value": TemplateType.TEMPORAL_PYDANTIC_AI}, + {"name": "Temporal + LangGraph", "value": TemplateType.TEMPORAL_LANGGRAPH}, ], ).ask() if not template_type: diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 new file mode 100644 index 000000000..015f49ef7 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 @@ -0,0 +1,13 @@ +# {{ agent_name }} - Environment Variables +# Copy this file to .env and fill in the values + +# API key for your LLM provider +LITELLM_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 new file mode 100644 index 000000000..2a3f1108b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 @@ -0,0 +1,55 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_HTTP_TIMEOUT=1000 + +WORKDIR /app/{{ project_path_from_build_root }} + +# Copy dependency files for layer caching +COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./ + +# Install dependencies (without project itself, for layer caching) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-install-project --no-dev + +# Copy the project code +COPY {{ project_path_from_build_root }}/project ./project + +# Install the project +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-dev + +ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH" + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 new file mode 100644 index 000000000..ba47485a9 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 @@ -0,0 +1,48 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 new file mode 100644 index 000000000..e8af5a90b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 @@ -0,0 +1,121 @@ +# {{ agent_name }} — AgentEx Temporal + LangGraph + +A starter template for building AI agents with AgentEx, [LangGraph](https://langchain-ai.github.io/langgraph/), +and Temporal — where **Temporal is the runtime and LangGraph is the agent framework**. + +It uses the official [`temporalio.contrib.langgraph`](https://docs.temporal.io/develop/python/integrations/langgraph) +plugin: each LangGraph node runs either as a durable **Temporal activity** or +inline in the **workflow**, configured per node with `execute_in`. You get +per-node durability, automatic retries, and full visibility in the Temporal UI +— without LangGraph's own runtime or an external checkpoint database. + +> The Temporal LangGraph plugin is currently **experimental**; its API may change. + +## What's in the box + +- **Nodes as activities** — the LLM (`agent`) node runs as a retried, observable + Temporal activity; the `tools` node runs in the workflow (see below). +- **Human-in-the-loop** — approval-gated tools raise a LangGraph `interrupt`; + the workflow pauses on a Temporal signal (`provide_approval`) until a human + approves or rejects, then resumes. +- **Live introspection via Temporal queries** — `get_status`, + `get_pending_approval`, `get_graph_state`, and `get_graph_mermaid` / + `get_graph_ascii` to render the agent graph while it runs. +- **Multi-turn memory** — the running message list is kept on the workflow + instance, durable for free. +- **Tracing/observability** — a per-turn span shipped to SGP/AgentEx. + +## The agent graph + +``` +START --> agent --> (tool calls?) --> tools --> agent + --> (no tool calls?) --> END +``` + +`project/graph.py` defines this graph. The `agent` node is marked +`execute_in="activity"`; the `tools` node is `execute_in="workflow"`. Query +`get_graph_mermaid` at runtime to see it rendered. + +### Why the tools node runs in the workflow + +The `tools` node runs inline in the workflow (not as an activity) for two +reasons: the `AIMessage` with tool calls stays intact without crossing an +activity boundary, and LangGraph `interrupt` (used for human approval) must run +where the workflow can pause on a Temporal signal. For long-running or heavily +side-effecting tools, move that work into its own `execute_in="activity"` node. +The router and tools are `async` so LangGraph awaits them directly (sync +callables are offloaded via `run_in_executor`, which Temporal workflows forbid). + +## Project structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ ├── acp.py # Thin async ACP server; registers the LangGraphPlugin +│ ├── workflow.py # Temporal runtime: runs the graph, HIL, queries, memory +│ ├── graph.py # LangGraph graph; nodes tagged execute_in activity/workflow +│ ├── tools.py # Async tool definitions + approval set +│ └── run_worker.py # Temporal worker; registers the LangGraphPlugin +├── Dockerfile +├── manifest.yaml +├── environments.yaml +├── dev.ipynb +{% if use_uv %}└── pyproject.toml{% else %}└── requirements.txt{% endif %} +``` + +## Running the agent + +```bash +{% if use_uv %}agentex uv sync +source .venv/bin/activate{% else %}pip install -r requirements.txt{% endif %} + +# Start the agent (ACP server + Temporal worker) +agentex agents run --manifest manifest.yaml +``` + +The agent starts on port 8000. Open the Temporal UI at http://localhost:8080 to +watch workflows and activities execute. Use `dev.ipynb` to create a task and +send messages. + +## Human-in-the-loop + +Tools listed in `TOOLS_REQUIRING_APPROVAL` (in `project/tools.py`) raise a +LangGraph `interrupt` before they run. The workflow surfaces the pending call +(queryable via `get_pending_approval`) and waits — durably, for as long as it +takes — for a `provide_approval` signal carrying the decision: + +```python +# decision: {"approved": true, "approver": "daniel", "reason": "looks good"} +``` + +If rejected, the rejection is fed back to the model so it can adjust. + +## Adding tools + +1. Define an **async** `@tool` function in `project/tools.py` and add it to `TOOLS`. +2. (Optional) add its name to `TOOLS_REQUIRING_APPROVAL` to gate it behind + human approval. + +The model is bound with `TOOLS` and the tool node looks them up by name, so no +other wiring is needed. + +## Configuration + +Tune the model in `project/graph.py` (`MODEL_NAME`) and the system prompt +(`SYSTEM_PROMPT`). Per-node activity timeouts and retry policies live in the +node `metadata` in `build_graph()`. + +## Environment variables + +Create a `.env` file (see `.env.example`): + +```bash +LITELLM_API_KEY=your-litellm-key # copied to OPENAI_API_KEY automatically +# OPENAI_BASE_URL= # optional: point at a different provider +# SGP_API_KEY= # optional: tracing +# SGP_ACCOUNT_ID= # optional: tracing +# SGP_CLIENT_BASE_URL= # optional: tracing +``` + +Happy building with Temporal + LangGraph! diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 new file mode 100644 index 000000000..d3a68303f --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 new file mode 100644 index 000000000..a3df5e228 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 new file mode 100644 index 000000000..18cffd54a --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 @@ -0,0 +1,140 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: "{{ description }}" + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: {{ workflow_name }} + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: {{ queue_name }} + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: LITELLM_API_KEY + # secret_name: litellm-api-key + # secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: {} + # LITELLM_API_KEY: "" + # OPENAI_BASE_URL: "" + # OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret name + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 new file mode 100644 index 000000000..c01f8831c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 @@ -0,0 +1,42 @@ +"""ACP server for the Temporal LangGraph agent. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create → @workflow.run on the workflow class + HTTP task/event/send → @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel → workflow cancellation via the Temporal client + +so we don't define any handlers here. The agent logic lives in +``project/workflow.py`` (the runtime) and ``project/graph.py`` (the LangGraph +graph whose nodes run as Temporal activities), executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. + +The ``LangGraphPlugin`` is registered here too so the Temporal client started +by FastACP shares the same graph registry as the worker. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + # When deployed to the cluster, the Temporal address is set automatically. + # Locally we point at the Temporal service from docker compose. + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ), +) \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 new file mode 100644 index 000000000..feb8051bb --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 @@ -0,0 +1,165 @@ +"""LangGraph graph for {{ agent_name }} — nodes run as Temporal activities. + +This is the LangGraph half of the integration. The ``temporalio.contrib.langgraph`` +plugin executes this graph's nodes durably: each node's ``execute_in`` metadata +says whether it runs as a Temporal **activity** or inline in the **workflow**. + + START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END + +- ``agent`` (``execute_in="activity"``): the LLM call. Runs as its own durable, + retried Temporal activity — visible in the Temporal UI. +- ``tools`` (``execute_in="workflow"``): executes tool calls and hosts the + human-in-the-loop gate. It runs inline in the workflow because (a) the + ``AIMessage`` with tool calls stays intact without crossing an activity + boundary, and (b) LangGraph ``interrupt`` (used for approvals) needs to run + where the workflow can pause on a Temporal signal. + +Why these shapes: +- The router (``route_after_agent``) and tools are **async** so LangGraph + awaits them directly; sync callables would be offloaded via + ``run_in_executor``, which Temporal's workflow event loop does not support. +- Tool execution as a workflow node keeps things simple for this template. + For long-running or heavily side-effecting tools, move that work into its + own activity (e.g. mark a dedicated tool node ``execute_in="activity"``). +""" + +from __future__ import annotations + +import os +from typing import Any, Annotated +from datetime import datetime, timedelta + +# Copy the LiteLLM proxy key to OPENAI_API_KEY so langchain-openai authenticates +# against the Scale LiteLLM proxy when one is configured. This runs in the +# worker process (where the agent activity executes). +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key: + os.environ.setdefault("OPENAI_API_KEY", _litellm_key) + +from typing_extensions import TypedDict + +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from langchain_openai import ChatOpenAI +from temporalio.common import RetryPolicy +from langchain_core.messages import ToolMessage, SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS, TOOLS_BY_NAME, TOOLS_REQUIRING_APPROVAL + +# The name this graph is registered under in the LangGraphPlugin. The workflow +# retrieves it with ``graph(GRAPH_NAME)``; acp.py and run_worker.py register it. +GRAPH_NAME = "{{ agent_name }}" + +# Swap for any LangChain-supported chat model id, e.g. "gpt-4o", "o3-mini". +MODEL_NAME = "gpt-4o" + +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + + messages: Annotated[list[Any], add_messages] + + +async def agent_node(state: AgentState) -> dict[str, Any]: + """The 'agent' node — one LLM call. Runs as a durable Temporal activity.""" + llm = ChatOpenAI(model=MODEL_NAME).bind_tools(TOOLS) + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system = SystemMessage( + content=SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + messages = [system, *messages] + return {"messages": [await llm.ainvoke(messages)]} + + +async def tools_node(state: AgentState) -> dict[str, Any]: + """The 'tools' node — executes tool calls, with a human-approval gate. + + Runs inline in the workflow. For tools in ``TOOLS_REQUIRING_APPROVAL`` it + raises a LangGraph ``interrupt`` carrying the pending call; the workflow + pauses on a Temporal signal until a human approves or rejects, then resumes. + """ + last_message = state["messages"][-1] + tool_messages: list[ToolMessage] = [] + + for tool_call in last_message.tool_calls: + name = tool_call["name"] + + tool = TOOLS_BY_NAME.get(name) + if tool is None: + # The model hallucinated a tool that isn't registered — tell it so + # it can recover, rather than crashing the workflow. + tool_messages.append( + ToolMessage(content=f"Error: unknown tool {name!r}", tool_call_id=tool_call["id"], name=name) + ) + continue + + if name in TOOLS_REQUIRING_APPROVAL: + # interrupt() pauses the graph; the workflow resumes it with the + # human's decision via Command(resume=...). Durable: it can wait + # minutes, hours, or days and survive worker restarts. + decision = interrupt( + {"tool_call_id": tool_call["id"], "name": name, "args": tool_call["args"]} + ) + if not decision.get("approved"): + rejection = ( + f"Tool call rejected by {decision.get('approver', 'human')}: " + f"{decision.get('reason', 'no reason given')}" + ) + tool_messages.append( + ToolMessage(content=rejection, tool_call_id=tool_call["id"], name=name) + ) + continue + + result = await tool.ainvoke(tool_call["args"]) + tool_messages.append( + ToolMessage(content=str(result), tool_call_id=tool_call["id"], name=name) + ) + + return {"messages": tool_messages} + + +async def route_after_agent(state: AgentState) -> str: + """Route to the tools node when the model requested tools, else finish. + + Async so LangGraph awaits it directly in the workflow (a sync router would + be offloaded via run_in_executor, unsupported in Temporal workflows). + """ + last_message = state["messages"][-1] + return "tools" if getattr(last_message, "tool_calls", None) else END + + +def build_graph() -> StateGraph: + """Build the agent graph with per-node Temporal execution metadata. + + Registered with the ``LangGraphPlugin`` in acp.py / run_worker.py, and used + by the workflow's visualization queries. + """ + builder = StateGraph(AgentState) + builder.add_node( + "agent", + agent_node, + metadata={ + "execute_in": "activity", + "start_to_close_timeout": timedelta(minutes=5), + "retry_policy": RetryPolicy(maximum_attempts=3), + }, + ) + builder.add_node("tools", tools_node, metadata={"execute_in": "workflow"}) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END}) + builder.add_edge("tools", "agent") + return builder \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 new file mode 100644 index 000000000..9dc45a4a0 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 @@ -0,0 +1,50 @@ +"""Temporal worker for {{ agent_name }}. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The ``LangGraphPlugin`` is given the graph registry (``{ GRAPH_NAME: graph }``). +At runtime it turns the graph's ``execute_in="activity"`` nodes into Temporal +activities and registers them on the worker automatically — so we don't have +to enumerate node activities by hand. +""" + +import asyncio + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from project.workflow import {{ workflow_class }} +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + # AgentexWorker runs workflows with an unsandboxed runner, so importing + # langchain/langgraph inside the workflow + nodes is fine. The LangGraph + # plugin registers the graph's activity-nodes for us. + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ) + + await worker.run( + activities=get_all_activities(), + workflow={{ workflow_class }}, + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 new file mode 100644 index 000000000..35660ad9b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 @@ -0,0 +1,57 @@ +"""Tool definitions for the LangGraph + Temporal agent. + +Each tool is an async LangChain ``@tool``. They're run by the ``tools`` node +(see ``project/graph.py``), which the Temporal LangGraph plugin executes +inside the workflow. Tools must be ``async`` so the in-workflow node awaits +them directly rather than offloading to a thread executor (which Temporal's +workflow event loop does not allow). + +``TOOLS`` is the single source of truth: it's bound to the model (so the LLM +knows the schemas) and looked up by name when the tool node runs. + +``TOOLS_REQUIRING_APPROVAL`` marks tools that pause for human approval before +they run — the tool node raises a LangGraph ``interrupt`` for those, which the +workflow surfaces and resolves via a Temporal signal (human-in-the-loop). +""" + +from __future__ import annotations + +from langchain_core.tools import tool + + +@tool +async def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with a real weather API call. + return f"The weather in {city} is sunny and 72°F" + + +@tool +async def send_notification(recipient: str, message: str) -> str: + """Send a notification to a recipient. Requires human approval before sending. + + Args: + recipient: Who to notify. + message: The message body to send. + + Returns: + A confirmation string. + """ + # TODO: Replace with a real side-effecting integration (email, Slack, ...). + return f"Notification sent to {recipient}: {message!r}" + + +# All tools available to the agent. Bound to the model and looked up by name +# when the tool node runs. +TOOLS = [get_weather, send_notification] +TOOLS_BY_NAME = {t.name: t for t in TOOLS} + +# Tools in this set pause for a human-approval signal before they run. +TOOLS_REQUIRING_APPROVAL = {"send_notification"} diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 new file mode 100644 index 000000000..d1621fb8c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 @@ -0,0 +1,259 @@ +"""Temporal workflow for {{ agent_name }} — Temporal as the LangGraph runtime. + +*Temporal replaces the runtime; LangGraph is the agent framework.* This +workflow is that runtime. Each turn it runs the LangGraph graph defined in +``project/graph.py`` via the ``temporalio.contrib.langgraph`` plugin, which +executes the graph's nodes as durable Temporal activities (the ``agent``/LLM +node) or inline in the workflow (the ``tools`` node). + +Showcased here: + +- **Nodes as activities** — the plugin runs the LLM node as a retried, + observable Temporal activity (see ``execute_in`` metadata in graph.py). +- **Human-in-the-loop** — when the graph raises a LangGraph ``interrupt`` for + an approval-gated tool, the workflow pauses on a Temporal signal + (``provide_approval``) and resumes with the human's decision. +- **Live introspection via Temporal queries** — status, the pending approval, + and a Mermaid/ASCII rendering of the agent graph, queryable while it runs. +- **Multi-turn memory** — the running message list is kept on the workflow + instance; durable and replay-safe for free, so no checkpoint DB is needed. +- **Tracing** — a per-turn span shipped to SGP/AgentEx. +""" + +from __future__ import annotations + +import os +import json +from typing import Any + +# LangGraph plugin helper: retrieves the graph registered under GRAPH_NAME. +import langgraph.checkpoint.memory +from temporalio import workflow +from langgraph.types import Command +from temporalio.contrib.langgraph import graph as lg_graph + +from agentex.lib import adk +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.adk import emit_langgraph_messages +from agentex.protocol.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +# Register the SGP tracing exporter (spans also reach the AgentEx backend via +# the default processor that is lazy-initialised on first span). +SGP_API_KEY = os.environ.get("SGP_API_KEY", "") +SGP_ACCOUNT_ID = os.environ.get("SGP_ACCOUNT_ID", "") +if SGP_API_KEY and SGP_ACCOUNT_ID: + add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=SGP_API_KEY, + sgp_account_id=SGP_ACCOUNT_ID, + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) + ) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class {{ workflow_class }}(BaseWorkflow): + """Durable runtime that runs the LangGraph agent via the Temporal plugin.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + # Running conversation, as LangGraph message objects. Durable: Temporal + # replays the activity results that produced it, so it survives crashes. + self._messages: list[Any] = [] + # How many messages have already been surfaced to the AgentEx UI. + self._emitted = 0 + self._status = "idle" + self._pending_approval: dict[str, Any] | None = None + self._approval_response: dict[str, Any] | None = None + self._viz_graph: Any = None + + # ------------------------------------------------------------------ # + # Signals + # ------------------------------------------------------------------ # + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a new user message: echo it, then run the agent graph durably.""" + logger.info(f"Received task event for task {params.task.id}") + self._turn_number += 1 + user_text = params.event.content.content + + # Echo the user's message so it shows up as a chat bubble. + await adk.messages.create(task_id=params.task.id, content=params.event.content) + self._messages.append({"role": "user", "content": user_text}) + + async with adk.tracing.span( + trace_id=params.task.id, + task_id=params.task.id, + name=f"Turn {self._turn_number}", + input={"message": user_text}, + ) as span: + final_text = await self._run_graph(params.task.id) + if span: + span.output = {"final_output": final_text} + + @workflow.signal + async def provide_approval(self, response: dict[str, Any]) -> None: + """Provide a human approval decision for a pending tool call. + + Args: + response: ``{"approved": bool, "reason": str, "approver": str}``. + """ + logger.info(f"Received approval response: {response}") + self._approval_response = response + + @workflow.signal + async def complete_task_signal(self) -> None: + """Gracefully end the task/workflow.""" + logger.info("Received complete_task signal") + self._complete_task = True + + # ------------------------------------------------------------------ # + # Agent turn — run the LangGraph graph, pausing for approvals. + # ------------------------------------------------------------------ # + + async def _run_graph(self, task_id: str) -> str: + """Run one turn of the graph, handling any human-approval interrupts.""" + # A fresh in-memory checkpointer per turn: it only needs to persist the + # interrupt/resume state within this turn. Temporal provides durability; + # cross-turn memory lives in self._messages. + compiled = lg_graph(GRAPH_NAME).compile( + checkpointer=langgraph.checkpoint.memory.InMemorySaver() + ) + config = {"configurable": {"thread_id": f"{task_id}-{self._turn_number}"}} + + self._status = "processing" + result = await compiled.ainvoke({"messages": self._messages}, config=config) + + # The graph pauses (interrupt) whenever an approval-gated tool is called. + while result.get("__interrupt__"): + interrupt_value = result["__interrupt__"][0].value + decision = await self._await_human_approval(task_id, interrupt_value) + result = await compiled.ainvoke(Command(resume=decision), config=config) + + self._messages = result["messages"] + # Surface the messages this turn produced (tool calls, results, final + # text) to the AgentEx UI. The SDK helper does the LangGraph→AgentEx + # message conversion and returns the final assistant text. + final_text = await emit_langgraph_messages(self._messages[self._emitted:], task_id) + self._emitted = len(self._messages) + self._status = "completed" + return final_text + + async def _await_human_approval(self, task_id: str, pending: dict[str, Any]) -> dict[str, Any]: + """Pause until a ``provide_approval`` signal arrives, then return the decision.""" + self._pending_approval = pending + self._approval_response = None + self._status = "waiting_for_approval" + + await adk.messages.create( + task_id=task_id, + content=TextContent( + author="agent", + content=( + f"⏸️ Waiting for human approval to run **{pending.get('name')}** " + f"with `{json.dumps(pending.get('args', {}))}`.\n\n" + "Send a `provide_approval` signal, e.g. " + '`{"approved": true, "approver": "you"}`.' + ), + ), + ) + + await workflow.wait_condition(lambda: self._approval_response is not None) + + decision = self._approval_response or {"approved": False} + self._pending_approval = None + self._approval_response = None + self._status = "processing" + return decision + + # ------------------------------------------------------------------ # + # Queries — inspect the running agent live from the Temporal UI/client. + # ------------------------------------------------------------------ # + + @workflow.query + def get_status(self) -> str: + """Current status: idle | processing | waiting_for_approval | completed.""" + return self._status + + @workflow.query + def get_pending_approval(self) -> dict[str, Any] | None: + """The tool call currently awaiting human approval, if any.""" + return self._pending_approval + + @workflow.query + def get_graph_state(self) -> dict[str, Any]: + """A snapshot of the agent loop's progress.""" + return { + "turn_number": self._turn_number, + "message_count": len(self._messages), + "status": self._status, + "pending_approval": self._pending_approval, + "completed": self._complete_task, + } + + @workflow.query + def get_graph_mermaid(self) -> str: + """Mermaid diagram of the agent graph (renders in GitHub/Notion).""" + try: + return self._visualization_graph().get_graph().draw_mermaid() + except Exception as exc: # pragma: no cover - visualization is best-effort + return f"Could not render graph: {exc}" + + @workflow.query + def get_graph_ascii(self) -> str: + """ASCII-art diagram of the agent graph (requires the `grandalf` package).""" + try: + return self._visualization_graph().get_graph().draw_ascii() + except ImportError: + return "ASCII rendering requires the 'grandalf' package. Try get_graph_mermaid instead." + except Exception as exc: # pragma: no cover - visualization is best-effort + return f"Could not render graph: {exc}" + + def _visualization_graph(self): + """Lazily build + cache the compiled graph used purely for rendering.""" + if self._viz_graph is None: + self._viz_graph = build_graph().compile() + return self._viz_graph + + # ------------------------------------------------------------------ # + # Entry point + # ------------------------------------------------------------------ # + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + """Keep the conversation alive, handling incoming message/approval signals.""" + logger.info(f"Task created: {params.task.id}") + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 new file mode 100644 index 000000000..125ce704c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 @@ -0,0 +1,42 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + # Temporal with the LangGraph plugin (temporalio.contrib.langgraph), + # which runs LangGraph nodes as Temporal activities. Needs >=1.27.0. + "temporalio[langgraph]>=1.27.0", + "langchain-openai", + "langchain-core", + "grandalf", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 new file mode 100644 index 000000000..a499fc17c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 @@ -0,0 +1,18 @@ +# Agentex SDK +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# Temporal with the LangGraph plugin (temporalio.contrib.langgraph). +# The plugin runs LangGraph nodes as Temporal activities; needs >=1.27.0. +temporalio[langgraph]>=1.27.0 + +# LangChain model + tools +langchain-openai +langchain-core + +# Optional: enables get_graph_ascii() ASCII graph rendering +grandalf + +python-dotenv diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 new file mode 100644 index 000000000..2d28e44d4 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 @@ -0,0 +1,147 @@ +""" +Sample tests for AgentEx ACP agent. + +This test suite demonstrates how to test the main AgentEx API functions: +- Non-streaming event sending and polling +- Streaming event sending + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: {{ agent_name }}) +""" + +import os +import uuid +import asyncio +import pytest +import pytest_asyncio +from agentex import AsyncAgentex +from agentex.types import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest +from agentex.types.text_content_param import TextContentParam +from test_utils.async_utils import ( + poll_for_agent_response, + send_event_and_poll_yielding, + stream_agent_response, + validate_text_in_response, + poll_messages, +) + + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "{{ agent_name }}") + + +@pytest_asyncio.fixture +async def client(): + """Create an AsyncAgentex client instance for testing.""" + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """Test non-streaming event sending and polling.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, agent_name: str, agent_id: str): + """Test sending an event and polling for the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # TODO: Poll for the initial task creation message (if your agent sends one) + # async for message in poll_messages( + # client=client, + # task_id=task.id, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected initial message + # assert "expected initial text" in message.content.content + # break + + # TODO: Send an event and poll for response using the yielding helper function + # user_message = "Your test message here" + # async for message in send_event_and_poll_yielding( + # client=client, + # agent_id=agent_id, + # task_id=task.id, + # user_message=user_message, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected response + # assert "expected response text" in message.content.content + # break + pass + + +class TestStreamingEvents: + """Test streaming event sending.""" + + @pytest.mark.asyncio + async def test_send_event_and_stream(self, client: AsyncAgentex, agent_name: str, agent_id: str): + """Test sending an event and streaming the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # user_message = "Your test message here" + + # # Collect events from stream + # all_events = [] + + # async def collect_stream_events(): + # async for event in stream_agent_response( + # client=client, + # task_id=task.id, + # timeout=30, + # ): + # all_events.append(event) + + # # Start streaming task + # stream_task = asyncio.create_task(collect_stream_events()) + + # # Send the event + # event_content = TextContentParam(type="text", author="user", content=user_message) + # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) + + # # Wait for streaming to complete + # await stream_task + + # # TODO: Add your validation here + # assert len(all_events) > 0, "No events received in streaming response" + pass + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/lib/cli/test_init_templates.py b/tests/lib/cli/test_init_templates.py new file mode 100644 index 000000000..ec809cbbf --- /dev/null +++ b/tests/lib/cli/test_init_templates.py @@ -0,0 +1,139 @@ +"""Tests for the `agentex init` project templates. + +These render the Jinja templates the way the CLI does and assert that: + +- every template type's declared project files exist and render, +- rendered Python parses (catches `.j2` syntax/templating regressions), +- the agent-specific context (names, workflow class) is substituted in, +- the Temporal + LangGraph template is fully wired (enum, file map, root files). + +The Temporal + LangGraph template is the focus, but the parametrized smoke +test covers every template so a broken `.j2` anywhere is caught early. +""" + +from __future__ import annotations + +import ast +from pathlib import Path + +import pytest + +from agentex.lib.cli.commands.init import ( + TemplateType, + get_project_context, + create_project_structure, +) + + +def _context(template_type: TemplateType, use_uv: bool = True) -> dict: + """Build the same render context the CLI assembles from user answers.""" + answers = { + "template_type": template_type, + "project_path": ".", + "agent_name": "my-agent", + "agent_directory_name": "my-agent", + "description": "An Agentex agent", + "use_uv": use_uv, + } + context = get_project_context(answers, Path("."), Path("../../")) + context["template_type"] = template_type.value + context["use_uv"] = use_uv + return context + + +def _render_project(tmp_path: Path, template_type: TemplateType, use_uv: bool = True) -> Path: + context = _context(template_type, use_uv=use_uv) + create_project_structure(tmp_path, context, template_type, use_uv=use_uv) + return tmp_path / context["project_name"] + + +@pytest.mark.parametrize("template_type", list(TemplateType)) +def test_all_templates_render_to_valid_python(tmp_path: Path, template_type: TemplateType): + """Every template renders, and every rendered .py file is syntactically valid.""" + project_dir = _render_project(tmp_path, template_type) + + py_files = list(project_dir.rglob("*.py")) + assert py_files, f"{template_type.value} produced no Python files" + + for py_file in py_files: + source = py_file.read_text() + # Raises SyntaxError if a rendered template is broken. + ast.parse(source, filename=str(py_file)) + + +class TestTemporalLangGraphTemplate: + """Focused coverage for the new Temporal + LangGraph template.""" + + template_type = TemplateType.TEMPORAL_LANGGRAPH + + def test_enum_and_value(self): + assert TemplateType.TEMPORAL_LANGGRAPH.value == "temporal-langgraph" + + def test_expected_project_files_exist(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + project_pkg = project_dir / "project" + for filename in ( + "acp.py", + "workflow.py", + "run_worker.py", + "graph.py", + "tools.py", + "__init__.py", + ): + assert (project_pkg / filename).is_file(), f"missing project/{filename}" + + def test_expected_root_files_exist(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + for filename in ( + "manifest.yaml", + "README.md", + "environments.yaml", + ".env.example", + ".dockerignore", + "Dockerfile", + "dev.ipynb", + "pyproject.toml", + ): + assert (project_dir / filename).is_file(), f"missing {filename}" + + def test_workflow_class_substituted(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + workflow_src = (project_dir / "project" / "workflow.py").read_text() + # agent_name "my-agent" -> workflow class "MyAgentWorkflow" + assert "class MyAgentWorkflow(BaseWorkflow):" in workflow_src + assert "{{" not in workflow_src, "unrendered Jinja left in workflow.py" + + def test_nodes_run_via_langgraph_plugin(self, tmp_path: Path): + """The defining trait: nodes run as Temporal activities via the plugin.""" + project_dir = _render_project(tmp_path, self.template_type) + graph_src = (project_dir / "project" / "graph.py").read_text() + # The agent (LLM) node is an activity; the tools node runs in-workflow. + assert '"execute_in": "activity"' in graph_src + assert '"execute_in": "workflow"' in graph_src + + # Both the worker and the ACP register the LangGraph plugin. + worker_src = (project_dir / "project" / "run_worker.py").read_text() + acp_src = (project_dir / "project" / "acp.py").read_text() + assert "LangGraphPlugin" in worker_src + assert "LangGraphPlugin" in acp_src + + def test_human_in_the_loop_and_queries_present(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + workflow_src = (project_dir / "project" / "workflow.py").read_text() + graph_src = (project_dir / "project" / "graph.py").read_text() + # HIL: graph raises a langgraph interrupt; workflow resumes via signal + Command. + assert "interrupt(" in graph_src + assert "TOOLS_REQUIRING_APPROVAL" in graph_src + assert "def provide_approval" in workflow_src + assert "Command(resume=" in workflow_src + assert "wait_condition" in workflow_src + # Graph-visualization / introspection queries + for query in ("get_status", "get_graph_mermaid", "get_graph_ascii", "get_graph_state"): + assert query in workflow_src, f"missing query {query}" + + def test_requirements_include_langgraph_plugin_and_temporal(self, tmp_path: Path): + # requirements.txt only renders in the non-uv variant + project_dir = _render_project(tmp_path, self.template_type, use_uv=False) + requirements = (project_dir / "requirements.txt").read_text() + assert "temporalio[langgraph]>=1.27.0" in requirements + assert "langchain-openai" in requirements