From 920042ee2aa2960b5b76087fa0381c507759b8e2 Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Fri, 6 Feb 2026 11:47:48 -0500 Subject: [PATCH] fix stream multiplaction --- src/App.tsx | 6 +- src/components/Player.tsx | 1 + src/components/RealtimeWaveform.tsx | 78 +++++++++++------- src/utils/audio-stream-reader.ts | 122 ++++++++++++++++++++-------- 4 files changed, 142 insertions(+), 65 deletions(-) diff --git a/src/App.tsx b/src/App.tsx index b6e2e28..ca6382e 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -1,4 +1,4 @@ -import { createSignal, ErrorBoundary } from "solid-js"; +import { createSignal, createMemo, ErrorBoundary } from "solid-js"; import { useSelectionHandler } from "@opentui/solid"; import { Layout } from "./components/Layout"; import { Navigation } from "./components/Navigation"; @@ -107,7 +107,7 @@ export function App() { }).catch(() => {}) }) - const getPanels = () => { + const getPanels = createMemo(() => { const tab = activeTab(); switch (tab) { @@ -305,7 +305,7 @@ export function App() { hint: "", }; } - }; + }); return ( ( diff --git a/src/components/Player.tsx b/src/components/Player.tsx index 74c614a..b8912ab 100644 --- a/src/components/Player.tsx +++ b/src/components/Player.tsx @@ -105,6 +105,7 @@ export function Player(props: PlayerProps) { position={audio.position()} duration={dur()} isPlaying={audio.isPlaying()} + speed={audio.speed()} onSeek={(next: number) => audio.seek(next)} visualizerConfig={(() => { const viz = useAppStore().state().settings.visualizer diff --git a/src/components/RealtimeWaveform.tsx b/src/components/RealtimeWaveform.tsx index d4fb84a..55b27ef 100644 --- a/src/components/RealtimeWaveform.tsx +++ b/src/components/RealtimeWaveform.tsx @@ -10,7 +10,7 @@ * Same prop interface as MergedWaveform for drop-in replacement. */ -import { createSignal, createEffect, onCleanup, on } from "solid-js" +import { createSignal, createEffect, onCleanup, on, untrack } from "solid-js" import { loadCavaCore, type CavaCore, type CavaCoreConfig } from "../utils/cavacore" import { AudioStreamReader } from "../utils/audio-stream-reader" @@ -25,6 +25,8 @@ export type RealtimeWaveformProps = { duration: number /** Whether audio is currently playing */ isPlaying: boolean + /** Playback speed multiplier (default: 1) */ + speed?: number /** Number of frequency bars / columns */ resolution?: number /** Callback when user clicks to seek */ @@ -75,7 +77,7 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) { // ── Start/stop the visualization pipeline ────────────────────────── - const startVisualization = (url: string, position: number) => { + const startVisualization = (url: string, position: number, speed: number) => { stopVisualization() if (!url || !initCava() || !cava) return @@ -92,9 +94,12 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) { // Pre-allocate sample read buffer sampleBuffer = new Float64Array(SAMPLES_PER_FRAME) - // Start ffmpeg decode stream - reader = new AudioStreamReader({ url }) - reader.start(position) + // Start ffmpeg decode stream (reuse reader if same URL, else create new) + if (!reader || reader.url !== url) { + if (reader) reader.stop() + reader = new AudioStreamReader({ url }) + } + reader.start(position, speed) // Start render loop frameTimer = setInterval(renderFrame, FRAME_INTERVAL) @@ -107,7 +112,7 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) { } if (reader) { reader.stop() - reader = null + // Don't null reader — we reuse it across start/stop cycles } if (cava?.isReady) { cava.destroy() @@ -134,57 +139,70 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) { setBarData(Array.from(output)) } - // ── Reactive effects: respond to prop changes ────────────────────── + // ── Single unified effect: respond to all prop changes ───────────── + // + // Instead of three competing effects that each independently call + // startVisualization() and race against each other, we use ONE effect + // that tracks all relevant inputs. Position is read with untrack() + // so normal playback drift doesn't trigger restarts. + // + // SolidJS on() with an array of accessors compares each element + // individually, so the effect only fires when a value actually changes. - // Start/stop based on isPlaying and audioUrl createEffect( on( - () => [props.isPlaying, props.audioUrl] as const, - ([playing, url]) => { + [ + () => props.isPlaying, + () => props.audioUrl, + () => props.speed ?? 1, + resolution, + ], + ([playing, url, speed]) => { if (playing && url) { - startVisualization(url, props.position) + const pos = untrack(() => props.position) + startVisualization(url, pos, speed) } else { stopVisualization() - // Keep last bar data visible (freeze frame) when paused } }, ), ) - // Handle seeks: restart the ffmpeg stream at the new position - // We track position and restart only on significant jumps (>2s delta) + // ── Seek detection: lightweight effect for position jumps ────────── + // + // Watches position and restarts the reader (not the whole pipeline) + // only on significant jumps (>2s), which indicate a user seek. + // This is intentionally a separate effect — it should NOT trigger a + // full pipeline restart, just restart the ffmpeg stream at the new pos. + let lastSyncPosition = 0 createEffect( on( () => props.position, (pos) => { - if (!props.isPlaying || !reader?.running) return + if (!props.isPlaying || !reader?.running) { + lastSyncPosition = pos + return + } const delta = Math.abs(pos - lastSyncPosition) - // Only restart on seeks (>2s jump), not normal playback drift + lastSyncPosition = pos + if (delta > 2) { - reader.restart(pos) - lastSyncPosition = pos - } else { - lastSyncPosition = pos + const speed = props.speed ?? 1 + reader.restart(pos, speed) } }, ), ) - // Re-init cavacore if resolution changes - createEffect( - on(resolution, (bars) => { - if (props.isPlaying && props.audioUrl && cava) { - // Restart with new bar count - startVisualization(props.audioUrl, props.position) - } - }), - ) - // Cleanup on unmount onCleanup(() => { stopVisualization() + if (reader) { + reader.stop() + reader = null + } // Don't null cava itself — it can be reused. But do destroy its plan. if (cava?.isReady) { cava.destroy() diff --git a/src/utils/audio-stream-reader.ts b/src/utils/audio-stream-reader.ts index 65c843b..6a1f837 100644 --- a/src/utils/audio-stream-reader.ts +++ b/src/utils/audio-stream-reader.ts @@ -15,26 +15,31 @@ const SAMPLE_RATE = 44100 const CHANNELS = 1 const BYTES_PER_SAMPLE = 2 // s16le -/** How many samples to buffer (≈1 second) */ +/** How many samples to buffer (~1 second) */ const RING_BUFFER_SAMPLES = SAMPLE_RATE export interface AudioStreamReaderOptions { /** Audio URL or file path to decode */ url: string - /** Start position in seconds (for seeking sync) */ - startPosition?: number /** Sample rate (default: 44100) */ sampleRate?: number } +/** + * Monotonically increasing generation counter. + * Each start() increments this; the read loop checks it to know + * if it's been superseded and should bail out. + */ +let globalGeneration = 0 + export class AudioStreamReader { private proc: ReturnType | null = null private ringBuffer: Float64Array private writePos = 0 private totalSamplesWritten = 0 private _running = false - private readPromise: Promise | null = null - private url: string + private generation = 0 + readonly url: string private sampleRate: number constructor(options: AudioStreamReaderOptions) { @@ -55,14 +60,28 @@ export class AudioStreamReader { /** * Start the ffmpeg decode process and begin reading PCM data. + * + * If already running, the previous process is killed first. + * Uses a generation counter to guarantee that only one read loop + * is ever active — stale loops from killed processes bail out + * immediately. + * * @param startPosition Seek position in seconds (default: 0). + * @param speed Playback speed multiplier (default: 1). Applies ffmpeg + * atempo filter so visualization stays in sync with audio. */ - start(startPosition = 0): void { - if (this._running) return + start(startPosition = 0, speed = 1): void { + // Always kill the previous process first — no early return on _running + this.killProcess() + if (!Bun.which("ffmpeg")) { throw new Error("ffmpeg not found — required for audio visualization") } + // Increment generation so any lingering read loop from a previous + // start() will see a mismatch and exit. + this.generation = ++globalGeneration + const args = [ "ffmpeg", "-loglevel", "quiet", @@ -73,13 +92,21 @@ export class AudioStreamReader { args.push("-ss", String(startPosition)) } + args.push("-i", this.url) + + // Apply speed via atempo filter if not 1x. + // ffmpeg atempo only supports 0.5–100.0; chain multiple for extremes. + if (speed !== 1 && speed > 0) { + const atempoFilters = buildAtempoChain(speed) + args.push("-af", atempoFilters) + } + args.push( - "-i", this.url, "-ac", String(CHANNELS), "-ar", String(this.sampleRate), - "-f", "s16le", // raw signed 16-bit little-endian PCM + "-f", "s16le", "-acodec", "pcm_s16le", - "-", // output to stdout + "-", ) this.proc = Bun.spawn(args, { @@ -92,14 +119,22 @@ export class AudioStreamReader { this.writePos = 0 this.totalSamplesWritten = 0 + // Capture generation for this run + const myGeneration = this.generation + // Start async reading loop - this.readPromise = this.readLoop() + this.readLoop(myGeneration) // Detect process exit this.proc.exited.then(() => { - this._running = false + // Only clear _running if this is still the current generation + if (this.generation === myGeneration) { + this._running = false + } }).catch(() => { - this._running = false + if (this.generation === myGeneration) { + this._running = false + } }) } @@ -107,7 +142,7 @@ export class AudioStreamReader { * Read available samples into the provided buffer. * Returns the number of samples actually copied. * - * @param out - Float64Array to fill with samples (scaled ~±32768 for cavacore). + * @param out - Float64Array to fill with samples (scaled ~+/-32768 for cavacore). * @returns Number of samples written to `out`. */ read(out: Float64Array): number { @@ -118,10 +153,8 @@ export class AudioStreamReader { const readStart = (this.writePos - available + this.ringBuffer.length) % this.ringBuffer.length if (readStart + available <= this.ringBuffer.length) { - // Contiguous read out.set(this.ringBuffer.subarray(readStart, readStart + available)) } else { - // Wraps around const firstChunk = this.ringBuffer.length - readStart out.set(this.ringBuffer.subarray(readStart, this.ringBuffer.length)) out.set(this.ringBuffer.subarray(0, available - firstChunk), firstChunk) @@ -132,40 +165,44 @@ export class AudioStreamReader { /** * Stop the ffmpeg process and clean up. - * Safe to call multiple times. + * Safe to call multiple times. Guarantees the read loop exits. */ stop(): void { + // Bump generation to invalidate any running read loop + this.generation = ++globalGeneration this._running = false - if (this.proc) { - try { this.proc.kill() } catch { /* ignore */ } - this.proc = null - } + this.killProcess() this.writePos = 0 this.totalSamplesWritten = 0 } /** - * Restart the reader at a new position (e.g. after a seek). + * Restart the reader at a new position and/or speed. */ - restart(startPosition = 0): void { - this.stop() - this.start(startPosition) + restart(startPosition = 0, speed = 1): void { + this.start(startPosition, speed) + } + + /** Kill the ffmpeg process without touching generation/state. */ + private killProcess(): void { + if (this.proc) { + try { this.proc.kill() } catch { /* ignore */ } + this.proc = null + } } /** Internal: continuously reads stdout from ffmpeg and fills the ring buffer. */ - private async readLoop(): Promise { + private async readLoop(myGeneration: number): Promise { const stdout = this.proc?.stdout if (!stdout || typeof stdout === "number") return const reader = (stdout as ReadableStream).getReader() try { - while (this._running) { + while (this.generation === myGeneration) { const { done, value } = await reader.read() - if (done || !this._running) break + if (done || this.generation !== myGeneration) break if (!value || value.byteLength === 0) continue - // Convert raw s16le bytes → Float64Array scaled for cavacore - // Ensure we have an even number of bytes (each sample = 2 bytes) const sampleCount = Math.floor(value.byteLength / BYTES_PER_SAMPLE) if (sampleCount === 0) continue @@ -175,9 +212,8 @@ export class AudioStreamReader { sampleCount, ) - // Write samples into ring buffer (as doubles, preserving int16 scale) for (let i = 0; i < sampleCount; i++) { - this.ringBuffer[this.writePos] = int16View[i] // ±32768 range + this.ringBuffer[this.writePos] = int16View[i] this.writePos = (this.writePos + 1) % this.ringBuffer.length this.totalSamplesWritten++ } @@ -189,3 +225,25 @@ export class AudioStreamReader { } } } + +/** + * Build an ffmpeg atempo filter chain for a given speed. + * atempo only accepts values in [0.5, 100.0], so we chain + * multiple filters for extreme values (e.g. 0.25 = atempo=0.5,atempo=0.5). + */ +function buildAtempoChain(speed: number): string { + const parts: string[] = [] + let remaining = Math.max(0.25, Math.min(4, speed)) + + while (remaining > 100) { + parts.push("atempo=100.0") + remaining /= 100 + } + while (remaining < 0.5) { + parts.push("atempo=0.5") + remaining /= 0.5 + } + parts.push(`atempo=${remaining}`) + + return parts.join(",") +}