Compare commits
4 Commits
ead5d9be3a
...
ab1e2eb430
| Author | SHA1 | Date | |
|---|---|---|---|
| ab1e2eb430 | |||
| d2ef124369 | |||
| 925e37938b | |||
| 8e2e24d0e3 |
40
README.md
40
README.md
@@ -98,16 +98,46 @@ tasks:
|
||||
|
||||
## Configuration
|
||||
|
||||
Create `.ralpi/config.yaml`:
|
||||
Create config files. Both are optional:
|
||||
|
||||
| Scope | Path |
|
||||
|-------|------|
|
||||
| **Global** | `~/.pi/ralpi/config.yaml` |
|
||||
| **Project** | `./.ralpi/config.yaml` |
|
||||
|
||||
```yaml
|
||||
maxRetries: 3
|
||||
retryDelayMs: 5000
|
||||
timeoutMs: 1800000
|
||||
maxParallel: 3
|
||||
execution:
|
||||
maxParallel: 3 # ralpi-level concurrency only
|
||||
models: # round-robin in <provider>/<model> format
|
||||
- google/gemini-3.5-flash # 1st and 3rd task in parallel
|
||||
- openai/gpt-5.5 # 2nd task in parallel
|
||||
prompts:
|
||||
projectContext: "Additional context for all tasks"
|
||||
```
|
||||
|
||||
> ralpi deliberately does **not** set timeouts or retries — those are inherited
|
||||
> from Pi's own settings. Tasks run until they complete or Pi's own flow stops them.
|
||||
>
|
||||
> `execution.models` uses slot-aware round-robin: with 3 models and 2 concurrent
|
||||
> tasks, only the first two models are used. The third model is only touched when
|
||||
> a third concurrent task starts. Freed model slots are reused before new ones
|
||||
> are allocated.
|
||||
>
|
||||
> **Automatic failover**: if a provider/API is unreachable (rate limit, 503, etc.),
|
||||
> the task automatically cycles to the next model in the list without counting it
|
||||
> as a task failure. Each model is tried once before the task is marked as failed.
|
||||
|
||||
The keys mirror the nested structure of `RalpiConfig` in `src/types.ts`.
|
||||
|
||||
### Precedence (highest wins)
|
||||
|
||||
| Priority | Source |
|
||||
|----------|--------|
|
||||
| **1st** | In-memory overrides (`model`, `thinkingLevel` from parent Pi session) |
|
||||
| **2nd** | `./.ralpi/config.yaml` — project-level |
|
||||
| **3rd** | `~/.pi/ralpi/config.yaml` — global, shared across projects |
|
||||
| **4th** | `DEFAULT_CONFIG` in `src/types.ts` |
|
||||
|
||||
### Task-Level Timeout
|
||||
|
||||
You can set a timeout for individual tasks using a meta block in the task file:
|
||||
|
||||
88
index.ts
88
index.ts
@@ -66,9 +66,10 @@ async function selectExecutionMode(
|
||||
ctx: ExtensionContext,
|
||||
project: import("./src/types").Project,
|
||||
taskFile: string,
|
||||
config: import("./src/types").RalpiConfig,
|
||||
): Promise<ExecutionMode> {
|
||||
const mode = await ctx.ui.select("Execution mode for this run?", [
|
||||
"Parallel (where dependencies allow)",
|
||||
`Parallel (where dependencies allow)-[${config.execution.maxParallel} max]`,
|
||||
"Sequential (one at a time)",
|
||||
]);
|
||||
const isParallel = mode?.startsWith("Parallel") ?? false;
|
||||
@@ -117,7 +118,7 @@ async function executePlanBatches(
|
||||
plan: ReturnType<typeof buildPlanByMode>,
|
||||
project: Parameters<typeof buildExecutionPlan>[0],
|
||||
taskFile: string,
|
||||
config: import("./src/types").ralpiConfig,
|
||||
config: import("./src/types").RalpiConfig,
|
||||
progress: ProgressTracker,
|
||||
ctx: ExtensionContext,
|
||||
mode: ExecutionMode,
|
||||
@@ -220,6 +221,16 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
|
||||
},
|
||||
);
|
||||
|
||||
// Register the extension's prompts/ directory so Pi discovers @task-manager
|
||||
pi.on("resources_discover", async (_event, _ctx) => {
|
||||
const promptsDir = fs.existsSync(path.resolve(__dirname, "prompts"))
|
||||
? path.resolve(__dirname, "prompts")
|
||||
: path.resolve(__dirname, "..", "prompts");
|
||||
return {
|
||||
promptPaths: [promptsDir],
|
||||
};
|
||||
});
|
||||
|
||||
pi.registerCommand("ralpi", {
|
||||
description:
|
||||
"Execute tasks from a task file using DAG-based dependency resolution",
|
||||
@@ -248,21 +259,45 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
|
||||
return handlePlan(ctx, parts);
|
||||
}
|
||||
if (looksLikePath(parts[0])) {
|
||||
return handleRun(ctx, parts, sendProgress);
|
||||
return handleRun(
|
||||
ctx,
|
||||
parts,
|
||||
sendProgress,
|
||||
ctx.model,
|
||||
pi.getThinkingLevel(),
|
||||
);
|
||||
}
|
||||
|
||||
const command = parts[0];
|
||||
switch (command) {
|
||||
case "run":
|
||||
return handleRun(ctx, parts.slice(1), sendProgress);
|
||||
return handleRun(
|
||||
ctx,
|
||||
parts.slice(1),
|
||||
sendProgress,
|
||||
ctx.model,
|
||||
pi.getThinkingLevel(),
|
||||
);
|
||||
case "plan":
|
||||
return handlePlan(ctx, parts.slice(1));
|
||||
case "status":
|
||||
return handleStatus(ctx, parts.slice(1));
|
||||
case "resume":
|
||||
return handleResume(ctx, parts.slice(1), sendProgress);
|
||||
return handleResume(
|
||||
ctx,
|
||||
parts.slice(1),
|
||||
sendProgress,
|
||||
ctx.model,
|
||||
pi.getThinkingLevel(),
|
||||
);
|
||||
case "next":
|
||||
return handleNext(ctx, parts.slice(1), sendProgress);
|
||||
return handleNext(
|
||||
ctx,
|
||||
parts.slice(1),
|
||||
sendProgress,
|
||||
ctx.model,
|
||||
pi.getThinkingLevel(),
|
||||
);
|
||||
case "reset":
|
||||
return handleReset(ctx, parts.slice(1));
|
||||
default: {
|
||||
@@ -316,6 +351,8 @@ async function handleRun(
|
||||
ctx: ExtensionContext,
|
||||
args: string[],
|
||||
sendChatMessage?: SendChatMessage,
|
||||
parentModel?: unknown,
|
||||
parentThinkingLevel?: unknown,
|
||||
): Promise<void> {
|
||||
const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd());
|
||||
|
||||
@@ -323,7 +360,13 @@ async function handleRun(
|
||||
// auto-resume instead of starting fresh
|
||||
const existingProgress = findProgressFile(process.cwd(), taskFile);
|
||||
if (existingProgress) {
|
||||
return handleResume(ctx, args.slice(0, 1), sendChatMessage);
|
||||
return handleResume(
|
||||
ctx,
|
||||
args.slice(0, 1),
|
||||
sendChatMessage,
|
||||
parentModel,
|
||||
parentThinkingLevel,
|
||||
);
|
||||
}
|
||||
|
||||
// No existing progress for this task — check for any progress at all
|
||||
@@ -336,7 +379,13 @@ async function handleRun(
|
||||
);
|
||||
|
||||
if (shouldResume?.startsWith("Yes")) {
|
||||
return handleResume(ctx, [], sendChatMessage);
|
||||
return handleResume(
|
||||
ctx,
|
||||
[],
|
||||
sendChatMessage,
|
||||
parentModel,
|
||||
parentThinkingLevel,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -346,16 +395,12 @@ async function handleRun(
|
||||
|
||||
const project = parseTaskFile(taskFile);
|
||||
const config = loadConfig(projectDir);
|
||||
config.model = parentModel ?? ctx.model;
|
||||
config.thinkingLevel = parentThinkingLevel;
|
||||
const progress = new ProgressTracker(projectDir, taskFile);
|
||||
|
||||
// Set initial status
|
||||
ctx.ui.setStatus(
|
||||
"ralpi",
|
||||
`Starting ${project.tasks.length} tasks from ${path.basename(taskFile)}`,
|
||||
);
|
||||
|
||||
const completed = buildCompletedSet(progress, project);
|
||||
const mode = await selectExecutionMode(ctx, project, taskFile);
|
||||
const mode = await selectExecutionMode(ctx, project, taskFile, config);
|
||||
const plan = buildPlanByMode(mode, project, completed);
|
||||
|
||||
// Show execution plan before starting so user can see batch breakdown
|
||||
@@ -434,6 +479,8 @@ async function handleResume(
|
||||
ctx: ExtensionContext,
|
||||
args: string[],
|
||||
sendChatMessage?: SendChatMessage,
|
||||
parentModel?: unknown,
|
||||
parentThinkingLevel?: unknown,
|
||||
): Promise<void> {
|
||||
let taskFile: string;
|
||||
let projectDir: string;
|
||||
@@ -473,15 +520,14 @@ async function handleResume(
|
||||
);
|
||||
}
|
||||
const config = loadConfig(projectDir);
|
||||
config.model = parentModel ?? ctx.model;
|
||||
config.thinkingLevel = parentThinkingLevel;
|
||||
const progress = new ProgressTracker(projectDir, taskFile, found.prdKey);
|
||||
|
||||
progress.setPaused(false);
|
||||
|
||||
// Set resume status
|
||||
ctx.ui.setStatus("ralpi", `Resuming from ${path.basename(taskFile)}`);
|
||||
|
||||
const completed = buildCompletedSet(progress, project);
|
||||
const mode = await selectExecutionMode(ctx, project, taskFile);
|
||||
const mode = await selectExecutionMode(ctx, project, taskFile, config);
|
||||
const plan = buildPlanByMode(mode, project, completed);
|
||||
|
||||
await executePlanBatches(
|
||||
@@ -505,6 +551,8 @@ async function handleNext(
|
||||
ctx: ExtensionContext,
|
||||
args: string[],
|
||||
sendChatMessage?: SendChatMessage,
|
||||
parentModel?: unknown,
|
||||
parentThinkingLevel?: unknown,
|
||||
): Promise<void> {
|
||||
let taskFile: string;
|
||||
let projectDir: string;
|
||||
@@ -540,6 +588,8 @@ async function handleNext(
|
||||
);
|
||||
}
|
||||
const config = loadConfig(projectDir);
|
||||
config.model = parentModel ?? ctx.model;
|
||||
config.thinkingLevel = parentThinkingLevel;
|
||||
const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey);
|
||||
|
||||
const completed = buildCompletedSet(progress, project);
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "ralpi-loop",
|
||||
"version": "1.0.0",
|
||||
"description": "Execute tasks from task files using DAG-based dependency resolution with persistent progress tracking",
|
||||
"name": "ralpi",
|
||||
"version": "0.1.0",
|
||||
"description": "Execute tasks from task files/PRD's using DAG-based dependency resolution with persistent progress tracking",
|
||||
"main": "dist/index.js",
|
||||
"keywords": [
|
||||
"pi-package",
|
||||
|
||||
332
src/executor.ts
332
src/executor.ts
@@ -5,7 +5,6 @@ import type { ProgressTracker } from "./progress";
|
||||
import type { ExtensionContext } from "@earendil-works/pi-coding-agent";
|
||||
import { buildTaskPrompt } from "./prompts";
|
||||
import { extractReflection } from "./reflection";
|
||||
import { WidgetBatcher } from "./widget-batcher";
|
||||
import {
|
||||
runAgentSession,
|
||||
writeFileSafe,
|
||||
@@ -26,6 +25,78 @@ export interface ToolCallEntry {
|
||||
label: string;
|
||||
}
|
||||
|
||||
// ─── Widget Expand/Collapse ───────────────────────────────────────────────
|
||||
|
||||
/** Max tool calls shown in a live widget before truncating. Widgets don't
|
||||
* support message-style Ctrl+O expansion (that's only for chat-history
|
||||
* messages rendered by registerMessageRenderer). */
|
||||
const MAX_COLLAPSED = 3;
|
||||
|
||||
const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
||||
|
||||
// ─── Model Round-Robin ─────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Round-robin model assignment with slot reuse.
|
||||
*
|
||||
* With models [A, B, C] and 2 concurrent tasks, only A and B are used.
|
||||
* Model C is only touched when a third concurrent task starts.
|
||||
* Freed slots are reused before new slots are allocated.
|
||||
*/
|
||||
class ModelRoundRobin {
|
||||
private models: unknown[];
|
||||
private freeSlots: number[];
|
||||
private nextIndex = 0;
|
||||
private assignments = new Map<string, number>();
|
||||
|
||||
constructor(models: unknown[]) {
|
||||
this.models = models;
|
||||
this.freeSlots = [];
|
||||
}
|
||||
|
||||
get length(): number {
|
||||
return this.models.length;
|
||||
}
|
||||
|
||||
assign(taskId: string): unknown {
|
||||
let index: number;
|
||||
if (this.freeSlots.length > 0) {
|
||||
// Reuse a freed model slot first
|
||||
index = this.freeSlots.shift()!;
|
||||
} else if (this.nextIndex < this.models.length) {
|
||||
// Allocate a new slot
|
||||
index = this.nextIndex++;
|
||||
} else {
|
||||
// All models in use — wrap around
|
||||
index = this.nextIndex % this.models.length;
|
||||
this.nextIndex++;
|
||||
}
|
||||
this.assignments.set(taskId, index);
|
||||
return this.models[index];
|
||||
}
|
||||
|
||||
release(taskId: string): void {
|
||||
const index = this.assignments.get(taskId);
|
||||
if (index !== undefined) {
|
||||
this.freeSlots.push(index);
|
||||
this.freeSlots.sort((a, b) => a - b);
|
||||
this.assignments.delete(taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Shared state for parallel-batch widget. Each running task writes its
|
||||
* tool calls and spinner frame; the batch widget reads them in task-ID order. */
|
||||
interface ParallelWidgetEntry {
|
||||
taskHeader: string;
|
||||
frameIndex: number;
|
||||
done: boolean;
|
||||
success: boolean;
|
||||
toolCalls: ToolCallEntry[];
|
||||
}
|
||||
|
||||
type ParallelWidgetState = Map<string, ParallelWidgetEntry>;
|
||||
|
||||
// ─── Run Single Task ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
@@ -40,7 +111,8 @@ export async function runTask(
|
||||
ctx: ExtensionContext,
|
||||
sendChatMessage?: SendChatMessage,
|
||||
projectDir: string = project.sourceDir,
|
||||
batcher?: WidgetBatcher,
|
||||
parallelState?: ParallelWidgetState,
|
||||
assignedModel?: unknown,
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
reflection?: Reflection;
|
||||
@@ -68,59 +140,76 @@ export async function runTask(
|
||||
const promptFile = path.join(ralpiDir, `prompt-${startMs}.md`);
|
||||
writeFileSafe(promptFile, prompt);
|
||||
|
||||
// Footer shows just the task title (no batch prefix)
|
||||
ctx.ui.setStatus("ralpi", task.title);
|
||||
|
||||
const taskHeader = `${task.id} · ${task.title}`;
|
||||
|
||||
// Live progress widget above the editor — animated spinner + tool call tree
|
||||
// Using setWidget instead of setWorkingMessage because the working message area
|
||||
// is only visible during parent agent streaming, not during extension command execution.
|
||||
// Widget key is unique per task so parallel tasks each get their own widget.
|
||||
// When running in parallel, all tasks share a single widget so ordering
|
||||
// is deterministic (sorted by task ID). In sequential mode each task gets
|
||||
// its own widget.
|
||||
const isParallel = !!parallelState;
|
||||
const widgetKey = `ralpi-task-${task.id}`;
|
||||
const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
||||
let frameIndex = 0;
|
||||
const theme = ctx.ui.theme;
|
||||
const MAX_COLLAPSED = 3;
|
||||
|
||||
const toolCalls: ToolCallEntry[] = [];
|
||||
let widgetTui: { requestRender(): void } | null = null;
|
||||
|
||||
const updateWidget = () => {
|
||||
const frame = theme.fg("accent", SPINNER_FRAMES[frameIndex]);
|
||||
if (isParallel) {
|
||||
parallelState!.set(task.id, {
|
||||
taskHeader,
|
||||
frameIndex: 0,
|
||||
done: false,
|
||||
success: false,
|
||||
toolCalls: [],
|
||||
});
|
||||
} else {
|
||||
// Build widget lines from current state. Live widgets can't expand/collapse
|
||||
// like chat messages, so we always truncate to MAX_COLLAPSED recent calls.
|
||||
const buildLines = (t: typeof ctx.ui.theme): string[] => {
|
||||
const frame = t.fg("accent", SPINNER_FRAMES[frameIndex]);
|
||||
const lines = [`${frame} ${taskHeader}`];
|
||||
|
||||
if (toolCalls.length > 0) {
|
||||
if (toolCalls.length <= MAX_COLLAPSED) {
|
||||
for (let i = 0; i < toolCalls.length; i++) {
|
||||
const entry = toolCalls[i];
|
||||
const isLast = i === toolCalls.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${entry.name}]`);
|
||||
lines.push(`${branch}${tag} ${entry.label}`);
|
||||
}
|
||||
} else {
|
||||
const shown = toolCalls.slice(-MAX_COLLAPSED);
|
||||
const remaining = toolCalls.length - shown.length;
|
||||
|
||||
if (remaining > 0) {
|
||||
lines.push(theme.fg("dim", ` ├── ${remaining} more`));
|
||||
}
|
||||
|
||||
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
|
||||
for (let i = 0; i < shown.length; i++) {
|
||||
const entry = shown[i];
|
||||
const isLast = i === shown.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = theme.fg("accent", `[${entry.name}]`);
|
||||
const tag = t.fg("accent", `[${entry.name}]`);
|
||||
lines.push(`${branch}${tag} ${entry.label}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (batcher) {
|
||||
batcher.schedule(widgetKey, lines);
|
||||
} else {
|
||||
ctx.ui.setWidget(widgetKey, lines);
|
||||
}
|
||||
return lines;
|
||||
};
|
||||
|
||||
// Smooth spinner animation at 100ms intervals
|
||||
const spinnerTimer = setInterval(() => {
|
||||
frameIndex = (frameIndex + 1) % SPINNER_FRAMES.length;
|
||||
updateWidget();
|
||||
}, 100);
|
||||
ctx.ui.setWidget(widgetKey, (tui, t) => {
|
||||
widgetTui = tui;
|
||||
return {
|
||||
render: () => buildLines(t),
|
||||
invalidate: () => widgetTui?.requestRender(),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// Initial display
|
||||
updateWidget();
|
||||
const requestRender = () => widgetTui?.requestRender();
|
||||
|
||||
// Spinner animation (sequential only — parallel uses a single batch timer)
|
||||
let spinnerTimer: NodeJS.Timeout | undefined;
|
||||
if (!isParallel) {
|
||||
spinnerTimer = setInterval(() => {
|
||||
frameIndex = (frameIndex + 1) % SPINNER_FRAMES.length;
|
||||
requestRender();
|
||||
}, 100);
|
||||
}
|
||||
|
||||
// Use task-level timeout if set, otherwise fall back to config
|
||||
const timeoutMs = task.timeoutMs ?? config.execution.timeoutMs;
|
||||
@@ -142,23 +231,34 @@ export async function runTask(
|
||||
name: event.toolName,
|
||||
label,
|
||||
});
|
||||
updateWidget();
|
||||
if (isParallel) {
|
||||
const entry = parallelState!.get(task.id);
|
||||
if (entry) {
|
||||
entry.toolCalls.push({ name: event.toolName, label });
|
||||
}
|
||||
}
|
||||
requestRender();
|
||||
}
|
||||
},
|
||||
undefined, // no abort signal
|
||||
sessionFilePath, // stream events to file
|
||||
assignedModel ?? config.model,
|
||||
config.thinkingLevel,
|
||||
);
|
||||
|
||||
const durationMs = Date.now() - startMs;
|
||||
|
||||
// Clear progress widget and status after task finishes
|
||||
clearInterval(spinnerTimer);
|
||||
if (batcher) {
|
||||
batcher.scheduleRemove(widgetKey);
|
||||
if (spinnerTimer) clearInterval(spinnerTimer);
|
||||
if (isParallel) {
|
||||
const entry = parallelState!.get(task.id);
|
||||
if (entry) {
|
||||
entry.done = true;
|
||||
entry.success = output.success;
|
||||
}
|
||||
} else {
|
||||
ctx.ui.setWidget(widgetKey, undefined);
|
||||
}
|
||||
ctx.ui.setStatus("ralpi", undefined);
|
||||
|
||||
if (!output.success) {
|
||||
sendChatMessage?.(`✗ ${taskHeader} — ${output.error}`);
|
||||
@@ -227,6 +327,37 @@ export async function executeBatch(
|
||||
);
|
||||
}
|
||||
|
||||
// Set up model round-robin if configured.
|
||||
// Config entries are "<provider>/<model>" strings — resolve via modelRegistry.
|
||||
let roundRobin: ModelRoundRobin | null = null;
|
||||
if (config.execution.models.length > 0) {
|
||||
const resolvedModels: unknown[] = [];
|
||||
for (const entry of config.execution.models) {
|
||||
const slashIdx = entry.indexOf("/");
|
||||
if (slashIdx === -1) {
|
||||
ctx.ui.notify(
|
||||
`ralpi config: skipping model "${entry}" — expected <provider>/<model> format`,
|
||||
"warning",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const provider = entry.slice(0, slashIdx);
|
||||
const modelId = entry.slice(slashIdx + 1);
|
||||
const resolved = ctx.modelRegistry?.find(provider, modelId);
|
||||
if (resolved) {
|
||||
resolvedModels.push(resolved);
|
||||
} else {
|
||||
ctx.ui.notify(
|
||||
`ralpi config: model "${entry}" not found in registry — skipping`,
|
||||
"warning",
|
||||
);
|
||||
}
|
||||
}
|
||||
if (resolvedModels.length > 0) {
|
||||
roundRobin = new ModelRoundRobin(resolvedModels);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we should run parallel
|
||||
const shouldParallel =
|
||||
options?.parallel && tasks.length > 1 && config.execution.maxParallel > 0;
|
||||
@@ -240,12 +371,14 @@ export async function executeBatch(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
roundRobin,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Execute sequentially
|
||||
for (const task of tasks) {
|
||||
const model = roundRobin?.assign(task.id);
|
||||
await executeTask(
|
||||
task,
|
||||
project,
|
||||
@@ -254,6 +387,9 @@ export async function executeBatch(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
undefined,
|
||||
model,
|
||||
roundRobin,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -269,12 +405,76 @@ async function executeBatchParallel(
|
||||
ctx: ExtensionContext,
|
||||
sendChatMessage?: SendChatMessage,
|
||||
projectDir?: string,
|
||||
roundRobin?: ModelRoundRobin | null,
|
||||
): Promise<void> {
|
||||
const maxParallel = config.execution.maxParallel;
|
||||
const batcher = new WidgetBatcher(ctx);
|
||||
const sharedState: ParallelWidgetState = new Map();
|
||||
|
||||
// Register a single batch widget that renders ALL parallel tasks in ID order.
|
||||
const widgetKey = `ralpi-parallel-${Date.now()}`;
|
||||
let widgetTui: { requestRender(): void } | null = null;
|
||||
|
||||
const buildBatchLines = (t: typeof ctx.ui.theme): string[] => {
|
||||
const lines: string[] = [];
|
||||
const sortedIds = Array.from(sharedState.keys()).sort();
|
||||
|
||||
for (const id of sortedIds) {
|
||||
const entry = sharedState.get(id)!;
|
||||
const frame = entry.done
|
||||
? entry.success
|
||||
? "✓"
|
||||
: "✗"
|
||||
: t.fg("accent", SPINNER_FRAMES[entry.frameIndex]);
|
||||
lines.push(`${frame} ${entry.taskHeader}`);
|
||||
|
||||
if (entry.toolCalls.length > 0) {
|
||||
if (entry.toolCalls.length <= MAX_COLLAPSED) {
|
||||
for (let i = 0; i < entry.toolCalls.length; i++) {
|
||||
const tc = entry.toolCalls[i];
|
||||
const isLast = i === entry.toolCalls.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${tc.name}]`);
|
||||
lines.push(`${branch}${tag} ${tc.label}`);
|
||||
}
|
||||
} else {
|
||||
const shown = entry.toolCalls.slice(-MAX_COLLAPSED);
|
||||
const remaining = entry.toolCalls.length - shown.length;
|
||||
lines.push(t.fg("dim", ` ├── …${remaining} earlier`));
|
||||
for (let i = 0; i < shown.length; i++) {
|
||||
const tc = shown[i];
|
||||
const isLast = i === shown.length - 1;
|
||||
const branch = isLast ? " └── " : " ├── ";
|
||||
const tag = t.fg("accent", `[${tc.name}]`);
|
||||
lines.push(`${branch}${tag} ${tc.label}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return lines;
|
||||
};
|
||||
|
||||
ctx.ui.setWidget(widgetKey, (tui, t) => {
|
||||
widgetTui = tui;
|
||||
return {
|
||||
render: () => buildBatchLines(t),
|
||||
invalidate: () => widgetTui?.requestRender(),
|
||||
};
|
||||
});
|
||||
|
||||
// Single spinner timer drives all tasks in the batch
|
||||
const spinnerTimer = setInterval(() => {
|
||||
for (const entry of sharedState.values()) {
|
||||
if (!entry.done) {
|
||||
entry.frameIndex = (entry.frameIndex + 1) % SPINNER_FRAMES.length;
|
||||
}
|
||||
}
|
||||
widgetTui?.requestRender();
|
||||
}, 100);
|
||||
|
||||
const results: Array<{ task: Task; result: Promise<any> }> = [];
|
||||
|
||||
for (const task of tasks) {
|
||||
const assignedModel = roundRobin?.assign(task.id);
|
||||
results.push({
|
||||
task,
|
||||
result: executeTask(
|
||||
@@ -285,7 +485,9 @@ async function executeBatchParallel(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
batcher,
|
||||
sharedState,
|
||||
assignedModel,
|
||||
roundRobin,
|
||||
),
|
||||
});
|
||||
|
||||
@@ -301,8 +503,8 @@ async function executeBatchParallel(
|
||||
await result;
|
||||
}
|
||||
|
||||
// Flush and stop the batcher after all tasks complete
|
||||
batcher.stop();
|
||||
clearInterval(spinnerTimer);
|
||||
ctx.ui.setWidget(widgetKey, undefined);
|
||||
}
|
||||
|
||||
// ─── Execute Single Task with Retry ──────────────────────────────────────────
|
||||
@@ -315,11 +517,26 @@ async function executeTask(
|
||||
ctx: ExtensionContext,
|
||||
sendChatMessage?: SendChatMessage,
|
||||
projectDir: string = project.sourceDir,
|
||||
batcher?: WidgetBatcher,
|
||||
parallelState?: ParallelWidgetState,
|
||||
assignedModel?: unknown,
|
||||
roundRobin?: ModelRoundRobin | null,
|
||||
): Promise<void> {
|
||||
const maxRetries = config.execution.maxRetries;
|
||||
let retries = 0;
|
||||
|
||||
// Model failover: when a provider/API is down, cycle through available models.
|
||||
// result.success === false always means an agent-session failure (API error,
|
||||
// provider unreachable, etc.), not a task-work error.
|
||||
const maxModelAttempts = roundRobin ? roundRobin.length : 1;
|
||||
let modelAttempt = 0;
|
||||
let currentModel: unknown = assignedModel ?? config.model;
|
||||
|
||||
while (modelAttempt < maxModelAttempts) {
|
||||
// Get the next model from round-robin (on first try, use the pre-assigned model)
|
||||
if (modelAttempt > 0 && roundRobin) {
|
||||
currentModel = roundRobin.assign(task.id);
|
||||
}
|
||||
|
||||
let retries = 0;
|
||||
while (retries <= maxRetries) {
|
||||
try {
|
||||
// Mark as in progress
|
||||
@@ -339,7 +556,8 @@ async function executeTask(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
batcher,
|
||||
parallelState,
|
||||
currentModel,
|
||||
);
|
||||
|
||||
if (result.success) {
|
||||
@@ -359,10 +577,23 @@ async function executeTask(
|
||||
result.commitMessages,
|
||||
result.commitSummary,
|
||||
);
|
||||
roundRobin?.release(task.id);
|
||||
return;
|
||||
}
|
||||
|
||||
// Task failed, check if we should retry
|
||||
// Agent session failed (provider error).
|
||||
// If we have more models, cycle immediately — don't waste retries.
|
||||
if (roundRobin && modelAttempt < maxModelAttempts - 1) {
|
||||
roundRobin.release(task.id);
|
||||
modelAttempt++;
|
||||
ctx.ui.notify(
|
||||
`Task ${task.id}: model failed, trying next (${modelAttempt + 1}/${maxModelAttempts}): ${result.error}`,
|
||||
"warning",
|
||||
);
|
||||
break; // exit retry loop, cycle to next model
|
||||
}
|
||||
|
||||
// No more models — use normal retry logic
|
||||
if (retries < maxRetries) {
|
||||
retries = progress.incrementRetry(task.id);
|
||||
ctx.ui.notify(
|
||||
@@ -379,11 +610,20 @@ async function executeTask(
|
||||
throw new Error(`Task ${task.id} failed: ${result.error}`);
|
||||
}
|
||||
} catch (error) {
|
||||
roundRobin?.release(task.id);
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
progress.markFailed(task.id, errorMsg);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// If we broke out (model cycling), continue the outer loop
|
||||
modelAttempt++;
|
||||
}
|
||||
|
||||
// All models exhausted
|
||||
progress.markFailed(task.id, "All configured models exhausted");
|
||||
throw new Error(`Task ${task.id} failed: all configured models exhausted`);
|
||||
}
|
||||
|
||||
// ─── Save Reflection to File ────────────────────────────────────────────────
|
||||
|
||||
13
src/types.ts
13
src/types.ts
@@ -153,6 +153,8 @@ export interface RalpiConfig {
|
||||
timeoutMs: number;
|
||||
/** Maximum parallel tasks (0 = unlimited) */
|
||||
maxParallel: number;
|
||||
/** Round-robin model list for parallel tasks (empty = inherit parent model) */
|
||||
models: string[];
|
||||
};
|
||||
prompts: {
|
||||
/** Additional context injected into every task prompt */
|
||||
@@ -160,6 +162,10 @@ export interface RalpiConfig {
|
||||
/** Custom prompt suffix for reflection extraction */
|
||||
reflectionPrompt: string;
|
||||
};
|
||||
/** Parent session model to inherit in child agent sessions */
|
||||
model?: unknown;
|
||||
/** Parent session thinking level to inherit in child agent sessions */
|
||||
thinkingLevel?: unknown;
|
||||
}
|
||||
|
||||
export const DEFAULT_CONFIG: RalpiConfig = {
|
||||
@@ -168,10 +174,11 @@ export const DEFAULT_CONFIG: RalpiConfig = {
|
||||
reflectionsDir: ".ralpi/reflections",
|
||||
},
|
||||
execution: {
|
||||
maxRetries: 3,
|
||||
retryDelayMs: 5000,
|
||||
timeoutMs: 30 * 60 * 1000, // 30 minutes
|
||||
maxRetries: 0,
|
||||
retryDelayMs: 0,
|
||||
timeoutMs: 0, // 0 = inherit Pi's own defaults (no ralpi-level timeout)
|
||||
maxParallel: 3,
|
||||
models: [],
|
||||
},
|
||||
prompts: {
|
||||
projectContext: "",
|
||||
|
||||
97
src/utils.ts
97
src/utils.ts
@@ -82,32 +82,35 @@ export function findProgressFile(
|
||||
|
||||
// ─── Config ──────────────────────────────────────────────────────────────────
|
||||
|
||||
function parseSimpleYaml(content: string): Record<string, any> {
|
||||
/** Try to use the `yaml` package (real dependency in package.json).
|
||||
* Falls back to a flat key:value parser when unavailable. */
|
||||
const parseSimpleYaml: (content: string) => Record<string, any> = (() => {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-var-requires
|
||||
const { parse } = require("yaml");
|
||||
return (content: string) => parse(content) ?? {};
|
||||
} catch {
|
||||
return (content: string) => {
|
||||
const result: Record<string, any> = {};
|
||||
const lines = content.split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
for (const line of content.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed || trimmed.startsWith("#")) continue;
|
||||
|
||||
const match = trimmed.match(/^([^:]+):\s*(.+)$/);
|
||||
const match = trimmed.match(/^([^:]+):\s*(.*)$/);
|
||||
if (match) {
|
||||
const key = match[1].trim();
|
||||
let value: string | boolean | number = match[2].trim();
|
||||
|
||||
// Parse booleans
|
||||
if (value === "true") value = true;
|
||||
else if (value === "false") value = false;
|
||||
// Parse numbers
|
||||
else if (/^\d+$/.test(value)) value = parseInt(value, 10);
|
||||
else if (/^\d+\.\d+$/.test(value)) value = parseFloat(value);
|
||||
|
||||
result[key] = value;
|
||||
const value = match[2].trim();
|
||||
if (value === "true") result[match[1].trim()] = true;
|
||||
else if (value === "false") result[match[1].trim()] = false;
|
||||
else if (/^\d+$/.test(value))
|
||||
result[match[1].trim()] = parseInt(value, 10);
|
||||
else if (/^\d+\.\d+$/.test(value))
|
||||
result[match[1].trim()] = parseFloat(value);
|
||||
else result[match[1].trim()] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
}
|
||||
})();
|
||||
|
||||
/**
|
||||
* Deep merge configuration objects
|
||||
@@ -129,25 +132,44 @@ function mergeConfig(
|
||||
return result as RalpiConfig;
|
||||
}
|
||||
|
||||
/** Path to the global ralpi config under the user's Pi home directory. */
|
||||
const GLOBAL_CONFIG_PATH = path.join(
|
||||
process.env.HOME || "/tmp",
|
||||
".pi",
|
||||
"ralpi",
|
||||
"config.yaml",
|
||||
);
|
||||
|
||||
/**
|
||||
* Load configuration from .ralpi/config.yaml or return defaults
|
||||
* Load and merge config from global and project sources.
|
||||
*
|
||||
* Precedence (highest wins):
|
||||
* 1. Project-level: `<projectDir>/.ralpi/config.yaml`
|
||||
* 2. Global: `~/.pi/ralpi/config.yaml`
|
||||
* 3. `DEFAULT_CONFIG` in `src/types.ts`
|
||||
*/
|
||||
export function loadConfig(projectDir: string): RalpiConfig {
|
||||
const configPath = path.join(projectDir, ".ralpi", "config.yaml");
|
||||
// Start with defaults
|
||||
const merged: RalpiConfig = { ...DEFAULT_CONFIG };
|
||||
|
||||
// Return defaults silently when config file does not exist
|
||||
if (!fs.existsSync(configPath)) {
|
||||
return { ...DEFAULT_CONFIG };
|
||||
}
|
||||
// Layer 1: global config (~/.pi/ralpi/config.yaml)
|
||||
tryLoadConfigFile(GLOBAL_CONFIG_PATH, merged);
|
||||
|
||||
// Layer 2: project config (.ralpi/config.yaml) — overrides global
|
||||
tryLoadConfigFile(path.join(projectDir, ".ralpi", "config.yaml"), merged);
|
||||
|
||||
return merged;
|
||||
|
||||
/** Attempt to load a single config file and merge into `acc` in place. */
|
||||
function tryLoadConfigFile(filePath: string, acc: RalpiConfig): void {
|
||||
if (!fs.existsSync(filePath)) return;
|
||||
try {
|
||||
const content = fs.readFileSync(configPath, "utf-8");
|
||||
// Simple YAML parsing (key: value format)
|
||||
const config = parseSimpleYaml(content);
|
||||
return mergeConfig(DEFAULT_CONFIG, config);
|
||||
const content = fs.readFileSync(filePath, "utf-8");
|
||||
const parsed = parseSimpleYaml(content);
|
||||
Object.assign(acc, mergeConfig(acc, parsed));
|
||||
} catch {
|
||||
// Malformed config — fall back to defaults silently
|
||||
return { ...DEFAULT_CONFIG };
|
||||
// Malformed config — skip silently
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,6 +361,8 @@ export async function runAgentSession(
|
||||
onEvent?: (event: AgentSessionEvent) => void,
|
||||
signal?: AbortSignal,
|
||||
sessionFile?: string,
|
||||
model?: unknown,
|
||||
thinkingLevel?: unknown,
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
text: string;
|
||||
@@ -361,10 +385,13 @@ export async function runAgentSession(
|
||||
? fs.createWriteStream(sessionFile, { flags: "a" })
|
||||
: null;
|
||||
|
||||
// Wire timeout via abort signal
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
// Wire timeout via abort signal (only when set; 0 means inherit Pi's defaults)
|
||||
let timeoutHandle: NodeJS.Timeout | null = null;
|
||||
if (timeoutMs > 0) {
|
||||
timeoutHandle = setTimeout(() => {
|
||||
if (sessionRef?.session) sessionRef.session.agent.abort();
|
||||
}, timeoutMs);
|
||||
}
|
||||
|
||||
const sessionRef: {
|
||||
session?: Awaited<ReturnType<typeof createAgentSession>>["session"];
|
||||
@@ -387,6 +414,8 @@ export async function runAgentSession(
|
||||
sessionManager: SessionManager.inMemory(),
|
||||
resourceLoader: loader,
|
||||
tools: ["read", "bash", "edit", "write", "grep", "find", "ls"],
|
||||
model: model as any,
|
||||
thinkingLevel: thinkingLevel as any,
|
||||
});
|
||||
sessionRef.session = result.session;
|
||||
|
||||
@@ -437,7 +466,7 @@ export async function runAgentSession(
|
||||
unsubscribe();
|
||||
result.session.dispose();
|
||||
signal?.removeEventListener("abort", abortHandler);
|
||||
clearTimeout(timeoutHandle);
|
||||
if (timeoutHandle) clearTimeout(timeoutHandle);
|
||||
|
||||
// Flush and close the event stream before returning
|
||||
if (eventStream) {
|
||||
@@ -463,7 +492,7 @@ export async function runAgentSession(
|
||||
events: [], // streamed to file
|
||||
};
|
||||
} catch (error) {
|
||||
clearTimeout(timeoutHandle);
|
||||
if (timeoutHandle) clearTimeout(timeoutHandle);
|
||||
if (eventStream && !eventStream.destroyed) {
|
||||
eventStream.end();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user