Compare commits

...

4 Commits

Author SHA1 Message Date
ab1e2eb430 inform on max configuration 2026-05-31 02:12:54 -04:00
d2ef124369 automatic failover 2026-05-31 02:01:37 -04:00
925e37938b round robin 2026-05-31 01:57:52 -04:00
8e2e24d0e3 depend on pi defaults, and global yaml 2026-05-31 01:35:52 -04:00
6 changed files with 766 additions and 410 deletions

View File

@@ -98,16 +98,46 @@ tasks:
## Configuration ## Configuration
Create `.ralpi/config.yaml`: Create config files. Both are optional:
| Scope | Path |
|-------|------|
| **Global** | `~/.pi/ralpi/config.yaml` |
| **Project** | `./.ralpi/config.yaml` |
```yaml ```yaml
maxRetries: 3 execution:
retryDelayMs: 5000 maxParallel: 3 # ralpi-level concurrency only
timeoutMs: 1800000 models: # round-robin in <provider>/<model> format
maxParallel: 3 - google/gemini-3.5-flash # 1st and 3rd task in parallel
- openai/gpt-5.5 # 2nd task in parallel
prompts:
projectContext: "Additional context for all tasks" projectContext: "Additional context for all tasks"
``` ```
> ralpi deliberately does **not** set timeouts or retries — those are inherited
> from Pi's own settings. Tasks run until they complete or Pi's own flow stops them.
>
> `execution.models` uses slot-aware round-robin: with 3 models and 2 concurrent
> tasks, only the first two models are used. The third model is only touched when
> a third concurrent task starts. Freed model slots are reused before new ones
> are allocated.
>
> **Automatic failover**: if a provider/API is unreachable (rate limit, 503, etc.),
> the task automatically cycles to the next model in the list without counting it
> as a task failure. Each model is tried once before the task is marked as failed.
The keys mirror the nested structure of `RalpiConfig` in `src/types.ts`.
### Precedence (highest wins)
| Priority | Source |
|----------|--------|
| **1st** | In-memory overrides (`model`, `thinkingLevel` from parent Pi session) |
| **2nd** | `./.ralpi/config.yaml` — project-level |
| **3rd** | `~/.pi/ralpi/config.yaml` — global, shared across projects |
| **4th** | `DEFAULT_CONFIG` in `src/types.ts` |
### Task-Level Timeout ### Task-Level Timeout
You can set a timeout for individual tasks using a meta block in the task file: You can set a timeout for individual tasks using a meta block in the task file:

View File

