Skip to main content
import upnext

worker = upnext.Worker("my-worker")

order_placed = worker.event("order.placed")

@order_placed.on
async def send_confirmation(order_id: str, user_id: str):
    return {"confirmation_sent": True}

# Later, fire the event
await order_placed.send(order_id="123", user_id="user-1")
Events let you decouple parts of your application with a pub/sub pattern. Define named events, attach handlers, and fire them from anywhere — tasks, API endpoints, or other handlers.

Define an event

Create an event with worker.event():
order_placed = worker.event("order.placed")
user_registered = worker.event("user.registered")
The event name is a string identifier. Use dot-notation to organize events by domain.

Attach handlers

Use the @event.on decorator to register a handler:
@order_placed.on
async def send_confirmation(order_id: str, user_id: str):
    # This runs as a background job when the event fires
    return {"sent": True}
You can attach multiple handlers to the same event. Each handler runs as a separate background job.
@order_placed.on
async def send_confirmation(order_id: str, **kwargs):
    ...

@order_placed.on(retries=2)
async def update_inventory(items: list[str] | None = None, **kwargs):
    ...
Use **kwargs in your handler signature to accept any additional event data without breaking if new fields are added later.

Handler options

The @event.on decorator accepts the same options as @worker.task:
ParameterTypeDefaultDescription
retriesint0Retry attempts on failure
retry_delayfloat1.0Seconds before first retry
retry_backofffloat2.0Backoff multiplier
timeoutfloat1800Max execution time in seconds

Fire events

Send an event with keyword arguments that get passed to all handlers:
# From an API endpoint
@api.post("/orders")
async def create_order(order: dict):
    await order_placed.send(
        order_id="123",
        user_id="user-1",
        items=["Widget A", "Widget B"],
    )
    return {"status": "submitted"}

# From inside a task
@worker.task
async def process_signup(email: str):
    await user_registered.send(user_id="user-1", email=email)

Typed events

When you decorate a handler, the returned object is a TypedEvent with your handler’s parameter types. This gives you IDE autocomplete on .send():
@order_placed.on
async def send_confirmation(order_id: str, user_id: str):
    ...

# send_confirmation.send() knows it needs order_id and user_id
await send_confirmation.send(order_id="123", user_id="user-1")
You can also use the untyped event handle directly to send to all handlers:
await order_placed.send(order_id="123", user_id="user-1")

Next: APIs

Build HTTP endpoints powered by FastAPI.