Skip to main content
import upnext

@worker.task(retries=3, timeout=60.0)
async def process_data(items: list[str]) -> dict:
    ctx = upnext.get_current_context()

    for i, item in enumerate(items):
        ctx.set_progress((i + 1) / len(items) * 100, f"Processing {item}")
        ctx.checkpoint({"last_index": i})

    ctx.send_log("info", "All items processed")
    return {"processed": len(items)}
The Context gives you access to the current job’s execution state. Use it to report progress, save recovery checkpoints, and send structured logs to the dashboard.

Get the context

Call upnext.get_current_context() from inside any task, cron job, or event handler:
ctx = upnext.get_current_context()
This only works inside a running job. Calling it outside a task raises a RuntimeError.

Progress updates

Report progress to the dashboard with set_progress():
ctx.set_progress(50, "Halfway done")
ctx.set_progress(0.75, "Three quarters")  # 0.0-1.0 also works
ctx.set_progress(100, "Complete")
Progress values can be:
  • 0 to 100 — treated as a percentage
  • 0.0 to 1.0 — automatically converted to a percentage
The optional message string is shown alongside the progress bar in the dashboard.
Progress updates are automatically coalesced to avoid flooding Redis. Small increments within 200ms are batched together.

Checkpoints

Save intermediate state for crash recovery with checkpoint():
@worker.task(retries=3)
async def process_batch(items: list[dict]) -> dict:
    ctx = upnext.get_current_context()
    processed = 0

    for i, item in enumerate(items):
        await do_work(item)
        processed += 1
        ctx.checkpoint({"last_index": i, "processed": processed})

    return {"total": processed}
If the worker crashes mid-execution and the task retries, your checkpoint state is available for recovery logic.

Structured logs

Send log entries that appear in the job’s log viewer in the dashboard:
ctx.send_log("info", "Starting data import")
ctx.send_log("warning", "Skipping invalid record", record_id="abc")
ctx.send_log("error", "Failed to connect", host="db.example.com")
Log levels: debug, info, warning, error. Extra keyword arguments are included as structured data.

Context properties

PropertyTypeDescription
job_idstrUnique job identifier
job_keystrQueue key for this job
parent_idstr | NoneParent job ID (see Workflows)
root_idstrRoot job ID in the execution tree (see Workflows)
attemptintCurrent attempt number (1-indexed)
max_attemptsintTotal allowed attempts
functionstrStable function key
function_namestrHuman-readable function name
is_cancelledboolWhether cancellation was requested
logLoggerPython logger for this job

Cooperative cancellation

Check is_cancelled in long-running tasks to exit early when a cancellation is requested:
@worker.task(timeout=300.0)
async def long_running_task():
    ctx = upnext.get_current_context()

    for i in range(1000):
        if ctx.is_cancelled:
            return {"stopped_at": i}
        await do_work(i)

Sync safety

All context methods (set_progress, checkpoint, send_log) work in both async and sync tasks. They detect the execution environment automatically — no special handling needed.
@worker.task
def sync_task():
    ctx = upnext.get_current_context()
    ctx.set_progress(50, "Works in sync tasks too")

Next: Artifacts

Attach outputs like JSON, images, and files to jobs.