@worker.task
async def step(segment: int, step: int) -> dict:
ctx = upnext.get_current_context()
ctx.set_progress(100, f"Segment {segment}, step {step} done")
return {"segment": segment, "step": step}
@worker.task
async def segment(segment: int, steps: int = 3) -> dict:
ctx = upnext.get_current_context()
completed = []
for i in range(1, steps + 1):
ctx.set_progress(i / steps * 100, f"Step {i}/{steps}")
result = await step.wait(segment=segment, step=i)
completed.append(result.value)
return {"segment": segment, "steps": completed}
@worker.task
async def flow(segments: int = 3) -> dict:
ctx = upnext.get_current_context()
results = []
for i in range(1, segments + 1):
ctx.set_progress(i / segments * 100, f"Segment {i}/{segments}")
result = await segment.wait(segment=i)
results.append(result.value)
return {"segments": results}