@@ -66,9 +66,10 @@ async function selectExecutionMode(
ctx: ExtensionContext, ctx: ExtensionContext,
project: import("./src/types").Project, project: import("./src/types").Project,
taskFile: string, taskFile: string,
config: import("./src/types").RalpiConfig,
): Promise<ExecutionMode> { ): Promise<ExecutionMode> {
const mode = await ctx.ui.select("Execution mode for this run?", [ const mode = await ctx.ui.select("Execution mode for this run?", [
"Parallel (where dependencies allow)", `Parallel (where dependencies allow)-[${config.execution.maxParallel} max]`,
"Sequential (one at a time)", "Sequential (one at a time)",
]); ]);
const isParallel = mode?.startsWith("Parallel") ?? false; const isParallel = mode?.startsWith("Parallel") ?? false;
@@ -117,7 +118,7 @@ async function executePlanBatches(
plan: ReturnType<typeof buildPlanByMode>, plan: ReturnType<typeof buildPlanByMode>,
project: Parameters<typeof buildExecutionPlan>[0], project: Parameters<typeof buildExecutionPlan>[0],
taskFile: string, taskFile: string,
config: import("./src/types").ralpiConfig, config: import("./src/types").RalpiConfig,
progress: ProgressTracker, progress: ProgressTracker,
ctx: ExtensionContext, ctx: ExtensionContext,
mode: ExecutionMode, mode: ExecutionMode,
@@ -220,6 +221,16 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
}, },
); );
// Register the extension's prompts/ directory so Pi discovers @task-manager
pi.on("resources_discover", async (_event, _ctx) => {
const promptsDir = fs.existsSync(path.resolve(__dirname, "prompts"))
? path.resolve(__dirname, "prompts")
: path.resolve(__dirname, "..", "prompts");
return {
promptPaths: [promptsDir],
};
});
pi.registerCommand("ralpi", { pi.registerCommand("ralpi", {
description: description:
"Execute tasks from a task file using DAG-based dependency resolution", "Execute tasks from a task file using DAG-based dependency resolution",
@@ -248,21 +259,45 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
return handlePlan(ctx, parts); return handlePlan(ctx, parts);
} }
if (looksLikePath(parts[0])) { if (looksLikePath(parts[0])) {
return handleRun(ctx, parts, sendProgress); return handleRun(
ctx,
parts,
sendProgress,
ctx.model,
pi.getThinkingLevel(),
);
} }
const command = parts[0]; const command = parts[0];
switch (command) { switch (command) {
case "run": case "run":
return handleRun(ctx, parts.slice(1), sendProgress); return handleRun(
ctx,
parts.slice(1),
sendProgress,
ctx.model,
pi.getThinkingLevel(),
);
case "plan": case "plan":
return handlePlan(ctx, parts.slice(1)); return handlePlan(ctx, parts.slice(1));
case "status": case "status":
return handleStatus(ctx, parts.slice(1)); return handleStatus(ctx, parts.slice(1));
case "resume": case "resume":
return handleResume(ctx, parts.slice(1), sendProgress); return handleResume(
ctx,
parts.slice(1),
sendProgress,
ctx.model,
pi.getThinkingLevel(),
);
case "next": case "next":
return handleNext(ctx, parts.slice(1), sendProgress); return handleNext(
ctx,
parts.slice(1),
sendProgress,
ctx.model,
pi.getThinkingLevel(),
);
case "reset": case "reset":
return handleReset(ctx, parts.slice(1)); return handleReset(ctx, parts.slice(1));
default: { default: {
@@ -316,6 +351,8 @@ async function handleRun(
ctx: ExtensionContext, ctx: ExtensionContext,
args: string[], args: string[],
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
parentModel?: unknown,
parentThinkingLevel?: unknown,
): Promise<void> { ): Promise<void> {
const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd()); const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd());
@@ -323,7 +360,13 @@ async function handleRun(
// auto-resume instead of starting fresh // auto-resume instead of starting fresh
const existingProgress = findProgressFile(process.cwd(), taskFile); const existingProgress = findProgressFile(process.cwd(), taskFile);
if (existingProgress) { if (existingProgress) {
return handleResume(ctx, args.slice(0, 1), sendChatMessage); return handleResume(
ctx,
args.slice(0, 1),
sendChatMessage,
parentModel,
parentThinkingLevel,
);
} }
// No existing progress for this task — check for any progress at all // No existing progress for this task — check for any progress at all
@@ -336,7 +379,13 @@ async function handleRun(
); );
if (shouldResume?.startsWith("Yes")) { if (shouldResume?.startsWith("Yes")) {
return handleResume(ctx, [], sendChatMessage); return handleResume(
ctx,
[],
sendChatMessage,
parentModel,
parentThinkingLevel,
);
} }
} }
@@ -346,16 +395,12 @@ async function handleRun(
const project = parseTaskFile(taskFile); const project = parseTaskFile(taskFile);
const config = loadConfig(projectDir); const config = loadConfig(projectDir);
config.model = parentModel ?? ctx.model;
config.thinkingLevel = parentThinkingLevel;
const progress = new ProgressTracker(projectDir, taskFile); const progress = new ProgressTracker(projectDir, taskFile);
// Set initial status
ctx.ui.setStatus(
"ralpi",
`Starting ${project.tasks.length} tasks from ${path.basename(taskFile)}`,
);
const completed = buildCompletedSet(progress, project); const completed = buildCompletedSet(progress, project);
const mode = await selectExecutionMode(ctx, project, taskFile); const mode = await selectExecutionMode(ctx, project, taskFile, config);
const plan = buildPlanByMode(mode, project, completed); const plan = buildPlanByMode(mode, project, completed);
// Show execution plan before starting so user can see batch breakdown // Show execution plan before starting so user can see batch breakdown
@@ -434,6 +479,8 @@ async function handleResume(
ctx: ExtensionContext, ctx: ExtensionContext,
args: string[], args: string[],
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
parentModel?: unknown,
parentThinkingLevel?: unknown,
): Promise<void> { ): Promise<void> {
let taskFile: string; let taskFile: string;
let projectDir: string; let projectDir: string;
@@ -473,15 +520,14 @@ async function handleResume(
); );
} }
const config = loadConfig(projectDir); const config = loadConfig(projectDir);
config.model = parentModel ?? ctx.model;
config.thinkingLevel = parentThinkingLevel;
const progress = new ProgressTracker(projectDir, taskFile, found.prdKey); const progress = new ProgressTracker(projectDir, taskFile, found.prdKey);
progress.setPaused(false); progress.setPaused(false);
// Set resume status
ctx.ui.setStatus("ralpi", `Resuming from ${path.basename(taskFile)}`);
const completed = buildCompletedSet(progress, project); const completed = buildCompletedSet(progress, project);
const mode = await selectExecutionMode(ctx, project, taskFile); const mode = await selectExecutionMode(ctx, project, taskFile, config);
const plan = buildPlanByMode(mode, project, completed); const plan = buildPlanByMode(mode, project, completed);
await executePlanBatches( await executePlanBatches(
@@ -505,6 +551,8 @@ async function handleNext(
ctx: ExtensionContext, ctx: ExtensionContext,
args: string[], args: string[],
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
parentModel?: unknown,
parentThinkingLevel?: unknown,
): Promise<void> { ): Promise<void> {
let taskFile: string; let taskFile: string;
let projectDir: string; let projectDir: string;
@@ -540,6 +588,8 @@ async function handleNext(
); );
} }
const config = loadConfig(projectDir); const config = loadConfig(projectDir);
config.model = parentModel ?? ctx.model;
config.thinkingLevel = parentThinkingLevel;
const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey); const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey);
const completed = buildCompletedSet(progress, project); const completed = buildCompletedSet(progress, project);

