Skip to content

Tasks

A task is an async function decorated with @task. Tasks handle the side-effecting work inside a step, calling APIs, writing to databases, sending messages. Tasks run in the same process as the step that calls them. There is no remote dispatch and no separate execution environment. You call a task like any other async function and it executes right there.

Task inputs and outputs are serialized so the engine can record them to the workflow history. The engine records each task's outcome so during the replay, completed tasks are skipped and their recorded result is returned immediately.

Basic Usage

import asyncio
from grctl.worker.task import task
from grctl.worker.context import Context
from grctl.models import Directive
from grctl.workflow import Workflow

order = Workflow(workflow_type="ProcessOrder")

@task
async def charge_payment(amount: float) -> str:
    # Call your payment API here
    return "txn_abc123"

@task
async def send_confirmation(txn_id: str, email: str) -> None:
    # Send confirmation email
    pass

@order.start()
async def start(ctx: Context, amount: float, email: str) -> Directive:
    txn_id = await charge_payment(amount)
    await send_confirmation(txn_id, email)
    return ctx.next.complete({"txn_id": txn_id})

Tasks are plain async functions. No ctx, no special return type. Call them with await like any other coroutine.

Replay Behavior

When a worker picks up a workflow after a crash, it re-executes the step from the beginning. Tasks that already completed are not re-executed — the engine looks up the recorded outcome and returns it immediately.

First execution:                  Re-execution after crash:
    charge_payment()  →  runs         charge_payment()  →  skipped, returns "txn_abc123"
    send_confirmation() →  runs       send_confirmation() →  skipped, returns None

If a task failed terminally on the first execution, the same error is re-raised on re-execution without calling the function again.

Retries

Pass a RetryPolicy to retry a task on failure:

from grctl.models.directive import RetryPolicy

@task(retry_policy=RetryPolicy(
    max_attempts=5,
    initial_delay_ms=500,
    backoff_multiplier=2.0,
    max_delay_ms=30_000,
))
async def call_external_api(url: str) -> dict:
    # This will be retried up to 5 times on failure
    ...

Retries happen within the same step execution. Each failed attempt is recorded before the next one starts, so if the worker crashes mid-retry the next worker picks up from the correct attempt count.

Retryable and Non-Retryable Errors

Control which error types trigger a retry:

# Only retry these specific errors
@task(retry_policy=RetryPolicy(
    max_attempts=3,
    retryable_errors=["TimeoutError", "ConnectionError"],
))
async def fetch_data(url: str) -> dict:
    ...

# Retry everything except these errors
@task(retry_policy=RetryPolicy(
    max_attempts=3,
    non_retryable_errors=["ValueError", "AuthenticationError"],
))
async def process_record(record_id: str) -> dict:
    ...

Error matching uses the exception class name (not the full module path).

If neither retryable_errors nor non_retryable_errors is set, all errors are retried up to max_attempts.

Error Handling

When a task raises an exception, what happens next depends on whether the task has a retry policy.

No retry policy: the exception propagates immediately to the step handler.

Retry policy exhausted: after all attempts fail, the final exception propagates to the step handler.

In both cases, you can catch the exception with a normal try/except in the step:

@order.step()
async def charge(ctx: Context) -> Directive:
    amount = await ctx.store.get("amount")
    order_id = await ctx.store.get("order_id")

    try:
        txn_id = await charge_payment(amount)
    except PaymentDeclinedError:
        await notify_customer(order_id, "payment_failed")
        return ctx.next.complete({"status": "payment_failed"})

    return ctx.next.complete({"txn_id": txn_id, "status": "charged"})

Use ctx.next.fail(error) to explicitly fail the workflow with structured error details:

from grctl.models import ErrorDetails

@order.step()
async def charge(ctx: Context) -> Directive:
    amount = await ctx.store.get("amount")

    try:
        txn_id = await charge_payment(amount)
    except Exception as e:
        return ctx.next.fail(ErrorDetails(
            type=type(e).__name__,
            message=f"Payment failed: {e}",
        ))

    return ctx.next.complete({"txn_id": txn_id})

If the exception goes uncaught, it propagates out of the step handler and the workflow fails with a StepFailed error.

Compensating Actions

When a task fails partway through a step that already completed other tasks, you need to undo the completed work. Do this in the except block before returning a directive:

@order.step()
async def fulfill(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id")

    reservation_id = await reserve_inventory(order_id)

    try:
        await charge_payment(order_id)
    except PaymentDeclinedError:
        await release_inventory(reservation_id)  # undo the reservation
        return ctx.next.complete({"status": "payment_failed"})

    return ctx.next.complete({"status": "fulfilled"})

Compensating tasks are recorded to history like any other task, so they won't re-run on replay.

Parallel Tasks

Tasks run in-process, so any standard asyncio concurrency primitive works. asyncio.gather(), asyncio.TaskGroup, asyncio.wait(), manual asyncio.create_task(), etc.

@order.step()
async def notify(ctx: Context) -> Directive:
    order_id = await ctx.store.get("order_id")
    email = await ctx.store.get("email")

    # gather
    await asyncio.gather(
        send_email(email, order_id),
        update_crm(order_id),
        log_analytics(order_id),
    )

    # or TaskGroup for structured concurrency
    async with asyncio.TaskGroup() as tg:
        tg.create_task(send_email(email, order_id))
        tg.create_task(update_crm(order_id))

    return ctx.next.complete("done")

Each task is recorded independently, so replay correctly skips only the ones that completed.

Reference

@task Decorator

Form Description
@task No retries. Single attempt.
@task(retry_policy=...) Retry on failure according to policy.

RetryPolicy

Field Type Default Description
max_attempts int | None 1 Total attempts including the first.
initial_delay_ms int | None 100 Delay before the second attempt, in milliseconds.
backoff_multiplier float | None 2.0 Multiplier applied to delay after each attempt.
max_delay_ms int | None 5000 Upper bound on delay, in milliseconds.
jitter float | None 0.0 Random fraction added to delay (0.01.0).
retryable_errors list[str] | None None Only retry errors whose class name is in this list.
non_retryable_errors list[str] | None None Never retry errors whose class name is in this list.