From 53bac1976a23ad6b8443182fbfa8558e7e7fb87e Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Sun, 31 May 2026 16:48:41 -0400 Subject: [PATCH] kick! --- src/executor.ts | 79 +++++++++++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/src/executor.ts b/src/executor.ts index 2697e28..37071c4 100644 --- a/src/executor.ts +++ b/src/executor.ts @@ -559,13 +559,20 @@ async function executeBatchParallel( requestBatchRender(); }, 100); - const results: Array<{ task: Task; result: Promise }> = []; + // Semaphore-based concurrency control: + // Start up to maxParallel tasks immediately. When ANY task completes, + // start the next pending task. This ensures slots fill as soon as they + // open, instead of blocking on the oldest task (FIFO pattern). + const pending = [...tasks]; + const running = new Set>(); - for (const task of tasks) { - const assignedModel = roundRobin?.assign(task.id); - results.push({ - task, - result: executeTask( + /** Start the next pending task if a slot is available. */ + const kick = (): void => { + while (running.size < maxParallel && pending.length > 0) { + const task = pending.shift()!; + const assignedModel = roundRobin?.assign(task.id); + + const p = executeTask( task, project, config, @@ -577,35 +584,43 @@ async function executeBatchParallel( assignedModel, roundRobin, requestBatchRender, - ).catch((error) => { - // Safety net: one task failure should never crash the batch. - // executeTask already marks failed and notifies, but catch as - // a last resort so the error doesn't propagate and crash pi. - roundRobin?.release(task.id); - requestBatchRender(); - const errorMsg = error instanceof Error ? error.message : String(error); - progress.markFailed(task.id, errorMsg); - // Auto-update the PRD source file checkbox - try { - updateTaskInFile(project.sourcePath, task.id, "failed"); - } catch { - // Best-effort - } - sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${errorMsg}`); - ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); - }), - }); + ) + .catch((error) => { + // Safety net: one task failure should never crash the batch. + // executeTask already marks failed and notifies, but catch as + // a last resort so the error doesn't propagate and crash pi. + roundRobin?.release(task.id); + requestBatchRender(); + const errorMsg = + error instanceof Error ? error.message : String(error); + progress.markFailed(task.id, errorMsg); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "failed"); + } catch { + // Best-effort + } + sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${errorMsg}`); + ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); + }) + .finally(() => { + // Remove from running set and start next pending task + running.delete(p); + requestBatchRender(); + kick(); + }); - // Limit concurrency - if (results.length >= maxParallel) { - const first = results.shift(); - if (first) await first.result; + running.add(p); } - } + }; - // Wait for remaining tasks - for (const { result } of results) { - await result; + // Kick off initial batch of tasks (up to maxParallel) + kick(); + + // Wait for all tasks to complete (kick() adds new promises to `running` + // when completed tasks free up slots, so we iterate until the set is empty). + while (running.size > 0) { + await Promise.race(running); } clearInterval(spinnerTimer);