Skip to main content
Once you’ve defined a task with @worker.task, you can submit it for background execution and optionally wait for the result.

Submit (fire and forget)

Use .submit() to enqueue a job and get back a Future:
future = await process_order.submit(order_id="123", items=["A", "B"])
print(future.job_id)  # "abc-123-def"
The call returns immediately. The worker picks up the job asynchronously.

Get the result later

Use the Future to wait for the result when you need it:
future = await process_order.submit(order_id="123", items=["A", "B"])

# ... do other work ...

result = await future.result(timeout=30.0)
print(result.value)    # {"order_id": "123", "status": "completed"}
print(result.job_id)   # "abc-123-def"

Get just the value

Use Future.value() when you only need the task return value:
future = await process_order.submit(order_id="123", items=["A", "B"])
value = await future.value(timeout=30.0)
print(value)  # {"order_id": "123", "status": "completed"}
future.value() raises the same errors as future.result() (TaskExecutionError, TaskTimeoutError, and TimeoutError). Both future.value() and result.value are typed from your task’s return annotation.

Cancel a job

cancelled = await future.cancel()

Wait (submit + block)

Use .wait() to submit and immediately wait for the result:
result = await process_order.wait(order_id="123", items=["A", "B"])
print(result.value)  # {"order_id": "123", "status": "completed"}
You can set a custom wait timeout:
result = await process_order.wait(
    order_id="123",
    items=["A", "B"],
    wait_timeout=60.0,
)

TaskResult

On success, .wait() and future.result() return a TaskResult. On failure, they raise a TaskExecutionError (or TaskTimeoutError for timeouts) — you never get a failed TaskResult back directly.
from upnext import TaskExecutionError

try:
    result = await process_order.wait(order_id="123", items=["A", "B"])
    print(result.value)  # Only reached on success
except TaskExecutionError as e:
    print(e.error)            # Error message
    print(e.task_result.attempts)  # Access the full TaskResult via the exception
PropertyTypeDescription
valueTReturn value from the task
job_idstrUnique job identifier
functionstrStable function key
function_namestrHuman-readable function name
statusstr"complete" (always, since failures raise)
okboolTrue (always, since failures raise)
started_atdatetime | NoneWhen execution started
completed_atdatetime | NoneWhen execution finished
attemptsintNumber of attempts (including retries)
parent_idstr | NoneParent job ID (see Workflows)
root_idstrRoot job ID in the execution tree (see Workflows)
See the Error Handling guide for details on catching TaskExecutionError and TaskTimeoutError.

Sync variants

All submission methods have sync equivalents for use outside of async contexts:
# Submit from sync code
future = process_order.submit_sync(order_id="123", items=["A", "B"])

# Submit and wait from sync code
result = process_order.wait_sync(order_id="123", items=["A", "B"])
Sync methods cannot be called from within an already-running event loop. Use the async variants inside tasks and API handlers.

Idempotent submission

Use .submit_idempotent() to prevent duplicate jobs with the same key:
future = await process_order.submit_idempotent(
    "order-123",  # Idempotency key
    order_id="123",
    items=["A", "B"],
)
If a job with the same idempotency key is already queued or running, you’ll get the existing job’s Future instead of creating a new one.
# Async wait variant
result = await process_order.wait_idempotent(
    "order-123",
    order_id="123",
    items=["A", "B"],
)

# Sync variants
future = process_order.submit_idempotent_sync("order-123", order_id="123", items=["A", "B"])
result = process_order.wait_idempotent_sync("order-123", order_id="123", items=["A", "B"])

Direct execution

Call a task handle directly to execute it inline (useful for testing):
# Runs the function directly, not via the queue
result = process_order(order_id="123", items=["A", "B"])