Skip to main content
UpNext uses a stream-per-function model: each registered task gets its own Redis Stream, and a worker only consumes from the streams of the tasks it registers. This means you can split work across multiple worker processes simply by registering different tasks on different workers.

Why dedicated workers?

A single worker running all tasks can become a bottleneck when your workloads have different requirements:
  • A slow video processing job shouldn’t block a fast email send
  • CPU-intensive report generation needs different concurrency than lightweight webhooks
  • Long-running imports need longer timeouts than quick cache updates
Dedicated workers solve this with process-level isolation — no priority queues needed.

Define separate workers

Create a worker for each workload category, each registering only its own tasks:
fast_worker.py
import upnext

worker = upnext.Worker("fast-worker", concurrency=50)

@worker.task(timeout=30)
async def send_email(to: str, subject: str):
    ...

@worker.task(timeout=10)
async def update_cache(key: str):
    ...
heavy_worker.py
import upnext

worker = upnext.Worker("heavy-worker", concurrency=2)

@worker.task(timeout=3600)
async def process_video(video_id: str):
    ...

@worker.task(timeout=1800)
async def generate_report(report_id: str):
    ...

Run as separate processes

Each worker runs in its own process, scaling independently:
# Terminal 1 — many lightweight tasks in parallel
upnext run fast_worker.py

# Terminal 2 — few heavy tasks with more resources
upnext run heavy_worker.py
Scale each tier independently by running multiple instances:
# 3 fast worker instances
upnext run fast_worker.py &
upnext run fast_worker.py &
upnext run fast_worker.py &

# 1 heavy worker instance
upnext run heavy_worker.py

Cross-worker task submission

Tasks in one worker can submit jobs to tasks registered on another worker. The jobs route automatically through Redis Streams — the submitting worker doesn’t need to process them.
fast_worker.py
from heavy_worker import process_video

@worker.task()
async def handle_upload(file_id: str):
    # This submits to heavy_worker's task
    await process_video.submit(video_id=file_id)
Importing process_video gives you the task handle — UpNext routes the job through Redis to whichever worker registered that function.

Autodiscovery for larger projects

For projects with many task modules, use autodiscover to automatically import and register all tasks in a package:
myapp/
  fast_worker.py
  heavy_worker.py
  tasks/
    __init__.py
    emails.py       # @fast_worker.task() — discovered automatically
    cache.py        # @fast_worker.task() — discovered automatically
  jobs/
    __init__.py
    video.py        # @heavy_worker.task() — discovered automatically
    reports.py      # @heavy_worker.task() — discovered automatically
fast_worker.py
import upnext

worker = upnext.Worker(
    "fast-worker",
    concurrency=50,
    autodiscover_packages=["myapp.tasks"],
)
Or call autodiscover() explicitly:
worker = upnext.Worker("fast-worker", concurrency=50)
worker.autodiscover("myapp.tasks")

When to use dedicated workers

ScenarioSolution
Fast tasks blocked by slow onesSeparate workers with different concurrency
Tasks need different timeoutsEach worker’s tasks configure their own timeouts
CPU-heavy vs I/O-heavy workloadsSeparate processes with different resource limits
Different scaling requirementsScale each worker tier independently
Need workload isolation in productionDeploy as separate containers/services
You don’t need to use dedicated workers for everything. A single worker running all tasks works well for small to medium workloads. Split into dedicated workers when you have clearly different workload characteristics.