diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2a9f9799..9f6fdffc 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,3 +11,6 @@ /*nexus/ @temporalio/nexus /tests/nexus*/ @temporalio/nexus /tests/*nexus/ @temporalio/nexus + +/google_genai_plugin/ @temporalio/sdk @temporalio/ai-sdk +/tests/google_genai_plugin/ @temporalio/sdk @temporalio/ai-sdk diff --git a/README.md b/README.md index 4d7091bc..77d674b9 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [external_storage](external_storage) - Offload large payloads to S3-compatible object storage, plus a codec server for the Web UI and CLI. * [external_storage_redis](external_storage_redis) - Redis driver for external storage * [gevent_async](gevent_async) - Combine gevent and Temporal. +* [google_genai_plugin](google_genai_plugin) - Run the Google Gemini SDK inside durable Temporal workflows (generate_content, tools/AFC, streaming, chat, structured output, MCP, files, interactions, agents, Vertex AI). * [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow. * [langchain](langchain) - Orchestrate workflows for LangChain. * [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API). diff --git a/google_genai_plugin/README.md b/google_genai_plugin/README.md new file mode 100644 index 00000000..ce250a75 --- /dev/null +++ b/google_genai_plugin/README.md @@ -0,0 +1,82 @@ +# Google GenAI Samples + +These samples demonstrate the [Temporal Google GenAI plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/google_genai), which runs the [Google Gemini SDK](https://googleapis.github.io/python-genai/) inside Temporal Workflows. Workflows construct a `TemporalAsyncClient`, and every Gemini API call — `generate_content`, tool calls, streaming, files, interactions, agents — runs as a Temporal Activity. You get durable execution, Temporal-managed retries and timeouts, and your credentials never enter the workflow or its event history. + +## Samples + +| Sample | Description | +|--------|-------------| +| [hello_world](hello_world) | Minimal `generate_content` call. Start here. | +| [tools](tools) | Automatic function calling: an `activity_as_tool`-wrapped activity and a plain workflow-method tool on one call. | +| [streaming](streaming) | Forward `generate_content_stream` chunks to an external subscriber via `streaming_topic` + `WorkflowStream`. | +| [chat](chat) | Multi-turn conversation with `client.chats`. | +| [structured_output](structured_output) | Typed JSON output via `response_schema` and a Pydantic model. | +| [mcp](mcp) | Give Gemini an MCP server's tools via `TemporalMcpClientSession`. | +| [files](files) | Upload a file with `client.files` and reference it in a call. *(needs a live API key)* | +| [interactions](interactions) | Stateful server-side conversations via `client.interactions`. *(needs a live API key)* | +| [agents](agents) | Managed-agent CRUD via `client.agents`. *(needs a live API key)* | +| [vertex_ai](vertex_ai) | The hello-world flow against Vertex AI (`vertexai=True`). *(needs GCP credentials)* | + +## Prerequisites + +1. Install dependencies: + + ```bash + uv sync --group google-genai + ``` + + > The `google-genai` extra of `temporalio` is shipping in an upcoming release. Until then, install the SDK from the source checkout: + > + > ```bash + > uv pip install -e ../sdk-python --extra google-genai --extra pydantic + > ``` + +2. Configure credentials. Most samples use the Gemini Developer API and read an API key from the environment: + + ```bash + export GOOGLE_API_KEY=... + ``` + + The [vertex_ai](vertex_ai) sample instead uses Vertex AI with Google Cloud Application Default Credentials — see its README. You can authenticate with `gcloud auth application-default login` and set `GOOGLE_CLOUD_PROJECT` (and optionally `GOOGLE_CLOUD_LOCATION`). + +3. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server): + + ```bash + temporal server start-dev + ``` + +## Running a Sample + +Each sample has two scripts. Start the Worker first, then the Workflow starter in a separate terminal: + +```bash +# Terminal 1: start the Worker +uv run google_genai_plugin//run_worker.py + +# Terminal 2: start the Workflow +uv run google_genai_plugin//run_workflow.py +``` + +For example, to run the tools sample: + +```bash +# Terminal 1 +uv run google_genai_plugin/tools/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/tools/run_workflow.py +``` + +## Key Features Demonstrated + +- **Durable API calls** — every Gemini call runs as an activity with configurable timeouts and retries; no credentials enter workflow history. +- **Automatic function calling** — the SDK's AFC loop runs in-workflow; tools can be durable activities (`activity_as_tool`) or plain workflow methods. +- **Streaming** — forward model chunks live to external subscribers via `WorkflowStream`. +- **Structured output** — Pydantic-typed results through the plugin's Pydantic data converter. +- **MCP integration** — register MCP servers on the worker; tool calls dispatched through per-server activities. +- **Full API surface** — chat, the Files API, the Interactions API, managed agents, and Vertex AI. + +## Related + +- [Temporal Google GenAI plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/google_genai) +- [Google Gemini SDK (`google-genai`)](https://googleapis.github.io/python-genai/) diff --git a/google_genai_plugin/__init__.py b/google_genai_plugin/__init__.py new file mode 100644 index 00000000..4cd47986 --- /dev/null +++ b/google_genai_plugin/__init__.py @@ -0,0 +1 @@ +"""Temporal Google GenAI plugin samples.""" diff --git a/google_genai_plugin/agents/README.md b/google_genai_plugin/agents/README.md new file mode 100644 index 00000000..dc7dd741 --- /dev/null +++ b/google_genai_plugin/agents/README.md @@ -0,0 +1,35 @@ +# Managed Agents + +Managed agents (`client.agents`) are server-side resources you can create, fetch, +list, and delete. This sample runs the full CRUD cycle, each operation as a +Temporal activity. + +> **Requires a live Gemini API key.** The Agents API talks to a real backend that +> the plugin's test server does not mock, so this sample has no automated test — +> run it against a real `GOOGLE_API_KEY`. + +## What This Sample Demonstrates + +- `client.agents.create(id=..., system_instruction=...)` +- `client.agents.get(id)`, `client.agents.list(page_size=...)`, `client.agents.delete(id)` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/agents/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/agents/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `AgentsWorkflow` — create, get, list, delete a managed agent | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/agents/__init__.py b/google_genai_plugin/agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/agents/run_worker.py b/google_genai_plugin/agents/run_worker.py new file mode 100644 index 00000000..b57d18de --- /dev/null +++ b/google_genai_plugin/agents/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the agents sample.""" + +# @@@SNIPSTART python-google-genai-agents-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.agents.workflow import AgentsWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-agents", + workflows=[AgentsWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/agents/run_workflow.py b/google_genai_plugin/agents/run_workflow.py new file mode 100644 index 00000000..c0ce643b --- /dev/null +++ b/google_genai_plugin/agents/run_workflow.py @@ -0,0 +1,27 @@ +"""Start the agents workflow.""" + +# @@@SNIPSTART python-google-genai-agents-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.agents.workflow import AgentsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + AgentsWorkflow.run, + "samples-demo-agent", + id="google-genai-agents", + task_queue="google-genai-agents", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/agents/workflow.py b/google_genai_plugin/agents/workflow.py new file mode 100644 index 00000000..0210fa95 --- /dev/null +++ b/google_genai_plugin/agents/workflow.py @@ -0,0 +1,35 @@ +"""Managed agents CRUD via client.agents. + +Managed agents are server-side resources you create, fetch, list, and delete. +Each operation runs as a Temporal activity. +""" + +# @@@SNIPSTART python-google-genai-agents-workflow +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class AgentsWorkflow: + @workflow.run + async def run(self, agent_id: str) -> dict[str, Any]: + client = TemporalAsyncClient() + + created = await client.agents.create( + id=agent_id, + system_instruction="You are a helpful assistant.", + ) + fetched = await client.agents.get(agent_id) + listing = await client.agents.list(page_size=10) + await client.agents.delete(agent_id) + + return { + "created_id": created.id, + "fetched_id": fetched.id, + "listed_ids": [a.id for a in (listing.agents or [])], + } + + +# @@@SNIPEND diff --git a/google_genai_plugin/chat/README.md b/google_genai_plugin/chat/README.md new file mode 100644 index 00000000..c2099573 --- /dev/null +++ b/google_genai_plugin/chat/README.md @@ -0,0 +1,31 @@ +# Chat + +A multi-turn conversation using `client.chats`. The chat session carries history +across turns, and each `send_message` call runs as a durable Temporal activity. + +## What This Sample Demonstrates + +- Creating a chat session with `client.chats.create(...)` +- Sending multiple turns with `await chat.send_message(...)` +- Conversation state persisting across durable activity calls + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/chat/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/chat/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `ChatWorkflow` — sends a list of prompts over one chat session | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints each turn's reply | diff --git a/google_genai_plugin/chat/__init__.py b/google_genai_plugin/chat/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/chat/run_worker.py b/google_genai_plugin/chat/run_worker.py new file mode 100644 index 00000000..60bc1313 --- /dev/null +++ b/google_genai_plugin/chat/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the chat sample.""" + +# @@@SNIPSTART python-google-genai-chat-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-chat", + workflows=[ChatWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/chat/run_workflow.py b/google_genai_plugin/chat/run_workflow.py new file mode 100644 index 00000000..90904221 --- /dev/null +++ b/google_genai_plugin/chat/run_workflow.py @@ -0,0 +1,31 @@ +"""Start the chat workflow with a multi-turn conversation.""" + +# @@@SNIPSTART python-google-genai-chat-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + ChatWorkflow.run, + [ + "My favorite color is teal. Remember that.", + "What is my favorite color?", + ], + id="google-genai-chat", + task_queue="google-genai-chat", + ) + + for turn, reply in enumerate(result, start=1): + print(f"Turn {turn}: {reply}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/chat/workflow.py b/google_genai_plugin/chat/workflow.py new file mode 100644 index 00000000..666d4c8a --- /dev/null +++ b/google_genai_plugin/chat/workflow.py @@ -0,0 +1,26 @@ +"""Multi-turn chat using client.chats. + +A chat session keeps conversation history across turns. Each ``send_message`` +call runs as a durable Temporal activity, and the SDK threads prior turns into +each request automatically. +""" + +# @@@SNIPSTART python-google-genai-chat-workflow +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class ChatWorkflow: + @workflow.run + async def run(self, prompts: list[str]) -> list[str]: + client = TemporalAsyncClient() + chat = client.chats.create(model="gemini-2.5-flash") + replies: list[str] = [] + for prompt in prompts: + response = await chat.send_message(prompt) + replies.append(response.text or "") + return replies + + +# @@@SNIPEND diff --git a/google_genai_plugin/files/README.md b/google_genai_plugin/files/README.md new file mode 100644 index 00000000..02d8c474 --- /dev/null +++ b/google_genai_plugin/files/README.md @@ -0,0 +1,38 @@ +# Files + +Upload a file with the Gemini Files API, then ask the model about it. +`client.files.upload` runs as a Temporal activity on the worker (the file is +read there), and the returned file handle is referenced in a `generate_content` +call. + +> **Requires a live Gemini API key.** The Files API talks to a real backend that +> the plugin's test server does not mock, so this sample has no automated test — +> run it against a real `GOOGLE_API_KEY`. The file path is resolved on the +> worker, so `sample.txt` must be reachable by the worker process. + +## What This Sample Demonstrates + +- `client.files.upload(file=..., config=UploadFileConfig(...))` as a durable activity +- Referencing the uploaded file handle in `generate_content` `contents` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/files/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/files/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `sample.txt` | The document uploaded and summarized | +| `workflow.py` | `FilesWorkflow` — uploads a file, then summarizes it | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow with the sample file path | diff --git a/google_genai_plugin/files/__init__.py b/google_genai_plugin/files/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/files/run_worker.py b/google_genai_plugin/files/run_worker.py new file mode 100644 index 00000000..d425597f --- /dev/null +++ b/google_genai_plugin/files/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the files sample.""" + +# @@@SNIPSTART python-google-genai-files-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.files.workflow import FilesWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-files", + workflows=[FilesWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/files/run_workflow.py b/google_genai_plugin/files/run_workflow.py new file mode 100644 index 00000000..3af2f309 --- /dev/null +++ b/google_genai_plugin/files/run_workflow.py @@ -0,0 +1,34 @@ +"""Start the files workflow. + +The file is read on the worker, so ``sample.txt`` must be on a path the worker +process can access (here it ships alongside this sample). +""" + +# @@@SNIPSTART python-google-genai-files-run-workflow +import asyncio +import os +from pathlib import Path + +from temporalio.client import Client + +from google_genai_plugin.files.workflow import FilesWorkflow + +SAMPLE_FILE = str(Path(__file__).parent / "sample.txt") + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + FilesWorkflow.run, + args=[SAMPLE_FILE, "Summarize this document in one sentence."], + id="google-genai-files", + task_queue="google-genai-files", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/files/sample.txt b/google_genai_plugin/files/sample.txt new file mode 100644 index 00000000..d192614a --- /dev/null +++ b/google_genai_plugin/files/sample.txt @@ -0,0 +1,6 @@ +Temporal is a durable execution platform. Workflows written with the Temporal +SDK run as ordinary code, but their state is persisted by the Temporal service, +so they survive process crashes, machine restarts, and deploys. Activities +encapsulate side effects like network calls; Temporal handles their retries and +timeouts. The Google GenAI plugin builds on this by running every Gemini API +call as a durable activity. diff --git a/google_genai_plugin/files/workflow.py b/google_genai_plugin/files/workflow.py new file mode 100644 index 00000000..098a9111 --- /dev/null +++ b/google_genai_plugin/files/workflow.py @@ -0,0 +1,33 @@ +"""Upload a file with the Files API, then ask Gemini about it. + +``client.files.upload`` runs as an activity on the worker — the file is read +there, not in the workflow — and the returned file handle is then referenced in +a ``generate_content`` call. +""" + +# @@@SNIPSTART python-google-genai-files-workflow +from typing import cast + +from google.genai import types +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class FilesWorkflow: + @workflow.run + async def run(self, file_path: str, prompt: str) -> str: + client = TemporalAsyncClient() + uploaded = await client.files.upload( + file=file_path, + config=types.UploadFileConfig(mime_type="text/plain"), + ) + contents = cast(types.ContentListUnion, [prompt, uploaded]) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=contents, + ) + return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/README.md b/google_genai_plugin/hello_world/README.md new file mode 100644 index 00000000..5c29cd55 --- /dev/null +++ b/google_genai_plugin/hello_world/README.md @@ -0,0 +1,33 @@ +# Hello World + +The simplest Google GenAI + Temporal sample: one `generate_content` call. The +call runs as a Temporal activity, so it gets durable retries, timeouts, and +crash recovery, and the Gemini credentials never enter the workflow. + +## What This Sample Demonstrates + +- Wiring `GoogleGenAIPlugin` onto the worker with a real `genai.Client` +- Constructing a `TemporalAsyncClient` inside a `@workflow.defn` +- Calling `client.models.generate_content(...)` durably + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server (`temporal server start-dev`). See the +[suite README](../README.md) for details. + +```bash +# Terminal 1 +uv run google_genai_plugin/hello_world/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/hello_world/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `HelloWorldWorkflow` with a single `generate_content` call | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/hello_world/__init__.py b/google_genai_plugin/hello_world/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/hello_world/run_worker.py b/google_genai_plugin/hello_world/run_worker.py new file mode 100644 index 00000000..3c925d3d --- /dev/null +++ b/google_genai_plugin/hello_world/run_worker.py @@ -0,0 +1,36 @@ +"""Worker for the hello world sample.""" + +# @@@SNIPSTART python-google-genai-hello-world-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + # The real genai.Client (with credentials) lives only on the worker. + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-hello-world", + workflows=[HelloWorldWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/run_workflow.py b/google_genai_plugin/hello_world/run_workflow.py new file mode 100644 index 00000000..457a2699 --- /dev/null +++ b/google_genai_plugin/hello_world/run_workflow.py @@ -0,0 +1,27 @@ +"""Start the hello world workflow.""" + +# @@@SNIPSTART python-google-genai-hello-world-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Write a haiku about durable execution.", + id="google-genai-hello-world", + task_queue="google-genai-hello-world", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/workflow.py b/google_genai_plugin/hello_world/workflow.py new file mode 100644 index 00000000..d962e91a --- /dev/null +++ b/google_genai_plugin/hello_world/workflow.py @@ -0,0 +1,25 @@ +"""Minimal Temporal + Google GenAI workflow: one prompt, one response. + +Every Gemini API call made through ``TemporalAsyncClient`` runs as a durable +Temporal activity, so it gets retries, timeouts, and crash recovery for free — +and no credentials ever enter the workflow. +""" + +# @@@SNIPSTART python-google-genai-hello-world-workflow +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class HelloWorldWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + ) + return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/README.md b/google_genai_plugin/interactions/README.md new file mode 100644 index 00000000..f3116731 --- /dev/null +++ b/google_genai_plugin/interactions/README.md @@ -0,0 +1,40 @@ +# Interactions + +The Interactions API (`client.interactions`) is a server-managed, stateful +conversation API: state lives on Google's backend, addressed by an interaction +id. This sample creates an interaction, fetches it, and deletes it — each +operation running as a Temporal activity. + +> **Requires a live Gemini API key.** The Interactions API talks to a real +> backend that the plugin's test server does not mock, so this sample has no +> automated test — run it against a real `GOOGLE_API_KEY`. + +Note: unlike `client.models`, the Interactions API has no automatic function +calling. To use tools, declare them as `{"type": "function", ...}` dicts and +drive the tool loop yourself. + +## What This Sample Demonstrates + +- `client.interactions.create(model=..., input=...)` as a durable activity +- `client.interactions.get(id)` and `client.interactions.delete(id)` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/interactions/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/interactions/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `InteractionsWorkflow` — create, get, delete an interaction | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/interactions/__init__.py b/google_genai_plugin/interactions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/interactions/run_worker.py b/google_genai_plugin/interactions/run_worker.py new file mode 100644 index 00000000..2ce79f97 --- /dev/null +++ b/google_genai_plugin/interactions/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the interactions sample.""" + +# @@@SNIPSTART python-google-genai-interactions-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.interactions.workflow import InteractionsWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-interactions", + workflows=[InteractionsWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/run_workflow.py b/google_genai_plugin/interactions/run_workflow.py new file mode 100644 index 00000000..63cb6b2d --- /dev/null +++ b/google_genai_plugin/interactions/run_workflow.py @@ -0,0 +1,27 @@ +"""Start the interactions workflow.""" + +# @@@SNIPSTART python-google-genai-interactions-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.interactions.workflow import InteractionsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + InteractionsWorkflow.run, + "What is durable execution?", + id="google-genai-interactions", + task_queue="google-genai-interactions", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/workflow.py b/google_genai_plugin/interactions/workflow.py new file mode 100644 index 00000000..3cd8ceab --- /dev/null +++ b/google_genai_plugin/interactions/workflow.py @@ -0,0 +1,32 @@ +"""Stateful server-side conversations via the Interactions API. + +``client.interactions`` is a server-managed API: the conversation state lives on +Google's backend, addressed by an interaction id. Each operation — create, get, +delete — runs as a Temporal activity. Unlike ``client.models``, this API has no +automatic function calling. +""" + +# @@@SNIPSTART python-google-genai-interactions-workflow +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class InteractionsWorkflow: + @workflow.run + async def run(self, prompt: str) -> dict[str, Any]: + client = TemporalAsyncClient() + + interaction = await client.interactions.create( + model="gemini-2.5-flash", + input=prompt, + ) + fetched = await client.interactions.get(interaction.id) + await client.interactions.delete(interaction.id) + + return {"id": interaction.id, "status": str(fetched.status)} + + +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/README.md b/google_genai_plugin/mcp/README.md new file mode 100644 index 00000000..0806e93e --- /dev/null +++ b/google_genai_plugin/mcp/README.md @@ -0,0 +1,37 @@ +# MCP + +Give Gemini access to an [MCP](https://modelcontextprotocol.io/) server's tools. +The worker launches the `echo_mcp_server.py` stdio server and registers it with +the plugin under the name `echo`. Inside the workflow, a +`TemporalMcpClientSession("echo")` is passed as a tool — Gemini's AFC loop +discovers and calls its tools, and `list_tools` / `call_tool` run as Temporal +activities against a pooled worker-side connection. + +## What This Sample Demonstrates + +- Registering an MCP server with `GoogleGenAIPlugin(mcp_servers={...})` +- Passing `TemporalMcpClientSession(name)` as a `generate_content` tool +- `cache_tools=True` to discover the tool list once and reuse it (replay-safe) +- MCP tool calls dispatched through per-server Temporal activities + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/mcp/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/mcp/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `echo_mcp_server.py` | A minimal `FastMCP` stdio server exposing an `echo` tool | +| `workflow.py` | `McpWorkflow` — passes `TemporalMcpClientSession("echo")` as a tool | +| `run_worker.py` | Registers the `echo` MCP server with the plugin, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/mcp/__init__.py b/google_genai_plugin/mcp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/mcp/echo_mcp_server.py b/google_genai_plugin/mcp/echo_mcp_server.py new file mode 100644 index 00000000..d75f82cd --- /dev/null +++ b/google_genai_plugin/mcp/echo_mcp_server.py @@ -0,0 +1,15 @@ +"""A minimal stdio MCP server used by the MCP sample.""" + +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("echo-server") + + +@mcp.tool() +def echo(message: str) -> str: + """Return the input message unchanged.""" + return message + + +if __name__ == "__main__": + mcp.run() diff --git a/google_genai_plugin/mcp/run_worker.py b/google_genai_plugin/mcp/run_worker.py new file mode 100644 index 00000000..30d05bf4 --- /dev/null +++ b/google_genai_plugin/mcp/run_worker.py @@ -0,0 +1,58 @@ +"""Worker for the MCP sample. + +Registers an ``echo`` MCP server (the ``echo_mcp_server.py`` stdio script) with +the plugin. The plugin opens a pooled MCP connection on the worker and runs +``list_tools`` / ``call_tool`` as activities. +""" + +# @@@SNIPSTART python-google-genai-mcp-worker +import asyncio +import os +import sys +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path + +from google import genai +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.mcp.workflow import McpWorkflow + +ECHO_SERVER = str(Path(__file__).parent / "echo_mcp_server.py") + + +@asynccontextmanager +async def echo_session() -> AsyncIterator[ClientSession]: + """Yield a connected, initialized session to the stdio echo MCP server.""" + params = StdioServerParameters(command=sys.executable, args=[ECHO_SERVER]) + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + yield session + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client, mcp_servers={"echo": echo_session}) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-mcp", + workflows=[McpWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/run_workflow.py b/google_genai_plugin/mcp/run_workflow.py new file mode 100644 index 00000000..6c82158a --- /dev/null +++ b/google_genai_plugin/mcp/run_workflow.py @@ -0,0 +1,27 @@ +"""Start the MCP workflow.""" + +# @@@SNIPSTART python-google-genai-mcp-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.mcp.workflow import McpWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + McpWorkflow.run, + "Use the echo tool to echo back the phrase: durable execution.", + id="google-genai-mcp", + task_queue="google-genai-mcp", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/workflow.py b/google_genai_plugin/mcp/workflow.py new file mode 100644 index 00000000..7cd5e5d2 --- /dev/null +++ b/google_genai_plugin/mcp/workflow.py @@ -0,0 +1,41 @@ +"""Use an MCP server's tools from a Gemini call via TemporalMcpClientSession. + +The worker registers an ``echo`` MCP server with the plugin. Inside the +workflow, ``TemporalMcpClientSession("echo")`` is passed as a tool; Gemini's AFC +loop discovers and calls the MCP tools, with ``list_tools`` / ``call_tool`` +running as Temporal activities against a pooled worker-side connection. +""" + +# @@@SNIPSTART python-google-genai-mcp-workflow +from datetime import timedelta + +from google.genai import types +from temporalio import workflow +from temporalio.contrib.google_genai import ( + TemporalAsyncClient, + TemporalMcpClientSession, +) +from temporalio.workflow import ActivityConfig + + +@workflow.defn +class McpWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + session = TemporalMcpClientSession( + "echo", + cache_tools=True, + activity_config=ActivityConfig( + start_to_close_timeout=timedelta(seconds=30), + ), + ) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig(tools=[session]), + ) + return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/README.md b/google_genai_plugin/streaming/README.md new file mode 100644 index 00000000..2e25a1b7 --- /dev/null +++ b/google_genai_plugin/streaming/README.md @@ -0,0 +1,35 @@ +# Streaming + +Forward Gemini model output to an external subscriber in real time. +`TemporalAsyncClient(streaming_topic="gemini")` publishes each +`generate_content_stream` chunk onto a workflow-hosted `WorkflowStream` as it +arrives. A subscriber connects with `WorkflowStreamClient` and reads the topic +live while the workflow runs durably. + +## What This Sample Demonstrates + +- `TemporalAsyncClient(streaming_topic=...)` publishing chunks to a topic +- Hosting a `WorkflowStream` in `@workflow.init` (required for streaming) +- Consuming the stream externally via `WorkflowStreamClient.subscribe(...)` +- Holding the workflow open on a signal so the subscriber can drain the stream + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/streaming/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/streaming/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `StreamingWorkflow` — streams chunks to the `gemini` topic | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Starts the workflow and consumes the stream live | diff --git a/google_genai_plugin/streaming/__init__.py b/google_genai_plugin/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/streaming/run_worker.py b/google_genai_plugin/streaming/run_worker.py new file mode 100644 index 00000000..5d85f86e --- /dev/null +++ b/google_genai_plugin/streaming/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the streaming sample.""" + +# @@@SNIPSTART python-google-genai-streaming-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-streaming", + workflows=[StreamingWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/run_workflow.py b/google_genai_plugin/streaming/run_workflow.py new file mode 100644 index 00000000..5e8c6704 --- /dev/null +++ b/google_genai_plugin/streaming/run_workflow.py @@ -0,0 +1,49 @@ +"""Start the streaming workflow and consume model chunks live.""" + +# @@@SNIPSTART python-google-genai-streaming-run-workflow +import asyncio +import os +from datetime import timedelta + +from google.genai import types +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + workflow_id = "google-genai-streaming" + + handle = await client.start_workflow( + StreamingWorkflow.run, + "Count from 1 to 5, one number per sentence.", + id=workflow_id, + task_queue="google-genai-streaming", + ) + + # Subscribe to the "gemini" topic and print chunks as the model produces them. + stream = WorkflowStreamClient.create(client, workflow_id) + async for item in stream.subscribe( + ["gemini"], + from_offset=0, + result_type=types.GenerateContentResponse, + poll_cooldown=timedelta(milliseconds=50), + ): + chunk: types.GenerateContentResponse = item.data + if chunk.text: + print(chunk.text, end="", flush=True) + if chunk.candidates and chunk.candidates[0].finish_reason: + print() + break + + # Release the workflow now that we've consumed the stream. + await handle.signal(StreamingWorkflow.finish) + result = await handle.result() + print(f"Final result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/workflow.py b/google_genai_plugin/streaming/workflow.py new file mode 100644 index 00000000..90c766b2 --- /dev/null +++ b/google_genai_plugin/streaming/workflow.py @@ -0,0 +1,41 @@ +"""Stream Gemini output to an external subscriber via WorkflowStream. + +``TemporalAsyncClient(streaming_topic="gemini")`` publishes each +``generate_content_stream`` chunk onto a workflow-hosted ``WorkflowStream`` as +it arrives, so external consumers can watch the model produce text in real time +while the workflow runs durably. The workflow holds itself open on a ``finish`` +signal so a subscriber can reliably read the stream before the run completes. +""" + +# @@@SNIPSTART python-google-genai-streaming-workflow +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient +from temporalio.contrib.workflow_streams import WorkflowStream + + +@workflow.defn +class StreamingWorkflow: + @workflow.init + def __init__(self, prompt: str) -> None: + # Hosting a WorkflowStream is required when streaming_topic is set. + self.stream = WorkflowStream() + self._done = False + + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient(streaming_topic="gemini") + chunks: list[str] = [] + async for chunk in await client.models.generate_content_stream( + model="gemini-2.5-flash", + contents=prompt, + ): + chunks.append(chunk.text or "") + await workflow.wait_condition(lambda: self._done) + return "".join(chunks) + + @workflow.signal + def finish(self) -> None: + self._done = True + + +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/README.md b/google_genai_plugin/structured_output/README.md new file mode 100644 index 00000000..db520e68 --- /dev/null +++ b/google_genai_plugin/structured_output/README.md @@ -0,0 +1,32 @@ +# Structured Output + +Get typed JSON back from Gemini by passing a Pydantic model as `response_schema`. +The plugin installs a `PydanticPayloadConverter`, so the model serializes cleanly +through Temporal payloads — the workflow returns a real `Recipe` instance. + +## What This Sample Demonstrates + +- `GenerateContentConfig(response_mime_type="application/json", response_schema=Recipe)` +- Reading the parsed model from `response.parsed` +- Returning a Pydantic model as a workflow result via the plugin's Pydantic converter + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/structured_output/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/structured_output/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | The `Recipe` model and `StructuredOutputWorkflow` | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the typed recipe | diff --git a/google_genai_plugin/structured_output/__init__.py b/google_genai_plugin/structured_output/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/structured_output/run_worker.py b/google_genai_plugin/structured_output/run_worker.py new file mode 100644 index 00000000..aa1e15fb --- /dev/null +++ b/google_genai_plugin/structured_output/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the structured output sample.""" + +# @@@SNIPSTART python-google-genai-structured-output-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.structured_output.workflow import StructuredOutputWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-structured-output", + workflows=[StructuredOutputWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/run_workflow.py b/google_genai_plugin/structured_output/run_workflow.py new file mode 100644 index 00000000..431ff797 --- /dev/null +++ b/google_genai_plugin/structured_output/run_workflow.py @@ -0,0 +1,31 @@ +"""Start the structured output workflow.""" + +# @@@SNIPSTART python-google-genai-structured-output-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.structured_output.workflow import StructuredOutputWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + recipe = await client.execute_workflow( + StructuredOutputWorkflow.run, + "Give me a simple recipe for avocado toast.", + id="google-genai-structured-output", + task_queue="google-genai-structured-output", + ) + + print(f"Recipe: {recipe.name}") + print(f"Ingredients: {', '.join(recipe.ingredients)}") + print("Steps:") + for i, step in enumerate(recipe.steps, start=1): + print(f" {i}. {step}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/workflow.py b/google_genai_plugin/structured_output/workflow.py new file mode 100644 index 00000000..5383cdae --- /dev/null +++ b/google_genai_plugin/structured_output/workflow.py @@ -0,0 +1,40 @@ +"""Typed JSON output via response_schema + a Pydantic model. + +The plugin installs a ``PydanticPayloadConverter``, so a Pydantic model flows +through Temporal payloads cleanly. Passing the model as ``response_schema`` +makes Gemini return matching JSON, which the SDK parses into the model on +``response.parsed``. +""" + +# @@@SNIPSTART python-google-genai-structured-output-workflow +from google.genai import types +from pydantic import BaseModel +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +class Recipe(BaseModel): + name: str + ingredients: list[str] + steps: list[str] + + +@workflow.defn +class StructuredOutputWorkflow: + @workflow.run + async def run(self, prompt: str) -> Recipe: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=Recipe, + ), + ) + recipe = response.parsed + assert isinstance(recipe, Recipe) + return recipe + + +# @@@SNIPEND diff --git a/google_genai_plugin/tools/README.md b/google_genai_plugin/tools/README.md new file mode 100644 index 00000000..d0c10921 --- /dev/null +++ b/google_genai_plugin/tools/README.md @@ -0,0 +1,39 @@ +# Tools + +Two tool surfaces wired into one Gemini `generate_content` call, both driven by +the SDK's automatic function-calling (AFC) loop: + +| Pattern | When to use it | +|---------|----------------| +| `@activity.defn` wrapped via `activity_as_tool` | Anything with I/O or non-determinism — runs as a durable activity. | +| Plain workflow method | Pure, deterministic logic — runs in-workflow with no activity dispatch. | + +A single prompt exercises both: the model calls `get_weather` (an activity), +then `recommend_activity` (a workflow method). + +## What This Sample Demonstrates + +- `activity_as_tool` carrying per-tool `ActivityConfig` (timeouts, retries) +- Passing a plain workflow method as a tool alongside an activity-backed one +- The AFC loop running inside the workflow, dispatching tool calls durably + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/tools/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/tools/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `get_weather` activity, `recommend_activity` method, and `ToolsWorkflow` | +| `run_worker.py` | Registers `GoogleGenAIPlugin` + the `get_weather` activity | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/tools/__init__.py b/google_genai_plugin/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/tools/run_worker.py b/google_genai_plugin/tools/run_worker.py new file mode 100644 index 00000000..897830a7 --- /dev/null +++ b/google_genai_plugin/tools/run_worker.py @@ -0,0 +1,36 @@ +"""Worker for the tools sample.""" + +# @@@SNIPSTART python-google-genai-tools-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.tools.workflow import ToolsWorkflow, get_weather + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-tools", + workflows=[ToolsWorkflow], + activities=[get_weather], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/tools/run_workflow.py b/google_genai_plugin/tools/run_workflow.py new file mode 100644 index 00000000..4c57a03f --- /dev/null +++ b/google_genai_plugin/tools/run_workflow.py @@ -0,0 +1,27 @@ +"""Start the tools workflow.""" + +# @@@SNIPSTART python-google-genai-tools-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.tools.workflow import ToolsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + ToolsWorkflow.run, + "What's the weather in Tokyo, and what should I do there?", + id="google-genai-tools", + task_queue="google-genai-tools", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/tools/workflow.py b/google_genai_plugin/tools/workflow.py new file mode 100644 index 00000000..10f4d68f --- /dev/null +++ b/google_genai_plugin/tools/workflow.py @@ -0,0 +1,57 @@ +"""Two tool surfaces on one Gemini call, both driven by automatic function calling. + +1. ``@activity.defn get_weather`` wrapped via ``activity_as_tool`` — runs as a + durable Temporal activity. Use this for I/O or non-deterministic work. +2. ``recommend_activity`` — a plain workflow method passed directly as a tool. + It runs deterministically in-workflow with no activity dispatch. + +Gemini's automatic function-calling (AFC) loop runs inside the workflow and +invokes both as needed. +""" + +# @@@SNIPSTART python-google-genai-tools-workflow +from datetime import timedelta + +from google.genai import types +from temporalio import activity, workflow +from temporalio.contrib.google_genai import TemporalAsyncClient, activity_as_tool +from temporalio.workflow import ActivityConfig + + +@activity.defn +async def get_weather(city: str) -> str: + """Look up the current weather for a city.""" + # Stub — replace with a real HTTP call in production. + return f"It's 72F and sunny in {city}." + + +@workflow.defn +class ToolsWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig( + tools=[ + activity_as_tool( + get_weather, + activity_config=ActivityConfig( + start_to_close_timeout=timedelta(seconds=30), + ), + ), + self.recommend_activity, + ], + ), + ) + return response.text or "" + + async def recommend_activity(self, weather: str) -> str: + """Recommend something to do given a weather description.""" + if "sunny" in weather.lower(): + return "Go for a hike." + return "Visit a museum." + + +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/README.md b/google_genai_plugin/vertex_ai/README.md new file mode 100644 index 00000000..b0ead608 --- /dev/null +++ b/google_genai_plugin/vertex_ai/README.md @@ -0,0 +1,47 @@ +# Vertex AI + +The same hello-world flow as [`hello_world`](../hello_world), but pointed at +**Vertex AI** instead of the Gemini Developer API. The only difference is +configuration: both the worker's `genai.Client` and the workflow's +`TemporalAsyncClient` set `vertexai=True` with a Google Cloud project and +location. The `vertexai` setting must match on both sides. + +> **Requires Google Cloud credentials**, not a Gemini API key. Authenticate with +> Application Default Credentials (`gcloud auth application-default login`) or a +> service-account key (`GOOGLE_APPLICATION_CREDENTIALS`). This sample has no +> automated test. + +## Configuration + +| Variable | Description | +|----------|-------------| +| `GOOGLE_CLOUD_PROJECT` | Your Google Cloud project ID (required) | +| `GOOGLE_CLOUD_LOCATION` | Region, e.g. `us-central1` (defaults to `us-central1`) | + +## What This Sample Demonstrates + +- `genai.Client(vertexai=True, project=..., location=...)` on the worker +- `TemporalAsyncClient(vertexai=True, project=..., location=...)` in the workflow +- Passing project/location as workflow arguments to keep the workflow deterministic + +## Running the Sample + +Prerequisites: install dependencies, configure GCP credentials, set +`GOOGLE_CLOUD_PROJECT`, and start a Temporal dev server. See the +[suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/vertex_ai/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/vertex_ai/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `VertexAIWorkflow` — generate_content via Vertex AI | +| `run_worker.py` | Registers a Vertex-configured `GoogleGenAIPlugin` | +| `run_workflow.py` | Reads project/location from env and executes the workflow | diff --git a/google_genai_plugin/vertex_ai/__init__.py b/google_genai_plugin/vertex_ai/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/vertex_ai/run_worker.py b/google_genai_plugin/vertex_ai/run_worker.py new file mode 100644 index 00000000..fe7d50a8 --- /dev/null +++ b/google_genai_plugin/vertex_ai/run_worker.py @@ -0,0 +1,44 @@ +"""Worker for the Vertex AI sample. + +Uses ``genai.Client(vertexai=True, ...)`` with Application Default Credentials +(no API key). Run ``gcloud auth application-default login`` first, or set +``GOOGLE_APPLICATION_CREDENTIALS`` to a service-account key file. +""" + +# @@@SNIPSTART python-google-genai-vertex-ai-worker +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.vertex_ai.workflow import VertexAIWorkflow + + +async def main() -> None: + genai_client = genai.Client( + vertexai=True, + project=os.environ["GOOGLE_CLOUD_PROJECT"], + location=os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1"), + ) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-vertex-ai", + workflows=[VertexAIWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/run_workflow.py b/google_genai_plugin/vertex_ai/run_workflow.py new file mode 100644 index 00000000..3805a6a4 --- /dev/null +++ b/google_genai_plugin/vertex_ai/run_workflow.py @@ -0,0 +1,30 @@ +"""Start the Vertex AI workflow.""" + +# @@@SNIPSTART python-google-genai-vertex-ai-run-workflow +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.vertex_ai.workflow import VertexAIWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + project = os.environ["GOOGLE_CLOUD_PROJECT"] + location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1") + + result = await client.execute_workflow( + VertexAIWorkflow.run, + args=["Write a haiku about durable execution.", project, location], + id="google-genai-vertex-ai", + task_queue="google-genai-vertex-ai", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/workflow.py b/google_genai_plugin/vertex_ai/workflow.py new file mode 100644 index 00000000..33db84aa --- /dev/null +++ b/google_genai_plugin/vertex_ai/workflow.py @@ -0,0 +1,31 @@ +"""Hello world against Vertex AI instead of the Gemini Developer API. + +The only difference from the basic sample is configuration: both the workflow's +``TemporalAsyncClient`` and the worker's ``genai.Client`` use ``vertexai=True`` +with a Google Cloud project and location. The project and location are passed in +as workflow arguments (read from the environment by the starter) to keep the +workflow deterministic. +""" + +# @@@SNIPSTART python-google-genai-vertex-ai-workflow +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class VertexAIWorkflow: + @workflow.run + async def run(self, prompt: str, project: str, location: str) -> str: + client = TemporalAsyncClient( + vertexai=True, + project=project, + location=location, + ) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + ) + return response.text or "" + + +# @@@SNIPEND diff --git a/pyproject.toml b/pyproject.toml index a148f865..d175a4a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,10 @@ external-storage = [ ] external-storage-redis = ["redis>=5.0.0,<8"] gevent = ["gevent>=25.4.2 ; python_version >= '3.8'"] +google-genai = [ + "mcp>=1.0.0", + "temporalio[google-genai,pydantic]>=1.28.0", +] langsmith-tracing = [ "openai>=1.4.0", "langsmith>=0.7.0", @@ -106,6 +110,7 @@ packages = [ "external_storage", "external_storage_redis", "gevent_async", + "google_genai_plugin", "hello", "langgraph_plugin", "langsmith_tracing", diff --git a/tests/google_genai_plugin/__init__.py b/tests/google_genai_plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/google_genai_plugin/chat_test.py b/tests/google_genai_plugin/chat_test.py new file mode 100644 index 00000000..d1e61654 --- /dev/null +++ b/tests/google_genai_plugin/chat_test.py @@ -0,0 +1,43 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def test_chat(client: Client) -> None: + server = GeminiTestServer( + [ + text_response("Got it — your favorite color is teal."), + text_response("Your favorite color is teal."), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-chat-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ChatWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + ChatWorkflow.run, + [ + "My favorite color is teal. Remember that.", + "What is my favorite color?", + ], + id=f"google-genai-chat-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == [ + "Got it — your favorite color is teal.", + "Your favorite color is teal.", + ] + assert len(server.requests) == 2 diff --git a/tests/google_genai_plugin/hello_world_test.py b/tests/google_genai_plugin/hello_world_test.py new file mode 100644 index 00000000..c7ffda1f --- /dev/null +++ b/tests/google_genai_plugin/hello_world_test.py @@ -0,0 +1,32 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def test_hello_world(client: Client) -> None: + server = GeminiTestServer([text_response("A haiku, for you.")]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-hello-world-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[HelloWorldWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Write a haiku.", + id=f"google-genai-hello-world-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "A haiku, for you." + assert len(server.requests) == 1 diff --git a/tests/google_genai_plugin/mcp_test.py b/tests/google_genai_plugin/mcp_test.py new file mode 100644 index 00000000..87ff50ac --- /dev/null +++ b/tests/google_genai_plugin/mcp_test.py @@ -0,0 +1,81 @@ +import sys +import uuid +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +from google import genai +from google.genai.types import HttpResponse as SdkHttpResponse +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.contrib.google_genai.testing import ( + function_call_response, + text_response, +) +from temporalio.worker import Worker + +from google_genai_plugin.mcp.workflow import McpWorkflow + +ECHO_SERVER = str( + Path(__file__).parents[2] / "google_genai_plugin" / "mcp" / "echo_mcp_server.py" +) + + +@asynccontextmanager +async def _echo_session() -> AsyncIterator[ClientSession]: + params = StdioServerParameters(command=sys.executable, args=[ECHO_SERVER]) + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + yield session + + +def _mcp_plugin(responses: list[str]) -> GoogleGenAIPlugin: + """A plugin with scripted model HTTP plus a real echo MCP server. + + Mirrors what ``GeminiTestServer.plugin()`` does for the model HTTP layer, + but also registers an MCP server (which ``GeminiTestServer`` does not), so + the MCP ``list_tools`` / ``call_tool`` activities run for real. + """ + genai_client = genai.Client(api_key="fake-test-key") + index = {"i": 0} + + async def fake_async_request(*_args: Any, **_kwargs: Any) -> SdkHttpResponse: + body = responses[index["i"]] + index["i"] += 1 + return SdkHttpResponse(headers={"content-type": "application/json"}, body=body) + + genai_client._api_client.async_request = fake_async_request # type: ignore[assignment] + return GoogleGenAIPlugin(genai_client, mcp_servers={"echo": _echo_session}) + + +async def test_mcp(client: Client) -> None: + plugin = _mcp_plugin( + [ + function_call_response("echo", {"message": "durable execution"}), + text_response("The echo tool returned: durable execution"), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], plugin] + client = Client(**config) + + task_queue = f"google-genai-mcp-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[McpWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + McpWorkflow.run, + "Use the echo tool to echo back the phrase: durable execution.", + id=f"google-genai-mcp-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert "durable execution" in result diff --git a/tests/google_genai_plugin/streaming_test.py b/tests/google_genai_plugin/streaming_test.py new file mode 100644 index 00000000..512d4ca3 --- /dev/null +++ b/tests/google_genai_plugin/streaming_test.py @@ -0,0 +1,53 @@ +import uuid +from datetime import timedelta + +from google.genai import types +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.contrib.workflow_streams import WorkflowStreamClient +from temporalio.worker import Worker + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def test_streaming_publishes_to_workflow_stream(client: Client) -> None: + server = GeminiTestServer([text_response("Hello from Gemini stream")]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-streaming-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[StreamingWorkflow], + max_cached_workflows=0, + ): + wf_id = f"google-genai-streaming-{uuid.uuid4()}" + handle = await client.start_workflow( + StreamingWorkflow.run, + "say hi", + id=wf_id, + task_queue=task_queue, + execution_timeout=timedelta(seconds=15), + ) + + # Consume the published chunk from the "gemini" topic. + stream = WorkflowStreamClient.create(client, wf_id) + received: list[types.GenerateContentResponse] = [] + async for item in stream.subscribe( + ["gemini"], + from_offset=0, + result_type=types.GenerateContentResponse, + poll_cooldown=timedelta(milliseconds=20), + ): + received.append(item.data) + break # one scripted chunk + + await handle.signal(StreamingWorkflow.finish) + result = await handle.result() + + assert result == "Hello from Gemini stream" + assert len(received) == 1 + assert received[0].text == "Hello from Gemini stream" diff --git a/tests/google_genai_plugin/structured_output_test.py b/tests/google_genai_plugin/structured_output_test.py new file mode 100644 index 00000000..ce328bdf --- /dev/null +++ b/tests/google_genai_plugin/structured_output_test.py @@ -0,0 +1,45 @@ +import json +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.structured_output.workflow import ( + Recipe, + StructuredOutputWorkflow, +) + + +async def test_structured_output(client: Client) -> None: + recipe_json = json.dumps( + { + "name": "Avocado Toast", + "ingredients": ["bread", "avocado", "salt"], + "steps": ["Toast the bread.", "Mash the avocado on top.", "Season."], + } + ) + server = GeminiTestServer([text_response(recipe_json)]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-structured-output-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[StructuredOutputWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + StructuredOutputWorkflow.run, + "Give me a simple recipe for avocado toast.", + id=f"google-genai-structured-output-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert isinstance(result, Recipe) + assert result.name == "Avocado Toast" + assert result.ingredients == ["bread", "avocado", "salt"] + assert len(result.steps) == 3 diff --git a/tests/google_genai_plugin/tools_test.py b/tests/google_genai_plugin/tools_test.py new file mode 100644 index 00000000..0a6d7923 --- /dev/null +++ b/tests/google_genai_plugin/tools_test.py @@ -0,0 +1,46 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import ( + GeminiTestServer, + function_call_response, + text_response, +) +from temporalio.worker import Worker + +from google_genai_plugin.tools.workflow import ToolsWorkflow, get_weather + + +async def test_tools(client: Client) -> None: + server = GeminiTestServer( + [ + function_call_response("get_weather", {"city": "Tokyo"}), + function_call_response( + "recommend_activity", {"weather": "It's 72F and sunny in Tokyo."} + ), + text_response("It's sunny in Tokyo — go for a hike!"), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-tools-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ToolsWorkflow], + activities=[get_weather], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + ToolsWorkflow.run, + "What's the weather in Tokyo, and what should I do there?", + id=f"google-genai-tools-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "It's sunny in Tokyo — go for a hike!" + # One model turn per response: two tool calls and a final text answer. + assert len(server.requests) == 3