Skip to content

Client

The Client is how external code starts workflows and interacts with them. It connects to the server over NATS and returns handles you can use to send events or wait for results.

Connecting

Create a Connection first, then pass it to the Client:

from grctl.nats.connection import Connection
from grctl.client.client import Client

connection = await Connection.connect()
client = Client(connection=connection)

Connection.connect() reads the server address from the GRCTL_NATS_SERVERS environment variable. The default is nats://localhost:4225. You can also pass servers explicitly:

connection = await Connection.connect(servers=["nats://prod-server:4225"])

Connection is a singleton — calling connect() a second time returns the same instance.

Starting a Workflow and Waiting for the Result

Use run_workflow() to start a workflow and block until it completes:

result = await client.run_workflow(
    workflow_type="ProcessOrder",
    workflow_id="order-abc-123",
    workflow_input={"order_id": "abc-123", "amount": 49.99},
    workflow_timeout=timedelta(minutes=5),
)
print(result)  # the value passed to ctx.next.complete(...)

run_workflow() raises if the workflow fails, times out, or is cancelled:

from grctl.models.errors import WorkflowError

try:
    result = await client.run_workflow(...)
except WorkflowError as e:
    print(f"Workflow failed: {e}")
except TimeoutError:
    print("Workflow timed out")
except asyncio.CancelledError:
    print("Workflow was cancelled")

Getting a Handle

Use start_workflow() when you need to send events to the workflow or don't want to block on the result:

handle = await client.start_workflow(
    workflow_type="ProcessOrder",
    workflow_id="order-abc-123",
    workflow_input={"order_id": "abc-123", "amount": 49.99},
    workflow_timeout=timedelta(minutes=30),
)

The workflow starts immediately. start_workflow() returns as soon as the start command is sent — it does not wait for the workflow to finish.

Sending Events

Use handle.send() to push events into a running workflow:

await handle.send("approve")
await handle.send("update_quantity", payload={"item_id": "sku-1", "qty": 3})

See Events for how the workflow receives and processes them.

Waiting for the Result

handle.future is an asyncio.Future that resolves when the workflow completes:

# Wait indefinitely
result = await handle.future

# Wait with a timeout
result = await asyncio.wait_for(handle.future, timeout=60)

The future resolves to the value passed to ctx.next.complete(result) inside the workflow. It raises WorkflowError, TimeoutError, or asyncio.CancelledError on failure.

Sending events then waiting

handle = await client.start_workflow(
    workflow_type="GreetEvents",
    workflow_id="greet-session-1",
    workflow_input={"name": "Cem"},
    workflow_timeout=timedelta(seconds=30),
)

await handle.send("greet")
await handle.send("farewell")

result = await asyncio.wait_for(handle.future, timeout=30)

Getting a Handle for an Existing Workflow

Use get_handle() to get a handle to a workflow that is already running. You only need the workflow_id:

handle = await client.get_handle(workflow_id="order-abc-123")

# Send events, run queries, or wait for the result
await handle.send("approve")
result = await handle.future

This is useful when a different part of your system needs to interact with a workflow it didn't start. For example, a webhook handler that sends an approval event to a running order workflow.

See Workflow ID and Runs for how workflow identity and deduplication work.

Reference

Client

Method Description
Client(connection) Create a client using an existing Connection.
await client.run_workflow(...) Start a workflow and wait for its result.
await client.start_workflow(...) Start a workflow and return a handle.
await client.get_handle(workflow_id) Get a handle to an existing workflow.

run_workflow() / start_workflow() Parameters

Parameter Type Default Description
workflow_type str The workflow_type string of the workflow to run.
workflow_id str Stable caller-supplied identifier. Used for deduplication.
workflow_input Any \| None None Input passed to the workflow's start handler as keyword arguments.
workflow_timeout timedelta \| None None Maximum duration before the workflow is failed with TimeoutError.

WorkflowHandle

Member Type Description
handle.future asyncio.Future Resolves to the workflow result. Raises on failure.
handle.run_info RunInfo Metadata: run_info.id (run ID), run_info.wf_id (workflow ID), run_info.wf_type.
await handle.send(event_name, payload) Send an event to the workflow.

Connection.connect()

Parameter Type Default Description
servers list[str] \| None None NATS server URLs. Defaults to grctl_NATS_SERVERS env var or nats://localhost:4225.