This commit is contained in:
2026-05-31 16:48:41 -04:00
parent 9ce89325fd
commit 53bac1976a

View File

@@ -559,13 +559,20 @@ async function executeBatchParallel(
requestBatchRender(); requestBatchRender();
}, 100); }, 100);
const results: Array<{ task: Task; result: Promise<any> }> = []; // Semaphore-based concurrency control:
// Start up to maxParallel tasks immediately. When ANY task completes,
// start the next pending task. This ensures slots fill as soon as they
// open, instead of blocking on the oldest task (FIFO pattern).
const pending = [...tasks];
const running = new Set<Promise<void>>();
for (const task of tasks) { /** Start the next pending task if a slot is available. */
const kick = (): void => {
while (running.size < maxParallel && pending.length > 0) {
const task = pending.shift()!;
const assignedModel = roundRobin?.assign(task.id); const assignedModel = roundRobin?.assign(task.id);
results.push({
task, const p = executeTask(
result: executeTask(
task, task,
project, project,
config, config,
@@ -577,13 +584,15 @@ async function executeBatchParallel(
assignedModel, assignedModel,
roundRobin, roundRobin,
requestBatchRender, requestBatchRender,
).catch((error) => { )
.catch((error) => {
// Safety net: one task failure should never crash the batch. // Safety net: one task failure should never crash the batch.
// executeTask already marks failed and notifies, but catch as // executeTask already marks failed and notifies, but catch as
// a last resort so the error doesn't propagate and crash pi. // a last resort so the error doesn't propagate and crash pi.
roundRobin?.release(task.id); roundRobin?.release(task.id);
requestBatchRender(); requestBatchRender();
const errorMsg = error instanceof Error ? error.message : String(error); const errorMsg =
error instanceof Error ? error.message : String(error);
progress.markFailed(task.id, errorMsg); progress.markFailed(task.id, errorMsg);
// Auto-update the PRD source file checkbox // Auto-update the PRD source file checkbox
try { try {
@@ -593,19 +602,25 @@ async function executeBatchParallel(
} }
sendChatMessage?.(`${task.id} · ${task.title}${errorMsg}`); sendChatMessage?.(`${task.id} · ${task.title}${errorMsg}`);
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
}), })
.finally(() => {
// Remove from running set and start next pending task
running.delete(p);
requestBatchRender();
kick();
}); });
// Limit concurrency running.add(p);
if (results.length >= maxParallel) {
const first = results.shift();
if (first) await first.result;
}
} }
};
// Wait for remaining tasks // Kick off initial batch of tasks (up to maxParallel)
for (const { result } of results) { kick();
await result;
// Wait for all tasks to complete (kick() adds new promises to `running`
// when completed tasks free up slots, so we iterate until the set is empty).
while (running.size > 0) {
await Promise.race(running);
} }
clearInterval(spinnerTimer); clearInterval(spinnerTimer);