Skip to main content
UpNext provides utilities for running multiple tasks in parallel. These work with both Future objects and regular awaitables, and are the building blocks for multi-step workflows.

gather — run all, wait for all

Submit multiple tasks and wait for all of them to complete:
import upnext

user_future = await fetch_user.submit(user_id="123")
orders_future = await fetch_orders.submit(user_id="123")
notifications_future = await fetch_notifications.submit(user_id="123")

user, orders, notifications = await upnext.gather(
    user_future,
    orders_future,
    notifications_future,
)
gather also works with regular awaitables:
import asyncio
import upnext

total, ping = await upnext.gather(
    asyncio.sleep(0, result=42),
    asyncio.sleep(0, result="ok"),
)
When you pass Future objects, gather returns task values (same as await future.value() for each future).
When you pass task.wait(...) coroutines, gather returns TaskResult objects because those awaitables resolve to TaskResult.
results = await upnext.gather(
    task_a.wait(x=1),
    task_b.wait(x=2),
)
print(results[0].value)

Handle errors gracefully

By default, gather raises on the first error. Set return_exceptions=True to collect errors as results instead:
results = await upnext.gather(
    task_a.wait(x=1),
    task_b.wait(x=2),
    task_c.wait(x=3),
    return_exceptions=True,
)

for result in results:
    if isinstance(result, Exception):
        print(f"Task failed: {result}")
    else:
        print(f"Task succeeded: {result}")

map_tasks — fan out with concurrency control

Apply a task to a list of inputs with bounded concurrency:
results = await upnext.map_tasks(
    process_item,  # TaskHandle from @worker.task
    [{"item_id": i} for i in range(1000)],
    concurrency=10,
)
This submits up to 10 jobs at a time and waits for all of them to finish. Each dict in the input list is unpacked as keyword arguments to the task.

Parameters

ParameterTypeDefaultDescription
taskTaskHandlerequiredThe task to execute
inputslist[dict]requiredList of keyword argument dicts
concurrencyint10Maximum concurrent executions

first_completed — race multiple tasks

Return the first result and cancel the rest:
result = await upnext.first_completed(
    fetch_from_provider_a.wait(query="data"),
    fetch_from_provider_b.wait(query="data"),
    fetch_from_provider_c.wait(query="data"),
)
Useful for racing multiple providers, fallback strategies, or timeout patterns.
result = await upnext.first_completed(
    slow_task.wait(x=1),
    fast_task.wait(x=1),
    timeout=10.0,  # Overall timeout
)

submit_many — fire and forget in bulk

Submit multiple jobs without waiting for results:
futures = await upnext.submit_many(
    process_item,
    [{"item_id": i} for i in range(1000)],
)

# futures is a list of Future objects
# Collect results later when needed
for future in futures:
    result = await future.result()
Unlike map_tasks, submit_many returns immediately with a list of Future objects. Use it when you want to submit all jobs upfront and collect results later (or not at all).

Workflows

All of these primitives work inside nested tasks. When a parent task uses gather, map_tasks, or first_completed to coordinate child tasks, UpNext automatically tracks the parent-child lineage through the entire execution tree.

Workflows

Learn about nested tasks, lineage tracking, failure propagation, and multi-level workflow patterns.