Skip to content

Context

Every workflow handler receives a Context as its first argument. It provides access to workflow state storage, step transitions, deterministic utilities, and logging.

from grctl.worker.context import Context
from grctl.models import Directive

@order.step()
async def process(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id")
    # ... do work ...
    return ctx.next.complete({"order_id": order_id})

Store

ctx.store is a key-value store scoped to the workflow run. Use it to pass state between steps.

@order.start()
async def start(ctx: Context, order_id: str, amount: float) -> Directive:
    ctx.store.put("order_id", order_id)
    ctx.store.put("amount", amount)
    return ctx.next.step(validate)

@order.step()
async def validate(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id", str)  # raises KeyError if not set
    amount = await ctx.store.get("amount", float)
    return ctx.next.step(charge)

Atomicity and Buffering

The Store provides an all-or-nothing guarantee for state changes:

  • Writes are buffered: ctx.store.put() writes to local memory immediately but does not hit the server during step execution.
  • Atomic flush: Pending updates are flushed only when the handler returns a Directive. The server saves the state changes, history events, and the next step assignment in a single atomic transaction.
  • Consistency: This ensures that state never drifts from history. If a step's store updates are visible, its history is too. If a worker crashes mid-step, no partial state is ever saved, and the next worker will replay the step from the last clean state.

Store API

Method Description
ctx.store.put(key, value) Write a value. Buffered until step completes.
await ctx.store.get(key) Read a value. Returns None if not found.

Values can be any msgpack-serializable type: strings, numbers, lists, dicts, booleans, None.

Step Transitions

ctx.next is a builder that constructs the Directive returned from a handler. Each method produces a directive that tells the engine what to do after the current step completes.

Step Boundaries

A Directive marks a transaction boundary. Ground Control only persists your changes and moves to the next phase when a directive is returned.

return ctx.next.step(next_step_fn)           # run another step
return ctx.next.wait_for_event()             # pause, wait for an event
return ctx.next.complete(result)             # finish the workflow
return ctx.next.fail(error)                  # fail the workflow

See Workflows for the full transition table and Events for wait_for_event details.

Deterministic Functions

Workflow code re-executes from the beginning when a worker picks up a run after a crash. Any non-deterministic calls, getting the current time, generating a UUID, sampling a random number, must go through ctx so the engine can record the value on first execution and replay the same value on re-execution.

Do not use datetime.now(), uuid.uuid4(), or random.random() directly inside handlers or tasks. Use the ctx equivalents instead.

ctx.now()

Returns the current UTC time as a datetime. On re-execution, returns the recorded value from the first execution.

from datetime import datetime

@order.step()
async def record_timestamp(ctx: Context) -> Directive:
    created_at = await ctx.now()  # datetime, UTC
    ctx.store.put("created_at", created_at.isoformat())
    return ctx.next.complete("done")

ctx.random()

Returns a float in [0.0, 1.0). On re-execution, returns the recorded value.

@order.step()
async def assign_variant(ctx: Context) -> Directive:
    roll = await ctx.random()
    variant = "A" if roll < 0.5 else "B"
    ctx.store.put("variant", variant)
    return ctx.next.step(run_experiment)

ctx.uuid4()

Returns a uuid.UUID. On re-execution, returns the recorded value.

@order.step()
async def create_record(ctx: Context) -> Directive:
    record_id = await ctx.uuid4()
    ctx.store.put("record_id", str(record_id))
    return ctx.next.step(save_record)

ctx.sleep(duration)

Pauses execution for the given duration. On re-execution, returns immediately — the sleep is not repeated.

from datetime import timedelta

@order.step()
async def wait_and_retry(ctx: Context) -> Directive:
    await ctx.sleep(timedelta(minutes=5))
    return ctx.next.step(retry_payment)

ctx.sleep is appropriate for short delays inside a step. For pausing a workflow until an external event occurs, use ctx.next.wait_for_event() instead.

Logger

ctx.logger is a standard Python logger that is replay-aware. Log calls are suppressed during re-execution so that log output is not duplicated.

@order.step()
async def process(ctx: Context) -> Directive:
    ctx.logger.info("Processing order %s", await ctx.store.get("order_id"))
    return ctx.next.complete("done")

The logger supports the standard logging interface: debug, info, warning, error, exception.

Run Info

ctx.run exposes metadata about the current workflow run. Useful for logging or passing the workflow ID to external systems.

@order.step()
async def log_run(ctx: Context) -> Directive:
    ctx.logger.info("run_id=%s wf_id=%s", ctx.run.id, ctx.run.wf_id)
    return ctx.next.complete("done")
Field Type Description
ctx.run.id str Unique run ID (ULID)
ctx.run.wf_id str Workflow ID (caller-supplied, used for deduplication)
ctx.run.wf_type str Workflow type string

Child Workflows

ctx.start() launches a child workflow and returns a WorkflowHandle. ctx.send_to_parent() emits an event to the parent workflow. Both are covered in Child Workflows.

Reference

Context Properties

Property Type Description
ctx.store Store Key-value store for this run
ctx.next NextBuilder Step transition builder
ctx.logger ReplayAwareLogger Replay-aware logger
ctx.run RunInfo Metadata about the current run

Deterministic Functions

Method Returns Description
await ctx.now() datetime Current UTC time, recorded on first call
await ctx.random() float Random float in [0.0, 1.0), recorded on first call
await ctx.uuid4() uuid.UUID Random UUID v4, recorded on first call
await ctx.sleep(duration) None Sleep for timedelta; skipped on re-execution