From 925e37938b1410bd4abb162f31614c1844ee77ff Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Sun, 31 May 2026 01:57:52 -0400 Subject: [PATCH] round robin --- README.md | 8 +++++ index.ts | 10 ++++++ src/executor.ts | 93 +++++++++++++++++++++++++++++++++++++++++++++++-- src/types.ts | 3 ++ 4 files changed, 112 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f4213f4..3eb8992 100644 --- a/README.md +++ b/README.md @@ -108,12 +108,20 @@ Create config files. Both are optional: ```yaml execution: maxParallel: 3 # ralpi-level concurrency only + models: # round-robin in / format + - google/gemini-3.5-flash # 1st and 3rd task in parallel + - openai/gpt-5.5 # 2nd task in parallel prompts: projectContext: "Additional context for all tasks" ``` > ralpi deliberately does **not** set timeouts or retries — those are inherited > from Pi's own settings. Tasks run until they complete or Pi's own flow stops them. +> +> `execution.models` uses slot-aware round-robin: with 3 models and 2 concurrent +> tasks, only the first two models are used. The third model is only touched when +> a third concurrent task starts. Freed model slots are reused before new ones +> are allocated. The keys mirror the nested structure of `RalpiConfig` in `src/types.ts`. diff --git a/index.ts b/index.ts index 809eb53..354f1a9 100644 --- a/index.ts +++ b/index.ts @@ -220,6 +220,16 @@ export default function ralpiLoopExtension(pi: ExtensionAPI): void { }, ); + // Register the extension's prompts/ directory so Pi discovers @task-manager + pi.on("resources_discover", async (_event, _ctx) => { + const promptsDir = fs.existsSync(path.resolve(__dirname, "prompts")) + ? path.resolve(__dirname, "prompts") + : path.resolve(__dirname, "..", "prompts"); + return { + promptPaths: [promptsDir], + }; + }); + pi.registerCommand("ralpi", { description: "Execute tasks from a task file using DAG-based dependency resolution", diff --git a/src/executor.ts b/src/executor.ts index ff86dee..4149687 100644 --- a/src/executor.ts +++ b/src/executor.ts @@ -34,6 +34,53 @@ const MAX_COLLAPSED = 3; const SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; +// ─── Model Round-Robin ───────────────────────────────────────────────────── + +/** + * Round-robin model assignment with slot reuse. + * + * With models [A, B, C] and 2 concurrent tasks, only A and B are used. + * Model C is only touched when a third concurrent task starts. + * Freed slots are reused before new slots are allocated. + */ +class ModelRoundRobin { + private models: unknown[]; + private freeSlots: number[]; + private nextIndex = 0; + private assignments = new Map(); + + constructor(models: unknown[]) { + this.models = models; + this.freeSlots = []; + } + + assign(taskId: string): unknown { + let index: number; + if (this.freeSlots.length > 0) { + // Reuse a freed model slot first + index = this.freeSlots.shift()!; + } else if (this.nextIndex < this.models.length) { + // Allocate a new slot + index = this.nextIndex++; + } else { + // All models in use — wrap around + index = this.nextIndex % this.models.length; + this.nextIndex++; + } + this.assignments.set(taskId, index); + return this.models[index]; + } + + release(taskId: string): void { + const index = this.assignments.get(taskId); + if (index !== undefined) { + this.freeSlots.push(index); + this.freeSlots.sort((a, b) => a - b); + this.assignments.delete(taskId); + } + } +} + /** Shared state for parallel-batch widget. Each running task writes its * tool calls and spinner frame; the batch widget reads them in task-ID order. */ interface ParallelWidgetEntry { @@ -61,6 +108,7 @@ export async function runTask( sendChatMessage?: SendChatMessage, projectDir: string = project.sourceDir, parallelState?: ParallelWidgetState, + assignedModel?: unknown, ): Promise<{ success: boolean; reflection?: Reflection; @@ -190,7 +238,7 @@ export async function runTask( }, undefined, // no abort signal sessionFilePath, // stream events to file - config.model, + assignedModel ?? config.model, config.thinkingLevel, ); @@ -275,6 +323,37 @@ export async function executeBatch( ); } + // Set up model round-robin if configured. + // Config entries are "/" strings — resolve via modelRegistry. + let roundRobin: ModelRoundRobin | null = null; + if (config.execution.models.length > 0) { + const resolvedModels: unknown[] = []; + for (const entry of config.execution.models) { + const slashIdx = entry.indexOf("/"); + if (slashIdx === -1) { + ctx.ui.notify( + `ralpi config: skipping model "${entry}" — expected / format`, + "warning", + ); + continue; + } + const provider = entry.slice(0, slashIdx); + const modelId = entry.slice(slashIdx + 1); + const resolved = ctx.modelRegistry?.find(provider, modelId); + if (resolved) { + resolvedModels.push(resolved); + } else { + ctx.ui.notify( + `ralpi config: model "${entry}" not found in registry — skipping`, + "warning", + ); + } + } + if (resolvedModels.length > 0) { + roundRobin = new ModelRoundRobin(resolvedModels); + } + } + // Check if we should run parallel const shouldParallel = options?.parallel && tasks.length > 1 && config.execution.maxParallel > 0; @@ -288,12 +367,14 @@ export async function executeBatch( ctx, sendChatMessage, projectDir, + roundRobin, ); return; } // Execute sequentially for (const task of tasks) { + const model = roundRobin?.assign(task.id); await executeTask( task, project, @@ -302,7 +383,10 @@ export async function executeBatch( ctx, sendChatMessage, projectDir, + undefined, + model, ); + roundRobin?.release(task.id); } } @@ -317,6 +401,7 @@ async function executeBatchParallel( ctx: ExtensionContext, sendChatMessage?: SendChatMessage, projectDir?: string, + roundRobin?: ModelRoundRobin | null, ): Promise { const maxParallel = config.execution.maxParallel; const sharedState: ParallelWidgetState = new Map(); @@ -385,6 +470,7 @@ async function executeBatchParallel( const results: Array<{ task: Task; result: Promise }> = []; for (const task of tasks) { + const assignedModel = roundRobin?.assign(task.id); results.push({ task, result: executeTask( @@ -396,7 +482,8 @@ async function executeBatchParallel( sendChatMessage, projectDir, sharedState, - ), + assignedModel, + ).finally(() => roundRobin?.release(task.id)), }); // Limit concurrency @@ -426,6 +513,7 @@ async function executeTask( sendChatMessage?: SendChatMessage, projectDir: string = project.sourceDir, parallelState?: ParallelWidgetState, + assignedModel?: unknown, ): Promise { const maxRetries = config.execution.maxRetries; let retries = 0; @@ -450,6 +538,7 @@ async function executeTask( sendChatMessage, projectDir, parallelState, + assignedModel, ); if (result.success) { diff --git a/src/types.ts b/src/types.ts index e083593..a4852a9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -153,6 +153,8 @@ export interface RalpiConfig { timeoutMs: number; /** Maximum parallel tasks (0 = unlimited) */ maxParallel: number; + /** Round-robin model list for parallel tasks (empty = inherit parent model) */ + models: string[]; }; prompts: { /** Additional context injected into every task prompt */ @@ -176,6 +178,7 @@ export const DEFAULT_CONFIG: RalpiConfig = { retryDelayMs: 0, timeoutMs: 0, // 0 = inherit Pi's own defaults (no ralpi-level timeout) maxParallel: 3, + models: [], }, prompts: { projectContext: "",