Skip to main content
UpNext provides automatic retries, timeout enforcement, and a dead-letter queue for jobs that exhaust all attempts.

Retries

Configure retries on any task:
@worker.task(retries=3)
async def flaky_task():
    # Will be retried up to 3 times on failure
    ...

Retry delay and backoff

Control the wait time between retries:
@worker.task(
    retries=5,
    retry_delay=2.0,      # 2 seconds before first retry
    retry_backoff=2.0,     # Double the delay each time
)
async def api_call():
    # Retry delays: 2s, 4s, 8s, 16s, 32s
    ...
ParameterDefaultDescription
retries0Maximum retry attempts
retry_delay1.0Seconds before the first retry
retry_backoff2.0Multiplier applied after each retry

Check attempt number

Inside a task, check which attempt you’re on:
@worker.task(retries=3)
async def resilient_task():
    ctx = upnext.get_current_context()
    print(f"Attempt {ctx.attempt} of {ctx.max_attempts}")

Timeouts

Set a maximum execution time for a task:
@worker.task(timeout=30.0)  # 30 seconds
async def bounded_task():
    ...
The default timeout is 30 minutes. If a task exceeds its timeout, it’s terminated and a TaskTimeoutError is raised.

Handle timeout errors

When waiting for a result, you can catch timeout-specific errors:
from upnext import TaskTimeoutError

try:
    result = await slow_task.wait(data="large")
except TaskTimeoutError as e:
    print(f"Job {e.job_id} timed out: {e.error}")

Error types

UpNext raises specific exceptions when tasks fail:

TaskExecutionError

Raised when a task finishes with a failed or cancelled status:
from upnext import TaskExecutionError

try:
    result = await risky_task.wait(x=1)
except TaskExecutionError as e:
    print(e.job_id)     # Job that failed
    print(e.status)     # "failed" or "cancelled"
    print(e.error)      # Error message
    print(e.task_result)  # Full TaskResult object

TaskTimeoutError

A subclass of TaskExecutionError, raised specifically when a task exceeds its timeout:
from upnext import TaskTimeoutError

try:
    result = await slow_task.wait(data="large")
except TaskTimeoutError as e:
    print(f"Timed out after {e.error}")

Catching errors

Both .wait() and future.result() always raise on failure — you never get a TaskResult with ok=False. Use try/except to handle errors:
from upnext import TaskExecutionError, TaskTimeoutError

try:
    result = await risky_task.wait(x=1)
    print(f"Success: {result.value}")
except TaskTimeoutError as e:
    print(f"Timed out: {e.error}")
except TaskExecutionError as e:
    print(f"Failed: {e.error}")
    print(f"Status: {e.status}")
    print(f"Attempts: {e.task_result.attempts}")
In workflows, this means a child failure bubbles up to the parent as an exception, giving you full control over recovery logic. To collect errors without raising, use gather with return_exceptions=True:
results = await upnext.gather(
    task_a.wait(x=1),
    task_b.wait(x=2),
    return_exceptions=True,
)

for result in results:
    if isinstance(result, TaskExecutionError):
        print(f"Task failed: {result.error}")
    else:
        print(f"Task succeeded: {result.value}")

Dead-letter queue

Jobs that exhaust all retry attempts are moved to the dead-letter queue (DLQ). You can browse and manage DLQ entries from the dashboard, or via the server’s REST API at /api/v1/dlq. The DLQ stores:
  • The original job payload
  • The error that caused the final failure
  • All attempt metadata

Lifecycle hooks

Attach callbacks to specific task events:
async def on_task_failure():
    # Send an alert, log to external service, etc.
    ...

@worker.task(
    retries=3,
    on_failure=on_task_failure,
    on_retry=lambda: print("Retrying..."),
)
async def monitored_task():
    ...
Available hooks: on_start, on_success, on_failure, on_retry, on_complete.