From 16767246de91b45ebc9e23032d372fbae8e618be Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 18 Jun 2026 11:44:22 -0700 Subject: [PATCH 1/2] Add Google GenAI plugin samples Add a google_genai_plugin/ sample suite for temporalio.contrib.google_genai, mirroring the strands_plugin/ layout (one feature per sub-directory, each with workflow.py / run_worker.py / run_workflow.py / README.md). Samples cover every major plugin feature: - hello_world: generate_content - tools: automatic function calling (activity_as_tool + plain workflow-method tool) - streaming: generate_content_stream + streaming_topic/WorkflowStream - chat: multi-turn client.chats - structured_output: response_schema + Pydantic - mcp: TemporalMcpClientSession with a local echo MCP server - files: client.files.upload (live API) - interactions: client.interactions stateful API (live API) - agents: client.agents CRUD (live API) - vertex_ai: vertexai=True configuration (GCP credentials) Tests under tests/google_genai_plugin/ use the plugin's GeminiTestServer to run the model-layer samples offline; the mcp test additionally registers a real echo MCP server. files/interactions/agents/vertex_ai are runnable-only (require live credentials) and documented as such. Register the suite in pyproject.toml (google-genai dependency group + wheel package), the root README, and CODEOWNERS. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/CODEOWNERS | 3 + README.md | 1 + google_genai_plugin/README.md | 82 +++++++++++++++++++ google_genai_plugin/__init__.py | 1 + google_genai_plugin/agents/README.md | 35 ++++++++ google_genai_plugin/agents/__init__.py | 0 google_genai_plugin/agents/run_worker.py | 33 ++++++++ google_genai_plugin/agents/run_workflow.py | 25 ++++++ google_genai_plugin/agents/workflow.py | 31 +++++++ google_genai_plugin/chat/README.md | 31 +++++++ google_genai_plugin/chat/__init__.py | 0 google_genai_plugin/chat/run_worker.py | 33 ++++++++ google_genai_plugin/chat/run_workflow.py | 29 +++++++ google_genai_plugin/chat/workflow.py | 22 +++++ google_genai_plugin/files/README.md | 38 +++++++++ google_genai_plugin/files/__init__.py | 0 google_genai_plugin/files/run_worker.py | 33 ++++++++ google_genai_plugin/files/run_workflow.py | 32 ++++++++ google_genai_plugin/files/sample.txt | 6 ++ google_genai_plugin/files/workflow.py | 29 +++++++ google_genai_plugin/hello_world/README.md | 33 ++++++++ google_genai_plugin/hello_world/__init__.py | 0 google_genai_plugin/hello_world/run_worker.py | 34 ++++++++ .../hello_world/run_workflow.py | 25 ++++++ google_genai_plugin/hello_world/workflow.py | 21 +++++ google_genai_plugin/interactions/README.md | 40 +++++++++ google_genai_plugin/interactions/__init__.py | 0 .../interactions/run_worker.py | 33 ++++++++ .../interactions/run_workflow.py | 25 ++++++ google_genai_plugin/interactions/workflow.py | 28 +++++++ google_genai_plugin/mcp/README.md | 37 +++++++++ google_genai_plugin/mcp/__init__.py | 0 google_genai_plugin/mcp/echo_mcp_server.py | 15 ++++ google_genai_plugin/mcp/run_worker.py | 56 +++++++++++++ google_genai_plugin/mcp/run_workflow.py | 25 ++++++ google_genai_plugin/mcp/workflow.py | 37 +++++++++ google_genai_plugin/streaming/README.md | 35 ++++++++ google_genai_plugin/streaming/__init__.py | 0 google_genai_plugin/streaming/run_worker.py | 33 ++++++++ google_genai_plugin/streaming/run_workflow.py | 47 +++++++++++ google_genai_plugin/streaming/workflow.py | 37 +++++++++ .../structured_output/README.md | 32 ++++++++ .../structured_output/__init__.py | 0 .../structured_output/run_worker.py | 33 ++++++++ .../structured_output/run_workflow.py | 29 +++++++ .../structured_output/workflow.py | 36 ++++++++ google_genai_plugin/tools/README.md | 39 +++++++++ google_genai_plugin/tools/__init__.py | 0 google_genai_plugin/tools/run_worker.py | 34 ++++++++ google_genai_plugin/tools/run_workflow.py | 25 ++++++ google_genai_plugin/tools/workflow.py | 53 ++++++++++++ google_genai_plugin/vertex_ai/README.md | 47 +++++++++++ google_genai_plugin/vertex_ai/__init__.py | 0 google_genai_plugin/vertex_ai/run_worker.py | 42 ++++++++++ google_genai_plugin/vertex_ai/run_workflow.py | 28 +++++++ google_genai_plugin/vertex_ai/workflow.py | 27 ++++++ pyproject.toml | 5 ++ tests/google_genai_plugin/__init__.py | 0 tests/google_genai_plugin/chat_test.py | 43 ++++++++++ tests/google_genai_plugin/hello_world_test.py | 32 ++++++++ tests/google_genai_plugin/mcp_test.py | 81 ++++++++++++++++++ tests/google_genai_plugin/streaming_test.py | 53 ++++++++++++ .../structured_output_test.py | 45 ++++++++++ tests/google_genai_plugin/tools_test.py | 46 +++++++++++ 64 files changed, 1755 insertions(+) create mode 100644 google_genai_plugin/README.md create mode 100644 google_genai_plugin/__init__.py create mode 100644 google_genai_plugin/agents/README.md create mode 100644 google_genai_plugin/agents/__init__.py create mode 100644 google_genai_plugin/agents/run_worker.py create mode 100644 google_genai_plugin/agents/run_workflow.py create mode 100644 google_genai_plugin/agents/workflow.py create mode 100644 google_genai_plugin/chat/README.md create mode 100644 google_genai_plugin/chat/__init__.py create mode 100644 google_genai_plugin/chat/run_worker.py create mode 100644 google_genai_plugin/chat/run_workflow.py create mode 100644 google_genai_plugin/chat/workflow.py create mode 100644 google_genai_plugin/files/README.md create mode 100644 google_genai_plugin/files/__init__.py create mode 100644 google_genai_plugin/files/run_worker.py create mode 100644 google_genai_plugin/files/run_workflow.py create mode 100644 google_genai_plugin/files/sample.txt create mode 100644 google_genai_plugin/files/workflow.py create mode 100644 google_genai_plugin/hello_world/README.md create mode 100644 google_genai_plugin/hello_world/__init__.py create mode 100644 google_genai_plugin/hello_world/run_worker.py create mode 100644 google_genai_plugin/hello_world/run_workflow.py create mode 100644 google_genai_plugin/hello_world/workflow.py create mode 100644 google_genai_plugin/interactions/README.md create mode 100644 google_genai_plugin/interactions/__init__.py create mode 100644 google_genai_plugin/interactions/run_worker.py create mode 100644 google_genai_plugin/interactions/run_workflow.py create mode 100644 google_genai_plugin/interactions/workflow.py create mode 100644 google_genai_plugin/mcp/README.md create mode 100644 google_genai_plugin/mcp/__init__.py create mode 100644 google_genai_plugin/mcp/echo_mcp_server.py create mode 100644 google_genai_plugin/mcp/run_worker.py create mode 100644 google_genai_plugin/mcp/run_workflow.py create mode 100644 google_genai_plugin/mcp/workflow.py create mode 100644 google_genai_plugin/streaming/README.md create mode 100644 google_genai_plugin/streaming/__init__.py create mode 100644 google_genai_plugin/streaming/run_worker.py create mode 100644 google_genai_plugin/streaming/run_workflow.py create mode 100644 google_genai_plugin/streaming/workflow.py create mode 100644 google_genai_plugin/structured_output/README.md create mode 100644 google_genai_plugin/structured_output/__init__.py create mode 100644 google_genai_plugin/structured_output/run_worker.py create mode 100644 google_genai_plugin/structured_output/run_workflow.py create mode 100644 google_genai_plugin/structured_output/workflow.py create mode 100644 google_genai_plugin/tools/README.md create mode 100644 google_genai_plugin/tools/__init__.py create mode 100644 google_genai_plugin/tools/run_worker.py create mode 100644 google_genai_plugin/tools/run_workflow.py create mode 100644 google_genai_plugin/tools/workflow.py create mode 100644 google_genai_plugin/vertex_ai/README.md create mode 100644 google_genai_plugin/vertex_ai/__init__.py create mode 100644 google_genai_plugin/vertex_ai/run_worker.py create mode 100644 google_genai_plugin/vertex_ai/run_workflow.py create mode 100644 google_genai_plugin/vertex_ai/workflow.py create mode 100644 tests/google_genai_plugin/__init__.py create mode 100644 tests/google_genai_plugin/chat_test.py create mode 100644 tests/google_genai_plugin/hello_world_test.py create mode 100644 tests/google_genai_plugin/mcp_test.py create mode 100644 tests/google_genai_plugin/streaming_test.py create mode 100644 tests/google_genai_plugin/structured_output_test.py create mode 100644 tests/google_genai_plugin/tools_test.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2a9f9799..9f6fdffc 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,3 +11,6 @@ /*nexus/ @temporalio/nexus /tests/nexus*/ @temporalio/nexus /tests/*nexus/ @temporalio/nexus + +/google_genai_plugin/ @temporalio/sdk @temporalio/ai-sdk +/tests/google_genai_plugin/ @temporalio/sdk @temporalio/ai-sdk diff --git a/README.md b/README.md index 4d7091bc..77d674b9 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [external_storage](external_storage) - Offload large payloads to S3-compatible object storage, plus a codec server for the Web UI and CLI. * [external_storage_redis](external_storage_redis) - Redis driver for external storage * [gevent_async](gevent_async) - Combine gevent and Temporal. +* [google_genai_plugin](google_genai_plugin) - Run the Google Gemini SDK inside durable Temporal workflows (generate_content, tools/AFC, streaming, chat, structured output, MCP, files, interactions, agents, Vertex AI). * [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow. * [langchain](langchain) - Orchestrate workflows for LangChain. * [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API). diff --git a/google_genai_plugin/README.md b/google_genai_plugin/README.md new file mode 100644 index 00000000..ce250a75 --- /dev/null +++ b/google_genai_plugin/README.md @@ -0,0 +1,82 @@ +# Google GenAI Samples + +These samples demonstrate the [Temporal Google GenAI plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/google_genai), which runs the [Google Gemini SDK](https://googleapis.github.io/python-genai/) inside Temporal Workflows. Workflows construct a `TemporalAsyncClient`, and every Gemini API call — `generate_content`, tool calls, streaming, files, interactions, agents — runs as a Temporal Activity. You get durable execution, Temporal-managed retries and timeouts, and your credentials never enter the workflow or its event history. + +## Samples + +| Sample | Description | +|--------|-------------| +| [hello_world](hello_world) | Minimal `generate_content` call. Start here. | +| [tools](tools) | Automatic function calling: an `activity_as_tool`-wrapped activity and a plain workflow-method tool on one call. | +| [streaming](streaming) | Forward `generate_content_stream` chunks to an external subscriber via `streaming_topic` + `WorkflowStream`. | +| [chat](chat) | Multi-turn conversation with `client.chats`. | +| [structured_output](structured_output) | Typed JSON output via `response_schema` and a Pydantic model. | +| [mcp](mcp) | Give Gemini an MCP server's tools via `TemporalMcpClientSession`. | +| [files](files) | Upload a file with `client.files` and reference it in a call. *(needs a live API key)* | +| [interactions](interactions) | Stateful server-side conversations via `client.interactions`. *(needs a live API key)* | +| [agents](agents) | Managed-agent CRUD via `client.agents`. *(needs a live API key)* | +| [vertex_ai](vertex_ai) | The hello-world flow against Vertex AI (`vertexai=True`). *(needs GCP credentials)* | + +## Prerequisites + +1. Install dependencies: + + ```bash + uv sync --group google-genai + ``` + + > The `google-genai` extra of `temporalio` is shipping in an upcoming release. Until then, install the SDK from the source checkout: + > + > ```bash + > uv pip install -e ../sdk-python --extra google-genai --extra pydantic + > ``` + +2. Configure credentials. Most samples use the Gemini Developer API and read an API key from the environment: + + ```bash + export GOOGLE_API_KEY=... + ``` + + The [vertex_ai](vertex_ai) sample instead uses Vertex AI with Google Cloud Application Default Credentials — see its README. You can authenticate with `gcloud auth application-default login` and set `GOOGLE_CLOUD_PROJECT` (and optionally `GOOGLE_CLOUD_LOCATION`). + +3. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server): + + ```bash + temporal server start-dev + ``` + +## Running a Sample + +Each sample has two scripts. Start the Worker first, then the Workflow starter in a separate terminal: + +```bash +# Terminal 1: start the Worker +uv run google_genai_plugin//run_worker.py + +# Terminal 2: start the Workflow +uv run google_genai_plugin//run_workflow.py +``` + +For example, to run the tools sample: + +```bash +# Terminal 1 +uv run google_genai_plugin/tools/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/tools/run_workflow.py +``` + +## Key Features Demonstrated + +- **Durable API calls** — every Gemini call runs as an activity with configurable timeouts and retries; no credentials enter workflow history. +- **Automatic function calling** — the SDK's AFC loop runs in-workflow; tools can be durable activities (`activity_as_tool`) or plain workflow methods. +- **Streaming** — forward model chunks live to external subscribers via `WorkflowStream`. +- **Structured output** — Pydantic-typed results through the plugin's Pydantic data converter. +- **MCP integration** — register MCP servers on the worker; tool calls dispatched through per-server activities. +- **Full API surface** — chat, the Files API, the Interactions API, managed agents, and Vertex AI. + +## Related + +- [Temporal Google GenAI plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/google_genai) +- [Google Gemini SDK (`google-genai`)](https://googleapis.github.io/python-genai/) diff --git a/google_genai_plugin/__init__.py b/google_genai_plugin/__init__.py new file mode 100644 index 00000000..4cd47986 --- /dev/null +++ b/google_genai_plugin/__init__.py @@ -0,0 +1 @@ +"""Temporal Google GenAI plugin samples.""" diff --git a/google_genai_plugin/agents/README.md b/google_genai_plugin/agents/README.md new file mode 100644 index 00000000..dc7dd741 --- /dev/null +++ b/google_genai_plugin/agents/README.md @@ -0,0 +1,35 @@ +# Managed Agents + +Managed agents (`client.agents`) are server-side resources you can create, fetch, +list, and delete. This sample runs the full CRUD cycle, each operation as a +Temporal activity. + +> **Requires a live Gemini API key.** The Agents API talks to a real backend that +> the plugin's test server does not mock, so this sample has no automated test — +> run it against a real `GOOGLE_API_KEY`. + +## What This Sample Demonstrates + +- `client.agents.create(id=..., system_instruction=...)` +- `client.agents.get(id)`, `client.agents.list(page_size=...)`, `client.agents.delete(id)` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/agents/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/agents/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `AgentsWorkflow` — create, get, list, delete a managed agent | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/agents/__init__.py b/google_genai_plugin/agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/agents/run_worker.py b/google_genai_plugin/agents/run_worker.py new file mode 100644 index 00000000..51d5ccad --- /dev/null +++ b/google_genai_plugin/agents/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the agents sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.agents.workflow import AgentsWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-agents", + workflows=[AgentsWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/agents/run_workflow.py b/google_genai_plugin/agents/run_workflow.py new file mode 100644 index 00000000..a1d73e31 --- /dev/null +++ b/google_genai_plugin/agents/run_workflow.py @@ -0,0 +1,25 @@ +"""Start the agents workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.agents.workflow import AgentsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + AgentsWorkflow.run, + "samples-demo-agent", + id="google-genai-agents", + task_queue="google-genai-agents", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/agents/workflow.py b/google_genai_plugin/agents/workflow.py new file mode 100644 index 00000000..eaada308 --- /dev/null +++ b/google_genai_plugin/agents/workflow.py @@ -0,0 +1,31 @@ +"""Managed agents CRUD via client.agents. + +Managed agents are server-side resources you create, fetch, list, and delete. +Each operation runs as a Temporal activity. +""" + +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class AgentsWorkflow: + @workflow.run + async def run(self, agent_id: str) -> dict[str, Any]: + client = TemporalAsyncClient() + + created = await client.agents.create( + id=agent_id, + system_instruction="You are a helpful assistant.", + ) + fetched = await client.agents.get(agent_id) + listing = await client.agents.list(page_size=10) + await client.agents.delete(agent_id) + + return { + "created_id": created.id, + "fetched_id": fetched.id, + "listed_ids": [a.id for a in (listing.agents or [])], + } diff --git a/google_genai_plugin/chat/README.md b/google_genai_plugin/chat/README.md new file mode 100644 index 00000000..c2099573 --- /dev/null +++ b/google_genai_plugin/chat/README.md @@ -0,0 +1,31 @@ +# Chat + +A multi-turn conversation using `client.chats`. The chat session carries history +across turns, and each `send_message` call runs as a durable Temporal activity. + +## What This Sample Demonstrates + +- Creating a chat session with `client.chats.create(...)` +- Sending multiple turns with `await chat.send_message(...)` +- Conversation state persisting across durable activity calls + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/chat/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/chat/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `ChatWorkflow` — sends a list of prompts over one chat session | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints each turn's reply | diff --git a/google_genai_plugin/chat/__init__.py b/google_genai_plugin/chat/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/chat/run_worker.py b/google_genai_plugin/chat/run_worker.py new file mode 100644 index 00000000..42728c07 --- /dev/null +++ b/google_genai_plugin/chat/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the chat sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-chat", + workflows=[ChatWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/chat/run_workflow.py b/google_genai_plugin/chat/run_workflow.py new file mode 100644 index 00000000..0563552a --- /dev/null +++ b/google_genai_plugin/chat/run_workflow.py @@ -0,0 +1,29 @@ +"""Start the chat workflow with a multi-turn conversation.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + ChatWorkflow.run, + [ + "My favorite color is teal. Remember that.", + "What is my favorite color?", + ], + id="google-genai-chat", + task_queue="google-genai-chat", + ) + + for turn, reply in enumerate(result, start=1): + print(f"Turn {turn}: {reply}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/chat/workflow.py b/google_genai_plugin/chat/workflow.py new file mode 100644 index 00000000..4531b96d --- /dev/null +++ b/google_genai_plugin/chat/workflow.py @@ -0,0 +1,22 @@ +"""Multi-turn chat using client.chats. + +A chat session keeps conversation history across turns. Each ``send_message`` +call runs as a durable Temporal activity, and the SDK threads prior turns into +each request automatically. +""" + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class ChatWorkflow: + @workflow.run + async def run(self, prompts: list[str]) -> list[str]: + client = TemporalAsyncClient() + chat = client.chats.create(model="gemini-2.5-flash") + replies: list[str] = [] + for prompt in prompts: + response = await chat.send_message(prompt) + replies.append(response.text or "") + return replies diff --git a/google_genai_plugin/files/README.md b/google_genai_plugin/files/README.md new file mode 100644 index 00000000..02d8c474 --- /dev/null +++ b/google_genai_plugin/files/README.md @@ -0,0 +1,38 @@ +# Files + +Upload a file with the Gemini Files API, then ask the model about it. +`client.files.upload` runs as a Temporal activity on the worker (the file is +read there), and the returned file handle is referenced in a `generate_content` +call. + +> **Requires a live Gemini API key.** The Files API talks to a real backend that +> the plugin's test server does not mock, so this sample has no automated test — +> run it against a real `GOOGLE_API_KEY`. The file path is resolved on the +> worker, so `sample.txt` must be reachable by the worker process. + +## What This Sample Demonstrates + +- `client.files.upload(file=..., config=UploadFileConfig(...))` as a durable activity +- Referencing the uploaded file handle in `generate_content` `contents` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/files/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/files/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `sample.txt` | The document uploaded and summarized | +| `workflow.py` | `FilesWorkflow` — uploads a file, then summarizes it | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow with the sample file path | diff --git a/google_genai_plugin/files/__init__.py b/google_genai_plugin/files/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/files/run_worker.py b/google_genai_plugin/files/run_worker.py new file mode 100644 index 00000000..9ac089ad --- /dev/null +++ b/google_genai_plugin/files/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the files sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.files.workflow import FilesWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-files", + workflows=[FilesWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/files/run_workflow.py b/google_genai_plugin/files/run_workflow.py new file mode 100644 index 00000000..8a8ef689 --- /dev/null +++ b/google_genai_plugin/files/run_workflow.py @@ -0,0 +1,32 @@ +"""Start the files workflow. + +The file is read on the worker, so ``sample.txt`` must be on a path the worker +process can access (here it ships alongside this sample). +""" + +import asyncio +import os +from pathlib import Path + +from temporalio.client import Client + +from google_genai_plugin.files.workflow import FilesWorkflow + +SAMPLE_FILE = str(Path(__file__).parent / "sample.txt") + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + FilesWorkflow.run, + args=[SAMPLE_FILE, "Summarize this document in one sentence."], + id="google-genai-files", + task_queue="google-genai-files", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/files/sample.txt b/google_genai_plugin/files/sample.txt new file mode 100644 index 00000000..d192614a --- /dev/null +++ b/google_genai_plugin/files/sample.txt @@ -0,0 +1,6 @@ +Temporal is a durable execution platform. Workflows written with the Temporal +SDK run as ordinary code, but their state is persisted by the Temporal service, +so they survive process crashes, machine restarts, and deploys. Activities +encapsulate side effects like network calls; Temporal handles their retries and +timeouts. The Google GenAI plugin builds on this by running every Gemini API +call as a durable activity. diff --git a/google_genai_plugin/files/workflow.py b/google_genai_plugin/files/workflow.py new file mode 100644 index 00000000..86c07884 --- /dev/null +++ b/google_genai_plugin/files/workflow.py @@ -0,0 +1,29 @@ +"""Upload a file with the Files API, then ask Gemini about it. + +``client.files.upload`` runs as an activity on the worker — the file is read +there, not in the workflow — and the returned file handle is then referenced in +a ``generate_content`` call. +""" + +from typing import cast + +from google.genai import types +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class FilesWorkflow: + @workflow.run + async def run(self, file_path: str, prompt: str) -> str: + client = TemporalAsyncClient() + uploaded = await client.files.upload( + file=file_path, + config=types.UploadFileConfig(mime_type="text/plain"), + ) + contents = cast(types.ContentListUnion, [prompt, uploaded]) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=contents, + ) + return response.text or "" diff --git a/google_genai_plugin/hello_world/README.md b/google_genai_plugin/hello_world/README.md new file mode 100644 index 00000000..5c29cd55 --- /dev/null +++ b/google_genai_plugin/hello_world/README.md @@ -0,0 +1,33 @@ +# Hello World + +The simplest Google GenAI + Temporal sample: one `generate_content` call. The +call runs as a Temporal activity, so it gets durable retries, timeouts, and +crash recovery, and the Gemini credentials never enter the workflow. + +## What This Sample Demonstrates + +- Wiring `GoogleGenAIPlugin` onto the worker with a real `genai.Client` +- Constructing a `TemporalAsyncClient` inside a `@workflow.defn` +- Calling `client.models.generate_content(...)` durably + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server (`temporal server start-dev`). See the +[suite README](../README.md) for details. + +```bash +# Terminal 1 +uv run google_genai_plugin/hello_world/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/hello_world/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `HelloWorldWorkflow` with a single `generate_content` call | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/hello_world/__init__.py b/google_genai_plugin/hello_world/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/hello_world/run_worker.py b/google_genai_plugin/hello_world/run_worker.py new file mode 100644 index 00000000..1a8678e0 --- /dev/null +++ b/google_genai_plugin/hello_world/run_worker.py @@ -0,0 +1,34 @@ +"""Worker for the hello world sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + # The real genai.Client (with credentials) lives only on the worker. + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-hello-world", + workflows=[HelloWorldWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/hello_world/run_workflow.py b/google_genai_plugin/hello_world/run_workflow.py new file mode 100644 index 00000000..8a1ece99 --- /dev/null +++ b/google_genai_plugin/hello_world/run_workflow.py @@ -0,0 +1,25 @@ +"""Start the hello world workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Write a haiku about durable execution.", + id="google-genai-hello-world", + task_queue="google-genai-hello-world", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/hello_world/workflow.py b/google_genai_plugin/hello_world/workflow.py new file mode 100644 index 00000000..4f31d0a7 --- /dev/null +++ b/google_genai_plugin/hello_world/workflow.py @@ -0,0 +1,21 @@ +"""Minimal Temporal + Google GenAI workflow: one prompt, one response. + +Every Gemini API call made through ``TemporalAsyncClient`` runs as a durable +Temporal activity, so it gets retries, timeouts, and crash recovery for free — +and no credentials ever enter the workflow. +""" + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class HelloWorldWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + ) + return response.text or "" diff --git a/google_genai_plugin/interactions/README.md b/google_genai_plugin/interactions/README.md new file mode 100644 index 00000000..f3116731 --- /dev/null +++ b/google_genai_plugin/interactions/README.md @@ -0,0 +1,40 @@ +# Interactions + +The Interactions API (`client.interactions`) is a server-managed, stateful +conversation API: state lives on Google's backend, addressed by an interaction +id. This sample creates an interaction, fetches it, and deletes it — each +operation running as a Temporal activity. + +> **Requires a live Gemini API key.** The Interactions API talks to a real +> backend that the plugin's test server does not mock, so this sample has no +> automated test — run it against a real `GOOGLE_API_KEY`. + +Note: unlike `client.models`, the Interactions API has no automatic function +calling. To use tools, declare them as `{"type": "function", ...}` dicts and +drive the tool loop yourself. + +## What This Sample Demonstrates + +- `client.interactions.create(model=..., input=...)` as a durable activity +- `client.interactions.get(id)` and `client.interactions.delete(id)` + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/interactions/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/interactions/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `InteractionsWorkflow` — create, get, delete an interaction | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/interactions/__init__.py b/google_genai_plugin/interactions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/interactions/run_worker.py b/google_genai_plugin/interactions/run_worker.py new file mode 100644 index 00000000..3249056f --- /dev/null +++ b/google_genai_plugin/interactions/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the interactions sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.interactions.workflow import InteractionsWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-interactions", + workflows=[InteractionsWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/interactions/run_workflow.py b/google_genai_plugin/interactions/run_workflow.py new file mode 100644 index 00000000..c02bd82a --- /dev/null +++ b/google_genai_plugin/interactions/run_workflow.py @@ -0,0 +1,25 @@ +"""Start the interactions workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.interactions.workflow import InteractionsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + InteractionsWorkflow.run, + "What is durable execution?", + id="google-genai-interactions", + task_queue="google-genai-interactions", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/interactions/workflow.py b/google_genai_plugin/interactions/workflow.py new file mode 100644 index 00000000..2767ea60 --- /dev/null +++ b/google_genai_plugin/interactions/workflow.py @@ -0,0 +1,28 @@ +"""Stateful server-side conversations via the Interactions API. + +``client.interactions`` is a server-managed API: the conversation state lives on +Google's backend, addressed by an interaction id. Each operation — create, get, +delete — runs as a Temporal activity. Unlike ``client.models``, this API has no +automatic function calling. +""" + +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class InteractionsWorkflow: + @workflow.run + async def run(self, prompt: str) -> dict[str, Any]: + client = TemporalAsyncClient() + + interaction = await client.interactions.create( + model="gemini-2.5-flash", + input=prompt, + ) + fetched = await client.interactions.get(interaction.id) + await client.interactions.delete(interaction.id) + + return {"id": interaction.id, "status": str(fetched.status)} diff --git a/google_genai_plugin/mcp/README.md b/google_genai_plugin/mcp/README.md new file mode 100644 index 00000000..0806e93e --- /dev/null +++ b/google_genai_plugin/mcp/README.md @@ -0,0 +1,37 @@ +# MCP + +Give Gemini access to an [MCP](https://modelcontextprotocol.io/) server's tools. +The worker launches the `echo_mcp_server.py` stdio server and registers it with +the plugin under the name `echo`. Inside the workflow, a +`TemporalMcpClientSession("echo")` is passed as a tool — Gemini's AFC loop +discovers and calls its tools, and `list_tools` / `call_tool` run as Temporal +activities against a pooled worker-side connection. + +## What This Sample Demonstrates + +- Registering an MCP server with `GoogleGenAIPlugin(mcp_servers={...})` +- Passing `TemporalMcpClientSession(name)` as a `generate_content` tool +- `cache_tools=True` to discover the tool list once and reuse it (replay-safe) +- MCP tool calls dispatched through per-server Temporal activities + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/mcp/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/mcp/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `echo_mcp_server.py` | A minimal `FastMCP` stdio server exposing an `echo` tool | +| `workflow.py` | `McpWorkflow` — passes `TemporalMcpClientSession("echo")` as a tool | +| `run_worker.py` | Registers the `echo` MCP server with the plugin, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/mcp/__init__.py b/google_genai_plugin/mcp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/mcp/echo_mcp_server.py b/google_genai_plugin/mcp/echo_mcp_server.py new file mode 100644 index 00000000..d75f82cd --- /dev/null +++ b/google_genai_plugin/mcp/echo_mcp_server.py @@ -0,0 +1,15 @@ +"""A minimal stdio MCP server used by the MCP sample.""" + +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("echo-server") + + +@mcp.tool() +def echo(message: str) -> str: + """Return the input message unchanged.""" + return message + + +if __name__ == "__main__": + mcp.run() diff --git a/google_genai_plugin/mcp/run_worker.py b/google_genai_plugin/mcp/run_worker.py new file mode 100644 index 00000000..e2d86de5 --- /dev/null +++ b/google_genai_plugin/mcp/run_worker.py @@ -0,0 +1,56 @@ +"""Worker for the MCP sample. + +Registers an ``echo`` MCP server (the ``echo_mcp_server.py`` stdio script) with +the plugin. The plugin opens a pooled MCP connection on the worker and runs +``list_tools`` / ``call_tool`` as activities. +""" + +import asyncio +import os +import sys +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path + +from google import genai +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.mcp.workflow import McpWorkflow + +ECHO_SERVER = str(Path(__file__).parent / "echo_mcp_server.py") + + +@asynccontextmanager +async def echo_session() -> AsyncIterator[ClientSession]: + """Yield a connected, initialized session to the stdio echo MCP server.""" + params = StdioServerParameters(command=sys.executable, args=[ECHO_SERVER]) + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + yield session + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client, mcp_servers={"echo": echo_session}) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-mcp", + workflows=[McpWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/mcp/run_workflow.py b/google_genai_plugin/mcp/run_workflow.py new file mode 100644 index 00000000..b0418d55 --- /dev/null +++ b/google_genai_plugin/mcp/run_workflow.py @@ -0,0 +1,25 @@ +"""Start the MCP workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.mcp.workflow import McpWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + McpWorkflow.run, + "Use the echo tool to echo back the phrase: durable execution.", + id="google-genai-mcp", + task_queue="google-genai-mcp", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/mcp/workflow.py b/google_genai_plugin/mcp/workflow.py new file mode 100644 index 00000000..466c2207 --- /dev/null +++ b/google_genai_plugin/mcp/workflow.py @@ -0,0 +1,37 @@ +"""Use an MCP server's tools from a Gemini call via TemporalMcpClientSession. + +The worker registers an ``echo`` MCP server with the plugin. Inside the +workflow, ``TemporalMcpClientSession("echo")`` is passed as a tool; Gemini's AFC +loop discovers and calls the MCP tools, with ``list_tools`` / ``call_tool`` +running as Temporal activities against a pooled worker-side connection. +""" + +from datetime import timedelta + +from google.genai import types +from temporalio import workflow +from temporalio.contrib.google_genai import ( + TemporalAsyncClient, + TemporalMcpClientSession, +) +from temporalio.workflow import ActivityConfig + + +@workflow.defn +class McpWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + session = TemporalMcpClientSession( + "echo", + cache_tools=True, + activity_config=ActivityConfig( + start_to_close_timeout=timedelta(seconds=30), + ), + ) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig(tools=[session]), + ) + return response.text or "" diff --git a/google_genai_plugin/streaming/README.md b/google_genai_plugin/streaming/README.md new file mode 100644 index 00000000..2e25a1b7 --- /dev/null +++ b/google_genai_plugin/streaming/README.md @@ -0,0 +1,35 @@ +# Streaming + +Forward Gemini model output to an external subscriber in real time. +`TemporalAsyncClient(streaming_topic="gemini")` publishes each +`generate_content_stream` chunk onto a workflow-hosted `WorkflowStream` as it +arrives. A subscriber connects with `WorkflowStreamClient` and reads the topic +live while the workflow runs durably. + +## What This Sample Demonstrates + +- `TemporalAsyncClient(streaming_topic=...)` publishing chunks to a topic +- Hosting a `WorkflowStream` in `@workflow.init` (required for streaming) +- Consuming the stream externally via `WorkflowStreamClient.subscribe(...)` +- Holding the workflow open on a signal so the subscriber can drain the stream + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/streaming/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/streaming/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `StreamingWorkflow` — streams chunks to the `gemini` topic | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Starts the workflow and consumes the stream live | diff --git a/google_genai_plugin/streaming/__init__.py b/google_genai_plugin/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/streaming/run_worker.py b/google_genai_plugin/streaming/run_worker.py new file mode 100644 index 00000000..0f7579b6 --- /dev/null +++ b/google_genai_plugin/streaming/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the streaming sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-streaming", + workflows=[StreamingWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/streaming/run_workflow.py b/google_genai_plugin/streaming/run_workflow.py new file mode 100644 index 00000000..09957835 --- /dev/null +++ b/google_genai_plugin/streaming/run_workflow.py @@ -0,0 +1,47 @@ +"""Start the streaming workflow and consume model chunks live.""" + +import asyncio +import os +from datetime import timedelta + +from google.genai import types +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + workflow_id = "google-genai-streaming" + + handle = await client.start_workflow( + StreamingWorkflow.run, + "Count from 1 to 5, one number per sentence.", + id=workflow_id, + task_queue="google-genai-streaming", + ) + + # Subscribe to the "gemini" topic and print chunks as the model produces them. + stream = WorkflowStreamClient.create(client, workflow_id) + async for item in stream.subscribe( + ["gemini"], + from_offset=0, + result_type=types.GenerateContentResponse, + poll_cooldown=timedelta(milliseconds=50), + ): + chunk: types.GenerateContentResponse = item.data + if chunk.text: + print(chunk.text, end="", flush=True) + if chunk.candidates and chunk.candidates[0].finish_reason: + print() + break + + # Release the workflow now that we've consumed the stream. + await handle.signal(StreamingWorkflow.finish) + result = await handle.result() + print(f"Final result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/streaming/workflow.py b/google_genai_plugin/streaming/workflow.py new file mode 100644 index 00000000..2ea7c22e --- /dev/null +++ b/google_genai_plugin/streaming/workflow.py @@ -0,0 +1,37 @@ +"""Stream Gemini output to an external subscriber via WorkflowStream. + +``TemporalAsyncClient(streaming_topic="gemini")`` publishes each +``generate_content_stream`` chunk onto a workflow-hosted ``WorkflowStream`` as +it arrives, so external consumers can watch the model produce text in real time +while the workflow runs durably. The workflow holds itself open on a ``finish`` +signal so a subscriber can reliably read the stream before the run completes. +""" + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient +from temporalio.contrib.workflow_streams import WorkflowStream + + +@workflow.defn +class StreamingWorkflow: + @workflow.init + def __init__(self, prompt: str) -> None: + # Hosting a WorkflowStream is required when streaming_topic is set. + self.stream = WorkflowStream() + self._done = False + + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient(streaming_topic="gemini") + chunks: list[str] = [] + async for chunk in await client.models.generate_content_stream( + model="gemini-2.5-flash", + contents=prompt, + ): + chunks.append(chunk.text or "") + await workflow.wait_condition(lambda: self._done) + return "".join(chunks) + + @workflow.signal + def finish(self) -> None: + self._done = True diff --git a/google_genai_plugin/structured_output/README.md b/google_genai_plugin/structured_output/README.md new file mode 100644 index 00000000..db520e68 --- /dev/null +++ b/google_genai_plugin/structured_output/README.md @@ -0,0 +1,32 @@ +# Structured Output + +Get typed JSON back from Gemini by passing a Pydantic model as `response_schema`. +The plugin installs a `PydanticPayloadConverter`, so the model serializes cleanly +through Temporal payloads — the workflow returns a real `Recipe` instance. + +## What This Sample Demonstrates + +- `GenerateContentConfig(response_mime_type="application/json", response_schema=Recipe)` +- Reading the parsed model from `response.parsed` +- Returning a Pydantic model as a workflow result via the plugin's Pydantic converter + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/structured_output/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/structured_output/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | The `Recipe` model and `StructuredOutputWorkflow` | +| `run_worker.py` | Registers `GoogleGenAIPlugin`, starts the worker | +| `run_workflow.py` | Executes the workflow and prints the typed recipe | diff --git a/google_genai_plugin/structured_output/__init__.py b/google_genai_plugin/structured_output/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/structured_output/run_worker.py b/google_genai_plugin/structured_output/run_worker.py new file mode 100644 index 00000000..364dda20 --- /dev/null +++ b/google_genai_plugin/structured_output/run_worker.py @@ -0,0 +1,33 @@ +"""Worker for the structured output sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.structured_output.workflow import StructuredOutputWorkflow + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-structured-output", + workflows=[StructuredOutputWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/structured_output/run_workflow.py b/google_genai_plugin/structured_output/run_workflow.py new file mode 100644 index 00000000..ef9b9df0 --- /dev/null +++ b/google_genai_plugin/structured_output/run_workflow.py @@ -0,0 +1,29 @@ +"""Start the structured output workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.structured_output.workflow import StructuredOutputWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + recipe = await client.execute_workflow( + StructuredOutputWorkflow.run, + "Give me a simple recipe for avocado toast.", + id="google-genai-structured-output", + task_queue="google-genai-structured-output", + ) + + print(f"Recipe: {recipe.name}") + print(f"Ingredients: {', '.join(recipe.ingredients)}") + print("Steps:") + for i, step in enumerate(recipe.steps, start=1): + print(f" {i}. {step}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/structured_output/workflow.py b/google_genai_plugin/structured_output/workflow.py new file mode 100644 index 00000000..9b0c3330 --- /dev/null +++ b/google_genai_plugin/structured_output/workflow.py @@ -0,0 +1,36 @@ +"""Typed JSON output via response_schema + a Pydantic model. + +The plugin installs a ``PydanticPayloadConverter``, so a Pydantic model flows +through Temporal payloads cleanly. Passing the model as ``response_schema`` +makes Gemini return matching JSON, which the SDK parses into the model on +``response.parsed``. +""" + +from google.genai import types +from pydantic import BaseModel +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +class Recipe(BaseModel): + name: str + ingredients: list[str] + steps: list[str] + + +@workflow.defn +class StructuredOutputWorkflow: + @workflow.run + async def run(self, prompt: str) -> Recipe: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=Recipe, + ), + ) + recipe = response.parsed + assert isinstance(recipe, Recipe) + return recipe diff --git a/google_genai_plugin/tools/README.md b/google_genai_plugin/tools/README.md new file mode 100644 index 00000000..d0c10921 --- /dev/null +++ b/google_genai_plugin/tools/README.md @@ -0,0 +1,39 @@ +# Tools + +Two tool surfaces wired into one Gemini `generate_content` call, both driven by +the SDK's automatic function-calling (AFC) loop: + +| Pattern | When to use it | +|---------|----------------| +| `@activity.defn` wrapped via `activity_as_tool` | Anything with I/O or non-determinism — runs as a durable activity. | +| Plain workflow method | Pure, deterministic logic — runs in-workflow with no activity dispatch. | + +A single prompt exercises both: the model calls `get_weather` (an activity), +then `recommend_activity` (a workflow method). + +## What This Sample Demonstrates + +- `activity_as_tool` carrying per-tool `ActivityConfig` (timeouts, retries) +- Passing a plain workflow method as a tool alongside an activity-backed one +- The AFC loop running inside the workflow, dispatching tool calls durably + +## Running the Sample + +Prerequisites: install dependencies, set `GOOGLE_API_KEY`, and start a Temporal +dev server. See the [suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/tools/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/tools/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `get_weather` activity, `recommend_activity` method, and `ToolsWorkflow` | +| `run_worker.py` | Registers `GoogleGenAIPlugin` + the `get_weather` activity | +| `run_workflow.py` | Executes the workflow and prints the result | diff --git a/google_genai_plugin/tools/__init__.py b/google_genai_plugin/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/tools/run_worker.py b/google_genai_plugin/tools/run_worker.py new file mode 100644 index 00000000..2dfaad87 --- /dev/null +++ b/google_genai_plugin/tools/run_worker.py @@ -0,0 +1,34 @@ +"""Worker for the tools sample.""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.tools.workflow import ToolsWorkflow, get_weather + + +async def main() -> None: + genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-tools", + workflows=[ToolsWorkflow], + activities=[get_weather], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/tools/run_workflow.py b/google_genai_plugin/tools/run_workflow.py new file mode 100644 index 00000000..0c17e0d5 --- /dev/null +++ b/google_genai_plugin/tools/run_workflow.py @@ -0,0 +1,25 @@ +"""Start the tools workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.tools.workflow import ToolsWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + ToolsWorkflow.run, + "What's the weather in Tokyo, and what should I do there?", + id="google-genai-tools", + task_queue="google-genai-tools", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/tools/workflow.py b/google_genai_plugin/tools/workflow.py new file mode 100644 index 00000000..0589362b --- /dev/null +++ b/google_genai_plugin/tools/workflow.py @@ -0,0 +1,53 @@ +"""Two tool surfaces on one Gemini call, both driven by automatic function calling. + +1. ``@activity.defn get_weather`` wrapped via ``activity_as_tool`` — runs as a + durable Temporal activity. Use this for I/O or non-deterministic work. +2. ``recommend_activity`` — a plain workflow method passed directly as a tool. + It runs deterministically in-workflow with no activity dispatch. + +Gemini's automatic function-calling (AFC) loop runs inside the workflow and +invokes both as needed. +""" + +from datetime import timedelta + +from google.genai import types +from temporalio import activity, workflow +from temporalio.contrib.google_genai import TemporalAsyncClient, activity_as_tool +from temporalio.workflow import ActivityConfig + + +@activity.defn +async def get_weather(city: str) -> str: + """Look up the current weather for a city.""" + # Stub — replace with a real HTTP call in production. + return f"It's 72F and sunny in {city}." + + +@workflow.defn +class ToolsWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + client = TemporalAsyncClient() + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig( + tools=[ + activity_as_tool( + get_weather, + activity_config=ActivityConfig( + start_to_close_timeout=timedelta(seconds=30), + ), + ), + self.recommend_activity, + ], + ), + ) + return response.text or "" + + async def recommend_activity(self, weather: str) -> str: + """Recommend something to do given a weather description.""" + if "sunny" in weather.lower(): + return "Go for a hike." + return "Visit a museum." diff --git a/google_genai_plugin/vertex_ai/README.md b/google_genai_plugin/vertex_ai/README.md new file mode 100644 index 00000000..b0ead608 --- /dev/null +++ b/google_genai_plugin/vertex_ai/README.md @@ -0,0 +1,47 @@ +# Vertex AI + +The same hello-world flow as [`hello_world`](../hello_world), but pointed at +**Vertex AI** instead of the Gemini Developer API. The only difference is +configuration: both the worker's `genai.Client` and the workflow's +`TemporalAsyncClient` set `vertexai=True` with a Google Cloud project and +location. The `vertexai` setting must match on both sides. + +> **Requires Google Cloud credentials**, not a Gemini API key. Authenticate with +> Application Default Credentials (`gcloud auth application-default login`) or a +> service-account key (`GOOGLE_APPLICATION_CREDENTIALS`). This sample has no +> automated test. + +## Configuration + +| Variable | Description | +|----------|-------------| +| `GOOGLE_CLOUD_PROJECT` | Your Google Cloud project ID (required) | +| `GOOGLE_CLOUD_LOCATION` | Region, e.g. `us-central1` (defaults to `us-central1`) | + +## What This Sample Demonstrates + +- `genai.Client(vertexai=True, project=..., location=...)` on the worker +- `TemporalAsyncClient(vertexai=True, project=..., location=...)` in the workflow +- Passing project/location as workflow arguments to keep the workflow deterministic + +## Running the Sample + +Prerequisites: install dependencies, configure GCP credentials, set +`GOOGLE_CLOUD_PROJECT`, and start a Temporal dev server. See the +[suite README](../README.md). + +```bash +# Terminal 1 +uv run google_genai_plugin/vertex_ai/run_worker.py + +# Terminal 2 +uv run google_genai_plugin/vertex_ai/run_workflow.py +``` + +## Files + +| File | Description | +|------|-------------| +| `workflow.py` | `VertexAIWorkflow` — generate_content via Vertex AI | +| `run_worker.py` | Registers a Vertex-configured `GoogleGenAIPlugin` | +| `run_workflow.py` | Reads project/location from env and executes the workflow | diff --git a/google_genai_plugin/vertex_ai/__init__.py b/google_genai_plugin/vertex_ai/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_genai_plugin/vertex_ai/run_worker.py b/google_genai_plugin/vertex_ai/run_worker.py new file mode 100644 index 00000000..625275fd --- /dev/null +++ b/google_genai_plugin/vertex_ai/run_worker.py @@ -0,0 +1,42 @@ +"""Worker for the Vertex AI sample. + +Uses ``genai.Client(vertexai=True, ...)`` with Application Default Credentials +(no API key). Run ``gcloud auth application-default login`` first, or set +``GOOGLE_APPLICATION_CREDENTIALS`` to a service-account key file. +""" + +import asyncio +import os + +from google import genai +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.worker import Worker + +from google_genai_plugin.vertex_ai.workflow import VertexAIWorkflow + + +async def main() -> None: + genai_client = genai.Client( + vertexai=True, + project=os.environ["GOOGLE_CLOUD_PROJECT"], + location=os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1"), + ) + plugin = GoogleGenAIPlugin(genai_client) + + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="google-genai-vertex-ai", + workflows=[VertexAIWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/vertex_ai/run_workflow.py b/google_genai_plugin/vertex_ai/run_workflow.py new file mode 100644 index 00000000..e72dadfc --- /dev/null +++ b/google_genai_plugin/vertex_ai/run_workflow.py @@ -0,0 +1,28 @@ +"""Start the Vertex AI workflow.""" + +import asyncio +import os + +from temporalio.client import Client + +from google_genai_plugin.vertex_ai.workflow import VertexAIWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + project = os.environ["GOOGLE_CLOUD_PROJECT"] + location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1") + + result = await client.execute_workflow( + VertexAIWorkflow.run, + args=["Write a haiku about durable execution.", project, location], + id="google-genai-vertex-ai", + task_queue="google-genai-vertex-ai", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_genai_plugin/vertex_ai/workflow.py b/google_genai_plugin/vertex_ai/workflow.py new file mode 100644 index 00000000..2a11f5fa --- /dev/null +++ b/google_genai_plugin/vertex_ai/workflow.py @@ -0,0 +1,27 @@ +"""Hello world against Vertex AI instead of the Gemini Developer API. + +The only difference from the basic sample is configuration: both the workflow's +``TemporalAsyncClient`` and the worker's ``genai.Client`` use ``vertexai=True`` +with a Google Cloud project and location. The project and location are passed in +as workflow arguments (read from the environment by the starter) to keep the +workflow deterministic. +""" + +from temporalio import workflow +from temporalio.contrib.google_genai import TemporalAsyncClient + + +@workflow.defn +class VertexAIWorkflow: + @workflow.run + async def run(self, prompt: str, project: str, location: str) -> str: + client = TemporalAsyncClient( + vertexai=True, + project=project, + location=location, + ) + response = await client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + ) + return response.text or "" diff --git a/pyproject.toml b/pyproject.toml index a148f865..d175a4a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,10 @@ external-storage = [ ] external-storage-redis = ["redis>=5.0.0,<8"] gevent = ["gevent>=25.4.2 ; python_version >= '3.8'"] +google-genai = [ + "mcp>=1.0.0", + "temporalio[google-genai,pydantic]>=1.28.0", +] langsmith-tracing = [ "openai>=1.4.0", "langsmith>=0.7.0", @@ -106,6 +110,7 @@ packages = [ "external_storage", "external_storage_redis", "gevent_async", + "google_genai_plugin", "hello", "langgraph_plugin", "langsmith_tracing", diff --git a/tests/google_genai_plugin/__init__.py b/tests/google_genai_plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/google_genai_plugin/chat_test.py b/tests/google_genai_plugin/chat_test.py new file mode 100644 index 00000000..d1e61654 --- /dev/null +++ b/tests/google_genai_plugin/chat_test.py @@ -0,0 +1,43 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.chat.workflow import ChatWorkflow + + +async def test_chat(client: Client) -> None: + server = GeminiTestServer( + [ + text_response("Got it — your favorite color is teal."), + text_response("Your favorite color is teal."), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-chat-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ChatWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + ChatWorkflow.run, + [ + "My favorite color is teal. Remember that.", + "What is my favorite color?", + ], + id=f"google-genai-chat-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == [ + "Got it — your favorite color is teal.", + "Your favorite color is teal.", + ] + assert len(server.requests) == 2 diff --git a/tests/google_genai_plugin/hello_world_test.py b/tests/google_genai_plugin/hello_world_test.py new file mode 100644 index 00000000..c7ffda1f --- /dev/null +++ b/tests/google_genai_plugin/hello_world_test.py @@ -0,0 +1,32 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def test_hello_world(client: Client) -> None: + server = GeminiTestServer([text_response("A haiku, for you.")]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-hello-world-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[HelloWorldWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Write a haiku.", + id=f"google-genai-hello-world-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "A haiku, for you." + assert len(server.requests) == 1 diff --git a/tests/google_genai_plugin/mcp_test.py b/tests/google_genai_plugin/mcp_test.py new file mode 100644 index 00000000..87ff50ac --- /dev/null +++ b/tests/google_genai_plugin/mcp_test.py @@ -0,0 +1,81 @@ +import sys +import uuid +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +from google import genai +from google.genai.types import HttpResponse as SdkHttpResponse +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from temporalio.client import Client +from temporalio.contrib.google_genai import GoogleGenAIPlugin +from temporalio.contrib.google_genai.testing import ( + function_call_response, + text_response, +) +from temporalio.worker import Worker + +from google_genai_plugin.mcp.workflow import McpWorkflow + +ECHO_SERVER = str( + Path(__file__).parents[2] / "google_genai_plugin" / "mcp" / "echo_mcp_server.py" +) + + +@asynccontextmanager +async def _echo_session() -> AsyncIterator[ClientSession]: + params = StdioServerParameters(command=sys.executable, args=[ECHO_SERVER]) + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + yield session + + +def _mcp_plugin(responses: list[str]) -> GoogleGenAIPlugin: + """A plugin with scripted model HTTP plus a real echo MCP server. + + Mirrors what ``GeminiTestServer.plugin()`` does for the model HTTP layer, + but also registers an MCP server (which ``GeminiTestServer`` does not), so + the MCP ``list_tools`` / ``call_tool`` activities run for real. + """ + genai_client = genai.Client(api_key="fake-test-key") + index = {"i": 0} + + async def fake_async_request(*_args: Any, **_kwargs: Any) -> SdkHttpResponse: + body = responses[index["i"]] + index["i"] += 1 + return SdkHttpResponse(headers={"content-type": "application/json"}, body=body) + + genai_client._api_client.async_request = fake_async_request # type: ignore[assignment] + return GoogleGenAIPlugin(genai_client, mcp_servers={"echo": _echo_session}) + + +async def test_mcp(client: Client) -> None: + plugin = _mcp_plugin( + [ + function_call_response("echo", {"message": "durable execution"}), + text_response("The echo tool returned: durable execution"), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], plugin] + client = Client(**config) + + task_queue = f"google-genai-mcp-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[McpWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + McpWorkflow.run, + "Use the echo tool to echo back the phrase: durable execution.", + id=f"google-genai-mcp-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert "durable execution" in result diff --git a/tests/google_genai_plugin/streaming_test.py b/tests/google_genai_plugin/streaming_test.py new file mode 100644 index 00000000..512d4ca3 --- /dev/null +++ b/tests/google_genai_plugin/streaming_test.py @@ -0,0 +1,53 @@ +import uuid +from datetime import timedelta + +from google.genai import types +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.contrib.workflow_streams import WorkflowStreamClient +from temporalio.worker import Worker + +from google_genai_plugin.streaming.workflow import StreamingWorkflow + + +async def test_streaming_publishes_to_workflow_stream(client: Client) -> None: + server = GeminiTestServer([text_response("Hello from Gemini stream")]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-streaming-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[StreamingWorkflow], + max_cached_workflows=0, + ): + wf_id = f"google-genai-streaming-{uuid.uuid4()}" + handle = await client.start_workflow( + StreamingWorkflow.run, + "say hi", + id=wf_id, + task_queue=task_queue, + execution_timeout=timedelta(seconds=15), + ) + + # Consume the published chunk from the "gemini" topic. + stream = WorkflowStreamClient.create(client, wf_id) + received: list[types.GenerateContentResponse] = [] + async for item in stream.subscribe( + ["gemini"], + from_offset=0, + result_type=types.GenerateContentResponse, + poll_cooldown=timedelta(milliseconds=20), + ): + received.append(item.data) + break # one scripted chunk + + await handle.signal(StreamingWorkflow.finish) + result = await handle.result() + + assert result == "Hello from Gemini stream" + assert len(received) == 1 + assert received[0].text == "Hello from Gemini stream" diff --git a/tests/google_genai_plugin/structured_output_test.py b/tests/google_genai_plugin/structured_output_test.py new file mode 100644 index 00000000..ce328bdf --- /dev/null +++ b/tests/google_genai_plugin/structured_output_test.py @@ -0,0 +1,45 @@ +import json +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import GeminiTestServer, text_response +from temporalio.worker import Worker + +from google_genai_plugin.structured_output.workflow import ( + Recipe, + StructuredOutputWorkflow, +) + + +async def test_structured_output(client: Client) -> None: + recipe_json = json.dumps( + { + "name": "Avocado Toast", + "ingredients": ["bread", "avocado", "salt"], + "steps": ["Toast the bread.", "Mash the avocado on top.", "Season."], + } + ) + server = GeminiTestServer([text_response(recipe_json)]) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-structured-output-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[StructuredOutputWorkflow], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + StructuredOutputWorkflow.run, + "Give me a simple recipe for avocado toast.", + id=f"google-genai-structured-output-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert isinstance(result, Recipe) + assert result.name == "Avocado Toast" + assert result.ingredients == ["bread", "avocado", "salt"] + assert len(result.steps) == 3 diff --git a/tests/google_genai_plugin/tools_test.py b/tests/google_genai_plugin/tools_test.py new file mode 100644 index 00000000..0a6d7923 --- /dev/null +++ b/tests/google_genai_plugin/tools_test.py @@ -0,0 +1,46 @@ +import uuid + +from temporalio.client import Client +from temporalio.contrib.google_genai.testing import ( + GeminiTestServer, + function_call_response, + text_response, +) +from temporalio.worker import Worker + +from google_genai_plugin.tools.workflow import ToolsWorkflow, get_weather + + +async def test_tools(client: Client) -> None: + server = GeminiTestServer( + [ + function_call_response("get_weather", {"city": "Tokyo"}), + function_call_response( + "recommend_activity", {"weather": "It's 72F and sunny in Tokyo."} + ), + text_response("It's sunny in Tokyo — go for a hike!"), + ] + ) + + config = client.config() + config["plugins"] = [*config["plugins"], server.plugin()] + client = Client(**config) + + task_queue = f"google-genai-tools-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ToolsWorkflow], + activities=[get_weather], + max_cached_workflows=0, + ): + result = await client.execute_workflow( + ToolsWorkflow.run, + "What's the weather in Tokyo, and what should I do there?", + id=f"google-genai-tools-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "It's sunny in Tokyo — go for a hike!" + # One model turn per response: two tool calls and a final text answer. + assert len(server.requests) == 3 From a7794d0cb57caf0d775cf05475591f3452acd916 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 18 Jun 2026 12:07:54 -0700 Subject: [PATCH 2/2] Add SNIPSTART/SNIPEND annotations to google_genai samples Wrap the workflow.py, run_worker.py, and run_workflow.py bodies of each sample in @@@SNIPSTART/@@@SNIPEND markers (python-google-genai--) so the code can be embedded in docs, matching the strands_plugin convention. Co-Authored-By: Claude Opus 4.8 (1M context) --- google_genai_plugin/agents/run_worker.py | 2 ++ google_genai_plugin/agents/run_workflow.py | 2 ++ google_genai_plugin/agents/workflow.py | 4 ++++ google_genai_plugin/chat/run_worker.py | 2 ++ google_genai_plugin/chat/run_workflow.py | 2 ++ google_genai_plugin/chat/workflow.py | 4 ++++ google_genai_plugin/files/run_worker.py | 2 ++ google_genai_plugin/files/run_workflow.py | 2 ++ google_genai_plugin/files/workflow.py | 4 ++++ google_genai_plugin/hello_world/run_worker.py | 2 ++ google_genai_plugin/hello_world/run_workflow.py | 2 ++ google_genai_plugin/hello_world/workflow.py | 4 ++++ google_genai_plugin/interactions/run_worker.py | 2 ++ google_genai_plugin/interactions/run_workflow.py | 2 ++ google_genai_plugin/interactions/workflow.py | 4 ++++ google_genai_plugin/mcp/run_worker.py | 2 ++ google_genai_plugin/mcp/run_workflow.py | 2 ++ google_genai_plugin/mcp/workflow.py | 4 ++++ google_genai_plugin/streaming/run_worker.py | 2 ++ google_genai_plugin/streaming/run_workflow.py | 2 ++ google_genai_plugin/streaming/workflow.py | 4 ++++ google_genai_plugin/structured_output/run_worker.py | 2 ++ google_genai_plugin/structured_output/run_workflow.py | 2 ++ google_genai_plugin/structured_output/workflow.py | 4 ++++ google_genai_plugin/tools/run_worker.py | 2 ++ google_genai_plugin/tools/run_workflow.py | 2 ++ google_genai_plugin/tools/workflow.py | 4 ++++ google_genai_plugin/vertex_ai/run_worker.py | 2 ++ google_genai_plugin/vertex_ai/run_workflow.py | 2 ++ google_genai_plugin/vertex_ai/workflow.py | 4 ++++ 30 files changed, 80 insertions(+) diff --git a/google_genai_plugin/agents/run_worker.py b/google_genai_plugin/agents/run_worker.py index 51d5ccad..b57d18de 100644 --- a/google_genai_plugin/agents/run_worker.py +++ b/google_genai_plugin/agents/run_worker.py @@ -1,5 +1,6 @@ """Worker for the agents sample.""" +# @@@SNIPSTART python-google-genai-agents-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/agents/run_workflow.py b/google_genai_plugin/agents/run_workflow.py index a1d73e31..c0ce643b 100644 --- a/google_genai_plugin/agents/run_workflow.py +++ b/google_genai_plugin/agents/run_workflow.py @@ -1,5 +1,6 @@ """Start the agents workflow.""" +# @@@SNIPSTART python-google-genai-agents-run-workflow import asyncio import os @@ -23,3 +24,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/agents/workflow.py b/google_genai_plugin/agents/workflow.py index eaada308..0210fa95 100644 --- a/google_genai_plugin/agents/workflow.py +++ b/google_genai_plugin/agents/workflow.py @@ -4,6 +4,7 @@ Each operation runs as a Temporal activity. """ +# @@@SNIPSTART python-google-genai-agents-workflow from typing import Any from temporalio import workflow @@ -29,3 +30,6 @@ async def run(self, agent_id: str) -> dict[str, Any]: "fetched_id": fetched.id, "listed_ids": [a.id for a in (listing.agents or [])], } + + +# @@@SNIPEND diff --git a/google_genai_plugin/chat/run_worker.py b/google_genai_plugin/chat/run_worker.py index 42728c07..60bc1313 100644 --- a/google_genai_plugin/chat/run_worker.py +++ b/google_genai_plugin/chat/run_worker.py @@ -1,5 +1,6 @@ """Worker for the chat sample.""" +# @@@SNIPSTART python-google-genai-chat-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/chat/run_workflow.py b/google_genai_plugin/chat/run_workflow.py index 0563552a..90904221 100644 --- a/google_genai_plugin/chat/run_workflow.py +++ b/google_genai_plugin/chat/run_workflow.py @@ -1,5 +1,6 @@ """Start the chat workflow with a multi-turn conversation.""" +# @@@SNIPSTART python-google-genai-chat-run-workflow import asyncio import os @@ -27,3 +28,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/chat/workflow.py b/google_genai_plugin/chat/workflow.py index 4531b96d..666d4c8a 100644 --- a/google_genai_plugin/chat/workflow.py +++ b/google_genai_plugin/chat/workflow.py @@ -5,6 +5,7 @@ each request automatically. """ +# @@@SNIPSTART python-google-genai-chat-workflow from temporalio import workflow from temporalio.contrib.google_genai import TemporalAsyncClient @@ -20,3 +21,6 @@ async def run(self, prompts: list[str]) -> list[str]: response = await chat.send_message(prompt) replies.append(response.text or "") return replies + + +# @@@SNIPEND diff --git a/google_genai_plugin/files/run_worker.py b/google_genai_plugin/files/run_worker.py index 9ac089ad..d425597f 100644 --- a/google_genai_plugin/files/run_worker.py +++ b/google_genai_plugin/files/run_worker.py @@ -1,5 +1,6 @@ """Worker for the files sample.""" +# @@@SNIPSTART python-google-genai-files-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/files/run_workflow.py b/google_genai_plugin/files/run_workflow.py index 8a8ef689..3af2f309 100644 --- a/google_genai_plugin/files/run_workflow.py +++ b/google_genai_plugin/files/run_workflow.py @@ -4,6 +4,7 @@ process can access (here it ships alongside this sample). """ +# @@@SNIPSTART python-google-genai-files-run-workflow import asyncio import os from pathlib import Path @@ -30,3 +31,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/files/workflow.py b/google_genai_plugin/files/workflow.py index 86c07884..098a9111 100644 --- a/google_genai_plugin/files/workflow.py +++ b/google_genai_plugin/files/workflow.py @@ -5,6 +5,7 @@ a ``generate_content`` call. """ +# @@@SNIPSTART python-google-genai-files-workflow from typing import cast from google.genai import types @@ -27,3 +28,6 @@ async def run(self, file_path: str, prompt: str) -> str: contents=contents, ) return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/run_worker.py b/google_genai_plugin/hello_world/run_worker.py index 1a8678e0..3c925d3d 100644 --- a/google_genai_plugin/hello_world/run_worker.py +++ b/google_genai_plugin/hello_world/run_worker.py @@ -1,5 +1,6 @@ """Worker for the hello world sample.""" +# @@@SNIPSTART python-google-genai-hello-world-worker import asyncio import os @@ -32,3 +33,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/run_workflow.py b/google_genai_plugin/hello_world/run_workflow.py index 8a1ece99..457a2699 100644 --- a/google_genai_plugin/hello_world/run_workflow.py +++ b/google_genai_plugin/hello_world/run_workflow.py @@ -1,5 +1,6 @@ """Start the hello world workflow.""" +# @@@SNIPSTART python-google-genai-hello-world-run-workflow import asyncio import os @@ -23,3 +24,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/hello_world/workflow.py b/google_genai_plugin/hello_world/workflow.py index 4f31d0a7..d962e91a 100644 --- a/google_genai_plugin/hello_world/workflow.py +++ b/google_genai_plugin/hello_world/workflow.py @@ -5,6 +5,7 @@ and no credentials ever enter the workflow. """ +# @@@SNIPSTART python-google-genai-hello-world-workflow from temporalio import workflow from temporalio.contrib.google_genai import TemporalAsyncClient @@ -19,3 +20,6 @@ async def run(self, prompt: str) -> str: contents=prompt, ) return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/run_worker.py b/google_genai_plugin/interactions/run_worker.py index 3249056f..2ce79f97 100644 --- a/google_genai_plugin/interactions/run_worker.py +++ b/google_genai_plugin/interactions/run_worker.py @@ -1,5 +1,6 @@ """Worker for the interactions sample.""" +# @@@SNIPSTART python-google-genai-interactions-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/run_workflow.py b/google_genai_plugin/interactions/run_workflow.py index c02bd82a..63cb6b2d 100644 --- a/google_genai_plugin/interactions/run_workflow.py +++ b/google_genai_plugin/interactions/run_workflow.py @@ -1,5 +1,6 @@ """Start the interactions workflow.""" +# @@@SNIPSTART python-google-genai-interactions-run-workflow import asyncio import os @@ -23,3 +24,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/interactions/workflow.py b/google_genai_plugin/interactions/workflow.py index 2767ea60..3cd8ceab 100644 --- a/google_genai_plugin/interactions/workflow.py +++ b/google_genai_plugin/interactions/workflow.py @@ -6,6 +6,7 @@ automatic function calling. """ +# @@@SNIPSTART python-google-genai-interactions-workflow from typing import Any from temporalio import workflow @@ -26,3 +27,6 @@ async def run(self, prompt: str) -> dict[str, Any]: await client.interactions.delete(interaction.id) return {"id": interaction.id, "status": str(fetched.status)} + + +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/run_worker.py b/google_genai_plugin/mcp/run_worker.py index e2d86de5..30d05bf4 100644 --- a/google_genai_plugin/mcp/run_worker.py +++ b/google_genai_plugin/mcp/run_worker.py @@ -5,6 +5,7 @@ ``list_tools`` / ``call_tool`` as activities. """ +# @@@SNIPSTART python-google-genai-mcp-worker import asyncio import os import sys @@ -54,3 +55,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/run_workflow.py b/google_genai_plugin/mcp/run_workflow.py index b0418d55..6c82158a 100644 --- a/google_genai_plugin/mcp/run_workflow.py +++ b/google_genai_plugin/mcp/run_workflow.py @@ -1,5 +1,6 @@ """Start the MCP workflow.""" +# @@@SNIPSTART python-google-genai-mcp-run-workflow import asyncio import os @@ -23,3 +24,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/mcp/workflow.py b/google_genai_plugin/mcp/workflow.py index 466c2207..7cd5e5d2 100644 --- a/google_genai_plugin/mcp/workflow.py +++ b/google_genai_plugin/mcp/workflow.py @@ -6,6 +6,7 @@ running as Temporal activities against a pooled worker-side connection. """ +# @@@SNIPSTART python-google-genai-mcp-workflow from datetime import timedelta from google.genai import types @@ -35,3 +36,6 @@ async def run(self, prompt: str) -> str: config=types.GenerateContentConfig(tools=[session]), ) return response.text or "" + + +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/run_worker.py b/google_genai_plugin/streaming/run_worker.py index 0f7579b6..5d85f86e 100644 --- a/google_genai_plugin/streaming/run_worker.py +++ b/google_genai_plugin/streaming/run_worker.py @@ -1,5 +1,6 @@ """Worker for the streaming sample.""" +# @@@SNIPSTART python-google-genai-streaming-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/run_workflow.py b/google_genai_plugin/streaming/run_workflow.py index 09957835..5e8c6704 100644 --- a/google_genai_plugin/streaming/run_workflow.py +++ b/google_genai_plugin/streaming/run_workflow.py @@ -1,5 +1,6 @@ """Start the streaming workflow and consume model chunks live.""" +# @@@SNIPSTART python-google-genai-streaming-run-workflow import asyncio import os from datetime import timedelta @@ -45,3 +46,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/streaming/workflow.py b/google_genai_plugin/streaming/workflow.py index 2ea7c22e..90c766b2 100644 --- a/google_genai_plugin/streaming/workflow.py +++ b/google_genai_plugin/streaming/workflow.py @@ -7,6 +7,7 @@ signal so a subscriber can reliably read the stream before the run completes. """ +# @@@SNIPSTART python-google-genai-streaming-workflow from temporalio import workflow from temporalio.contrib.google_genai import TemporalAsyncClient from temporalio.contrib.workflow_streams import WorkflowStream @@ -35,3 +36,6 @@ async def run(self, prompt: str) -> str: @workflow.signal def finish(self) -> None: self._done = True + + +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/run_worker.py b/google_genai_plugin/structured_output/run_worker.py index 364dda20..aa1e15fb 100644 --- a/google_genai_plugin/structured_output/run_worker.py +++ b/google_genai_plugin/structured_output/run_worker.py @@ -1,5 +1,6 @@ """Worker for the structured output sample.""" +# @@@SNIPSTART python-google-genai-structured-output-worker import asyncio import os @@ -31,3 +32,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/run_workflow.py b/google_genai_plugin/structured_output/run_workflow.py index ef9b9df0..431ff797 100644 --- a/google_genai_plugin/structured_output/run_workflow.py +++ b/google_genai_plugin/structured_output/run_workflow.py @@ -1,5 +1,6 @@ """Start the structured output workflow.""" +# @@@SNIPSTART python-google-genai-structured-output-run-workflow import asyncio import os @@ -27,3 +28,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/structured_output/workflow.py b/google_genai_plugin/structured_output/workflow.py index 9b0c3330..5383cdae 100644 --- a/google_genai_plugin/structured_output/workflow.py +++ b/google_genai_plugin/structured_output/workflow.py @@ -6,6 +6,7 @@ ``response.parsed``. """ +# @@@SNIPSTART python-google-genai-structured-output-workflow from google.genai import types from pydantic import BaseModel from temporalio import workflow @@ -34,3 +35,6 @@ async def run(self, prompt: str) -> Recipe: recipe = response.parsed assert isinstance(recipe, Recipe) return recipe + + +# @@@SNIPEND diff --git a/google_genai_plugin/tools/run_worker.py b/google_genai_plugin/tools/run_worker.py index 2dfaad87..897830a7 100644 --- a/google_genai_plugin/tools/run_worker.py +++ b/google_genai_plugin/tools/run_worker.py @@ -1,5 +1,6 @@ """Worker for the tools sample.""" +# @@@SNIPSTART python-google-genai-tools-worker import asyncio import os @@ -32,3 +33,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/tools/run_workflow.py b/google_genai_plugin/tools/run_workflow.py index 0c17e0d5..4c57a03f 100644 --- a/google_genai_plugin/tools/run_workflow.py +++ b/google_genai_plugin/tools/run_workflow.py @@ -1,5 +1,6 @@ """Start the tools workflow.""" +# @@@SNIPSTART python-google-genai-tools-run-workflow import asyncio import os @@ -23,3 +24,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/tools/workflow.py b/google_genai_plugin/tools/workflow.py index 0589362b..10f4d68f 100644 --- a/google_genai_plugin/tools/workflow.py +++ b/google_genai_plugin/tools/workflow.py @@ -9,6 +9,7 @@ invokes both as needed. """ +# @@@SNIPSTART python-google-genai-tools-workflow from datetime import timedelta from google.genai import types @@ -51,3 +52,6 @@ async def recommend_activity(self, weather: str) -> str: if "sunny" in weather.lower(): return "Go for a hike." return "Visit a museum." + + +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/run_worker.py b/google_genai_plugin/vertex_ai/run_worker.py index 625275fd..fe7d50a8 100644 --- a/google_genai_plugin/vertex_ai/run_worker.py +++ b/google_genai_plugin/vertex_ai/run_worker.py @@ -5,6 +5,7 @@ ``GOOGLE_APPLICATION_CREDENTIALS`` to a service-account key file. """ +# @@@SNIPSTART python-google-genai-vertex-ai-worker import asyncio import os @@ -40,3 +41,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/run_workflow.py b/google_genai_plugin/vertex_ai/run_workflow.py index e72dadfc..3805a6a4 100644 --- a/google_genai_plugin/vertex_ai/run_workflow.py +++ b/google_genai_plugin/vertex_ai/run_workflow.py @@ -1,5 +1,6 @@ """Start the Vertex AI workflow.""" +# @@@SNIPSTART python-google-genai-vertex-ai-run-workflow import asyncio import os @@ -26,3 +27,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) +# @@@SNIPEND diff --git a/google_genai_plugin/vertex_ai/workflow.py b/google_genai_plugin/vertex_ai/workflow.py index 2a11f5fa..33db84aa 100644 --- a/google_genai_plugin/vertex_ai/workflow.py +++ b/google_genai_plugin/vertex_ai/workflow.py @@ -7,6 +7,7 @@ workflow deterministic. """ +# @@@SNIPSTART python-google-genai-vertex-ai-workflow from temporalio import workflow from temporalio.contrib.google_genai import TemporalAsyncClient @@ -25,3 +26,6 @@ async def run(self, prompt: str, project: str, location: str) -> str: contents=prompt, ) return response.text or "" + + +# @@@SNIPEND