Skip to main content
import upnext

worker = upnext.Worker("my-worker", concurrency=10)

@worker.task(retries=3, timeout=30.0)
async def process_order(order_id: str, items: list[str]) -> dict:
    return {"order_id": order_id, "status": "completed"}

@worker.cron("0 9 * * *")
async def daily_report():
    return {"generated": True}

order_placed = worker.event("order.placed")

@order_placed.on
async def send_confirmation(order_id: str):
    return {"sent": True}
A Worker listens on a Redis queue and executes jobs. You define all three job types — tasks, events, and cron jobs — on a worker instance.

Create a worker

Instantiate upnext.Worker to create a worker:
worker = upnext.Worker(
    "my-worker",        # Name (used for identification in dashboard)
    concurrency=10,     # Max concurrent jobs (default: 2)
)
A worker manages:
  • A Redis-backed queue for receiving jobs
  • A job processor that executes task, event, and cron functions
  • Cron schedulers for recurring work
  • Event routers for pub/sub handlers

Worker options

ParameterTypeDefaultDescription
namestrauto-generatedWorker name for dashboard identification
concurrencyint2Maximum concurrent job executions
sync_executorSyncExecutorTHREADExecutor for sync tasks: THREAD or PROCESS
redis_urlstrNoneRedis URL (falls back to UPNEXT_REDIS_URL env var)
queue_configWorkerQueueConfig | NoneNoneOptional per-worker queue overrides (otherwise uses UPNEXT_QUEUE_*)

Queue tuning

Queue behavior is tuned with UPNEXT_QUEUE_* environment variables. These values are read when the worker initializes.
VariableDefaultDescription
UPNEXT_QUEUE_BATCH_SIZE4Number of jobs fetched/flushed per batch
UPNEXT_QUEUE_INBOX_SIZE4In-memory fetch buffer size (minimum is batch size)
UPNEXT_QUEUE_OUTBOX_SIZE10000In-memory completion buffer size
UPNEXT_QUEUE_FLUSH_INTERVAL_MS5.0Max delay before flushing completed jobs
UPNEXT_QUEUE_CLAIM_TIMEOUT_MS30000Idle time before stale jobs are reclaimable
UPNEXT_QUEUE_JOB_TTL_SECONDS86400TTL for job payload/index/dedup keys
UPNEXT_QUEUE_STREAM_MAXLEN0Redis stream trim cap (0 means unbounded)
You can also override queue tuning per worker in code:
worker = upnext.Worker(
    "my-worker",
    queue_config=upnext.WorkerQueueConfig(batch_size=32, inbox_size=32),
)
UPNEXT_QUEUE_BATCH_SIZE=200 \
UPNEXT_QUEUE_INBOX_SIZE=2000 \
UPNEXT_QUEUE_OUTBOX_SIZE=20000 \
UPNEXT_QUEUE_FLUSH_INTERVAL_MS=20 \
UPNEXT_QUEUE_STREAM_MAXLEN=200000 \
UPNEXT_REDIS_URL=redis://localhost:6379 \
upnext run service.py

Run a worker

Workers are started with upnext.run() or the CLI:
# In code
upnext.run(worker)
# From the CLI
upnext run service.py
See the Run Services guide for running workers alongside APIs, using --only to start specific components, and more.

Scaling

Run multiple worker instances pointing at the same Redis — UpNext distributes jobs automatically.
# Terminal 1
upnext run service.py

# Terminal 2 (same code, same Redis)
upnext run service.py
Under the hood, UpNext uses Redis Streams consumer groups. Delivery is at-least-once, not exactly-once. If a worker crashes after claiming a job but before ACK/finalize, another worker can reclaim and execute it. Use idempotency keys for duplicate-sensitive submissions (see Submit and Wait). Each instance manages its own concurrency independently:
# Each instance processes up to 50 jobs at a time
worker = upnext.Worker("my-worker", concurrency=50)
Per-function controls like rate_limit and max_concurrency are enforced across the entire cluster, not per-instance — so max_concurrency=10 means 10 total, regardless of how many workers are running.
See the Docker Compose guide for scaling workers with container replicas.

Job types

Workers execute three kinds of jobs. Each has its own decorator and behavior:

Tasks

One-off background jobs with retries, timeouts, and result handling.

Events

Pub/sub handlers that trigger when an event is published.

Cron Jobs

Recurring jobs on a schedule with standard cron syntax.
Tasks can submit other tasks, creating parent-child relationships that form multi-step workflows. UpNext automatically tracks the full execution tree — see Workflows.