From 424e2fa8859cce4ca726aa2be57b6b4184f77804 Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Sun, 31 May 2026 11:44:47 -0400 Subject: [PATCH] Add loop-active marker, YAML task file support, and auto-updating PRD checkboxes - Persist loop-active state for widget re-instantiation after session reload - Add YAML task file parsing and update support via yaml library - Auto-update PRD source file checkboxes on task status changes - Add batchRender callback for real-time parallel widget animation - Normalize tabs-to-spaces indentation across source files - Use padStart(2, '0') for ID formatting instead of hardcoded prefix - Enable parallel execution for single-task DAG batches --- index.ts | 1237 ++++++++++++++++++++++++++++++----------------- src/dag.ts | 533 +++++++++++--------- src/executor.ts | 73 ++- src/parser.ts | 148 ++++-- src/utils.ts | 72 +++ 5 files changed, 1360 insertions(+), 703 deletions(-) diff --git a/index.ts b/index.ts index d67b720..da41449 100644 --- a/index.ts +++ b/index.ts @@ -1,25 +1,34 @@ import * as fs from "node:fs"; import * as path from "node:path"; import type { - ExtensionAPI, - ExtensionContext, + ExtensionAPI, + ExtensionContext, } from "@earendil-works/pi-coding-agent"; import { Box, Text } from "@earendil-works/pi-tui"; import { parseTaskFile, updateTaskInFile } from "./src/parser"; import { - buildExecutionPlan, - buildSequentialPlan, - formatExecutionPlan, + buildExecutionPlan, + buildSequentialPlan, + formatDependencyChain, + formatExecutionPlan, } from "./src/dag"; import { ProgressTracker } from "./src/progress"; import { buildPlanPrompt } from "./src/prompts"; import { formatReflections } from "./src/reflection"; -import { executeBatch, type SendChatMessage } from "./src/executor"; import { - loadConfig, - resolveTaskArg, - formatProgressStatus, - findProgressFile, + executeBatch, + SPINNER_FRAMES, + type SendChatMessage, +} from "./src/executor"; +import { + loadConfig, + resolveTaskArg, + formatProgressStatus, + findProgressFile, + writeLoopActive, + deleteLoopActive, + readLoopActive, + findRalpiDir, } from "./src/utils"; const COMMANDS = ["plan", "resume", "reset"] as const; @@ -33,422 +42,770 @@ type ExecutionMode = "parallel" | "sequential"; * Matches: @path, /path, ./path, ../path, path/to/file, path.md, path.yaml */ function looksLikePath(token: string): boolean { - return ( - token.startsWith("@") || - token.startsWith("/") || - token.startsWith("./") || - token.startsWith("../") || - token.includes("/") || - token.endsWith(".md") || - token.endsWith(".yaml") || - token.endsWith(".yml") - ); + return ( + token.startsWith("@") || + token.startsWith("/") || + token.startsWith("./") || + token.startsWith("../") || + token.includes("/") || + token.endsWith(".md") || + token.endsWith(".yaml") || + token.endsWith(".yml") + ); } /** Build the set of completed tasks from progress tracker and PRD checkboxes. */ function buildCompletedSet( - progress: ProgressTracker, - project: import("./src/types").Project, + progress: ProgressTracker, + project: import("./src/types").Project, ): Set { - const completed = new Set(progress.getCompletedTaskIds()); - for (const task of project.tasks) { - if (task.status === "completed") { - completed.add(task.id); - } - } - return completed; + const completed = new Set(progress.getCompletedTaskIds()); + for (const task of project.tasks) { + if (task.status === "completed") { + completed.add(task.id); + } + } + return completed; } /** Prompt user to select an execution mode with dependency validation. */ async function selectExecutionMode( - ctx: ExtensionContext, - project: import("./src/types").Project, - taskFile: string, - config: import("./src/types").RalpiConfig, + ctx: ExtensionContext, + project: import("./src/types").Project, + taskFile: string, + config: import("./src/types").RalpiConfig, ): Promise { - const mode = await ctx.ui.select("Execution mode for this run?", [ - `Parallel (where dependencies allow)[${config.execution.maxParallel} max]`, - "Sequential (one at a time)", - ]); - const isParallel = mode?.startsWith("Parallel") ?? false; + const mode = await ctx.ui.select("Execution mode for this run?", [ + `Parallel (where dependencies allow)[${config.execution.maxParallel} max]`, + "Sequential (one at a time)", + ]); + const isParallel = mode?.startsWith("Parallel") ?? false; - if (!isParallel) return "sequential"; + if (!isParallel) return "sequential"; - // Validate dependency graph for parallel mode - if (Object.keys(project.dependencies).length === 0) { - const hasDepsSection = await fs.promises - .readFile(taskFile, "utf-8") - .then((content) => /^##\s+Dependencies\s*$/m.test(content)) - .catch(() => false); + // Validate dependency graph for parallel mode + if (Object.keys(project.dependencies).length === 0) { + const hasDepsSection = await fs.promises + .readFile(taskFile, "utf-8") + .then((content) => /^##\s+Dependencies\s*$/m.test(content)) + .catch(() => false); - if (hasDepsSection) { - const choice = await ctx.ui.select( - "Found ## Dependencies section but no valid dependencies were parsed.\n\n" + - "This may be due to unsupported format. Parallel mode requires explicit dependencies.\n\n" + - "See README.md for supported dependency formats:\n" + - "- Arrow notation: `1 -> 2,3,4`\n" + - "- Natural language: `13 depends on 17, 18, 19, 20`\n\n" + - "Fall back to sequential mode?", - ["Yes, use sequential", "No, continue with parallel"], - ); - if (choice?.startsWith("Yes")) { - return "sequential"; - } - } - } + if (hasDepsSection) { + const choice = await ctx.ui.select( + "Found ## Dependencies section but no valid dependencies were parsed.\n\n" + + "This may be due to unsupported format. Parallel mode requires explicit dependencies.\n\n" + + "See README.md for supported dependency formats:\n" + + "- Arrow notation: `1 -> 2,3,4`\n" + + "- Natural language: `13 depends on 17, 18, 19, 20`\n\n" + + "Fall back to sequential mode?", + ["Yes, use sequential", "No, continue with parallel"], + ); + if (choice?.startsWith("Yes")) { + return "sequential"; + } + } + } - return "parallel"; + return "parallel"; } /** Build an execution plan based on the selected mode. */ function buildPlanByMode( - mode: ExecutionMode, - project: Parameters[0], - completed: Set, + mode: ExecutionMode, + project: Parameters[0], + completed: Set, ) { - return mode === "parallel" - ? buildExecutionPlan(project, completed) - : buildSequentialPlan(project, completed); + return mode === "parallel" + ? buildExecutionPlan(project, completed) + : buildSequentialPlan(project, completed); } /** Run all batches in a plan, updating the task file after each batch. */ async function executePlanBatches( - plan: ReturnType, - project: Parameters[0], - taskFile: string, - config: import("./src/types").RalpiConfig, - progress: ProgressTracker, - ctx: ExtensionContext, - mode: ExecutionMode, - sendChatMessage?: SendChatMessage, - projectDir?: string, + plan: ReturnType, + project: Parameters[0], + taskFile: string, + config: import("./src/types").RalpiConfig, + progress: ProgressTracker, + ctx: ExtensionContext, + mode: ExecutionMode, + sendChatMessage?: SendChatMessage, + projectDir?: string, ): Promise { - // Track failed task IDs across batches to block downstream tasks - const failedTaskIds = new Set(progress.getFailedTaskIds()); + // Write loop-active marker so widgets can be re-instantiated after a reload + if (projectDir) { + const allTaskIds = plan.batches.flatMap((b) => b.tasks.map((t) => t.id)); + writeLoopActive(projectDir, { + taskFile, + mode, + startedAt: new Date().toISOString(), + taskIds: allTaskIds, + prdKey: progress.getKey(), + }); + } - for (const batch of plan.batches) { - if (progress.getState().paused) { - ctx.ui.notify( - "Execution paused. Use /ralpi resume to continue.", - "warning", - ); - return; - } + // Track failed task IDs across batches to block downstream tasks + const failedTaskIds = new Set(progress.getFailedTaskIds()); - if (!Array.isArray(batch.tasks)) { - throw new Error( - `Batch ${ - batch.batchIndex - } has invalid tasks: expected array, got ${typeof batch.tasks}`, - ); - } + try { + for (const batch of plan.batches) { + if (progress.getState().paused) { + ctx.ui.notify( + "Execution paused. Use /ralpi resume to continue.", + "warning", + ); + return; + } - await executeBatch( - batch.tasks, - project, - config, - progress, - ctx, - { parallel: mode === "parallel" }, - sendChatMessage, - projectDir, - ); + if (!Array.isArray(batch.tasks)) { + throw new Error( + `Batch ${ + batch.batchIndex + } has invalid tasks: expected array, got ${typeof batch.tasks}`, + ); + } - for (const task of batch.tasks) { - const status = progress.getTaskStatus(task.id); - updateTaskInFile(taskFile, task.id, status); - } + await executeBatch( + batch.tasks, + project, + config, + progress, + ctx, + { parallel: mode === "parallel" }, + sendChatMessage, + projectDir, + ); - // Update failed task IDs after batch completes - const newFailed = progress.getFailedTaskIds(); - for (const id of newFailed) { - failedTaskIds.add(id); - } + for (const task of batch.tasks) { + const status = progress.getTaskStatus(task.id); + updateTaskInFile(taskFile, task.id, status); + } - // In sequential mode, stop after any failure - if (mode === "sequential" && failedTaskIds.size > 0) { - break; - } + // Update failed task IDs after batch completes + const newFailed = progress.getFailedTaskIds(); + for (const id of newFailed) { + failedTaskIds.add(id); + } - // In parallel mode, rebuild the plan to filter out newly blocked tasks - if (mode === "parallel") { - const completed = new Set(progress.getCompletedTaskIds()); - const newPlan = buildExecutionPlan( - project, - completed, - undefined, - failedTaskIds, - ); + // In sequential mode, stop after any failure + if (mode === "sequential" && failedTaskIds.size > 0) { + break; + } - // Replace remaining batches with filtered ones - const currentIdx = plan.batches.indexOf(batch); - const remainingBatches = newPlan.batches.filter( - (b) => b.batchIndex > currentIdx, - ); + // In parallel mode, rebuild the plan to filter out newly blocked tasks + if (mode === "parallel") { + const completed = new Set(progress.getCompletedTaskIds()); + const newPlan = buildExecutionPlan( + project, + completed, + undefined, + failedTaskIds, + ); - // Update the plan's batches in-place - plan.batches.length = 0; - plan.batches.push(...remainingBatches); + // Replace remaining batches with filtered ones + const currentIdx = plan.batches.indexOf(batch); + const remainingBatches = newPlan.batches.filter( + (b) => b.batchIndex > currentIdx, + ); - // Skip empty batches - if (remainingBatches.length === 0) { - break; - } - } - } + // Update the plan's batches in-place + plan.batches.length = 0; + plan.batches.push(...remainingBatches); + + // Skip empty batches + if (remainingBatches.length === 0) { + break; + } + } + } + } finally { + if (projectDir) { + deleteLoopActive(projectDir); + } + } } // ─── Extension Entry ──────────────────────────────────────────────────────── export default function ralpiLoopExtension(pi: ExtensionAPI): void { - // Register custom message renderer for ralpi progress messages. - // Renders an expandable tool-call tree: collapsed shows last 3 + "N more", - // expanded (Ctrl+O) shows every tool call. - pi.registerMessageRenderer( - "ralpi-progress", - (message, { expanded }, theme) => { - const details = message.details as - | { - phase?: string; - toolCalls?: Array<{ name: string; label: string }>; - } - | undefined; + // Register custom message renderer for ralpi progress messages. + // Renders an expandable tool-call tree: collapsed shows last 3 + "N more", + // expanded (Ctrl+O) shows every tool call. + pi.registerMessageRenderer( + "ralpi-progress", + (message, { expanded }, theme) => { + const details = message.details as + | { + phase?: string; + toolCalls?: Array<{ name: string; label: string }>; + } + | undefined; - const MAX_COLLAPSED = 3; - const lines: string[] = []; + const MAX_COLLAPSED = 3; + const lines: string[] = []; - // Header line — e.g. "✓ 05 · billing-subscriptions-trials (2m 14s)" - lines.push(String(message.content)); + // Header line — e.g. "✓ 05 · billing-subscriptions-trials (2m 14s)" + lines.push(String(message.content)); - // Build tool-call tree - if (details?.toolCalls && details.toolCalls.length > 0) { - const all = details.toolCalls; + // Build tool-call tree + if (details?.toolCalls && details.toolCalls.length > 0) { + const all = details.toolCalls; - if (expanded) { - // Expanded: show ALL tool calls - for (let i = 0; i < all.length; i++) { - const entry = all[i]; - const isLast = i === all.length - 1; - const branch = isLast ? " └── " : " ├── "; - const tag = theme.fg("accent", `[${entry.name}]`); - lines.push(`${branch}${tag} ${entry.label}`); - } - } else { - // Collapsed: last N + "X more" - const shown = all.slice(-MAX_COLLAPSED); - const remaining = all.length - shown.length; + if (expanded) { + // Expanded: show ALL tool calls + for (let i = 0; i < all.length; i++) { + const entry = all[i]; + const isLast = i === all.length - 1; + const branch = isLast ? " └── " : " ├── "; + const tag = theme.fg("accent", `[${entry.name}]`); + lines.push(`${branch}${tag} ${entry.label}`); + } + } else { + // Collapsed: last N + "X more" + const shown = all.slice(-MAX_COLLAPSED); + const remaining = all.length - shown.length; - if (remaining > 0) { - lines.push(theme.fg("dim", ` ├── ${remaining} more`)); - } + if (remaining > 0) { + lines.push(theme.fg("dim", ` ├── ${remaining} more`)); + } - for (let i = 0; i < shown.length; i++) { - const entry = shown[i]; - const isLast = i === shown.length - 1; - const branch = isLast ? " └── " : " ├── "; - const tag = theme.fg("accent", `[${entry.name}]`); - lines.push(`${branch}${tag} ${entry.label}`); - } - } - } + for (let i = 0; i < shown.length; i++) { + const entry = shown[i]; + const isLast = i === shown.length - 1; + const branch = isLast ? " └── " : " ├── "; + const tag = theme.fg("accent", `[${entry.name}]`); + lines.push(`${branch}${tag} ${entry.label}`); + } + } + } - const text = lines.join("\n"); - const box = new Box(1, 1, (t) => theme.bg("customMessageBg", t)); - box.addChild(new Text(text, 0, 0)); - return box; - }, - ); + const text = lines.join("\n"); + const box = new Box(1, 1, (t) => theme.bg("customMessageBg", t)); + box.addChild(new Text(text, 0, 0)); + return box; + }, + ); - pi.registerCommand("ralpi", { - description: - "Execute tasks from a task file using DAG-based dependency resolution", - handler: async (args: string, ctx: ExtensionContext) => { - const parts = (args || "").trim().split(/\s+/).filter(Boolean); + // ─── Reload detection: re-instantiate widgets when session reloads ────── + // + // When the user types /reload while ralpi tasks are executing, the old + // ExtensionContext is torn down and widgets (created via ctx.ui.setWidget) + // disappear. This handler detects the reload, reads the persisted loop-active + // marker and progress.json, and re-creates live-status widgets that show + // task progress with spinner animation and tool calls from session files. + pi.on("session_start", async (event, ctx) => { + if (event.reason !== "reload") return; - // Wraps pi.sendMessage() for posting status to the chat history. - // Uses "ralpi-progress" customType with a "progress" phase so the - // renderer omits the label prefix entirely (no [INFO] etc.). - // Accepts an optional meta object with toolCalls for the expandable view. - const sendProgress: SendChatMessage = ( - content: string, - meta?: { toolCalls?: Array<{ name: string; label: string }> }, - ) => { - pi.sendMessage({ - customType: "ralpi-progress", - content, - display: true, - details: { phase: "progress", toolCalls: meta?.toolCalls }, - }); - }; + // Find the ralpi project directory + const projectDir = findRalpiDir(ctx.cwd); + if (!projectDir) return; - // If no args, show plan. If first token looks like a path (@path, /path, ./path), - // route to run so the execution mode prompt fires. - if (parts.length === 0) { - return handlePlan(ctx, parts); - } - if (looksLikePath(parts[0])) { - return handleRun( - ctx, - parts, - sendProgress, - ctx.model, - pi.getThinkingLevel(), - ); - } + // Check if a task execution loop was active before the reload + const loopState = readLoopActive(projectDir); + if (!loopState) return; - const command = parts[0]; - switch (command) { - case "run": - return handleRun( - ctx, - parts.slice(1), - sendProgress, - ctx.model, - pi.getThinkingLevel(), - ); - case "plan": - pi.sendUserMessage("@task-manager"); - ctx.ui.notify("Opening Task Manager...", "info"); - return; - case "resume": - return handleResume( - ctx, - parts.slice(1), - sendProgress, - ctx.model, - pi.getThinkingLevel(), - ); - case "reset": - return handleReset(ctx, parts.slice(1)); - default: { - // Auto-discover progress and offer resume - const found = findProgressFile(process.cwd()); - if (found) { - ctx.ui.notify( - `Unknown command: ${command}\n\nFound existing progress in ${ - found.path - }\nUse /ralpi resume to continue.\n\nAvailable: ${COMMANDS.join( - ", ", - )}`, - "warning", - ); - } else { - ctx.ui.notify( - `Unknown command: ${command}\nAvailable: ${COMMANDS.join(", ")}`, - "error", - ); - } - } - } - }, - }); + // Load progress state + let abortPolling = false; + const progressPath = path.join(projectDir, ".ralpi", "progress.json"); + const sessionsDir = path.join(projectDir, ".ralpi", "sessions"); + + // Parse the task file to get task titles + const titleMap = new Map(); + try { + const project = parseTaskFile(loopState.taskFile); + for (const task of project.tasks) { + titleMap.set(task.id, task.title); + } + } catch { + // If parsing fails, just use IDs without titles + } + + /** Read recent tool calls from a task's session file. */ + const readRecentToolCalls = ( + taskId: string, + maxLines = 30, + ): Array<{ name: string; label: string }> => { + try { + const files = fs + .readdirSync(sessionsDir) + .filter((f) => f.startsWith(taskId + "-")) + .sort(); + if (files.length === 0) return []; + const sessionPath = path.join(sessionsDir, files[files.length - 1]); + const content = fs.readFileSync(sessionPath, "utf-8"); + const lines = content + .split("\n") + .filter((l) => l.trim()) + .slice(-maxLines); + const calls: Array<{ name: string; label: string }> = []; + for (const line of lines) { + try { + const event = JSON.parse(line); + if (event.type === "tool_execution_start") { + calls.push({ + name: event.toolName, + label: formatToolLabel(event.toolName, event.args), + }); + } + } catch { + // Skip malformed lines + } + } + return calls; + } catch { + return []; + } + }; + + /** Format a tool call argument into a short label. */ + function formatToolLabel(name: string, args: unknown): string { + const a = args as Record | undefined; + if (!a) return name; + if (name === "bash") return String(a.command ?? "").slice(0, 70); + if (name === "write" || name === "read" || name === "edit") + return String(a.path ?? "").slice(0, 60); + if (name === "grep") + return `${a.pattern ?? "?"} — ${String(a.path ?? "").slice(0, 40)}`; + if (name === "find") return `${a.path ?? "."} — ${a.glob ?? "*"}`; + if (name === "ls") return String(a.path ?? ".").slice(0, 60); + return name; + } + + /** Re-read progress from disk (old tasks still writing to it). */ + const readTasks = (): Record | null => { + try { + const raw = fs.readFileSync(progressPath, "utf-8"); + const parsed = JSON.parse(raw) as Record; + return parsed.prds?.[loopState.prdKey]?.tasks ?? parsed.tasks ?? null; + } catch { + return null; + } + }; + + // Early exit: if all tasks already finished during the reload, just clean up + const initialTasks = readTasks(); + if (initialTasks) { + const remaining = Object.values(initialTasks).filter( + (t) => t.status === "in_progress", + ).length; + if (remaining === 0) { + ctx.ui.notify("All ralpi tasks completed during reload.", "info"); + deleteLoopActive(projectDir); + return; + } + } + + // Show a status notification for the reconnect + const taskCount = loopState.taskIds.length; + ctx.ui.notify( + `Reconnected to running ralpi execution (${taskCount} tasks, ${loopState.mode} mode)`, + "info", + ); + + // Shared state for the widget + let tickCount = 0; + const MAX_COLLAPSED = 3; + + if (loopState.mode === "parallel") { + // ── Parallel mode: single batch widget ── + const widgetKey = `ralpi-parallel-reconnect-${Date.now()}`; + let widgetTui: { requestRender(): void } | null = null; + + const buildBatchLines = (t: typeof ctx.ui.theme): string[] => { + const tasks = readTasks(); + if (!tasks) return [t.fg("dim", "(waiting for progress...)")]; + + const lines: string[] = []; + // Only show tasks that have started (in_progress, completed, failed). + // Pending/unstarted tasks are noise after a reload. + const sortedIds = [...loopState.taskIds].sort().filter((id) => { + const info = tasks[id]; + return info && info.status !== "pending"; + }); + + // If no tasks have started yet, show nothing — polling will pick up + // changes within 500ms. + if (sortedIds.length === 0) return [t.fg("dim", "(starting tasks...)")]; + + for (const id of sortedIds) { + const info = tasks[id]!; + const title = titleMap.get(id); + const header = title ? `${id} · ${title}` : id; + + // Status icon + if (info.status === "completed") { + lines.push(`${t.fg("success", "✓")} ${header}`); + } else if (info.status === "failed") { + lines.push(`${t.fg("error", "✗")} ${header}`); + } else if (info.status === "in_progress") { + const frame = t.fg( + "accent", + SPINNER_FRAMES[tickCount % SPINNER_FRAMES.length], + ); + lines.push(`${frame} ${header}`); + + // Show recent tool calls for active tasks + const toolCalls = readRecentToolCalls(id); + if (toolCalls.length > 0) { + if (toolCalls.length <= MAX_COLLAPSED) { + for (let i = 0; i < toolCalls.length; i++) { + const tc = toolCalls[i]; + const isLast = i === toolCalls.length - 1; + const branch = isLast ? " └── " : " ├── "; + lines.push( + `${branch}${t.fg("accent", `[${tc.name}]`)} ${tc.label}`, + ); + } + } else { + const shown = toolCalls.slice(-MAX_COLLAPSED); + const remaining = toolCalls.length - shown.length; + lines.push(t.fg("dim", ` ├── …${remaining} earlier`)); + for (let i = 0; i < shown.length; i++) { + const tc = shown[i]; + const isLast = i === shown.length - 1; + const branch = isLast ? " └── " : " ├── "; + lines.push( + `${branch}${t.fg("accent", `[${tc.name}]`)} ${tc.label}`, + ); + } + } + } + } + } + return lines; + }; + + ctx.ui.setWidget(widgetKey, (tui, t) => { + widgetTui = tui; + return { + render: () => buildBatchLines(t), + invalidate: () => widgetTui?.requestRender(), + }; + }); + + // 100ms tick: advances spinner frame every tick, refreshes + // progress + tool calls every 5 ticks (500ms). + const tickTimer = setInterval(() => { + if (abortPolling) return; + tickCount++; + widgetTui?.requestRender(); + + if (tickCount % 5 === 0) { + const tasks = readTasks(); + if (!tasks) return; + const activeCount = Object.values(tasks).filter( + (t) => t.status === "in_progress", + ).length; + if (activeCount === 0) { + clearInterval(tickTimer); + ctx.ui.setWidget(widgetKey, undefined); + deleteLoopActive(projectDir); + } + } + }, 100); + + // Clean up timer when extension is shut down + pi.on("session_shutdown", () => { + abortPolling = true; + clearInterval(tickTimer); + }); + } else { + // ── Sequential mode: per-task widget ── + const currentTaskId = loopState.taskIds.find((id) => { + const tasks = readTasks(); + return tasks?.[id]?.status === "in_progress"; + }); + + if (currentTaskId) { + const widgetKey = `ralpi-task-${currentTaskId}`; + let widgetTui: { requestRender(): void } | null = null; + + const buildLines = (t: typeof ctx.ui.theme): string[] => { + const tasks = readTasks(); + const info = tasks?.[currentTaskId]; + const title = titleMap.get(currentTaskId); + const header = title ? `${currentTaskId} · ${title}` : currentTaskId; + const lines: string[] = []; + + if (!info || info.status === "pending") { + return [t.fg("dim", "(starting task...)")]; + } + + if (info.status === "completed") { + lines.push(`${t.fg("success", "✓")} ${header}`); + } else if (info.status === "failed") { + lines.push(`${t.fg("error", "✗")} ${header}`); + } else if (info.status === "in_progress") { + const frame = t.fg( + "accent", + SPINNER_FRAMES[tickCount % SPINNER_FRAMES.length], + ); + lines.push(`${frame} ${header}`); + + // Show recent tool calls + const toolCalls = readRecentToolCalls(currentTaskId); + if (toolCalls.length > 0) { + const shown = toolCalls.slice(-MAX_COLLAPSED); + const remaining = toolCalls.length - shown.length; + if (remaining > 0) { + lines.push(t.fg("dim", ` ├── …${remaining} earlier`)); + } + for (let i = 0; i < shown.length; i++) { + const tc = shown[i]; + const isLast = i === shown.length - 1; + const branch = isLast ? " └── " : " ├── "; + lines.push( + `${branch}${t.fg("accent", `[${tc.name}]`)} ${tc.label}`, + ); + } + } + } + return lines; + }; + + ctx.ui.setWidget(widgetKey, (tui, t) => { + widgetTui = tui; + return { + render: () => buildLines(t), + invalidate: () => widgetTui?.requestRender(), + }; + }); + + const tickTimer = setInterval(() => { + if (abortPolling) return; + tickCount++; + widgetTui?.requestRender(); + + if (tickCount % 5 === 0) { + const tasks = readTasks(); + if (!tasks) return; + const status = tasks[currentTaskId]?.status; + if (status !== "in_progress") { + clearInterval(tickTimer); + // Keep widget visible a moment, then clean up + setTimeout(() => { + ctx.ui.setWidget(widgetKey, undefined); + deleteLoopActive(projectDir); + }, 3000); + } + } + }, 100); + + pi.on("session_shutdown", () => { + abortPolling = true; + clearInterval(tickTimer); + }); + } else { + // No task actively in progress — show a "resume" hint + ctx.ui.notify( + "No running task found. Use /ralpi resume to continue execution.", + "warning", + ); + } + } + }); + + pi.registerCommand("ralpi", { + description: + "Execute tasks from a task file using DAG-based dependency resolution", + handler: async (args: string, ctx: ExtensionContext) => { + const parts = (args || "").trim().split(/\s+/).filter(Boolean); + + // Wraps pi.sendMessage() for posting status to the chat history. + // Uses "ralpi-progress" customType with a "progress" phase so the + // renderer omits the label prefix entirely (no [INFO] etc.). + // Accepts an optional meta object with toolCalls for the expandable view. + const sendProgress: SendChatMessage = ( + content: string, + meta?: { toolCalls?: Array<{ name: string; label: string }> }, + ) => { + pi.sendMessage({ + customType: "ralpi-progress", + content, + display: true, + details: { phase: "progress", toolCalls: meta?.toolCalls }, + }); + }; + + // If no args, show plan. If first token looks like a path (@path, /path, ./path), + // route to run so the execution mode prompt fires. + if (parts.length === 0) { + return handlePlan(ctx, parts); + } + if (looksLikePath(parts[0])) { + return handleRun( + ctx, + parts, + sendProgress, + ctx.model, + pi.getThinkingLevel(), + ); + } + + const command = parts[0]; + switch (command) { + case "run": + return handleRun( + ctx, + parts.slice(1), + sendProgress, + ctx.model, + pi.getThinkingLevel(), + ); + case "plan": + pi.sendUserMessage("@task-manager"); + ctx.ui.notify("Opening Task Manager...", "info"); + return; + case "resume": + return handleResume( + ctx, + parts.slice(1), + sendProgress, + ctx.model, + pi.getThinkingLevel(), + ); + case "reset": + return handleReset(ctx, parts.slice(1)); + default: { + // Auto-discover progress and offer resume + const found = findProgressFile(process.cwd()); + if (found) { + ctx.ui.notify( + `Unknown command: ${command}\n\nFound existing progress in ${ + found.path + }\nUse /ralpi resume to continue.\n\nAvailable: ${COMMANDS.join( + ", ", + )}`, + "warning", + ); + } else { + ctx.ui.notify( + `Unknown command: ${command}\nAvailable: ${COMMANDS.join(", ")}`, + "error", + ); + } + } + } + }, + }); } // ─── /ralpi plan ───────────────────────────────────────────────────────────── async function handlePlan( - ctx: ExtensionContext, - args: string[], + ctx: ExtensionContext, + args: string[], ): Promise { - const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd()); - const project = parseTaskFile(taskFile); - if (!Array.isArray(project.tasks)) { - throw new Error( - `Parsed project from ${taskFile} has invalid tasks: expected array, got ${typeof project.tasks}`, - ); - } + const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd()); + const project = parseTaskFile(taskFile); + if (!Array.isArray(project.tasks)) { + throw new Error( + `Parsed project from ${taskFile} has invalid tasks: expected array, got ${typeof project.tasks}`, + ); + } - const planPrompt = buildPlanPrompt(project); - const plan = buildExecutionPlan(project, new Set()); - const formatted = formatExecutionPlan(plan); + const planPrompt = buildPlanPrompt(project); + const plan = buildExecutionPlan(project, new Set()); + const formatted = formatExecutionPlan(plan); - ctx.ui.notify(`${planPrompt}\n\n${formatted}`, "info"); + ctx.ui.notify(`${planPrompt}\n\n${formatted}`, "info"); } // ─── /ralpi run ────────────────────────────────────────────────────────────── async function handleRun( - ctx: ExtensionContext, - args: string[], - sendChatMessage?: SendChatMessage, - parentModel?: unknown, - parentThinkingLevel?: unknown, + ctx: ExtensionContext, + args: string[], + sendChatMessage?: SendChatMessage, + parentModel?: unknown, + parentThinkingLevel?: unknown, ): Promise { - const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd()); + const taskFile = resolveTaskArg(args[0] || "README.md", process.cwd()); - // If targeting a specific task file and there's existing progress for it, - // auto-resume instead of starting fresh - const existingProgress = findProgressFile(process.cwd(), taskFile); - if (existingProgress) { - return handleResume( - ctx, - args.slice(0, 1), - sendChatMessage, - parentModel, - parentThinkingLevel, - ); - } + // If targeting a specific task file and there's existing progress for it, + // auto-resume instead of starting fresh + const existingProgress = findProgressFile(process.cwd(), taskFile); + if (existingProgress) { + return handleResume( + ctx, + args.slice(0, 1), + sendChatMessage, + parentModel, + parentThinkingLevel, + ); + } - // No existing progress for this task — check for any progress at all - const found = findProgressFile(process.cwd()); - if (found && !args[0]) { - // Offer to resume instead of starting fresh - const shouldResume = await ctx.ui.select( - "Found existing ralpi progress. Resume?", - ["Yes, resume", "No, start fresh"], - ); + // No existing progress for this task — check for any progress at all + const found = findProgressFile(process.cwd()); + if (found && !args[0]) { + // Offer to resume instead of starting fresh + const shouldResume = await ctx.ui.select( + "Found existing ralpi progress. Resume?", + ["Yes, resume", "No, start fresh"], + ); - if (shouldResume?.startsWith("Yes")) { - return handleResume( - ctx, - [], - sendChatMessage, - parentModel, - parentThinkingLevel, - ); - } - } + if (shouldResume?.startsWith("Yes")) { + return handleResume( + ctx, + [], + sendChatMessage, + parentModel, + parentThinkingLevel, + ); + } + } - const projectDir = found - ? path.dirname(path.dirname(found.path)) - : process.cwd(); + const projectDir = found + ? path.dirname(path.dirname(found.path)) + : process.cwd(); - const project = parseTaskFile(taskFile); - const config = loadConfig(projectDir); - config.model = parentModel ?? ctx.model; - config.thinkingLevel = parentThinkingLevel; - const progress = new ProgressTracker(projectDir, taskFile); + const project = parseTaskFile(taskFile); + const config = loadConfig(projectDir); + config.model = parentModel ?? ctx.model; + config.thinkingLevel = parentThinkingLevel; + const progress = new ProgressTracker(projectDir, taskFile); - const completed = buildCompletedSet(progress, project); - const mode = await selectExecutionMode(ctx, project, taskFile, config); - const plan = buildPlanByMode(mode, project, completed); + const completed = buildCompletedSet(progress, project); + const mode = await selectExecutionMode(ctx, project, taskFile, config); + const plan = buildPlanByMode(mode, project, completed); - // Show execution plan before starting so user can see batch breakdown - const formattedPlan = formatExecutionPlan(plan); - ctx.ui.notify(`${formattedPlan}\n\nStarting ${mode} execution...`, "info"); + // Show dependency chain + execution plan before starting + const depChain = formatDependencyChain(project); + const formattedPlan = formatExecutionPlan(plan); + if (mode === "parallel") { + ctx.ui.notify( + `${depChain}\n\n${formattedPlan}\n\nStarting parallel execution...`, + "info", + ); + } else { + ctx.ui.notify( + `${formattedPlan}\n\nStarting sequential execution...`, + "info", + ); + } - await executePlanBatches( - plan, - project, - taskFile, - config, - progress, - ctx, - mode, - sendChatMessage, - projectDir, - ); + await executePlanBatches( + plan, + project, + taskFile, + config, + progress, + ctx, + mode, + sendChatMessage, + projectDir, + ); - const state = progress.getState(); - const output = formatProgressStatus(state); + const state = progress.getState(); + const output = formatProgressStatus(state); - const reflections = progress.getAllReflections(); - if (reflections.length > 0) { - ctx.ui.notify(`${output}\n\n${formatReflections(reflections)}`, "info"); - return; - } + const reflections = progress.getAllReflections(); + if (reflections.length > 0) { + ctx.ui.notify(`${output}\n\n${formatReflections(reflections)}`, "info"); + return; + } - ctx.ui.notify(output, "info"); + ctx.ui.notify(output, "info"); } // ─── /ralpi status ─────────────────────────────────────────────────────────── @@ -457,73 +814,73 @@ async function handleRun( // ─── /ralpi resume ─────────────────────────────────────────────────────────── async function handleResume( - ctx: ExtensionContext, - args: string[], - sendChatMessage?: SendChatMessage, - parentModel?: unknown, - parentThinkingLevel?: unknown, + ctx: ExtensionContext, + args: string[], + sendChatMessage?: SendChatMessage, + parentModel?: unknown, + parentThinkingLevel?: unknown, ): Promise { - let taskFile: string; - let projectDir: string; - let found: ReturnType; + let taskFile: string; + let projectDir: string; + let found: ReturnType; - if (args[0]) { - taskFile = resolveTaskArg(args[0], process.cwd()); - found = findProgressFile(process.cwd(), taskFile); - if (!found) { - ctx.ui.notify( - `No existing progress for ${args[0]}. Start with /ralpi run ${args[0]}`, - "warning", - ); - return; - } - projectDir = path.dirname(path.dirname(found.path)); - } else { - found = findProgressFile(process.cwd()); - if (!found) { - ctx.ui.notify( - "No .ralpi/progress.json found. Start with /ralpi run [task-file]", - "warning", - ); - return; - } - projectDir = path.dirname(path.dirname(found.path)); - // For no-arg resume, use the first PRD's source path or legacy sourcePath - taskFile = found.state.prds - ? Object.values(found.state.prds)[0].sourcePath - : found.state.sourcePath; - } + if (args[0]) { + taskFile = resolveTaskArg(args[0], process.cwd()); + found = findProgressFile(process.cwd(), taskFile); + if (!found) { + ctx.ui.notify( + `No existing progress for ${args[0]}. Start with /ralpi run ${args[0]}`, + "warning", + ); + return; + } + projectDir = path.dirname(path.dirname(found.path)); + } else { + found = findProgressFile(process.cwd()); + if (!found) { + ctx.ui.notify( + "No .ralpi/progress.json found. Start with /ralpi run [task-file]", + "warning", + ); + return; + } + projectDir = path.dirname(path.dirname(found.path)); + // For no-arg resume, use the first PRD's source path or legacy sourcePath + taskFile = found.state.prds + ? Object.values(found.state.prds)[0].sourcePath + : found.state.sourcePath; + } - const project = parseTaskFile(taskFile); - if (!Array.isArray(project.tasks)) { - throw new Error( - `Parsed project from ${taskFile} has invalid tasks: expected array, got ${typeof project.tasks}`, - ); - } - const config = loadConfig(projectDir); - config.model = parentModel ?? ctx.model; - config.thinkingLevel = parentThinkingLevel; - const progress = new ProgressTracker(projectDir, taskFile, found.prdKey); + const project = parseTaskFile(taskFile); + if (!Array.isArray(project.tasks)) { + throw new Error( + `Parsed project from ${taskFile} has invalid tasks: expected array, got ${typeof project.tasks}`, + ); + } + const config = loadConfig(projectDir); + config.model = parentModel ?? ctx.model; + config.thinkingLevel = parentThinkingLevel; + const progress = new ProgressTracker(projectDir, taskFile, found.prdKey); - progress.setPaused(false); + progress.setPaused(false); - const completed = buildCompletedSet(progress, project); - const mode = await selectExecutionMode(ctx, project, taskFile, config); - const plan = buildPlanByMode(mode, project, completed); + const completed = buildCompletedSet(progress, project); + const mode = await selectExecutionMode(ctx, project, taskFile, config); + const plan = buildPlanByMode(mode, project, completed); - await executePlanBatches( - plan, - project, - taskFile, - config, - progress, - ctx, - mode, - sendChatMessage, - projectDir, - ); + await executePlanBatches( + plan, + project, + taskFile, + config, + progress, + ctx, + mode, + sendChatMessage, + projectDir, + ); - ctx.ui.notify(formatProgressStatus(progress.getState()), "info"); + ctx.ui.notify(formatProgressStatus(progress.getState()), "info"); } // ─── /ralpi next ───────────────────────────────────────────────────────────── @@ -532,35 +889,35 @@ async function handleResume( // ─── /ralpi reset ──────────────────────────────────────────────────────────── async function handleReset( - ctx: ExtensionContext, - args: string[], + ctx: ExtensionContext, + args: string[], ): Promise { - if (args[0]) { - const taskFile = resolveTaskArg(args[0], process.cwd()); - const found = findProgressFile(process.cwd(), taskFile); - const projectDir = found - ? path.dirname(path.dirname(found.path)) - : process.cwd(); - const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey); - progress.reset(); - } else { - const found = findProgressFile(process.cwd()); - if (!found) { - ctx.ui.notify( - "No .ralpi/progress.json found. Start with /ralpi run [task-file]", - "warning", - ); - return; - } - const projectDir = path.dirname(path.dirname(found.path)); - const progress = new ProgressTracker( - projectDir, - found.state.prds - ? Object.values(found.state.prds)[0].sourcePath - : found.state.sourcePath, - ); - progress.reset(); - } + if (args[0]) { + const taskFile = resolveTaskArg(args[0], process.cwd()); + const found = findProgressFile(process.cwd(), taskFile); + const projectDir = found + ? path.dirname(path.dirname(found.path)) + : process.cwd(); + const progress = new ProgressTracker(projectDir, taskFile, found?.prdKey); + progress.reset(); + } else { + const found = findProgressFile(process.cwd()); + if (!found) { + ctx.ui.notify( + "No .ralpi/progress.json found. Start with /ralpi run [task-file]", + "warning", + ); + return; + } + const projectDir = path.dirname(path.dirname(found.path)); + const progress = new ProgressTracker( + projectDir, + found.state.prds + ? Object.values(found.state.prds)[0].sourcePath + : found.state.sourcePath, + ); + progress.reset(); + } - ctx.ui.notify("Progress reset. All task statuses cleared.", "info"); + ctx.ui.notify("Progress reset. All task statuses cleared.", "info"); } diff --git a/src/dag.ts b/src/dag.ts index ee5b356..a98a235 100644 --- a/src/dag.ts +++ b/src/dag.ts @@ -7,25 +7,25 @@ import type { Task, ExecutionBatch, ExecutionPlan, Project } from "./types"; * Returns a Set of blocked task IDs. */ export function getBlockedTasks( - pendingTasks: Task[], - failedTaskIds: Set, + pendingTasks: Task[], + failedTaskIds: Set, ): Set { - const blocked = new Set(); + const blocked = new Set(); - let changed = true; - while (changed) { - changed = false; - for (const task of pendingTasks) { - if (blocked.has(task.id)) continue; - const deps = task.dependencies || []; - if (deps.some((dep) => failedTaskIds.has(dep))) { - blocked.add(task.id); - changed = true; - } - } - } + let changed = true; + while (changed) { + changed = false; + for (const task of pendingTasks) { + if (blocked.has(task.id)) continue; + const deps = task.dependencies || []; + if (deps.some((dep) => failedTaskIds.has(dep))) { + blocked.add(task.id); + changed = true; + } + } + } - return blocked; + return blocked; } // ─── Main Entry ────────────────────────────────────────────────────────────── @@ -35,29 +35,29 @@ export function getBlockedTasks( * Returns ordered batches of parallelizable tasks. */ export function buildExecutionPlan( - project: Project, - completed: Set, - parallelGroup?: number, - failedTaskIds: Set = new Set(), + project: Project, + completed: Set, + parallelGroup?: number, + failedTaskIds: Set = new Set(), ): ExecutionPlan { - // Filter out already completed tasks - const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); + // Filter out already completed tasks + const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); - // If parallel_group is explicitly set, use group-based batching - if (parallelGroup !== undefined) { - return { - batches: buildParallelGroupBatches(pendingTasks, failedTaskIds), - totalTasks: pendingTasks.length, - skippedTasks: project.tasks.filter((t) => completed.has(t.id)), - }; - } + // If parallel_group is explicitly set, use group-based batching + if (parallelGroup !== undefined) { + return { + batches: buildParallelGroupBatches(pendingTasks, failedTaskIds), + totalTasks: pendingTasks.length, + skippedTasks: project.tasks.filter((t) => completed.has(t.id)), + }; + } - // Use dependency-based Kahn's algorithm - return { - batches: buildBatches(pendingTasks, failedTaskIds), - totalTasks: pendingTasks.length, - skippedTasks: project.tasks.filter((t) => completed.has(t.id)), - }; + // Use dependency-based Kahn's algorithm + return { + batches: buildBatches(pendingTasks, failedTaskIds), + totalTasks: pendingTasks.length, + skippedTasks: project.tasks.filter((t) => completed.has(t.id)), + }; } // ─── Sequential Plan ───────────────────────────────────────────────────────── @@ -66,77 +66,77 @@ export function buildExecutionPlan( * Build a sequential execution plan (one task per batch) */ export function buildSequentialPlan( - project: Project, - completed: Set, - failedTaskIds: Set = new Set(), + project: Project, + completed: Set, + failedTaskIds: Set = new Set(), ): ExecutionPlan { - const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); + const pendingTasks = project.tasks.filter((t) => !completed.has(t.id)); - // Mark tasks with failed dependencies as skipped - const blocked = getBlockedTasks(pendingTasks, failedTaskIds); - const skippedTasks = project.tasks.filter( - (t) => completed.has(t.id) || blocked.has(t.id), - ); - const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id)); + // Mark tasks with failed dependencies as skipped + const blocked = getBlockedTasks(pendingTasks, failedTaskIds); + const skippedTasks = project.tasks.filter( + (t) => completed.has(t.id) || blocked.has(t.id), + ); + const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id)); - const batches: ExecutionBatch[] = activeTasks.map((task, i) => ({ - tasks: [task], - batchIndex: i, - })); + const batches: ExecutionBatch[] = activeTasks.map((task, i) => ({ + tasks: [task], + batchIndex: i, + })); - return { - batches, - totalTasks: pendingTasks.length, - skippedTasks, - }; + return { + batches, + totalTasks: pendingTasks.length, + skippedTasks, + }; } // ─── Kahn's Algorithm (Dependency-Based Batching) ──────────────────────────── function buildBatches( - pendingTasks: Task[], - failedTaskIds: Set, + pendingTasks: Task[], + failedTaskIds: Set, ): ExecutionBatch[] { - const batches: ExecutionBatch[] = []; - const done = new Set(); - const blocked = getBlockedTasks(pendingTasks, failedTaskIds); - const pendingSet = new Set(pendingTasks.map((t) => t.id)); - const remaining = new Set( - pendingTasks.filter((t) => !blocked.has(t.id)).map((t) => t.id), - ); + const batches: ExecutionBatch[] = []; + const done = new Set(); + const blocked = getBlockedTasks(pendingTasks, failedTaskIds); + const pendingSet = new Set(pendingTasks.map((t) => t.id)); + const remaining = new Set( + pendingTasks.filter((t) => !blocked.has(t.id)).map((t) => t.id), + ); - while (remaining.size > 0) { - // Find tasks whose dependencies are all satisfied - const ready: Task[] = []; - for (const task of pendingTasks) { - if (!remaining.has(task.id)) continue; + while (remaining.size > 0) { + // Find tasks whose dependencies are all satisfied + const ready: Task[] = []; + for (const task of pendingTasks) { + if (!remaining.has(task.id)) continue; - const deps = task.dependencies || []; - const depsSatisfied = deps.every( - (dep) => done.has(dep) || !pendingSet.has(dep), - ); + const deps = task.dependencies || []; + const depsSatisfied = deps.every( + (dep) => done.has(dep) || !pendingSet.has(dep), + ); - if (depsSatisfied) { - ready.push(task); - } - } + if (depsSatisfied) { + ready.push(task); + } + } - // Cycle detection: no tasks ready but some remain - if (ready.length === 0) { - const cycleTasks = Array.from(remaining); - throw new Error( - `Dependency cycle detected among tasks: ${cycleTasks.join(", ")}`, - ); - } + // Cycle detection: no tasks ready but some remain + if (ready.length === 0) { + const cycleTasks = Array.from(remaining); + throw new Error( + `Dependency cycle detected among tasks: ${cycleTasks.join(", ")}`, + ); + } - batches.push({ tasks: ready, batchIndex: batches.length }); - for (const task of ready) { - done.add(task.id); - remaining.delete(task.id); - } - } + batches.push({ tasks: ready, batchIndex: batches.length }); + for (const task of ready) { + done.add(task.id); + remaining.delete(task.id); + } + } - return batches; + return batches; } // ─── Parallel Group Batching ───────────────────────────────────────────────── @@ -146,26 +146,26 @@ function buildBatches( * Groups execute in ascending order; tasks within a group run concurrently. */ function buildParallelGroupBatches( - pendingTasks: Task[], - failedTaskIds: Set, + pendingTasks: Task[], + failedTaskIds: Set, ): ExecutionBatch[] { - const blocked = getBlockedTasks(pendingTasks, failedTaskIds); - const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id)); + const blocked = getBlockedTasks(pendingTasks, failedTaskIds); + const activeTasks = pendingTasks.filter((t) => !blocked.has(t.id)); - const groups = new Map(); + const groups = new Map(); - for (const task of activeTasks) { - const group = task.parallelGroup ?? 0; - if (!groups.has(group)) groups.set(group, []); - groups.get(group)!.push(task); - } + for (const task of activeTasks) { + const group = task.parallelGroup ?? 0; + if (!groups.has(group)) groups.set(group, []); + groups.get(group)!.push(task); + } - const sortedGroups = Array.from(groups.entries()).sort((a, b) => a[0] - b[0]); + const sortedGroups = Array.from(groups.entries()).sort((a, b) => a[0] - b[0]); - return sortedGroups.map(([_groupNum, tasks], i) => ({ - tasks, - batchIndex: i, - })); + return sortedGroups.map(([_groupNum, tasks], i) => ({ + tasks, + batchIndex: i, + })); } // ─── Cycle Detection ───────────────────────────────────────────────────────── @@ -174,51 +174,51 @@ function buildParallelGroupBatches( * Detect cycles in the task dependency graph */ export function detectCycles(project: Project): string[] { - const adj = new Map(); - for (const task of project.tasks) { - adj.set(task.id, task.dependencies || []); - } + const adj = new Map(); + for (const task of project.tasks) { + adj.set(task.id, task.dependencies || []); + } - const WHITE = 0; - const GRAY = 1; - const BLACK = 2; - const color = new Map(); + const WHITE = 0; + const GRAY = 1; + const BLACK = 2; + const color = new Map(); - for (const task of project.tasks) { - color.set(task.id, WHITE); - } + for (const task of project.tasks) { + color.set(task.id, WHITE); + } - const cycleNodes: string[] = []; + const cycleNodes: string[] = []; - function dfs(node: string): boolean { - color.set(node, GRAY); - const deps = adj.get(node) || []; + function dfs(node: string): boolean { + color.set(node, GRAY); + const deps = adj.get(node) || []; - for (const dep of deps) { - if (!adj.has(dep)) continue; - const depColor = color.get(dep); + for (const dep of deps) { + if (!adj.has(dep)) continue; + const depColor = color.get(dep); - if (depColor === GRAY) { - cycleNodes.push(dep); - return true; - } - if (depColor === WHITE && dfs(dep)) { - cycleNodes.push(node); - return true; - } - } + if (depColor === GRAY) { + cycleNodes.push(dep); + return true; + } + if (depColor === WHITE && dfs(dep)) { + cycleNodes.push(node); + return true; + } + } - color.set(node, BLACK); - return false; - } + color.set(node, BLACK); + return false; + } - for (const task of project.tasks) { - if (color.get(task.id) === WHITE) { - dfs(task.id); - } - } + for (const task of project.tasks) { + if (color.get(task.id) === WHITE) { + dfs(task.id); + } + } - return [...new Set(cycleNodes)]; + return [...new Set(cycleNodes)]; } // ─── Ready Tasks ───────────────────────────────────────────────────────────── @@ -227,14 +227,14 @@ export function detectCycles(project: Project): string[] { * Get tasks that are ready to execute (all dependencies completed) */ export function getReadyTasks( - project: Project, - completed: Set, + project: Project, + completed: Set, ): Task[] { - return project.tasks.filter((task) => { - if (completed.has(task.id)) return false; - const deps = task.dependencies || []; - return deps.every((dep) => completed.has(dep)); - }); + return project.tasks.filter((task) => { + if (completed.has(task.id)) return false; + const deps = task.dependencies || []; + return deps.every((dep) => completed.has(dep)); + }); } // ─── Critical Path ─────────────────────────────────────────────────────────── @@ -243,70 +243,159 @@ export function getReadyTasks( * Calculate the critical path (longest path through the DAG) */ export function getCriticalPath(project: Project): Task[] { - const taskMap = new Map(project.tasks.map((t) => [t.id, t])); - const dist = new Map(); - const prev = new Map(); + const taskMap = new Map(project.tasks.map((t) => [t.id, t])); + const dist = new Map(); + const prev = new Map(); - // Initialize - for (const task of project.tasks) { - dist.set(task.id, 1); - prev.set(task.id, null); - } + // Initialize + for (const task of project.tasks) { + dist.set(task.id, 1); + prev.set(task.id, null); + } - // Topological sort - const sorted: Task[] = []; - const visited = new Set(); + // Topological sort + const sorted: Task[] = []; + const visited = new Set(); - function visit(id: string) { - if (visited.has(id)) return; - visited.add(id); - const task = taskMap.get(id); - if (!task) return; + function visit(id: string) { + if (visited.has(id)) return; + visited.add(id); + const task = taskMap.get(id); + if (!task) return; - for (const dep of task.dependencies || []) { - visit(dep); - } - sorted.push(task); - } + for (const dep of task.dependencies || []) { + visit(dep); + } + sorted.push(task); + } - for (const task of project.tasks) { - visit(task.id); - } + for (const task of project.tasks) { + visit(task.id); + } - // Relax edges - for (const task of sorted) { - for (const dep of task.dependencies || []) { - const depDist = dist.get(dep); - if (depDist === undefined) continue; + // Relax edges + for (const task of sorted) { + for (const dep of task.dependencies || []) { + const depDist = dist.get(dep); + if (depDist === undefined) continue; - const newDist = depDist + 1; - const currentDist = dist.get(task.id) ?? 0; - if (newDist > currentDist) { - dist.set(task.id, newDist); - prev.set(task.id, dep); - } - } - } + const newDist = depDist + 1; + const currentDist = dist.get(task.id) ?? 0; + if (newDist > currentDist) { + dist.set(task.id, newDist); + prev.set(task.id, dep); + } + } + } - // Trace back from the longest path end - let maxTask = project.tasks[0]; - for (const task of project.tasks) { - const taskDist = dist.get(task.id) ?? 0; - const maxDist = dist.get(maxTask.id) ?? 0; - if (taskDist > maxDist) { - maxTask = task; - } - } + // Trace back from the longest path end + let maxTask = project.tasks[0]; + for (const task of project.tasks) { + const taskDist = dist.get(task.id) ?? 0; + const maxDist = dist.get(maxTask.id) ?? 0; + if (taskDist > maxDist) { + maxTask = task; + } + } - const path: Task[] = []; - let current: string | null = maxTask.id; - while (current) { - const task = taskMap.get(current); - if (task) path.unshift(task); - current = prev.get(current) || null; - } + const path: Task[] = []; + let current: string | null = maxTask.id; + while (current) { + const task = taskMap.get(current); + if (task) path.unshift(task); + current = prev.get(current) || null; + } - return path; + return path; +} + +// ─── Format Dependency Chain ───────────────────────────────────────────────── + +/** + * Format the dependency DAG as a tree for display. + * Rooted at tasks with no dependencies, showing what depends on what. + */ +export function formatDependencyChain(project: Project): string { + const taskMap = new Map(project.tasks.map((t) => [t.id, t])); + const lines: string[] = []; + + lines.push("## Dependency Chain"); + lines.push(""); + + if (project.tasks.length === 0) { + lines.push("(no tasks)"); + return lines.join("\n"); + } + + // Build reverse dependency map: taskId → [dependent taskIds] + const dependents = new Map(); + for (const task of project.tasks) { + dependents.set(task.id, []); + } + for (const task of project.tasks) { + for (const dep of task.dependencies) { + if (dependents.has(dep)) { + dependents.get(dep)!.push(task.id); + } + } + } + + // Root tasks: those with no dependencies + const roots = project.tasks.filter((t) => t.dependencies.length === 0); + const rendered = new Set(); + + function renderNode(taskId: string, prefix: string, isLast: boolean): void { + const task = taskMap.get(taskId); + if (!task) return; + + const alreadyRendered = rendered.has(taskId); + rendered.add(taskId); + + const connector = prefix ? (isLast ? "└── " : "├── ") : ""; + + if (alreadyRendered) { + lines.push(`${prefix}${connector}${task.id} · ${task.title}`); + return; + } + + const deps = + task.dependencies.length > 0 + ? ` ← needs ${task.dependencies.join(", ")}` + : " (root)"; + + lines.push( + `${prefix}${connector}${task.id} · ${task.title}${prefix ? "" : deps}`, + ); + + const children = (dependents.get(taskId) || []) + .filter((c) => c !== taskId) + .sort(); + + for (let i = 0; i < children.length; i++) { + const childPrefix = prefix + (isLast ? " " : "│ "); + renderNode(children[i], childPrefix, i === children.length - 1); + } + } + + for (let i = 0; i < roots.length; i++) { + renderNode(roots[i].id, "", i === roots.length - 1); + } + + // Tasks not reached from any root (have deps but no root-traversable path) + const unreached = project.tasks.filter((t) => !rendered.has(t.id)); + if (unreached.length > 0) { + lines.push(""); + lines.push("Orphan tasks (dependencies not in task list):"); + for (const t of unreached) { + const deps = + t.dependencies.length > 0 + ? ` ← needs ${t.dependencies.join(", ")}` + : ""; + lines.push(` ${t.id} · ${t.title}${deps}`); + } + } + + return lines.join("\n"); } // ─── Format Execution Plan ─────────────────────────────────────────────────── @@ -315,26 +404,26 @@ export function getCriticalPath(project: Project): Task[] { * Format the execution plan for display */ export function formatExecutionPlan(plan: ExecutionPlan): string { - const lines: string[] = []; - lines.push("## Execution Plan"); - lines.push(""); - lines.push(`Total tasks: ${plan.totalTasks}`); - lines.push(`Batches: ${plan.batches.length}`); + const lines: string[] = []; + lines.push("## Execution Plan"); + lines.push(""); + lines.push(`Total tasks: ${plan.totalTasks}`); + lines.push(`Batches: ${plan.batches.length}`); - if (plan.skippedTasks.length > 0) { - lines.push( - `Already completed: ${plan.skippedTasks.map((t) => t.id).join(", ")}`, - ); - } - lines.push(""); + if (plan.skippedTasks.length > 0) { + lines.push( + `Already completed: ${plan.skippedTasks.map((t) => t.id).join(", ")}`, + ); + } + lines.push(""); - for (const batch of plan.batches) { - lines.push(`### Batch ${batch.batchIndex + 1}`); - for (const task of batch.tasks) { - lines.push(`- ${task.id}: ${task.title}`); - } - lines.push(""); - } + for (const batch of plan.batches) { + lines.push(`### Batch ${batch.batchIndex + 1}`); + for (const task of batch.tasks) { + lines.push(`- ${task.id}: ${task.title}`); + } + lines.push(""); + } - return lines.join("\n"); + return lines.join("\n"); } diff --git a/src/executor.ts b/src/executor.ts index 3e3837e..f780db4 100644 --- a/src/executor.ts +++ b/src/executor.ts @@ -13,6 +13,7 @@ import { captureGitCommits, formatDuration, } from "./utils"; +import { updateTaskInFile } from "./parser"; /** Optional callback to post a progress message into the chat history. */ export type SendChatMessage = ( @@ -33,7 +34,18 @@ export interface ToolCallEntry { * messages rendered by registerMessageRenderer). */ const MAX_COLLAPSED = 3; -const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; +export const SPINNER_FRAMES = [ + "⠋", + "⠙", + "⠹", + "⠸", + "⠼", + "⠴", + "⠦", + "⠧", + "⠇", + "⠏", +]; // ─── Model Round-Robin ───────────────────────────────────────────────────── @@ -135,6 +147,7 @@ export async function runTask( projectDir: string = project.sourceDir, parallelState?: ParallelWidgetState, assignedModel?: unknown, + batchRender?: () => void, ): Promise<{ success: boolean; reflection?: Reflection; @@ -271,8 +284,10 @@ export async function runTask( if (entry) { entry.toolCalls.push({ name: event.toolName, label }); } + batchRender?.(); + } else { + requestRender(); } - requestRender(); } }, undefined, // no abort signal @@ -291,6 +306,7 @@ export async function runTask( entry.done = true; entry.success = output.success; } + batchRender?.(); } else { ctx.ui.setWidget(widgetKey, undefined); } @@ -393,9 +409,12 @@ export async function executeBatch( } } - // Check if we should run parallel + // Check if we should run parallel. + // Use the parallel path whenever the user selected parallel mode, + // even for single-task batches produced by DAG dependency chains. + // Only sequential mode should inherit the parent session model. const shouldParallel = - options?.parallel && tasks.length > 1 && config.execution.maxParallel > 0; + options?.parallel && tasks.length > 0 && config.execution.maxParallel > 0; if (shouldParallel) { await executeBatchParallel( @@ -429,6 +448,12 @@ export async function executeBatch( const errorMsg = error instanceof Error ? error.message : String(error); progress.markFailed(task.id, errorMsg); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "failed"); + } catch { + // Best-effort + } sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${errorMsg}`); ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); break; @@ -518,14 +543,18 @@ async function executeBatchParallel( }; }); - // Single spinner timer drives all tasks in the batch + // Batch-render trigger: re-render on spinner ticks AND content changes. + // Spinner animation requires requestRender() on every tick; without it, + // spinner frames advance in memory but the display never updates. + const requestBatchRender = () => widgetTui?.requestRender(); + const spinnerTimer = setInterval(() => { for (const entry of sharedState.values()) { if (!entry.done) { entry.frameIndex = (entry.frameIndex + 1) % SPINNER_FRAMES.length; } } - widgetTui?.requestRender(); + requestBatchRender(); }, 100); const results: Array<{ task: Task; result: Promise }> = []; @@ -545,13 +574,21 @@ async function executeBatchParallel( sharedState, assignedModel, roundRobin, + requestBatchRender, ).catch((error) => { // Safety net: one task failure should never crash the batch. // executeTask already marks failed and notifies, but catch as // a last resort so the error doesn't propagate and crash pi. roundRobin?.release(task.id); + requestBatchRender(); const errorMsg = error instanceof Error ? error.message : String(error); progress.markFailed(task.id, errorMsg); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "failed"); + } catch { + // Best-effort + } sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${errorMsg}`); ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); }), @@ -586,6 +623,7 @@ async function executeTask( parallelState?: ParallelWidgetState, assignedModel?: unknown, roundRobin?: ModelRoundRobin | null, + batchRender?: () => void, ): Promise { const maxRetries = config.execution.maxRetries; @@ -609,6 +647,12 @@ async function executeTask( try { // Mark as in progress progress.markInProgress(task.id); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "in_progress"); + } catch { + // Best-effort: don't fail the task over a checkbox update + } // Get dependency reflections const depReflections = progress.getDependencyReflections( @@ -626,6 +670,7 @@ async function executeTask( projectDir, parallelState, currentModel, + batchRender, ); if (result.success) { @@ -645,6 +690,12 @@ async function executeTask( result.commitMessages, result.commitSummary, ); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "completed"); + } catch { + // Best-effort: don't fail the task over a checkbox update + } roundRobin?.release(task.id); return; } @@ -675,6 +726,7 @@ async function executeTask( } else { // Max retries exceeded progress.markFailed(task.id, result.error || "Unknown error"); + // Don't update PRD — retry exhaustion is transient, not terminal sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${result.error}`); ctx.ui.notify( `Task ${task.id} failed after ${maxRetries} retries: ${ @@ -686,8 +738,15 @@ async function executeTask( } } catch (error) { roundRobin?.release(task.id); + batchRender?.(); const errorMsg = error instanceof Error ? error.message : String(error); progress.markFailed(task.id, errorMsg); + // Auto-update the PRD source file checkbox + try { + updateTaskInFile(project.sourcePath, task.id, "failed"); + } catch { + // Best-effort + } sendChatMessage?.(`✗ ${task.id} · ${task.title} — ${errorMsg}`); ctx.ui.notify(`Task ${task.id} failed: ${errorMsg}`, "error"); return; @@ -700,7 +759,9 @@ async function executeTask( // All models exhausted — release the slot roundRobin?.release(task.id); + batchRender?.(); progress.markFailed(task.id, "All configured models exhausted"); + // Don't update PRD — model exhaustion is transient, not terminal sendChatMessage?.( `✗ ${task.id} · ${task.title} — all ${maxModelAttempts} models exhausted`, ); diff --git a/src/parser.ts b/src/parser.ts index f00bab9..fa86275 100644 --- a/src/parser.ts +++ b/src/parser.ts @@ -2,6 +2,20 @@ import * as fs from "node:fs"; import * as path from "node:path"; import type { Task, Project } from "./types"; +// Lazy-loaded yaml package +let YAML_module: typeof import("yaml") | undefined; +function loadYaml(): typeof import("yaml") { + if (YAML_module) return YAML_module; + try { + YAML_module = require("yaml"); + } catch { + throw new Error( + "YAML parsing requires the 'yaml' package. Run: npm install yaml", + ); + } + return YAML_module!; +} + // ─── Main Entry ────────────────────────────────────────────────────────────── /** @@ -75,7 +89,7 @@ function parseFioFormat( const [, status, id, title, file] = match; const timeoutMs = parseTimeoutFromLine(line); tasks.push({ - id: `0${id}`, + id: id.padStart(2, "0"), title: title.trim(), description: undefined, file: file || undefined, @@ -96,12 +110,12 @@ function parseFioFormat( ); if (arrowMatch) { const [, from, targets] = arrowMatch; - const fromId = `0${from}`; + const fromId = from.padStart(2, "0"); const targetIds = targets .split(",") .map((t) => t.trim()) .filter((t) => t) - .map((t) => `0${t}`); + .map((t) => t.padStart(2, "0")); // Each target depends on the source for (const toId of targetIds) { @@ -117,12 +131,12 @@ function parseFioFormat( ); if (dependsMatch) { const [, taskId, depsList] = dependsMatch; - const taskIdPadded = `0${taskId}`; + const taskIdPadded = taskId.padStart(2, "0"); const depIds = depsList .split(",") .map((t) => t.trim()) .filter((t) => t) - .map((t) => `0${t}`); + .map((t) => t.padStart(2, "0")); if (!dependencies[taskIdPadded]) dependencies[taskIdPadded] = []; dependencies[taskIdPadded].push(...depIds); @@ -134,7 +148,7 @@ function parseFioFormat( ); if (metaMatch) { const [, taskId, value, unit] = metaMatch; - const task = tasks.find((t) => t.id === `0${taskId}`); + const task = tasks.find((t) => t.id === taskId.padStart(2, "0")); if (task) { task.timeoutMs = parseTimeoutValue(Number(value), unit); } @@ -210,16 +224,7 @@ function parseYaml( sourcePath: string, sourceDir: string, ): Project { - // Lazy-load yaml (may not be installed) - let YAML: typeof import("yaml"); - try { - YAML = require("yaml"); - } catch { - throw new Error( - "YAML parsing requires the 'yaml' package. Run: npm install yaml", - ); - } - + const YAML = loadYaml(); const doc = YAML.parse(content); const tasks: Task[] = []; @@ -263,35 +268,108 @@ export function readTaskSpec(taskDir: string, taskFile: string): string { // ─── Task File Updater ─────────────────────────────────────────────────────── /** - * Update task status in the source markdown file + * Update task status in the source file (markdown or YAML). + * + * Handles three formats: + * 1. Fio numbered format: `- [ ] 01 – Title` — matches by task number in the file + * 2. Simple checkbox: `- [ ] Title` — matches by checkbox position (index) + * 3. YAML: uses `yaml` library to parse, update, and stringify */ export function updateTaskInFile( filePath: string, taskId: string, status: Task["status"], ): void { - let content = fs.readFileSync(filePath, "utf-8"); - const char = statusToChar(status); + const ext = path.extname(filePath).toLowerCase(); - // Try Fio numbered format first - const fioPattern = new RegExp( - `(^-\\s+\\[)(.)(\\]\\s+${escapeRegex(taskId)}\\s*[—–-])`, - "m", - ); - if (fioPattern.test(content)) { - content = content.replace(fioPattern, `$1${char}$3`); - fs.writeFileSync(filePath, content, "utf-8"); + // Handle YAML format + if (ext === ".yaml" || ext === ".yml") { + updateTaskInYaml(filePath, taskId, status); return; } - // Try simple checkbox format - const simplePattern = new RegExp( - `(-\\s+\\[)(.)(\\]\\s+${escapeRegex(taskId)})`, - "m", - ); - if (simplePattern.test(content)) { - content = content.replace(simplePattern, `$1${char}$3`); - fs.writeFileSync(filePath, content, "utf-8"); + let content = fs.readFileSync(filePath, "utf-8"); + const char = statusToChar(status); + + // Strategy 1: Fio numbered format — match by explicit task ID in the file + // Try both padded (01) and raw (1) variations + const rawId = parseInt(taskId, 10).toString(); + const idPatterns = new Set([escapeRegex(taskId), escapeRegex(rawId)]); + + for (const idPattern of idPatterns) { + const fioRegex = new RegExp( + `(^-\\s+\\[)(.)(\\]\\s+${idPattern}\\s*[—–:-])`, + "m", + ); + const match = content.match(fioRegex); + if (match) { + content = content.replace(fioRegex, `$1${char}$3`); + fs.writeFileSync(filePath, content, "utf-8"); + return; + } + } + + // Strategy 2: Simple checkbox by position (task IDs are zero-padded indices) + const targetIndex = parseInt(taskId, 10); + if (!isNaN(targetIndex)) { + const lines = content.split("\n"); + let checkboxIdx = 0; + for (let i = 0; i < lines.length; i++) { + const m = lines[i].match(/^(\s*-+\s+\[)(.)(\].*)$/); + if (m) { + if (checkboxIdx === targetIndex) { + lines[i] = m[1] + char + m[3]; + fs.writeFileSync(filePath, lines.join("\n"), "utf-8"); + return; + } + checkboxIdx++; + } + } + } +} + +/** + * Update task status in a YAML task file using the yaml library's + * Document API, which preserves comments and formatting. + * + * Matches by explicit `id` field first, then falls back to + * position-based matching (for files without explicit IDs). + */ +function updateTaskInYaml( + filePath: string, + taskId: string, + status: Task["status"], +): void { + const YAML = loadYaml(); + const content = fs.readFileSync(filePath, "utf-8"); + const doc = YAML.parseDocument(content); + const tasks = doc.get("tasks"); + if (!tasks || !YAML.isSeq(tasks)) return; + + const rawId = parseInt(taskId, 10).toString(); + + // Strategy 1: Match by explicit id field + for (const item of tasks.items) { + if (!YAML.isMap(item)) continue; + const idVal = item.get("id"); + if (idVal === undefined || idVal === null) continue; + const idStr = String(idVal); + if (idStr === taskId || idStr === rawId) { + item.set("status", status); + fs.writeFileSync(filePath, String(doc), "utf-8"); + return; + } + } + + // Strategy 2: Fall back to position-based matching + // (for YAML files without explicit id fields) + const targetIndex = parseInt(taskId, 10); + if (!isNaN(targetIndex) && targetIndex < tasks.items.length) { + const item = tasks.items[targetIndex]; + if (YAML.isMap(item)) { + item.set("status", status); + fs.writeFileSync(filePath, String(doc), "utf-8"); + } } } diff --git a/src/utils.ts b/src/utils.ts index 1acb51d..b51af59 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -34,6 +34,78 @@ export function writeFileSafe(filePath: string, content: string): void { fs.writeFileSync(filePath, content, "utf-8"); } +// ─── Loop-Active State ────────────────────────────────────────────────────── + +/** + * State persisted to disk when a ralpi execution loop is active. + * Used to re-instantiate widgets after a session reload. + */ +export interface LoopActiveState { + taskFile: string; + mode: "parallel" | "sequential"; + startedAt: string; + taskIds: string[]; + prdKey: string; +} + +/** + * Path (relative to projectDir) where the loop-active marker is stored. + */ +const LOOP_ACTIVE_FILE = ".ralpi/loop-active.json"; + +/** + * Write the loop-active marker, indicating an execution loop is running. + */ +export function writeLoopActive( + projectDir: string, + state: LoopActiveState, +): void { + writeFileSafe( + path.join(projectDir, LOOP_ACTIVE_FILE), + JSON.stringify(state, null, 2), + ); +} + +/** + * Read the loop-active marker, if present. + */ +export function readLoopActive(projectDir: string): LoopActiveState | null { + const filePath = path.join(projectDir, LOOP_ACTIVE_FILE); + try { + const raw = fs.readFileSync(filePath, "utf-8"); + return JSON.parse(raw) as LoopActiveState; + } catch { + return null; + } +} + +/** + * Delete the loop-active marker. + */ +export function deleteLoopActive(projectDir: string): void { + const filePath = path.join(projectDir, LOOP_ACTIVE_FILE); + try { + fs.unlinkSync(filePath); + } catch { + // Ignore if already gone + } +} + +/** + * Discover the project directory by walking up to find `.ralpi/`. + */ +export function findRalpiDir(startDir: string): string | null { + let current = path.resolve(startDir); + const root = path.parse(current).root; + while (current !== root) { + if (fs.existsSync(path.join(current, ".ralpi"))) { + return current; + } + current = path.dirname(current); + } + return null; +} + // ─── Async Agent Session ──────────────────────────────────────────────────── // ─── Progress Discovery ─────────────────────────────────────────────────────