Compare commits

...

3 Commits

Author SHA1 Message Date
dae33248e3 until done 2026-05-31 08:47:35 -04:00
9f90ed4252 readme cleanup 2026-05-31 08:46:59 -04:00
3c01652b90 fix width exceeding, release prep 2026-05-31 08:19:21 -04:00
10 changed files with 985 additions and 934 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
node_modules node_modules
dist
.pi-lens .pi-lens
package-lock.json package-lock.json

View File

@@ -4,18 +4,17 @@
A Pi coding agent extension that registers the `/ralpi` slash command. Not a standalone app — it runs inside Pi's extension host. A Pi coding agent extension that registers the `/ralpi` slash command. Not a standalone app — it runs inside Pi's extension host.
## Build ## Type checking
``` ```
npm run build # tsc → dist/ npm run typecheck # tsc --noEmit
npm run watch # tsc --watch
``` ```
No bundler, no linter, no test framework. Plain `tsc` with strict mode. No build step needed — Pi loads extensions via [jiti](https://github.com/unjs/jiti), which compiles TypeScript at runtime. `index.ts` is the entry point directly.
## Entry point ## Entry point
`index.ts` at repo root (not `src/`). Exports a default function receiving `ExtensionAPI`. The `tsconfig.json` sets `rootDir: "./"` so `index.ts` compiles to `dist/index.js`. `index.ts` at repo root (not `src/`). Exports a default function receiving `ExtensionAPI`.
## External dependencies ## External dependencies
@@ -55,7 +54,7 @@ Task IDs are zero-padded strings (`"01"`, `"02"`, etc.). The parser prepends `0`
## Command routing ## Command routing
`/ralpi` with no args → plan. First token looks like a path (`@path`, `./path`, `.md`, etc.) → run. Otherwise dispatches to subcommand (`run`, `plan`, `status`, `resume`, `next`, `reset`). `/ralpi` with no args → plan. First token looks like a path (`@path`, `./path`, `.md`, etc.) → run. Otherwise dispatches to subcommand (`run`, `plan`, `resume`, `reset`).
## Config ## Config

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Michael Freno
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

119
README.md
View File

@@ -1,78 +1,36 @@
# Ralpi # Ralpi
Execute tasks from task files using DAG-based dependency resolution with persistent progress tracking. Execute tasks from task files until done using DAG-based dependency resolution with persistent progress tracking.
## Features ## Features
- **DAG-based execution**: Tasks are ordered by dependencies using Kahn's algorithm
- **Parallel batching**: Independent tasks in each batch can run concurrently - **Parallel batching**: Independent tasks in each batch can run concurrently
- **Persistent progress**: Execution state saved to `.ralpi/progress.json` - **Persistent progress**: Execution state saved to `.ralpi/progress.json`
- **Reflection system**: Each task produces a reflection for downstream tasks - **Reflection system**: Each task produces a reflection for downstream tasks
- **Retry with backoff**: Failed tasks retry with exponential backoff - **Retry with backoff**: Failed tasks retry with exponential backoff
- **Multiple formats**: Supports Fio README, simple checkboxes, and YAML - **Multiple formats**: Supports simple checkboxes, and YAML
- **Chat progress**: Real-time progress messages in Pi chat via `pi.sendMessage`
- **Tool usage tracking**: Detects and reports tool usage (read, write, edit, bash) from task execution - **Tool usage tracking**: Detects and reports tool usage (read, write, edit, bash) from task execution
- **Git commit capture**: Captures git commit messages and generates summaries per task
- **Configurable timeouts**: Task-level timeouts via meta blocks, with global fallback - **Configurable timeouts**: Task-level timeouts via meta blocks, with global fallback
- **Session saving**: Saves full task output for expandable session review - **Session saving**: Saves full task output for expandable session review
- **Resume auto-discovery**: Automatically finds and resumes interrupted execution - **Resume auto-discovery**: Automatically finds and resumes interrupted execution
- **Custom message renderer**: Compact UI labels with expandable details in Pi TUI
## Usage ## Usage
``` ```
/ralpi plan [task-file] # Show execution plan /ralpi [task-file] # Execute all tasks
/ralpi run [task-file] # Execute all tasks /ralpi plan # Alias to /task-manager to plan new tasks
/ralpi status [task-file] # Show current progress /ralpi resume # Resume paused execution
/ralpi resume [task-file] # Resume paused execution /ralpi reset [task-file] # Reset progress and .ralpi directory - does not modify PRD
/ralpi next [task-file] # Execute next batch only
/ralpi reset [task-file] # Reset all progress
``` ```
## Task File Formats ## Task File Formats
### Fio README Format ### Highly recommended to use the task-manager prompt for prd construction, it's output pairs perfectly
```markdown
# Project Title # Project Title
## Tasks ## Tasks
- [ ] 01 — Setup project structure -> `tasks/01-setup.md`
- [ ] 02 — Implement auth -> `tasks/02-auth.md`
- [ ] 03 — Build API -> `tasks/03-api.md`
## Dependencies
1 -> 2,3
2 -> 3
```
#### Supported Dependency Formats
The parser supports two dependency declaration styles in the `## Dependencies` section:
**Arrow Notation** (recommended):
```
1 -> 2,3,4
5 -> 6
```
This means: "Task 1 must complete before tasks 2, 3, and 4 can start."
**Natural Language**:
```
13 depends on 17, 18, 19, 20
14 depends on 13, 15, 16
```
This means: "Task 13 depends on tasks 17, 18, 19, and 20."
**Parallel Groups** (informational only):
```
1, 2, 3, 4 can be done in parallel
5, 6, 7, 8 can be done in parallel
```
Note: These lines are ignored by the parser. Use explicit dependencies to control execution order.
### Simple Checkbox Format ### Simple Checkbox Format
```markdown ```markdown
@@ -96,9 +54,45 @@ tasks:
depends_on: ["01"] depends_on: ["01"]
``` ```
## Dependencies
### Arrow Notation (recommended):
1 -> 2,3,4
5 -> 6
This means: "Task 1 must complete before tasks 2, 3, and 4 can start."
### Natural Language:
13 depends on 17, 18, 19, 20
14 depends on 13, 15, 16
This means: "Task 13 depends on tasks 17, 18, 19, and 20."
### Parallel Groups (informational only):
1, 2, 3, 4 can be done in parallel
5, 6, 7, 8 can be done in parallel
Note: These lines are ignored by the parser. Use explicit dependencies to control execution order.
## Configuration ## Configuration
Create config files. Both are optional: ### Task-Level Timeout
You can set a timeout for individual tasks using a meta block in the task file:
```markdown
- [ ] 01: Setup project structure
timeout: 10m
```
Supported formats: `10m` (minutes), `600s` (seconds), `3600000` (milliseconds)
### Config files
| Scope | Path | | Scope | Path |
|-------|------| |-------|------|
@@ -115,9 +109,6 @@ prompts:
projectContext: "Additional context for all tasks" projectContext: "Additional context for all tasks"
``` ```
> ralpi deliberately does **not** set timeouts or retries — those are inherited
> from Pi's own settings. Tasks run until they complete or Pi's own flow stops them.
>
> `execution.models` uses slot-aware round-robin: with 3 models and 2 concurrent > `execution.models` uses slot-aware round-robin: with 3 models and 2 concurrent
> tasks, only the first two models are used. The third model is only touched when > tasks, only the first two models are used. The third model is only touched when
> a third concurrent task starts. Freed model slots are reused before new ones > a third concurrent task starts. Freed model slots are reused before new ones
@@ -127,28 +118,6 @@ prompts:
> the task automatically cycles to the next model in the list without counting it > the task automatically cycles to the next model in the list without counting it
> as a task failure. Each model is tried once before the task is marked as failed. > as a task failure. Each model is tried once before the task is marked as failed.
The keys mirror the nested structure of `RalpiConfig` in `src/types.ts`.
### Precedence (highest wins)
| Priority | Source |
|----------|--------|
| **1st** | In-memory overrides (`model`, `thinkingLevel` from parent Pi session) |
| **2nd** | `./.ralpi/config.yaml` — project-level |
| **3rd** | `~/.pi/ralpi/config.yaml` — global, shared across projects |
| **4th** | `DEFAULT_CONFIG` in `src/types.ts` |
### Task-Level Timeout
You can set a timeout for individual tasks using a meta block in the task file:
```markdown
- [ ] 01: Setup project structure
timeout: 10m
```
Supported formats: `10m` (minutes), `600s` (seconds), `3600000` (milliseconds)
## State Files ## State Files
- `.ralpi/progress.json` - Execution progress - `.ralpi/progress.json` - Execution progress

185
index.ts
View File

@@ -10,7 +10,6 @@ import {
buildExecutionPlan, buildExecutionPlan,
buildSequentialPlan, buildSequentialPlan,
formatExecutionPlan, formatExecutionPlan,
getReadyTasks,
} from "./src/dag"; } from "./src/dag";
import { ProgressTracker } from "./src/progress"; import { ProgressTracker } from "./src/progress";
import { buildPlanPrompt } from "./src/prompts"; import { buildPlanPrompt } from "./src/prompts";
@@ -20,11 +19,10 @@ import {
loadConfig, loadConfig,
resolveTaskArg, resolveTaskArg,
formatProgressStatus, formatProgressStatus,
formatAllPRDsStatus,
findProgressFile, findProgressFile,
} from "./src/utils"; } from "./src/utils";
const COMMANDS = ["status", "resume", "next", "reset"] as const; const COMMANDS = ["plan", "resume", "reset"] as const;
type ExecutionMode = "parallel" | "sequential"; type ExecutionMode = "parallel" | "sequential";
@@ -69,7 +67,7 @@ async function selectExecutionMode(
config: import("./src/types").RalpiConfig, config: import("./src/types").RalpiConfig,
): Promise<ExecutionMode> { ): Promise<ExecutionMode> {
const mode = await ctx.ui.select("Execution mode for this run?", [ const mode = await ctx.ui.select("Execution mode for this run?", [
`Parallel (where dependencies allow)-[${config.execution.maxParallel} max]`, `Parallel (where dependencies allow)[${config.execution.maxParallel} max]`,
"Sequential (one at a time)", "Sequential (one at a time)",
]); ]);
const isParallel = mode?.startsWith("Parallel") ?? false; const isParallel = mode?.startsWith("Parallel") ?? false;
@@ -125,6 +123,9 @@ async function executePlanBatches(
sendChatMessage?: SendChatMessage, sendChatMessage?: SendChatMessage,
projectDir?: string, projectDir?: string,
): Promise<void> { ): Promise<void> {
// Track failed task IDs across batches to block downstream tasks
const failedTaskIds = new Set(progress.getFailedTaskIds());
for (const batch of plan.batches) { for (const batch of plan.batches) {
if (progress.getState().paused) { if (progress.getState().paused) {
ctx.ui.notify( ctx.ui.notify(
@@ -157,6 +158,43 @@ async function executePlanBatches(
const status = progress.getTaskStatus(task.id); const status = progress.getTaskStatus(task.id);
updateTaskInFile(taskFile, task.id, status); updateTaskInFile(taskFile, task.id, status);
} }
// Update failed task IDs after batch completes
const newFailed = progress.getFailedTaskIds();
for (const id of newFailed) {
failedTaskIds.add(id);
}
// In sequential mode, stop after any failure
if (mode === "sequential" && failedTaskIds.size > 0) {
break;
}
// In parallel mode, rebuild the plan to filter out newly blocked tasks
if (mode === "parallel") {
const completed = new Set(progress.getCompletedTaskIds());
const newPlan = buildExecutionPlan(
project,
completed,
undefined,
failedTaskIds,
);
// Replace remaining batches with filtered ones
const currentIdx = plan.batches.indexOf(batch);
const remainingBatches = newPlan.batches.filter(
(b) => b.batchIndex > currentIdx,
);
// Update the plan's batches in-place
plan.batches.length = 0;
plan.batches.push(...remainingBatches);
// Skip empty batches
if (remainingBatches.length === 0) {
break;
}
}
} }
} }
@@ -279,9 +317,9 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
pi.getThinkingLevel(), pi.getThinkingLevel(),
); );
case "plan": case "plan":
return handlePlan(ctx, parts.slice(1)); pi.sendUserMessage("@task-manager");
case "status": ctx.ui.notify("Opening Task Manager...", "info");
return handleStatus(ctx, parts.slice(1)); return;
case "resume": case "resume":
return handleResume( return handleResume(
ctx, ctx,
@@ -290,14 +328,6 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void {
ctx.model, ctx.model,
pi.getThinkingLevel(), pi.getThinkingLevel(),
); );
case "next":
return handleNext(
ctx,
parts.slice(1),
sendProgress,
ctx.model,
pi.getThinkingLevel(),
);
case "reset": case "reset":
return handleReset(ctx, parts.slice(1)); return handleReset(ctx, parts.slice(1));
default: { default: {
@@ -432,46 +462,7 @@ async function handleRun(
} }
// ─── /ralpi status ─────────────────────────────────────────────────────────── // ─── /ralpi status ───────────────────────────────────────────────────────────
// (removed — use /ralpi plan to invoke @task-manager)
async function handleStatus(
ctx: ExtensionContext,
args: string[],
): Promise<void> {
if (args[0]) {
const taskFile = resolveTaskArg(args[0], process.cwd());
const existingProgress = findProgressFile(process.cwd(), taskFile);
if (existingProgress) {
const projectDir = path.dirname(path.dirname(existingProgress.path));
const progress = new ProgressTracker(
projectDir,
taskFile,
existingProgress.prdKey,
);
ctx.ui.notify(formatProgressStatus(progress.getState()), "info");
return;
}
// No progress yet for this task — parse and show plan instead
const project = parseTaskFile(taskFile);
ctx.ui.notify(
`No progress for ${path.basename(taskFile)}. ${
project.tasks.length
} tasks found.\nUse /ralpi run ${args[0]} to start.`,
"info",
);
return;
}
const found = findProgressFile(process.cwd());
if (!found) {
ctx.ui.notify(
"No .ralpi/progress.json found. Start with /ralpi run [task-file]",
"warning",
);
return;
}
ctx.ui.notify(formatAllPRDsStatus(found.state), "info");
}
// ─── /ralpi resume ─────────────────────────────────────────────────────────── // ─── /ralpi resume ───────────────────────────────────────────────────────────
@@ -546,89 +537,7 @@ async function handleResume(
} }
// ─── /ralpi next ───────────────────────────────────────────────────────────── // ─── /ralpi next ─────────────────────────────────────────────────────────────
// (removed — use /ralpi run to execute tasks)
async function handleNext(
ctx: ExtensionContext,
args: string[],
sendChatMessage?: SendChatMessage,
parentModel?: unknown,
parentThinkingLevel?: unknown,
): Promise<void> {
let taskFile: string;
let projectDir: string;
let found: ReturnType<typeof findProgressFile>;
if (args[0]) {
taskFile = resolveTaskArg(args[0], process.cwd());
found = findProgressFile(process.cwd(), taskFile);
if (found) {
projectDir = path.dirname(path.dirname(found.path));
} else {
projectDir = process.cwd();
}
} else {
found = findProgressFile(process.cwd());
if (!found) {
ctx.ui.notify(
"No .ralpi/progress.json found. Start with /ralpi run [task-file]",
"warning",
);
return;
}
taskFile = found.state.prds
? Object.values(found.state.prds)[0].sourcePath
: found.state.sourcePath;
projectDir = path.dirname(path.dirname(found.path));
}
const project = parseTaskFile(taskFile);
if (!Array.isArray(project.tasks)) {
throw new Error(
`Parsed project from ${taskFile} has invalid tasks: expected array, got ${typeof project.tasks}`,
);
}
const config = loadConfig(projectDir);
config.model = parentModel ?? ctx.model;
config.thinkingLevel = parentThinkingLevel;
const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey);
const completed = buildCompletedSet(progress, project);
const ready = getReadyTasks(project, completed);
if (ready.length === 0) {
ctx.ui.notify(
"No tasks ready to execute. All tasks completed or blocked.",
"info",
);
return;
}
const nextBatch = ready.slice(
0,
config.execution.maxParallel || ready.length,
);
for (const task of nextBatch) {
await executeBatch(
[task],
project,
config,
progress,
ctx,
{ parallel: false },
sendChatMessage,
projectDir,
);
updateTaskInFile(taskFile, task.id, progress.getTaskStatus(task.id));
}
ctx.ui.notify(
`Executed: ${nextBatch
.map((t) => t.id)
.join(", ")}\n\n${formatProgressStatus(progress.getState())}`,
"info",
);
}
// ─── /ralpi reset ──────────────────────────────────────────────────────────── // ─── /ralpi reset ────────────────────────────────────────────────────────────

View File

@@ -2,32 +2,49 @@
"name": "ralpi", "name": "ralpi",
"version": "0.1.0", "version": "0.1.0",
"description": "Execute tasks from task files/PRD's using DAG-based dependency resolution with persistent progress tracking", "description": "Execute tasks from task files/PRD's using DAG-based dependency resolution with persistent progress tracking",
"main": "dist/index.js",
"keywords": [ "keywords": [
"pi-package", "pi-package",
"pi-extension", "pi-extension",
"task-runner", "task-runner",
"dag", "dag",
"task-manager", "task-manager",
"ralpi-loop", "ralph-loop",
"prd" "prd"
], ],
"author": "", "author": "Michael Freno",
"license": "MIT", "license": "MIT",
"homepage": "https://github.com/mikefreno/ralpi",
"repository": {
"type": "git",
"url": "git+https://github.com/mikefreno/ralpi.git"
},
"bugs": {
"url": "https://github.com/mikefreno/ralpi/issues"
},
"files": [ "files": [
"dist/", "index.ts",
"src/",
"skills/", "skills/",
"prompts/", "prompts/",
"index.ts" "README.md",
"LICENSE"
], ],
"scripts": { "scripts": {
"build": "tsc", "typecheck": "tsc --noEmit",
"watch": "tsc --watch", "prepublishOnly": "tsc --noEmit"
"prepublishOnly": "npm run build" },
"engines": {
"bun": ">=1.1.0"
}, },
"pi": { "pi": {
"extensions": [ "extensions": [
"./dist/index.js" "./index.ts"
],
"skills": [
"./skills"
],
"prompts": [
"./prompts"
] ]
}, },
"dependencies": { "dependencies": {
@@ -37,6 +54,9 @@
"@earendil-works/pi-coding-agent": "*", "@earendil-works/pi-coding-agent": "*",
"@earendil-works/pi-tui": "*" "@earendil-works/pi-tui": "*"
}, },
"publishConfig": {
"access": "public"
},
"devDependencies": { "devDependencies": {
"@types/node": "^20.0.0", "@types/node": "^20.0.0",
"typescript": "^5.3.0" "typescript": "^5.3.0"

View File

@@ -1,5 +1,33 @@
import type { Task, ExecutionBatch, ExecutionPlan, Project } from "./types"; import type { Task, ExecutionBatch, ExecutionPlan, Project } from "./types";
// ─── Blocked Tasks ───────────────────────────────────────────────────────────
/**
* Find tasks that are blocked (direct or transitive) due to failed dependencies.
* Returns a Set of blocked task IDs.
*/
export function getBlockedTasks(
pendingTasks: Task[],
failedTaskIds: Set<string>,
): Set<string> {
const blocked = new Set<string>();
let changed = true;
while (changed) {
changed = false;
for (const task of pendingTasks) {
if (blocked.has(task.id)) continue;
const deps = task.dependencies || [];
if (deps.some((dep) => failedTaskIds.has(dep))) {
blocked.add(task.id);
changed = true;
}
}
}
return blocked;
}
// ─── Main Entry ────────────────────────────────────────────────────────────── // ─── Main Entry ──────────────────────────────────────────────────────────────
/** /**
@@ -10,16 +38,15 @@ export function buildExecutionPlan(
project: Project, project: Project,
completed: Set<string>, completed: Set<string>,
parallelGroup?: number, parallelGroup?: number,
failedTaskIds: Set<string> = new Set(),
): ExecutionPlan { ): ExecutionPlan {
const allTasks = new Map(project.tasks.map((t) => [t.id, t]));
// Filter out already completed tasks // Filter out already completed tasks
const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); const pendingTasks = project.tasks.filter((t) => !completed.has(t.id));
// If parallel_group is explicitly set, use group-based batching // If parallel_group is explicitly set, use group-based batching
if (parallelGroup !== undefined) { if (parallelGroup !== undefined) {
return { return {
batches: buildParallelGroupBatches(pendingTasks, allTasks, completed), batches: buildParallelGroupBatches(pendingTasks, failedTaskIds),
totalTasks: pendingTasks.length, totalTasks: pendingTasks.length,
skippedTasks: project.tasks.filter((t) => completed.has(t.id)), skippedTasks: project.tasks.filter((t) => completed.has(t.id)),
}; };
@@ -27,7 +54,7 @@ export function buildExecutionPlan(
// Use dependency-based Kahn's algorithm // Use dependency-based Kahn's algorithm
return { return {
batches: buildBatches(pendingTasks, allTasks, completed), batches: buildBatches(pendingTasks, failedTaskIds),
totalTasks: pendingTasks.length, totalTasks: pendingTasks.length,
skippedTasks: project.tasks.filter((t) => completed.has(t.id)), skippedTasks: project.tasks.filter((t) => completed.has(t.id)),
}; };
@@ -41,9 +68,18 @@ export function buildExecutionPlan(
export function buildSequentialPlan( export function buildSequentialPlan(
project: Project, project: Project,
completed: Set<string>, completed: Set<string>,
failedTaskIds: Set<string> = new Set(),
): ExecutionPlan { ): ExecutionPlan {
const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); const pendingTasks = project.tasks.filter((t) => !completed.has(t.id));
const batches: ExecutionBatch[] = pendingTasks.map((task, i) => ({
// Mark tasks with failed dependencies as skipped
const blocked = getBlockedTasks(pendingTasks, failedTaskIds);
const skippedTasks = project.tasks.filter(
(t) => completed.has(t.id) || blocked.has(t.id),
);
const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id));
const batches: ExecutionBatch[] = activeTasks.map((task, i) => ({
tasks: [task], tasks: [task],
batchIndex: i, batchIndex: i,
})); }));
@@ -51,7 +87,7 @@ export function buildSequentialPlan(
return { return {
batches, batches,
totalTasks: pendingTasks.length, totalTasks: pendingTasks.length,
skippedTasks: project.tasks.filter((t) => completed.has(t.id)), skippedTasks,
}; };
} }
@@ -59,12 +95,15 @@ export function buildSequentialPlan(
function buildBatches( function buildBatches(
pendingTasks: Task[], pendingTasks: Task[],
allTasks: Map<string, Task>, failedTaskIds: Set<string>,
completed: Set<string>,
): ExecutionBatch[] { ): ExecutionBatch[] {
const batches: ExecutionBatch[] = []; const batches: ExecutionBatch[] = [];
const done = new Set(completed); const done = new Set<string>();
const remaining = new Set(pendingTasks.map((t) => t.id)); const blocked = getBlockedTasks(pendingTasks, failedTaskIds);
const pendingSet = new Set(pendingTasks.map((t) => t.id));
const remaining = new Set(
pendingTasks.filter((t) => !blocked.has(t.id)).map((t) => t.id),
);
while (remaining.size > 0) { while (remaining.size > 0) {
// Find tasks whose dependencies are all satisfied // Find tasks whose dependencies are all satisfied
@@ -74,7 +113,7 @@ function buildBatches(
const deps = task.dependencies || []; const deps = task.dependencies || [];
const depsSatisfied = deps.every( const depsSatisfied = deps.every(
(dep) => done.has(dep) || !allTasks.has(dep), (dep) => done.has(dep) || !pendingSet.has(dep),
); );
if (depsSatisfied) { if (depsSatisfied) {
@@ -108,12 +147,14 @@ function buildBatches(
*/ */
function buildParallelGroupBatches( function buildParallelGroupBatches(
pendingTasks: Task[], pendingTasks: Task[],
allTasks: Map<string, Task>, failedTaskIds: Set<string>,
completed: Set<string>,
): ExecutionBatch[] { ): ExecutionBatch[] {
const blocked = getBlockedTasks(pendingTasks, failedTaskIds);
const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id));
const groups = new Map<number, Task[]>(); const groups = new Map<number, Task[]>();
for (const task of pendingTasks) { for (const task of activeTasks) {
const group = task.parallelGroup ?? 0; const group = task.parallelGroup ?? 0;
if (!groups.has(group)) groups.set(group, []); if (!groups.has(group)) groups.set(group, []);
groups.get(group)!.push(task); groups.get(group)!.push(task);
@@ -121,7 +162,7 @@ function buildParallelGroupBatches(
const sortedGroups = Array.from(groups.entries()).sort((a, b) => a[0] - b[0]); const sortedGroups = Array.from(groups.entries()).sort((a, b) => a[0] - b[0]);
return sortedGroups.map(([groupNum, tasks], i) => ({ return sortedGroups.map(([_groupNum, tasks], i) => ({
tasks, tasks,
batchIndex: i, batchIndex: i,
})); }));

View File

@@ -1,3 +1,4 @@
import { truncateToWidth } from "@earendil-works/pi-tui";
import * as path from "node:path"; import * as path from "node:path";
import type { Task, Project, Reflection, ToolUsage } from "./types"; import type { Task, Project, Reflection, ToolUsage } from "./types";
import type { RalpiConfig } from "./types"; import type { RalpiConfig } from "./types";
@@ -83,6 +84,27 @@ class ModelRoundRobin {
this.assignments.delete(taskId); this.assignments.delete(taskId);
} }
} }
/**
* Advance a task to the next model slot without going through freed slots.
* Used for model failover — when the current model is down, skip to the
* next one instead of re-assigning the same freed index.
*/
advance(taskId: string): unknown {
const currentIndex = this.assignments.get(taskId);
if (currentIndex === undefined) {
// No current assignment — fresh assign (fallback, shouldn't happen)
return this.assign(taskId);
}
// If this index was freed (e.g. from an earlier release call that raced),
// remove it from freeSlots so it's not handed out to another task.
const freeIdx = this.freeSlots.indexOf(currentIndex);
if (freeIdx !== -1) this.freeSlots.splice(freeIdx, 1);
// Advance to the next index (circular)
const nextIndex = (currentIndex + 1) % this.models.length;
this.assignments.set(taskId, nextIndex);
return this.models[nextIndex];
}
} }
/** Shared state for parallel-batch widget. Each running task writes its /** Shared state for parallel-batch widget. Each running task writes its
@@ -162,9 +184,13 @@ export async function runTask(
} else { } else {
// Build widget lines from current state. Live widgets can't expand/collapse // Build widget lines from current state. Live widgets can't expand/collapse
// like chat messages, so we always truncate to MAX_COLLAPSED recent calls. // like chat messages, so we always truncate to MAX_COLLAPSED recent calls.
const buildLines = (t: typeof ctx.ui.theme): string[] => { const truncateWidth = 74; // Account for widget container padding
const buildLines = (t: typeof ctx.ui.theme, width?: number): string[] => {
const effectiveWidth = width
? Math.min(width, truncateWidth)
: truncateWidth;
const frame = t.fg("accent", SPINNER_FRAMES[frameIndex]); const frame = t.fg("accent", SPINNER_FRAMES[frameIndex]);
const lines = [`${frame} ${taskHeader}`]; const lines = [truncateToWidth(`${frame} ${taskHeader}`, effectiveWidth)];
if (toolCalls.length > 0) { if (toolCalls.length > 0) {
if (toolCalls.length <= MAX_COLLAPSED) { if (toolCalls.length <= MAX_COLLAPSED) {
@@ -173,18 +199,27 @@ export async function runTask(
const isLast = i === toolCalls.length - 1; const isLast = i === toolCalls.length - 1;
const branch = isLast ? " └── " : " ├── "; const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${entry.name}]`); const tag = t.fg("accent", `[${entry.name}]`);
lines.push(`${branch}${tag} ${entry.label}`); lines.push(
truncateToWidth(`${branch}${tag} ${entry.label}`, effectiveWidth),
);
} }
} else { } else {
const shown = toolCalls.slice(-MAX_COLLAPSED); const shown = toolCalls.slice(-MAX_COLLAPSED);
const remaining = toolCalls.length - shown.length; const remaining = toolCalls.length - shown.length;
lines.push(t.fg("dim", ` ├── …${remaining} earlier`)); lines.push(
truncateToWidth(
t.fg("dim", ` ├── …${remaining} earlier`),
effectiveWidth,
),
);
for (let i = 0; i < shown.length; i++) { for (let i = 0; i < shown.length; i++) {
const entry = shown[i]; const entry = shown[i];
const isLast = i === shown.length - 1; const isLast = i === shown.length - 1;
const branch = isLast ? " └── " : " ├── "; const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${entry.name}]`); const tag = t.fg("accent", `[${entry.name}]`);
lines.push(`${branch}${tag} ${entry.label}`); lines.push(
truncateToWidth(`${branch}${tag} ${entry.label}`, effectiveWidth),
);
} }
} }
} }
@@ -194,7 +229,7 @@ export async function runTask(
ctx.ui.setWidget(widgetKey, (tui, t) => { ctx.ui.setWidget(widgetKey, (tui, t) => {
widgetTui = tui; widgetTui = tui;
return { return {
render: () => buildLines(t), render: (width?: number) => buildLines(t, width),
invalidate: () => widgetTui?.requestRender(), invalidate: () => widgetTui?.requestRender(),
}; };
}); });
@@ -261,8 +296,8 @@ export async function runTask(
} }
if (!output.success) { if (!output.success) {
sendChatMessage?.(`${taskHeader}${output.error}`); // Failure reporting is handled by the caller (executeTask) to avoid
ctx.ui.notify(`Task ${task.id} failed: ${output.error}`, "error"); // duplicate messages when model failover or retry cycling is active.
return { return {
success: false, success: false,
error: output.error, error: output.error,
@@ -378,6 +413,7 @@ export async function executeBatch(
// Execute sequentially // Execute sequentially
for (const task of tasks) { for (const task of tasks) {
try {
const model = roundRobin?.assign(task.id); const model = roundRobin?.assign(task.id);
await executeTask( await executeTask(
task, task,
@@ -391,6 +427,16 @@ export async function executeBatch(
model, model,
roundRobin, roundRobin,
); );
} catch (error) {
// Task failed — stop the batch. Dependent tasks are blocked by
// the DAG layer (getBlockedTasks) so they won't appear in this batch.
roundRobin?.release(task.id);
const errorMsg = error instanceof Error ? error.message : String(error);
progress.markFailed(task.id, errorMsg);
sendChatMessage?.(`${task.id} · ${task.title}${errorMsg}`);
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
break;
}
} }
} }
@@ -414,7 +460,11 @@ async function executeBatchParallel(
const widgetKey = `ralpi-parallel-${Date.now()}`; const widgetKey = `ralpi-parallel-${Date.now()}`;
let widgetTui: { requestRender(): void } | null = null; let widgetTui: { requestRender(): void } | null = null;
const buildBatchLines = (t: typeof ctx.ui.theme): string[] => { const buildBatchLines = (
t: typeof ctx.ui.theme,
width?: number,
): string[] => {
const effectiveWidth = width || 74;
const lines: string[] = []; const lines: string[] = [];
const sortedIds = Array.from(sharedState.keys()).sort(); const sortedIds = Array.from(sharedState.keys()).sort();
@@ -425,7 +475,9 @@ async function executeBatchParallel(
? "✓" ? "✓"
: "✗" : "✗"
: t.fg("accent", SPINNER_FRAMES[entry.frameIndex]); : t.fg("accent", SPINNER_FRAMES[entry.frameIndex]);
lines.push(`${frame} ${entry.taskHeader}`); lines.push(
truncateToWidth(`${frame} ${entry.taskHeader}`, effectiveWidth),
);
if (entry.toolCalls.length > 0) { if (entry.toolCalls.length > 0) {
if (entry.toolCalls.length <= MAX_COLLAPSED) { if (entry.toolCalls.length <= MAX_COLLAPSED) {
@@ -434,18 +486,27 @@ async function executeBatchParallel(
const isLast = i === entry.toolCalls.length - 1; const isLast = i === entry.toolCalls.length - 1;
const branch = isLast ? " └── " : " ├── "; const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${tc.name}]`); const tag = t.fg("accent", `[${tc.name}]`);
lines.push(`${branch}${tag} ${tc.label}`); lines.push(
truncateToWidth(`${branch}${tag} ${tc.label}`, effectiveWidth),
);
} }
} else { } else {
const shown = entry.toolCalls.slice(-MAX_COLLAPSED); const shown = entry.toolCalls.slice(-MAX_COLLAPSED);
const remaining = entry.toolCalls.length - shown.length; const remaining = entry.toolCalls.length - shown.length;
lines.push(t.fg("dim", ` ├── …${remaining} earlier`)); lines.push(
truncateToWidth(
t.fg("dim", ` ├── …${remaining} earlier`),
effectiveWidth,
),
);
for (let i = 0; i < shown.length; i++) { for (let i = 0; i < shown.length; i++) {
const tc = shown[i]; const tc = shown[i];
const isLast = i === shown.length - 1; const isLast = i === shown.length - 1;
const branch = isLast ? " └── " : " ├── "; const branch = isLast ? " └── " : " ├── ";
const tag = t.fg("accent", `[${tc.name}]`); const tag = t.fg("accent", `[${tc.name}]`);
lines.push(`${branch}${tag} ${tc.label}`); lines.push(
truncateToWidth(`${branch}${tag} ${tc.label}`, effectiveWidth),
);
} }
} }
} }
@@ -456,7 +517,7 @@ async function executeBatchParallel(
ctx.ui.setWidget(widgetKey, (tui, t) => { ctx.ui.setWidget(widgetKey, (tui, t) => {
widgetTui = tui; widgetTui = tui;
return { return {
render: () => buildBatchLines(t), render: (width?: number) => buildBatchLines(t, width),
invalidate: () => widgetTui?.requestRender(), invalidate: () => widgetTui?.requestRender(),
}; };
}); });
@@ -488,7 +549,16 @@ async function executeBatchParallel(
sharedState, sharedState,
assignedModel, assignedModel,
roundRobin, roundRobin,
), ).catch((error) => {
// Safety net: one task failure should never crash the batch.
// executeTask already marks failed and notifies, but catch as
// a last resort so the error doesn't propagate and crash pi.
roundRobin?.release(task.id);
const errorMsg = error instanceof Error ? error.message : String(error);
progress.markFailed(task.id, errorMsg);
sendChatMessage?.(`${task.id} · ${task.title}${errorMsg}`);
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
}),
}); });
// Limit concurrency // Limit concurrency
@@ -531,9 +601,11 @@ async function executeTask(
let currentModel: unknown = assignedModel ?? config.model; let currentModel: unknown = assignedModel ?? config.model;
while (modelAttempt < maxModelAttempts) { while (modelAttempt < maxModelAttempts) {
// Get the next model from round-robin (on first try, use the pre-assigned model) // On subsequent model attempts, advance to the next model.
// Uses advance() instead of assign() so we don't get stuck on
// the same freed slot when the current model is down.
if (modelAttempt > 0 && roundRobin) { if (modelAttempt > 0 && roundRobin) {
currentModel = roundRobin.assign(task.id); currentModel = roundRobin.advance(task.id);
} }
let retries = 0; let retries = 0;
@@ -584,11 +656,12 @@ async function executeTask(
// Agent session failed (provider error). // Agent session failed (provider error).
// If we have more models, cycle immediately — don't waste retries. // If we have more models, cycle immediately — don't waste retries.
if (roundRobin && modelAttempt < maxModelAttempts - 1) { if (roundRobin && modelAttempt < maxModelAttempts - 1) {
roundRobin.release(task.id); // Don't release — advance() already handles the transition.
// release() would put the slot in freeSlots, then assign()
// would pick it right back up, getting stuck on the same model.
modelAttempt++; modelAttempt++;
ctx.ui.notify( sendChatMessage?.(
`Task ${task.id}: model failed, trying next (${modelAttempt + 1}/${maxModelAttempts}): ${result.error}`, `~ ${task.id} · ${task.title} trying model ${modelAttempt + 1}/${maxModelAttempts} (previous: ${result.error})`,
"warning",
); );
break; // exit retry loop, cycle to next model break; // exit retry loop, cycle to next model
} }
@@ -596,9 +669,8 @@ async function executeTask(
// No more models — use normal retry logic // No more models — use normal retry logic
if (retries < maxRetries) { if (retries < maxRetries) {
retries = progress.incrementRetry(task.id); retries = progress.incrementRetry(task.id);
ctx.ui.notify( sendChatMessage?.(
`Retrying task ${task.id} (${retries}/${maxRetries}): ${result.error}`, `~ ${task.id} · ${task.title} — retrying (${retries}/${maxRetries}): ${result.error}`,
"warning",
); );
// Exponential backoff // Exponential backoff
@@ -607,13 +679,22 @@ async function executeTask(
} else { } else {
// Max retries exceeded // Max retries exceeded
progress.markFailed(task.id, result.error || "Unknown error"); progress.markFailed(task.id, result.error || "Unknown error");
throw new Error(`Task ${task.id} failed: ${result.error}`); sendChatMessage?.(` ${task.id} · ${task.title} ${result.error}`);
ctx.ui.notify(
`Task ${task.id} failed after ${maxRetries} retries: ${
result.error || "Unknown error"
}`,
"error",
);
return;
} }
} catch (error) { } catch (error) {
roundRobin?.release(task.id); roundRobin?.release(task.id);
const errorMsg = error instanceof Error ? error.message : String(error); const errorMsg = error instanceof Error ? error.message : String(error);
progress.markFailed(task.id, errorMsg); progress.markFailed(task.id, errorMsg);
throw error; sendChatMessage?.(`${task.id} · ${task.title}${errorMsg}`);
ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error");
return;
} }
} }
@@ -621,9 +702,16 @@ async function executeTask(
modelAttempt++; modelAttempt++;
} }
// All models exhausted // All models exhausted — release the slot
roundRobin?.release(task.id);
progress.markFailed(task.id, "All configured models exhausted"); progress.markFailed(task.id, "All configured models exhausted");
throw new Error(`Task ${task.id} failed: all configured models exhausted`); sendChatMessage?.(
`${task.id} · ${task.title} — all ${maxModelAttempts} models exhausted`,
);
ctx.ui.notify(
`Task ${task.id} failed: all configured models exhausted`,
"error",
);
} }
// ─── Save Reflection to File ──────────────────────────────────────────────── // ─── Save Reflection to File ────────────────────────────────────────────────

View File

@@ -213,6 +213,14 @@ export class ProgressTracker {
.map(([id]) => id); .map(([id]) => id);
} }
/** Get IDs of all failed tasks */
getFailedTaskIds(): string[] {
const prd = this.getPRD();
return Object.entries(prd.tasks)
.filter(([, info]) => info.status === "failed")
.map(([id]) => id);
}
/** Get all reflections from completed tasks */ /** Get all reflections from completed tasks */
getAllReflections(): Reflection[] { getAllReflections(): Reflection[] {
const prd = this.getPRD(); const prd = this.getPRD();

View File

@@ -1,18 +1,15 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "ES2022", "target": "ES2022",
"module": "commonjs", "module": "ES2022",
"moduleResolution": "bundler",
"lib": ["ES2022"], "lib": ["ES2022"],
"outDir": "./dist", "noEmit": true,
"rootDir": "./",
"strict": true, "strict": true,
"esModuleInterop": true, "esModuleInterop": true,
"skipLibCheck": true, "skipLibCheck": true,
"forceConsistentCasingInFileNames": true, "forceConsistentCasingInFileNames": true,
"resolveJsonModule": true, "resolveJsonModule": true
"declaration": true,
"declarationMap": true,
"sourceMap": true
}, },
"include": ["index.ts", "src/**/*"], "include": ["index.ts", "src/**/*"],
"exclude": ["node_modules", "dist"] "exclude": ["node_modules", "dist"]