Skip to main content
import upnext

worker = upnext.Worker("my-worker")

@worker.task(retries=3, timeout=30.0)
async def process_order(order_id: str, items: list[str]) -> dict:
    ctx = upnext.get_current_context()
    ctx.set_progress(50, "Halfway done")
    return {"order_id": order_id, "status": "completed"}
A task is any Python function — async or sync — decorated with @worker.task. Tasks are the most common job type. They run once when submitted and return a result.

Define a task

Register a task with the @worker.task decorator:
@worker.task
async def simple_task(name: str) -> str:
    return f"Hello, {name}!"
Tasks can be async or sync. Sync tasks run in a thread pool (or process pool) automatically.
@worker.task
def sync_task(data: list[str]) -> int:
    # Runs in a thread pool — no need for async
    return len(data)

Task options

ParameterTypeDefaultDescription
retriesint0Number of retry attempts on failure
retry_delayfloat1.0Seconds to wait before first retry
retry_backofffloat2.0Multiplier applied to delay after each retry
timeoutfloat1800Maximum execution time in seconds (30 min default)
namestrNoneCustom display name (defaults to function name)
rate_limitstrNoneRate limit string (e.g. "10/minute")
max_concurrencyintNoneMax concurrent executions of this specific task
cache_keystrNoneCache key template for result caching
cache_ttlintNoneCache TTL in seconds

Submit a task

Once a task is registered, you can submit it for background execution:
# Submit and get a Future (non-blocking)
future = await process_order.submit(order_id="123", items=["A", "B"])
print(future.job_id)  # Unique job ID
value = await future.value()  # {"order_id": "123", "status": "completed"}

# Submit and wait for the result (blocking)
result = await process_order.wait(order_id="123", items=["A", "B"])
print(result.value)   # {"order_id": "123", "status": "completed"}
See the Submit & Wait guide for details on Futures, TaskResults, and sync/async patterns.

Lifecycle hooks

You can attach callbacks to task lifecycle events:
@worker.task(
    on_start=lambda: print("Starting"),
    on_success=lambda: print("Done!"),
    on_failure=lambda: print("Failed"),
    on_retry=lambda: print("Retrying..."),
    on_complete=lambda: print("Complete (success or failure)"),
)
async def my_task():
    ...

Next: Workflows

Compose tasks into multi-step pipelines with automatic lineage tracking.