fix width exceeding, release prep
This commit is contained in:
149
src/executor.ts
149
src/executor.ts
@@ -1,3 +1,4 @@
|
||||
import { truncateToWidth } from "@earendil-works/pi-tui";
|
||||
import * as path from "node:path";
|
||||
import type { Task, Project, Reflection, ToolUsage } from "./types";
|
||||
import type { RalpiConfig } from "./types";
|
||||
@@ -83,6 +84,27 @@ class ModelRoundRobin {
|
||||
this.assignments.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance a task to the next model slot without going through freed slots.
|
||||
* Used for model failover — when the current model is down, skip to the
|
||||
* next one instead of re-assigning the same freed index.
|
||||
*/
|
||||
advance(taskId: string): unknown {
|
||||
const currentIndex = this.assignments.get(taskId);
|
||||
if (currentIndex === undefined) {
|
||||
// No current assignment — fresh assign (fallback, shouldn't happen)
|
||||
return this.assign(taskId);
|
||||
}
|
||||
// If this index was freed (e.g. from an earlier release call that raced),
|
||||
// remove it from freeSlots so it's not handed out to another task.
|
||||
const freeIdx = this.freeSlots.indexOf(currentIndex);
|
||||
if (freeIdx !== -1) this.freeSlots.splice(freeIdx, 1);
|
||||
// Advance to the next index (circular)
|
||||
const nextIndex = (currentIndex + 1) % this.models.length;
|
||||
this.assignments.set(taskId, nextIndex);
|
||||
return this.models[nextIndex];
|
||||
}
|
||||
}
|
||||
|
||||
/** Shared state for parallel-batch widget. Each running task writes its
|
||||
@@ -162,9 +184,13 @@ export async function runTask(
|
||||
} else {
|
||||
// Build widget lines from current state. Live widgets can't expand/collapse
|
||||
// like chat messages, so we always truncate to MAX_COLLAPSED recent calls.
|
||||
const buildLines = (t: typeof ctx.ui.theme): string[] => {
|
||||
const truncateWidth = 74; // Account for widget container padding
|
||||
const buildLines = (t: typeof ctx.ui.theme, width?: number): string[] => {
|
||||
const effectiveWidth = width
|
||||
? Math.min(width, truncateWidth)
|
||||
: truncateWidth;
|
||||
const frame = t.fg("accent", SPINNER_FRAMES[frameIndex]);
|
||||
const lines = [`${frame} ${taskHeader}`];
|
||||
const lines = [truncateToWidth(`${frame} ${taskHeader}`, effectiveWidth)];
|
||||
|
||||
if (toolCalls.length > 0) {
|
||||
if (toolCalls.length <= MAX_COLLAPSED) {
|
||||
@@ -173,18 +199,27 @@ export async function runTask(
|
||||
const isLast = i === toolCalls.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${entry.name}]`);
|
||||
lines.push(`${branch}${tag} ${entry.label}`);
|
||||
lines.push(
|
||||
truncateToWidth(`${branch}${tag} ${entry.label}`, effectiveWidth),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const shown = toolCalls.slice(-MAX_COLLAPSED);
|
||||
const remaining = toolCalls.length - shown.length;
|
||||
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
|
||||
lines.push(
|
||||
truncateToWidth(
|
||||
t.fg("dim", ` ├── …${remaining} earlier`),
|
||||
effectiveWidth,
|
||||
),
|
||||
);
|
||||
for (let i = 0; i < shown.length; i++) {
|
||||
const entry = shown[i];
|
||||
const isLast = i === shown.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${entry.name}]`);
|
||||
lines.push(`${branch}${tag} ${entry.label}`);
|
||||
lines.push(
|
||||
truncateToWidth(`${branch}${tag} ${entry.label}`, effectiveWidth),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,7 +229,7 @@ export async function runTask(
|
||||
ctx.ui.setWidget(widgetKey, (tui, t) => {
|
||||
widgetTui = tui;
|
||||
return {
|
||||
render: () => buildLines(t),
|
||||
render: (width?: number) => buildLines(t, width),
|
||||
invalidate: () => widgetTui?.requestRender(),
|
||||
};
|
||||
});
|
||||
@@ -378,19 +413,29 @@ export async function executeBatch(
|
||||
|
||||
// Execute sequentially
|
||||
for (const task of tasks) {
|
||||
const model = roundRobin?.assign(task.id);
|
||||
await executeTask(
|
||||
task,
|
||||
project,
|
||||
config,
|
||||
progress,
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
undefined,
|
||||
model,
|
||||
roundRobin,
|
||||
);
|
||||
try {
|
||||
const model = roundRobin?.assign(task.id);
|
||||
await executeTask(
|
||||
task,
|
||||
project,
|
||||
config,
|
||||
progress,
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
undefined,
|
||||
model,
|
||||
roundRobin,
|
||||
);
|
||||
} catch (error) {
|
||||
// Task failed — stop the batch. Dependent tasks are blocked by
|
||||
// the DAG layer (getBlockedTasks) so they won't appear in this batch.
|
||||
roundRobin?.release(task.id);
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
progress.markFailed(task.id, errorMsg);
|
||||
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,7 +459,11 @@ async function executeBatchParallel(
|
||||
const widgetKey = `ralpi-parallel-${Date.now()}`;
|
||||
let widgetTui: { requestRender(): void } | null = null;
|
||||
|
||||
const buildBatchLines = (t: typeof ctx.ui.theme): string[] => {
|
||||
const buildBatchLines = (
|
||||
t: typeof ctx.ui.theme,
|
||||
width?: number,
|
||||
): string[] => {
|
||||
const effectiveWidth = width || 74;
|
||||
const lines: string[] = [];
|
||||
const sortedIds = Array.from(sharedState.keys()).sort();
|
||||
|
||||
@@ -425,7 +474,9 @@ async function executeBatchParallel(
|
||||
? "✓"
|
||||
: "✗"
|
||||
: t.fg("accent", SPINNER_FRAMES[entry.frameIndex]);
|
||||
lines.push(`${frame} ${entry.taskHeader}`);
|
||||
lines.push(
|
||||
truncateToWidth(`${frame} ${entry.taskHeader}`, effectiveWidth),
|
||||
);
|
||||
|
||||
if (entry.toolCalls.length > 0) {
|
||||
if (entry.toolCalls.length <= MAX_COLLAPSED) {
|
||||
@@ -434,18 +485,27 @@ async function executeBatchParallel(
|
||||
const isLast = i === entry.toolCalls.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${tc.name}]`);
|
||||
lines.push(`${branch}${tag} ${tc.label}`);
|
||||
lines.push(
|
||||
truncateToWidth(`${branch}${tag} ${tc.label}`, effectiveWidth),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const shown = entry.toolCalls.slice(-MAX_COLLAPSED);
|
||||
const remaining = entry.toolCalls.length - shown.length;
|
||||
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
|
||||
lines.push(
|
||||
truncateToWidth(
|
||||
t.fg("dim", ` ├── …${remaining} earlier`),
|
||||
effectiveWidth,
|
||||
),
|
||||
);
|
||||
for (let i = 0; i < shown.length; i++) {
|
||||
const tc = shown[i];
|
||||
const isLast = i === shown.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${tc.name}]`);
|
||||
lines.push(`${branch}${tag} ${tc.label}`);
|
||||
lines.push(
|
||||
truncateToWidth(`${branch}${tag} ${tc.label}`, effectiveWidth),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -456,7 +516,7 @@ async function executeBatchParallel(
|
||||
ctx.ui.setWidget(widgetKey, (tui, t) => {
|
||||
widgetTui = tui;
|
||||
return {
|
||||
render: () => buildBatchLines(t),
|
||||
render: (width?: number) => buildBatchLines(t, width),
|
||||
invalidate: () => widgetTui?.requestRender(),
|
||||
};
|
||||
});
|
||||
@@ -488,7 +548,15 @@ async function executeBatchParallel(
|
||||
sharedState,
|
||||
assignedModel,
|
||||
roundRobin,
|
||||
),
|
||||
).catch((error) => {
|
||||
// Safety net: one task failure should never crash the batch.
|
||||
// executeTask already marks failed and notifies, but catch as
|
||||
// a last resort so the error doesn't propagate and crash pi.
|
||||
roundRobin?.release(task.id);
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
progress.markFailed(task.id, errorMsg);
|
||||
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
|
||||
}),
|
||||
});
|
||||
|
||||
// Limit concurrency
|
||||
@@ -531,9 +599,11 @@ async function executeTask(
|
||||
let currentModel: unknown = assignedModel ?? config.model;
|
||||
|
||||
while (modelAttempt < maxModelAttempts) {
|
||||
// Get the next model from round-robin (on first try, use the pre-assigned model)
|
||||
// On subsequent model attempts, advance to the next model.
|
||||
// Uses advance() instead of assign() so we don't get stuck on
|
||||
// the same freed slot when the current model is down.
|
||||
if (modelAttempt > 0 && roundRobin) {
|
||||
currentModel = roundRobin.assign(task.id);
|
||||
currentModel = roundRobin.advance(task.id);
|
||||
}
|
||||
|
||||
let retries = 0;
|
||||
@@ -584,7 +654,9 @@ async function executeTask(
|
||||
// Agent session failed (provider error).
|
||||
// If we have more models, cycle immediately — don't waste retries.
|
||||
if (roundRobin && modelAttempt < maxModelAttempts - 1) {
|
||||
roundRobin.release(task.id);
|
||||
// Don't release — advance() already handles the transition.
|
||||
// release() would put the slot in freeSlots, then assign()
|
||||
// would pick it right back up, getting stuck on the same model.
|
||||
modelAttempt++;
|
||||
ctx.ui.notify(
|
||||
`Task ${task.id}: model failed, trying next (${modelAttempt + 1}/${maxModelAttempts}): ${result.error}`,
|
||||
@@ -607,13 +679,20 @@ async function executeTask(
|
||||
} else {
|
||||
// Max retries exceeded
|
||||
progress.markFailed(task.id, result.error || "Unknown error");
|
||||
throw new Error(`Task ${task.id} failed: ${result.error}`);
|
||||
ctx.ui.notify(
|
||||
`Task ${task.id} failed after ${maxRetries} retries: ${
|
||||
result.error || "Unknown error"
|
||||
}`,
|
||||
"error",
|
||||
);
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
roundRobin?.release(task.id);
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
progress.markFailed(task.id, errorMsg);
|
||||
throw error;
|
||||
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,9 +700,13 @@ async function executeTask(
|
||||
modelAttempt++;
|
||||
}
|
||||
|
||||
// All models exhausted
|
||||
// All models exhausted — release the slot
|
||||
roundRobin?.release(task.id);
|
||||
progress.markFailed(task.id, "All configured models exhausted");
|
||||
throw new Error(`Task ${task.id} failed: all configured models exhausted`);
|
||||
ctx.ui.notify(
|
||||
`Task ${task.id} failed: all configured models exhausted`,
|
||||
"error",
|
||||
);
|
||||
}
|
||||
|
||||
// ─── Save Reflection to File ────────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user