Skip to main content
import upnext

worker = upnext.Worker("my-worker", concurrency=10)

@worker.task(retries=3, timeout=30.0)
async def process_order(order_id: str, items: list[str]) -> dict:
    return {"order_id": order_id, "status": "completed"}

@worker.cron("0 9 * * *")
async def daily_report():
    return {"generated": True}

order_placed = worker.event("order.placed")

@order_placed.on
async def send_confirmation(order_id: str):
    return {"sent": True}
A Worker listens on a Redis queue and executes jobs. You define all three job types — tasks, events, and cron jobs — on a worker instance.

Create a worker

Instantiate upnext.Worker to create a worker:
worker = upnext.Worker(
    "my-worker",        # Name (used for identification in dashboard)
    concurrency=10,     # Max concurrent jobs (default: 2)
)
A worker manages:
  • A Redis-backed queue for receiving jobs
  • A job processor that executes task, event, and cron functions
  • Cron schedulers for recurring work
  • Event routers for pub/sub handlers

Worker options

ParameterTypeDefaultDescription
namestrauto-generatedWorker name for dashboard identification
concurrencyint2Maximum concurrent job executions
prefetchintNoneJobs to prefetch from queue (auto-tuned by default)
sync_executorSyncExecutorTHREADExecutor for sync tasks: THREAD or PROCESS
sync_pool_sizeintNoneSize of the sync executor pool
redis_urlstrNoneRedis URL (falls back to UPNEXT_REDIS_URL env var)
secretslist[str][]Secret names to fetch on startup (requires server)

Run a worker

Workers are started with upnext.run() or the CLI:
# In code
upnext.run(worker)
# From the CLI
upnext run service.py
See the Run Services guide for running workers alongside APIs, using --only to start specific components, and more.

Scaling

Run multiple worker instances pointing at the same Redis — UpNext distributes jobs automatically.
# Terminal 1
upnext run service.py

# Terminal 2 (same code, same Redis)
upnext run service.py
Under the hood, UpNext uses Redis Streams consumer groups. Each worker instance joins the same consumer group and claims jobs via XREADGROUP, so no two workers process the same job. If a worker crashes, its pending jobs are automatically recovered by other instances. Each instance manages its own concurrency independently:
# Each instance processes up to 50 jobs at a time
worker = upnext.Worker("my-worker", concurrency=50)
Per-function controls like rate_limit and max_concurrency are enforced across the entire cluster, not per-instance — so max_concurrency=10 means 10 total, regardless of how many workers are running.
See the Docker Compose guide for scaling workers with container replicas.

Job types

Workers execute three kinds of jobs. Each has its own decorator and behavior: Tasks can submit other tasks, creating parent-child relationships that form multi-step workflows. UpNext automatically tracks the full execution tree — see Workflows.