Skip to content

Workflows

A workflow is a definition that describes a sequence of steps and how to transition between them. You define workflows using a decorator-based API similar to FastAPI route registration.

Defining a Workflow

Create a Workflow instance and register handlers with decorators:

from grctl.workflow import Workflow
from grctl.worker.context import Context
from grctl.models import Directive

order = Workflow(workflow_type="ProcessOrder")

@order.start()
async def start(ctx: Context, order_id: str) -> Directive:
    ctx.store.put("order_id", order_id)
    return ctx.next.step(process)

@order.step()
async def process(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id")
    # ... do work ...
    return ctx.next.complete({"order_id": order_id, "status": "done"})

The workflow_type string identifies this workflow across the system. Clients use it to start workflows, workers use it to route step assignments.

Workflow ID and Runs

Every workflow instance is identified by a workflow_id, a stable, caller-supplied string like an order number or request ID. When a client starts a workflow, it provides this ID. The server rejects the request if a workflow with that ID is already running, which makes workflow starts idempotent.

Each time a workflow runs, the system generates a run_id — a unique identifier for that specific execution. A single workflow_id can have multiple runs over its lifetime, but only one can be active at a time.

Concept Set by Purpose
workflow_id Caller Stable identity. Used for deduplication and to get a handle to an existing workflow.
run_id System Identifies a specific execution. Generated automatically on each start.

Use workflow_id to get a handle to a running workflow and interact with it — send events, run queries, or wait for the result. See Client for details.

Start Handler

Every workflow needs exactly one start handler. It runs once when the workflow is first created, receives the workflow input as function arguments, and returns a directive that determines what happens next.

Not every workflow needs multiple steps. If the workflow is simple, the start handler alone can do all the work and complete directly:

@order.start()
async def start(ctx: Context, order_id: str, amount: float) -> Directive:
    result = await charge_payment(order_id, amount)
    return ctx.next.complete(result)

Use separate steps when you need to split a workflow into distinct logical phases. For example, validate, then charge, then ship. Each step is a persistence boundary, so they're useful when you want progress saved between phases.

@order.start()
async def start(ctx: Context, order_id: str, amount: float) -> Directive:
    # Initialize workflow state
    ctx.store.put("order_id", order_id)
    ctx.store.put("amount", amount)

    # Transition to next step
    return ctx.next.step(validate)

The start handler's parameters (after ctx) are populated from the workflow_input dict passed by the client. If the client sends {"order_id": "abc", "amount": 49.99}, those become the order_id and amount arguments.

Steps

Steps are the building blocks of a workflow. Each step is a persistence boundary. When a step completes, its result is saved before the next step begins. Only one step runs at a time per workflow. See How It Works for details on how steps are dispatched to workers.

@order.step()
async def validate(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id")
    # ... validate order ...
    return ctx.next.step(charge)

@order.step()
async def charge(ctx: Context) -> Directive:
    amount = await ctx.store.get("amount")
    # ... charge payment ...
    return ctx.next.complete("charged")

Every step handler receives a Context and returns a Directive. The directive tells the engine what to do next: run another step, wait for an event, complete, or fail.

Step Timeouts

Steps have a default timeout of 10 seconds. Override it per step:

from datetime import timedelta

@order.step(timeout=timedelta(seconds=60))
async def long_running_step(ctx: Context) -> Directive:
    # This step has 60 seconds to complete
    ...

If a step exceeds its timeout, the engine cancels it and fails the workflow.

Looping Steps

A step can transition back to itself, creating a loop. This is useful for processing items in batches with saving state between each iteration:

counter = Workflow(workflow_type="Counter")

@counter.start()
async def start(ctx: Context, target: int) -> Directive:
    ctx.store.put("count", 0)
    ctx.store.put("target", target)
    return ctx.next.step(increment)

@counter.step()
async def increment(ctx: Context) -> Directive:
    count = await ctx.store.get("count")
    target = await ctx.store.get("target")

    # Do a batch of work
    for _ in range(10):
        count += 1

    ctx.store.put("count", count)

    if count >= target:
        return ctx.next.complete(count)

    # Loop back — state is persisted between iterations
    return ctx.next.step(increment)

Each loop iteration is a separate step execution with its own persisted state, so progress is never lost even if the worker crashes mid-loop.

Step Transitions

Every handler returns a directive via ctx.next that tells the engine what to do next:

Method Description
ctx.next.step(fn) Transition to the given step function
ctx.next.wait_for_event() Pause and wait for external events. See Events for details
ctx.next.complete(result) Complete the workflow with a result
ctx.next.fail(error) Fail the workflow with an error

Events

Use @workflow.event() to register event handlers on a workflow. Event handlers run when an external system sends an event to a running workflow via a workflow handle. They can mutate state and control transitions just like steps.

@order.event()
async def cancel(ctx: Context) -> Directive:
    ctx.store.put("status", "cancelled")
    return ctx.next.complete("cancelled")

Important

Events can arrive at any time, including while a step is running. When that happens, the event is saved to an inbox rather than processed immediately. The workflow only handles events when it transitions to WaitEvent state. At that point, the server pulls one waiting event from the inbox and dispatches it as a step. Once that step completes, the handler's ctx.next determines what happens next; transition to another step, wait for more events, or complete.

Events are covered in depth in Events.

Reference

Workflow

Method Parameters Description
Workflow(workflow_type) workflow_type: str Create a workflow definition
@workflow.start() Register the start handler (exactly one)
@workflow.step() timeout: timedelta \| None, retry_policy: RetryPolicy \| None Register a step handler
@workflow.event() name: str \| None Register an event handler
@workflow.query() name: str \| None Register a query handler

Handler Signatures

Handler Signature Returns
Start async def fn(ctx: Context, **input) -> Directive Directive
Step async def fn(ctx: Context) -> Directive Directive
Event async def fn(ctx: Context, [payload]) -> Directive Directive
Query async def fn(ctx: Context) -> Any Any value

Step Defaults

Setting Default
timeout 10 seconds
retry_policy None (no retries, single attempt)