drop tasks, drop thrashing bandaid
This commit is contained in:
@@ -5,6 +5,7 @@ import type { ProgressTracker } from "./progress";
|
||||
import type { ExtensionContext } from "@earendil-works/pi-coding-agent";
|
||||
import { buildTaskPrompt } from "./prompts";
|
||||
import { extractReflection } from "./reflection";
|
||||
import { WidgetBatcher } from "./widget-batcher";
|
||||
import {
|
||||
runAgentSession,
|
||||
writeFileSafe,
|
||||
@@ -39,6 +40,7 @@ export async function runTask(
|
||||
ctx: ExtensionContext,
|
||||
sendChatMessage?: SendChatMessage,
|
||||
projectDir: string = project.sourceDir,
|
||||
batcher?: WidgetBatcher,
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
reflection?: Reflection;
|
||||
@@ -104,7 +106,11 @@ export async function runTask(
|
||||
}
|
||||
}
|
||||
|
||||
ctx.ui.setWidget(widgetKey, lines);
|
||||
if (batcher) {
|
||||
batcher.schedule(widgetKey, lines);
|
||||
} else {
|
||||
ctx.ui.setWidget(widgetKey, lines);
|
||||
}
|
||||
};
|
||||
|
||||
// Smooth spinner animation at 100ms intervals
|
||||
@@ -119,6 +125,11 @@ export async function runTask(
|
||||
// Use task-level timeout if set, otherwise fall back to config
|
||||
const timeoutMs = task.timeoutMs ?? config.execution.timeoutMs;
|
||||
|
||||
// Pre-create session file path so events stream to disk (avoids 300+ MB in-memory accumulation)
|
||||
const sessionsDir = path.join(ralphDir, "sessions");
|
||||
ensureDir(sessionsDir);
|
||||
const sessionFilePath = path.join(sessionsDir, `${task.id}-${startMs}.txt`);
|
||||
|
||||
// Run task asynchronously via Pi SDK — event loop stays responsive
|
||||
const output = await runAgentSession(
|
||||
prompt,
|
||||
@@ -134,13 +145,19 @@ export async function runTask(
|
||||
updateWidget();
|
||||
}
|
||||
},
|
||||
undefined, // no abort signal
|
||||
sessionFilePath, // stream events to file
|
||||
);
|
||||
|
||||
const durationMs = Date.now() - startMs;
|
||||
|
||||
// Clear progress widget and status after task finishes
|
||||
clearInterval(spinnerTimer);
|
||||
ctx.ui.setWidget(widgetKey, undefined);
|
||||
if (batcher) {
|
||||
batcher.scheduleRemove(widgetKey);
|
||||
} else {
|
||||
ctx.ui.setWidget(widgetKey, undefined);
|
||||
}
|
||||
ctx.ui.setStatus("ralph", undefined);
|
||||
|
||||
if (!output.success) {
|
||||
@@ -150,6 +167,7 @@ export async function runTask(
|
||||
success: false,
|
||||
error: output.error,
|
||||
durationMs,
|
||||
sessionFile: sessionFilePath, // events streamed to file for debugging
|
||||
};
|
||||
}
|
||||
|
||||
@@ -159,12 +177,8 @@ export async function runTask(
|
||||
// Capture git commits made during this task
|
||||
const { commitMessages, commitSummary } = captureGitCommits(projectDir);
|
||||
|
||||
// Save full session transcript to .ralph/sessions/
|
||||
const sessionFile = saveSessionOutput(
|
||||
projectDir,
|
||||
task.id,
|
||||
JSON.stringify(output.events, null, 2),
|
||||
);
|
||||
// Session file already written by runAgentSession (events streamed to disk)
|
||||
const sessionFile = sessionFilePath;
|
||||
|
||||
// Build output preview (first 500 chars of agent text)
|
||||
const outputPreview =
|
||||
@@ -191,21 +205,6 @@ export async function runTask(
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Save Session Output ────────────────────────────────────────────────────
|
||||
|
||||
function saveSessionOutput(
|
||||
sourceDir: string,
|
||||
taskId: string,
|
||||
output: string,
|
||||
): string {
|
||||
const sessionsDir = path.join(sourceDir, ".ralph", "sessions");
|
||||
ensureDir(sessionsDir);
|
||||
const fileName = `${taskId}-${Date.now()}.txt`;
|
||||
const filePath = path.join(sessionsDir, fileName);
|
||||
writeFileSafe(filePath, output);
|
||||
return filePath;
|
||||
}
|
||||
|
||||
// ─── Execute Batch ───────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
@@ -272,6 +271,7 @@ async function executeBatchParallel(
|
||||
projectDir?: string,
|
||||
): Promise<void> {
|
||||
const maxParallel = config.execution.maxParallel;
|
||||
const batcher = new WidgetBatcher(ctx);
|
||||
const results: Array<{ task: Task; result: Promise<any> }> = [];
|
||||
|
||||
for (const task of tasks) {
|
||||
@@ -285,6 +285,7 @@ async function executeBatchParallel(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
batcher,
|
||||
),
|
||||
});
|
||||
|
||||
@@ -299,6 +300,9 @@ async function executeBatchParallel(
|
||||
for (const { result } of results) {
|
||||
await result;
|
||||
}
|
||||
|
||||
// Flush and stop the batcher after all tasks complete
|
||||
batcher.stop();
|
||||
}
|
||||
|
||||
// ─── Execute Single Task with Retry ──────────────────────────────────────────
|
||||
@@ -311,6 +315,7 @@ async function executeTask(
|
||||
ctx: ExtensionContext,
|
||||
sendChatMessage?: SendChatMessage,
|
||||
projectDir: string = project.sourceDir,
|
||||
batcher?: WidgetBatcher,
|
||||
): Promise<void> {
|
||||
const maxRetries = config.execution.maxRetries;
|
||||
let retries = 0;
|
||||
@@ -334,6 +339,7 @@ async function executeTask(
|
||||
ctx,
|
||||
sendChatMessage,
|
||||
projectDir,
|
||||
batcher,
|
||||
);
|
||||
|
||||
if (result.success) {
|
||||
|
||||
27
src/utils.ts
27
src/utils.ts
@@ -338,6 +338,7 @@ export async function runAgentSession(
|
||||
timeoutMs: number,
|
||||
onEvent?: (event: AgentSessionEvent) => void,
|
||||
signal?: AbortSignal,
|
||||
sessionFile?: string,
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
text: string;
|
||||
@@ -353,7 +354,12 @@ export async function runAgentSession(
|
||||
bash: 0,
|
||||
other: 0,
|
||||
};
|
||||
const recordedEvents: AgentSessionEvent[] = [];
|
||||
// Stream events to file instead of accumulating in memory.
|
||||
// Accumulating caused "Invalid string length" crashes when
|
||||
// JSON.stringify(output.events, null, 2) produced 300+ MB strings.
|
||||
const eventStream = sessionFile
|
||||
? fs.createWriteStream(sessionFile, { flags: "a" })
|
||||
: null;
|
||||
|
||||
// Wire timeout via abort signal
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
@@ -393,7 +399,10 @@ export async function runAgentSession(
|
||||
let stopReason: string | undefined;
|
||||
|
||||
const unsubscribe = result.session.subscribe((event) => {
|
||||
recordedEvents.push(event);
|
||||
// Stream event to file (avoids accumulating 300+ MB in memory)
|
||||
if (eventStream) {
|
||||
eventStream.write(JSON.stringify(event) + "\n");
|
||||
}
|
||||
onEvent?.(event);
|
||||
|
||||
if (event.type === "message_end") {
|
||||
@@ -430,6 +439,11 @@ export async function runAgentSession(
|
||||
signal?.removeEventListener("abort", abortHandler);
|
||||
clearTimeout(timeoutHandle);
|
||||
|
||||
// Flush and close the event stream before returning
|
||||
if (eventStream) {
|
||||
await new Promise<void>((resolve) => eventStream.end(resolve));
|
||||
}
|
||||
|
||||
if (errorMessage && !finalText) {
|
||||
return {
|
||||
success: false,
|
||||
@@ -437,7 +451,7 @@ export async function runAgentSession(
|
||||
error: errorMessage,
|
||||
toolUsage,
|
||||
stopReason,
|
||||
events: recordedEvents,
|
||||
events: [], // streamed to file
|
||||
};
|
||||
}
|
||||
|
||||
@@ -446,16 +460,19 @@ export async function runAgentSession(
|
||||
text: finalText.trim(),
|
||||
toolUsage,
|
||||
stopReason,
|
||||
events: recordedEvents,
|
||||
events: [], // streamed to file
|
||||
};
|
||||
} catch (error) {
|
||||
clearTimeout(timeoutHandle);
|
||||
if (eventStream && !eventStream.destroyed) {
|
||||
eventStream.end();
|
||||
}
|
||||
return {
|
||||
success: false,
|
||||
text: "",
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
toolUsage,
|
||||
events: recordedEvents,
|
||||
events: [], // streamed to file
|
||||
};
|
||||
} finally {
|
||||
sessionRef.session?.dispose();
|
||||
|
||||
92
src/widget-batcher.ts
Normal file
92
src/widget-batcher.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import type { ExtensionContext } from "@earendil-works/pi-coding-agent";
|
||||
|
||||
/**
|
||||
* Batches widget updates from multiple parallel tasks into a single
|
||||
* render cycle, preventing TUI thrashing when agents update independently.
|
||||
*
|
||||
* Uses microtask debouncing: updates within the same event-loop tick
|
||||
* are coalesced into one flush. No artificial interval — updates hit the
|
||||
* screen as soon as the current tick yields, but never duplicatively.
|
||||
*/
|
||||
export class WidgetBatcher {
|
||||
/** Pending widget updates keyed by widget key. */
|
||||
private pending: Map<string, string[]> = new Map();
|
||||
|
||||
/** Widget keys scheduled for removal. */
|
||||
private pendingRemovals: Set<string> = new Set();
|
||||
|
||||
/** Whether a microtask flush is already queued. */
|
||||
private scheduled = false;
|
||||
|
||||
/** Whether a flush is currently executing (prevents re-entry). */
|
||||
private flushing = false;
|
||||
|
||||
constructor(private ctx: ExtensionContext) {}
|
||||
|
||||
/**
|
||||
* Schedule a widget update. Flushed asynchronously at end of the
|
||||
* current event-loop tick; multiple calls in the same tick coalesce.
|
||||
*/
|
||||
schedule(key: string, lines: string[]): void {
|
||||
this.pending.set(key, lines);
|
||||
this.scheduleFlush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a widget (e.g., when a task completes).
|
||||
* Flushed asynchronously at end of the current tick.
|
||||
*/
|
||||
scheduleRemove(key: string): void {
|
||||
this.pending.delete(key);
|
||||
this.pendingRemovals.add(key);
|
||||
this.scheduleFlush();
|
||||
}
|
||||
|
||||
/** Synchronously flush all pending updates. */
|
||||
flush(): void {
|
||||
this.doFlush();
|
||||
}
|
||||
|
||||
/** Flush remaining updates then stop scheduling. */
|
||||
stop(): void {
|
||||
this.doFlush();
|
||||
}
|
||||
|
||||
// ── Internal ────────────────────────────────────────────────────────
|
||||
|
||||
private scheduleFlush(): void {
|
||||
if (this.scheduled) return;
|
||||
this.scheduled = true;
|
||||
queueMicrotask(() => {
|
||||
this.scheduled = false;
|
||||
this.doFlush();
|
||||
});
|
||||
}
|
||||
|
||||
private doFlush(): void {
|
||||
if (this.flushing) return;
|
||||
this.flushing = true;
|
||||
|
||||
// Atomically swap — new schedule()/scheduleRemove() calls land on fresh
|
||||
// collections, so the batch we iterate stays immutable and nothing is lost.
|
||||
const toRender = this.pending;
|
||||
const toRemove = this.pendingRemovals;
|
||||
this.pending = new Map();
|
||||
this.pendingRemovals = new Set();
|
||||
|
||||
// Apply removals first
|
||||
for (const key of toRemove) {
|
||||
this.ctx.ui.setWidget(key, undefined);
|
||||
}
|
||||
|
||||
// Sort by key for deterministic, stable ordering across every flush.
|
||||
// Task IDs are zero-padded ("008", "012", "013") so alpha sort = numeric order.
|
||||
const sortedKeys = Array.from(toRender.keys()).sort();
|
||||
for (const key of sortedKeys) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
this.ctx.ui.setWidget(key, toRender.get(key)!);
|
||||
}
|
||||
|
||||
this.flushing = false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user