UpNext uses a stream-per-function model: each registered task gets its own Redis Stream, and a worker only consumes from the streams of the tasks it registers. This means you can split work across multiple worker processes simply by registering different tasks on different workers.
Why dedicated workers?
A single worker running all tasks can become a bottleneck when your workloads have different requirements:
- A slow video processing job shouldn’t block a fast email send
- CPU-intensive report generation needs different concurrency than lightweight webhooks
- Long-running imports need longer timeouts than quick cache updates
Dedicated workers solve this with process-level isolation — no priority queues needed.
Define separate workers
Create a worker for each workload category, each registering only its own tasks:
import upnext
worker = upnext.Worker("fast-worker", concurrency=50)
@worker.task(timeout=30)
async def send_email(to: str, subject: str):
...
@worker.task(timeout=10)
async def update_cache(key: str):
...
import upnext
worker = upnext.Worker("heavy-worker", concurrency=2)
@worker.task(timeout=3600)
async def process_video(video_id: str):
...
@worker.task(timeout=1800)
async def generate_report(report_id: str):
...
Run as separate processes
Each worker runs in its own process, scaling independently:
# Terminal 1 — many lightweight tasks in parallel
upnext run fast_worker.py
# Terminal 2 — few heavy tasks with more resources
upnext run heavy_worker.py
Scale each tier independently by running multiple instances:
# 3 fast worker instances
upnext run fast_worker.py &
upnext run fast_worker.py &
upnext run fast_worker.py &
# 1 heavy worker instance
upnext run heavy_worker.py
Cross-worker task submission
Tasks in one worker can submit jobs to tasks registered on another worker. The jobs route automatically through Redis Streams — the submitting worker doesn’t need to process them.
from heavy_worker import process_video
@worker.task()
async def handle_upload(file_id: str):
# This submits to heavy_worker's task
await process_video.submit(video_id=file_id)
Importing process_video gives you the task handle — UpNext routes the job through Redis to whichever worker registered that function.
Autodiscovery for larger projects
For projects with many task modules, use autodiscover to automatically import and register all tasks in a package:
myapp/
fast_worker.py
heavy_worker.py
tasks/
__init__.py
emails.py # @fast_worker.task() — discovered automatically
cache.py # @fast_worker.task() — discovered automatically
jobs/
__init__.py
video.py # @heavy_worker.task() — discovered automatically
reports.py # @heavy_worker.task() — discovered automatically
import upnext
worker = upnext.Worker(
"fast-worker",
concurrency=50,
autodiscover_packages=["myapp.tasks"],
)
Or call autodiscover() explicitly:
worker = upnext.Worker("fast-worker", concurrency=50)
worker.autodiscover("myapp.tasks")
When to use dedicated workers
| Scenario | Solution |
|---|
| Fast tasks blocked by slow ones | Separate workers with different concurrency |
| Tasks need different timeouts | Each worker’s tasks configure their own timeouts |
| CPU-heavy vs I/O-heavy workloads | Separate processes with different resource limits |
| Different scaling requirements | Scale each worker tier independently |
| Need workload isolation in production | Deploy as separate containers/services |
You don’t need to use dedicated workers for everything. A single worker running all tasks works well for small to medium workloads. Split into dedicated workers when you have clearly different workload characteristics.