Once you’ve defined a task with @worker.task, you can submit it for background execution and optionally wait for the result.
Submit (fire and forget)
Use .submit() to enqueue a job and get back a Future:
future = await process_order.submit(order_id="123", items=["A", "B"])
print(future.job_id) # "abc-123-def"
The call returns immediately. The worker picks up the job asynchronously.
Get the result later
Use the Future to wait for the result when you need it:
future = await process_order.submit(order_id="123", items=["A", "B"])
# ... do other work ...
result = await future.result(timeout=30.0)
print(result.value) # {"order_id": "123", "status": "completed"}
print(result.job_id) # "abc-123-def"
Get just the value
Use Future.value() when you only need the task return value:
future = await process_order.submit(order_id="123", items=["A", "B"])
value = await future.value(timeout=30.0)
print(value) # {"order_id": "123", "status": "completed"}
future.value() raises the same errors as future.result() (TaskExecutionError, TaskTimeoutError, and TimeoutError).
Both future.value() and result.value are typed from your task’s return annotation.
Cancel a job
cancelled = await future.cancel()
Wait (submit + block)
Use .wait() to submit and immediately wait for the result:
result = await process_order.wait(order_id="123", items=["A", "B"])
print(result.value) # {"order_id": "123", "status": "completed"}
You can set a custom wait timeout:
result = await process_order.wait(
order_id="123",
items=["A", "B"],
wait_timeout=60.0,
)
TaskResult
On success, .wait() and future.result() return a TaskResult. On failure, they raise a TaskExecutionError (or TaskTimeoutError for timeouts) — you never get a failed TaskResult back directly.
from upnext import TaskExecutionError
try:
result = await process_order.wait(order_id="123", items=["A", "B"])
print(result.value) # Only reached on success
except TaskExecutionError as e:
print(e.error) # Error message
print(e.task_result.attempts) # Access the full TaskResult via the exception
| Property | Type | Description |
|---|
value | T | Return value from the task |
job_id | str | Unique job identifier |
function | str | Stable function key |
function_name | str | Human-readable function name |
status | str | "complete" (always, since failures raise) |
ok | bool | True (always, since failures raise) |
started_at | datetime | None | When execution started |
completed_at | datetime | None | When execution finished |
attempts | int | Number of attempts (including retries) |
parent_id | str | None | Parent job ID (see Workflows) |
root_id | str | Root job ID in the execution tree (see Workflows) |
Sync variants
All submission methods have sync equivalents for use outside of async contexts:
# Submit from sync code
future = process_order.submit_sync(order_id="123", items=["A", "B"])
# Submit and wait from sync code
result = process_order.wait_sync(order_id="123", items=["A", "B"])
Sync methods cannot be called from within an already-running event loop. Use the async variants inside tasks and API handlers.
Idempotent submission
Use .submit_idempotent() to prevent duplicate jobs with the same key:
future = await process_order.submit_idempotent(
"order-123", # Idempotency key
order_id="123",
items=["A", "B"],
)
If a job with the same idempotency key is already queued or running, you’ll get the existing job’s Future instead of creating a new one.
# Async wait variant
result = await process_order.wait_idempotent(
"order-123",
order_id="123",
items=["A", "B"],
)
# Sync variants
future = process_order.submit_idempotent_sync("order-123", order_id="123", items=["A", "B"])
result = process_order.wait_idempotent_sync("order-123", order_id="123", items=["A", "B"])
Direct execution
Call a task handle directly to execute it inline (useful for testing):
# Runs the function directly, not via the queue
result = process_order(order_id="123", items=["A", "B"])