View File

@@ -1,7 +1,7 @@
{ {
"name": "ralpi-loop", "name": "ralpi",
"version": "1.0.0", "version": "0.1.0",
"description": "Execute tasks from task files using DAG-based dependency resolution with persistent progress tracking", "description": "Execute tasks from task files/PRD's using DAG-based dependency resolution with persistent progress tracking",
"main": "dist/index.js", "main": "dist/index.js",
"keywords": [ "keywords": [
"pi-package", "pi-package",

View File

@@ -5,7 +5,6 @@ import type { ProgressTracker } from "./progress";
import type { ExtensionContext } from "@earendil-works/pi-coding-agent"; import type { ExtensionContext } from "@earendil-works/pi-coding-agent";
import { buildTaskPrompt } from "./prompts"; import { buildTaskPrompt } from "./prompts";
import { extractReflection } from "./reflection"; import { extractReflection } from "./reflection";
import { WidgetBatcher } from "./widget-batcher";
import { import {
runAgentSession, runAgentSession,
writeFileSafe, writeFileSafe,
@@ -26,6 +25,78 @@ export interface ToolCallEntry {
label: string; label: string;
} }
// ─── Widget Expand/Collapse ───────────────────────────────────────────────
/** Max tool calls shown in a live widget before truncating. Widgets don't
* support message-style Ctrl+O expansion (that's only for chat-history
* messages rendered by registerMessageRenderer). */
const MAX_COLLAPSED = 3;
const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
// ─── Model Round-Robin ─────────────────────────────────────────────────────
/**
* Round-robin model assignment with slot reuse.
*
* With models [A, B, C] and 2 concurrent tasks, only A and B are used.
* Model C is only touched when a third concurrent task starts.
* Freed slots are reused before new slots are allocated.
*/
class ModelRoundRobin {
private models: unknown[];
private freeSlots: number[];
private nextIndex = 0;
private assignments = new Map<string, number>();
constructor(models: unknown[]) {
this.models = models;
this.freeSlots = [];
}
get length(): number {
return this.models.length;
}
assign(taskId: string): unknown {
let index: number;
if (this.freeSlots.length > 0) {
// Reuse a freed model slot first
index = this.freeSlots.shift()!;
} else if (this.nextIndex < this.models.length) {
// Allocate a new slot
index = this.nextIndex++;
} else {
// All models in use — wrap around
index = this.nextIndex % this.models.length;
this.nextIndex++;
}
this.assignments.set(taskId, index);
return this.models[index];
}
release(taskId: string): void {
const index = this.assignments.get(taskId);
if (index !== undefined) {
this.freeSlots.push(index);
this.freeSlots.sort((a, b) => a - b);
this.assignments.delete(taskId);
}
}
}
/** Shared state for parallel-batch widget. Each running task writes its
* tool calls and spinner frame; the batch widget reads them in task-ID order. */
interface ParallelWidgetEntry {
taskHeader: string;
frameIndex: number;
done: boolean;
success: boolean;
toolCalls: ToolCallEntry[];
}
type ParallelWidgetState = Map<string, ParallelWidgetEntry>;
// ─── Run Single Task ──────────────────────────────────────────────────────── // ─── Run Single Task ────────────────────────────────────────────────────────
/** /**
@@ -40,7 +111,8 @@ export async function runTask(
ctx: ExtensionContext, ctx: ExtensionContext,
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
projectDir: string = project.sourceDir, projectDir: string = project.sourceDir,
batcher?: WidgetBatcher, parallelState?: ParallelWidgetState,
assignedModel?: unknown,
): Promise<{ ): Promise<{
success: boolean; success: boolean;
reflection?: Reflection; reflection?: Reflection;
@@ -68,59 +140,76 @@ export async function runTask(
const promptFile = path.join(ralpiDir, `prompt-${startMs}.md`); const promptFile = path.join(ralpiDir, `prompt-${startMs}.md`);
writeFileSafe(promptFile, prompt); writeFileSafe(promptFile, prompt);
// Footer shows just the task title (no batch prefix)
ctx.ui.setStatus("ralpi", task.title);
const taskHeader = `${task.id} · ${task.title}`; const taskHeader = `${task.id} · ${task.title}`;
// Live progress widget above the editor — animated spinner + tool call tree // When running in parallel, all tasks share a single widget so ordering
// Using setWidget instead of setWorkingMessage because the working message area // is deterministic (sorted by task ID). In sequential mode each task gets
// is only visible during parent agent streaming, not during extension command execution. // its own widget.
// Widget key is unique per task so parallel tasks each get their own widget. const isParallel = !!parallelState;
const widgetKey = `ralpi-task-${task.id}`; const widgetKey = `ralpi-task-${task.id}`;
const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
let frameIndex = 0; let frameIndex = 0;
const theme = ctx.ui.theme;
const MAX_COLLAPSED = 3;
const toolCalls: ToolCallEntry[] = []; const toolCalls: ToolCallEntry[] = [];
let widgetTui: { requestRender(): void } | null = null;
const updateWidget = () => { if (isParallel) {
const frame = theme.fg("accent", SPINNER_FRAMES[frameIndex]); parallelState!.set(task.id, {
taskHeader,
frameIndex: 0,
done: false,
success: false,
toolCalls: [],
});
} else {
// Build widget lines from current state. Live widgets can't expand/collapse
// like chat messages, so we always truncate to MAX_COLLAPSED recent calls.
const buildLines = (t: typeof ctx.ui.theme): string[] => {
const frame = t.fg("accent", SPINNER_FRAMES[frameIndex]);
const lines = [`${frame} ${taskHeader}`]; const lines = [`${frame} ${taskHeader}`];
if (toolCalls.length > 0) { if (toolCalls.length > 0) {
if (toolCalls.length <= MAX_COLLAPSED) {
for (let i = 0; i < toolCalls.length; i++) {
const entry = toolCalls[i];
const isLast = i === toolCalls.length - 1;
const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${entry.name}]`);
lines.push(`${branch}${tag} ${entry.label}`);
}
} else {
const shown = toolCalls.slice(-MAX_COLLAPSED); const shown = toolCalls.slice(-MAX_COLLAPSED);
const remaining = toolCalls.length - shown.length; const remaining = toolCalls.length - shown.length;
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
if (remaining > 0) {
lines.push(theme.fg("dim", ` ├── ${remaining} more`));
}
for (let i = 0; i < shown.length; i++) { for (let i = 0; i < shown.length; i++) {
const entry = shown[i]; const entry = shown[i];
const isLast = i === shown.length - 1; const isLast = i === shown.length - 1;
const branch = isLast ? " └── " : " ├── "; const branch = isLast ? " └── " : " ├── ";
const tag = theme.fg("accent", `[${entry.name}]`); const tag = t.fg("accent", `[${entry.name}]`);
lines.push(`${branch}${tag} ${entry.label}`); lines.push(`${branch}${tag} ${entry.label}`);
} }
} }
if (batcher) {
batcher.schedule(widgetKey, lines);
} else {
ctx.ui.setWidget(widgetKey, lines);
} }
return lines;
}; };
// Smooth spinner animation at 100ms intervals ctx.ui.setWidget(widgetKey, (tui, t) => {
const spinnerTimer = setInterval(() => { widgetTui = tui;
frameIndex = (frameIndex + 1) % SPINNER_FRAMES.length; return {
updateWidget(); render: () => buildLines(t),
}, 100); invalidate: () => widgetTui?.requestRender(),
};
});
}
// Initial display const requestRender = () => widgetTui?.requestRender();
updateWidget();
// Spinner animation (sequential only — parallel uses a single batch timer)
let spinnerTimer: NodeJS.Timeout | undefined;
if (!isParallel) {
spinnerTimer = setInterval(() => {
frameIndex = (frameIndex + 1) % SPINNER_FRAMES.length;
requestRender();
}, 100);
}
// Use task-level timeout if set, otherwise fall back to config // Use task-level timeout if set, otherwise fall back to config
const timeoutMs = task.timeoutMs ?? config.execution.timeoutMs; const timeoutMs = task.timeoutMs ?? config.execution.timeoutMs;
@@ -142,23 +231,34 @@ export async function runTask(
name: event.toolName, name: event.toolName,
label, label,
}); });
updateWidget(); if (isParallel) {
const entry = parallelState!.get(task.id);
if (entry) {
entry.toolCalls.push({ name: event.toolName, label });
}
}
requestRender();
} }
}, },
undefined, // no abort signal undefined, // no abort signal
sessionFilePath, // stream events to file sessionFilePath, // stream events to file
assignedModel ?? config.model,
config.thinkingLevel,
); );
const durationMs = Date.now() - startMs; const durationMs = Date.now() - startMs;
// Clear progress widget and status after task finishes // Clear progress widget and status after task finishes
clearInterval(spinnerTimer); if (spinnerTimer) clearInterval(spinnerTimer);
if (batcher) { if (isParallel) {
batcher.scheduleRemove(widgetKey); const entry = parallelState!.get(task.id);
if (entry) {
entry.done = true;
entry.success = output.success;
}
} else { } else {
ctx.ui.setWidget(widgetKey, undefined); ctx.ui.setWidget(widgetKey, undefined);
} }
ctx.ui.setStatus("ralpi", undefined);
if (!output.success) { if (!output.success) {
sendChatMessage?.(`${taskHeader}${output.error}`); sendChatMessage?.(`${taskHeader}${output.error}`);
@@ -227,6 +327,37 @@ export async function executeBatch(
); );
} }
// Set up model round-robin if configured.
// Config entries are "<provider>/<model>" strings — resolve via modelRegistry.
let roundRobin: ModelRoundRobin | null = null;
if (config.execution.models.length > 0) {
const resolvedModels: unknown[] = [];
for (const entry of config.execution.models) {
const slashIdx = entry.indexOf("/");
if (slashIdx === -1) {
ctx.ui.notify(
`ralpi config: skipping model "${entry}" — expected <provider>/<model> format`,
"warning",
);
continue;
}
const provider = entry.slice(0, slashIdx);
const modelId = entry.slice(slashIdx + 1);
const resolved = ctx.modelRegistry?.find(provider, modelId);
if (resolved) {
resolvedModels.push(resolved);
} else {
ctx.ui.notify(
`ralpi config: model "${entry}" not found in registry — skipping`,
"warning",
);
}
}
if (resolvedModels.length > 0) {
roundRobin = new ModelRoundRobin(resolvedModels);
}
}
// Check if we should run parallel // Check if we should run parallel
const shouldParallel = const shouldParallel =
options?.parallel && tasks.length > 1 && config.execution.maxParallel > 0; options?.parallel && tasks.length > 1 && config.execution.maxParallel > 0;
@@ -240,12 +371,14 @@ export async function executeBatch(
ctx, ctx,
sendChatMessage, sendChatMessage,
projectDir, projectDir,
roundRobin,
); );
return; return;
} }
// Execute sequentially // Execute sequentially
for (const task of tasks) { for (const task of tasks) {
const model = roundRobin?.assign(task.id);
await executeTask( await executeTask(
task, task,
project, project,
@@ -254,6 +387,9 @@ export async function executeBatch(
ctx, ctx,
sendChatMessage, sendChatMessage,
projectDir, projectDir,
undefined,
model,
roundRobin,
); );
} }
} }
@@ -269,12 +405,76 @@ async function executeBatchParallel(
ctx: ExtensionContext, ctx: ExtensionContext,
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
projectDir?: string, projectDir?: string,
roundRobin?: ModelRoundRobin | null,
): Promise<void> { ): Promise<void> {
const maxParallel = config.execution.maxParallel; const maxParallel = config.execution.maxParallel;
const batcher = new WidgetBatcher(ctx); const sharedState: ParallelWidgetState = new Map();
// Register a single batch widget that renders ALL parallel tasks in ID order.
const widgetKey = `ralpi-parallel-${Date.now()}`;
let widgetTui: { requestRender(): void } | null = null;
const buildBatchLines = (t: typeof ctx.ui.theme): string[] => {
const lines: string[] = [];
const sortedIds = Array.from(sharedState.keys()).sort();
for (const id of sortedIds) {
const entry = sharedState.get(id)!;
const frame = entry.done
? entry.success
? "✓"
: "✗"
: t.fg("accent", SPINNER_FRAMES[entry.frameIndex]);
lines.push(`${frame} ${entry.taskHeader}`);
if (entry.toolCalls.length > 0) {
if (entry.toolCalls.length <= MAX_COLLAPSED) {
for (let i = 0; i < entry.toolCalls.length; i++) {
const tc = entry.toolCalls[i];
const isLast = i === entry.toolCalls.length - 1;
const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${tc.name}]`);
lines.push(`${branch}${tag} ${tc.label}`);
}
} else {
const shown = entry.toolCalls.slice(-MAX_COLLAPSED);
const remaining = entry.toolCalls.length - shown.length;
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
for (let i = 0; i < shown.length; i++) {
const tc = shown[i];
const isLast = i === shown.length - 1;
const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${tc.name}]`);
lines.push(`${branch}${tag} ${tc.label}`);
}
}
}
}
return lines;
};
ctx.ui.setWidget(widgetKey, (tui, t) => {
widgetTui = tui;
return {
render: () => buildBatchLines(t),
invalidate: () => widgetTui?.requestRender(),
};
});
// Single spinner timer drives all tasks in the batch
const spinnerTimer = setInterval(() => {
for (const entry of sharedState.values()) {
if (!entry.done) {
entry.frameIndex = (entry.frameIndex + 1) % SPINNER_FRAMES.length;
}
}
widgetTui?.requestRender();
}, 100);
const results: Array<{ task: Task; result: Promise<any> }> = []; const results: Array<{ task: Task; result: Promise<any> }> = [];
for (const task of tasks) { for (const task of tasks) {
const assignedModel = roundRobin?.assign(task.id);
results.push({ results.push({
task, task,
result: executeTask( result: executeTask(
@@ -285,7 +485,9 @@ async function executeBatchParallel(
ctx, ctx,
sendChatMessage, sendChatMessage,
projectDir, projectDir,
batcher, sharedState,
assignedModel,
roundRobin,
), ),
}); });
@@ -301,8 +503,8 @@ async function executeBatchParallel(
await result; await result;
} }
// Flush and stop the batcher after all tasks complete clearInterval(spinnerTimer);
batcher.stop(); ctx.ui.setWidget(widgetKey, undefined);
} }
// ─── Execute Single Task with Retry ────────────────────────────────────────── // ─── Execute Single Task with Retry ──────────────────────────────────────────
@@ -315,11 +517,26 @@ async function executeTask(
ctx: ExtensionContext, ctx: ExtensionContext,
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
projectDir: string = project.sourceDir, projectDir: string = project.sourceDir,
batcher?: WidgetBatcher, parallelState?: ParallelWidgetState,
assignedModel?: unknown,
roundRobin?: ModelRoundRobin | null,
): Promise<void> { ): Promise<void> {
const maxRetries = config.execution.maxRetries; const maxRetries = config.execution.maxRetries;
let retries = 0;
// Model failover: when a provider/API is down, cycle through available models.
// result.success === false always means an agent-session failure (API error,
// provider unreachable, etc.), not a task-work error.
const maxModelAttempts = roundRobin ? roundRobin.length : 1;
let modelAttempt = 0;
let currentModel: unknown = assignedModel ?? config.model;
while (modelAttempt < maxModelAttempts) {
// Get the next model from round-robin (on first try, use the pre-assigned model)
if (modelAttempt > 0 && roundRobin) {
currentModel = roundRobin.assign(task.id);
}
let retries = 0;
while (retries <= maxRetries) { while (retries <= maxRetries) {
try { try {
// Mark as in progress // Mark as in progress
@@ -339,7 +556,8 @@ async function executeTask(
ctx, ctx,
sendChatMessage, sendChatMessage,
projectDir, projectDir,
batcher, parallelState,
currentModel,
); );
if (result.success) { if (result.success) {
@@ -359,10 +577,23 @@ async function executeTask(
result.commitMessages, result.commitMessages,
result.commitSummary, result.commitSummary,
); );
roundRobin?.release(task.id);
return; return;
} }
// Task failed, check if we should retry // Agent session failed (provider error).
// If we have more models, cycle immediately — don't waste retries.
if (roundRobin && modelAttempt < maxModelAttempts - 1) {
roundRobin.release(task.id);
modelAttempt++;
ctx.ui.notify(
`Task ${task.id}: model failed, trying next (${modelAttempt + 1}/${maxModelAttempts}): ${result.error}`,
"warning",
);
break; // exit retry loop, cycle to next model
}
// No more models — use normal retry logic
if (retries < maxRetries) { if (retries < maxRetries) {
retries = progress.incrementRetry(task.id); retries = progress.incrementRetry(task.id);
ctx.ui.notify( ctx.ui.notify(
@@ -379,11 +610,20 @@ async function executeTask(
throw new Error(`Task ${task.id} failed: ${result.error}`); throw new Error(`Task ${task.id} failed: ${result.error}`);
} }
} catch (error) { } catch (error) {
roundRobin?.release(task.id);
const errorMsg = error instanceof Error ? error.message : String(error); const errorMsg = error instanceof Error ? error.message : String(error);
progress.markFailed(task.id, errorMsg); progress.markFailed(task.id, errorMsg);
throw error; throw error;
} }
} }
// If we broke out (model cycling), continue the outer loop
modelAttempt++;
}
// All models exhausted
progress.markFailed(task.id, "All configured models exhausted");
throw new Error(`Task ${task.id} failed: all configured models exhausted`);
} }
// ─── Save Reflection to File ──────────────────────────────────────────────── // ─── Save Reflection to File ────────────────────────────────────────────────

View File

@@ -153,6 +153,8 @@ export interface RalpiConfig {
timeoutMs: number; timeoutMs: number;
/** Maximum parallel tasks (0 = unlimited) */ /** Maximum parallel tasks (0 = unlimited) */
maxParallel: number; maxParallel: number;
/** Round-robin model list for parallel tasks (empty = inherit parent model) */
models: string[];
}; };
prompts: { prompts: {
/** Additional context injected into every task prompt */ /** Additional context injected into every task prompt */
@@ -160,6 +162,10 @@ export interface RalpiConfig {
/** Custom prompt suffix for reflection extraction */ /** Custom prompt suffix for reflection extraction */
reflectionPrompt: string; reflectionPrompt: string;
}; };
/** Parent session model to inherit in child agent sessions */
model?: unknown;
/** Parent session thinking level to inherit in child agent sessions */
thinkingLevel?: unknown;
} }
export const DEFAULT_CONFIG: RalpiConfig = { export const DEFAULT_CONFIG: RalpiConfig = {
@@ -168,10 +174,11 @@ export const DEFAULT_CONFIG: RalpiConfig = {
reflectionsDir: ".ralpi/reflections", reflectionsDir: ".ralpi/reflections",
}, },
execution: { execution: {
maxRetries: 3, maxRetries: 0,
retryDelayMs: 5000, retryDelayMs: 0,
timeoutMs: 30 * 60 * 1000, // 30 minutes timeoutMs: 0, // 0 = inherit Pi's own defaults (no ralpi-level timeout)
maxParallel: 3, maxParallel: 3,
models: [],
}, },
prompts: { prompts: {
projectContext: "", projectContext: "",

View File

@@ -82,32 +82,35 @@ export function findProgressFile(
// ─── Config ────────────────────────────────────────────────────────────────── // ─── Config ──────────────────────────────────────────────────────────────────
function parseSimpleYaml(content: string): Record<string, any> { /** Try to use the `yaml` package (real dependency in package.json).
* Falls back to a flat key:value parser when unavailable. */
const parseSimpleYaml: (content: string) => Record<string, any> = (() => {
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { parse } = require("yaml");
return (content: string) => parse(content) ?? {};
} catch {
return (content: string) => {
const result: Record<string, any> = {}; const result: Record<string, any> = {};
const lines = content.split("\n"); for (const line of content.split("\n")) {
for (const line of lines) {
const trimmed = line.trim(); const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue; if (!trimmed || trimmed.startsWith("#")) continue;
const match = trimmed.match(/^([^:]+):\s*(.*)$/);
const match = trimmed.match(/^([^:]+):\s*(.+)$/);
if (match) { if (match) {
const key = match[1].trim(); const value = match[2].trim();
let value: string | boolean | number = match[2].trim(); if (value === "true") result[match[1].trim()] = true;
else if (value === "false") result[match[1].trim()] = false;
// Parse booleans else if (/^\d+$/.test(value))
if (value === "true") value = true; result[match[1].trim()] = parseInt(value, 10);
else if (value === "false") value = false; else if (/^\d+\.\d+$/.test(value))
// Parse numbers result[match[1].trim()] = parseFloat(value);
else if (/^\d+$/.test(value)) value = parseInt(value, 10); else result[match[1].trim()] = value;
else if (/^\d+\.\d+$/.test(value)) value = parseFloat(value);
result[key] = value;
} }
} }
return result; return result;
};
} }
})();
/** /**
* Deep merge configuration objects * Deep merge configuration objects
@@ -129,25 +132,44 @@ function mergeConfig(
return result as RalpiConfig; return result as RalpiConfig;
} }
/** Path to the global ralpi config under the user's Pi home directory. */
const GLOBAL_CONFIG_PATH = path.join(
process.env.HOME || "/tmp",
".pi",
"ralpi",
"config.yaml",
);
/** /**
* Load configuration from .ralpi/config.yaml or return defaults * Load and merge config from global and project sources.
*
* Precedence (highest wins):
* 1. Project-level: `<projectDir>/.ralpi/config.yaml`
* 2. Global: `~/.pi/ralpi/config.yaml`
* 3. `DEFAULT_CONFIG` in `src/types.ts`
*/ */
export function loadConfig(projectDir: string): RalpiConfig { export function loadConfig(projectDir: string): RalpiConfig {
const configPath = path.join(projectDir, ".ralpi", "config.yaml"); // Start with defaults
const merged: RalpiConfig = { ...DEFAULT_CONFIG };
// Return defaults silently when config file does not exist // Layer 1: global config (~/.pi/ralpi/config.yaml)
if (!fs.existsSync(configPath)) { tryLoadConfigFile(GLOBAL_CONFIG_PATH, merged);
return { ...DEFAULT_CONFIG };
}
// Layer 2: project config (.ralpi/config.yaml) — overrides global
tryLoadConfigFile(path.join(projectDir, ".ralpi", "config.yaml"), merged);
return merged;
/** Attempt to load a single config file and merge into `acc` in place. */
function tryLoadConfigFile(filePath: string, acc: RalpiConfig): void {
if (!fs.existsSync(filePath)) return;
try { try {
const content = fs.readFileSync(configPath, "utf-8"); const content = fs.readFileSync(filePath, "utf-8");
// Simple YAML parsing (key: value format) const parsed = parseSimpleYaml(content);
const config = parseSimpleYaml(content); Object.assign(acc, mergeConfig(acc, parsed));
return mergeConfig(DEFAULT_CONFIG, config);
} catch { } catch {
// Malformed config — fall back to defaults silently // Malformed config — skip silently
return { ...DEFAULT_CONFIG }; }
} }
} }
@@ -339,6 +361,8 @@ export async function runAgentSession(
onEvent?: (event: AgentSessionEvent) => void, onEvent?: (event: AgentSessionEvent) => void,
signal?: AbortSignal, signal?: AbortSignal,
sessionFile?: string, sessionFile?: string,
model?: unknown,
thinkingLevel?: unknown,
): Promise<{ ): Promise<{
success: boolean; success: boolean;
text: string; text: string;
@@ -361,10 +385,13 @@ export async function runAgentSession(
? fs.createWriteStream(sessionFile, { flags: "a" }) ? fs.createWriteStream(sessionFile, { flags: "a" })
: null; : null;
// Wire timeout via abort signal // Wire timeout via abort signal (only when set; 0 means inherit Pi's defaults)
const timeoutHandle = setTimeout(() => { let timeoutHandle: NodeJS.Timeout | null = null;
if (timeoutMs > 0) {
timeoutHandle = setTimeout(() => {
if (sessionRef?.session) sessionRef.session.agent.abort(); if (sessionRef?.session) sessionRef.session.agent.abort();
}, timeoutMs); }, timeoutMs);
}
const sessionRef: { const sessionRef: {
session?: Awaited<ReturnType<typeof createAgentSession>>["session"]; session?: Awaited<ReturnType<typeof createAgentSession>>["session"];
@@ -387,6 +414,8 @@ export async function runAgentSession(
sessionManager: SessionManager.inMemory(), sessionManager: SessionManager.inMemory(),
resourceLoader: loader, resourceLoader: loader,
tools: ["read", "bash", "edit", "write", "grep", "find", "ls"], tools: ["read", "bash", "edit", "write", "grep", "find", "ls"],
model: model as any,
thinkingLevel: thinkingLevel as any,
}); });
sessionRef.session = result.session; sessionRef.session = result.session;
@@ -437,7 +466,7 @@ export async function runAgentSession(
unsubscribe(); unsubscribe();
result.session.dispose(); result.session.dispose();
signal?.removeEventListener("abort", abortHandler); signal?.removeEventListener("abort", abortHandler);
clearTimeout(timeoutHandle); if (timeoutHandle) clearTimeout(timeoutHandle);
// Flush and close the event stream before returning // Flush and close the event stream before returning
if (eventStream) { if (eventStream) {
@@ -463,7 +492,7 @@ export async function runAgentSession(
events: [], // streamed to file events: [], // streamed to file
}; };
} catch (error) { } catch (error) {
clearTimeout(timeoutHandle); if (timeoutHandle) clearTimeout(timeoutHandle);
if (eventStream && !eventStream.destroyed) { if (eventStream && !eventStream.destroyed) {
eventStream.end(); eventStream.end();
} }