fix stream multiplaction
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { createSignal, ErrorBoundary } from "solid-js";
|
||||
import { createSignal, createMemo, ErrorBoundary } from "solid-js";
|
||||
import { useSelectionHandler } from "@opentui/solid";
|
||||
import { Layout } from "./components/Layout";
|
||||
import { Navigation } from "./components/Navigation";
|
||||
@@ -107,7 +107,7 @@ export function App() {
|
||||
}).catch(() => {})
|
||||
})
|
||||
|
||||
const getPanels = () => {
|
||||
const getPanels = createMemo(() => {
|
||||
const tab = activeTab();
|
||||
|
||||
switch (tab) {
|
||||
@@ -305,7 +305,7 @@ export function App() {
|
||||
hint: "",
|
||||
};
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
return (
|
||||
<ErrorBoundary fallback={(err) => (
|
||||
|
||||
@@ -105,6 +105,7 @@ export function Player(props: PlayerProps) {
|
||||
position={audio.position()}
|
||||
duration={dur()}
|
||||
isPlaying={audio.isPlaying()}
|
||||
speed={audio.speed()}
|
||||
onSeek={(next: number) => audio.seek(next)}
|
||||
visualizerConfig={(() => {
|
||||
const viz = useAppStore().state().settings.visualizer
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
* Same prop interface as MergedWaveform for drop-in replacement.
|
||||
*/
|
||||
|
||||
import { createSignal, createEffect, onCleanup, on } from "solid-js"
|
||||
import { createSignal, createEffect, onCleanup, on, untrack } from "solid-js"
|
||||
import { loadCavaCore, type CavaCore, type CavaCoreConfig } from "../utils/cavacore"
|
||||
import { AudioStreamReader } from "../utils/audio-stream-reader"
|
||||
|
||||
@@ -25,6 +25,8 @@ export type RealtimeWaveformProps = {
|
||||
duration: number
|
||||
/** Whether audio is currently playing */
|
||||
isPlaying: boolean
|
||||
/** Playback speed multiplier (default: 1) */
|
||||
speed?: number
|
||||
/** Number of frequency bars / columns */
|
||||
resolution?: number
|
||||
/** Callback when user clicks to seek */
|
||||
@@ -75,7 +77,7 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) {
|
||||
|
||||
// ── Start/stop the visualization pipeline ──────────────────────────
|
||||
|
||||
const startVisualization = (url: string, position: number) => {
|
||||
const startVisualization = (url: string, position: number, speed: number) => {
|
||||
stopVisualization()
|
||||
|
||||
if (!url || !initCava() || !cava) return
|
||||
@@ -92,9 +94,12 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) {
|
||||
// Pre-allocate sample read buffer
|
||||
sampleBuffer = new Float64Array(SAMPLES_PER_FRAME)
|
||||
|
||||
// Start ffmpeg decode stream
|
||||
// Start ffmpeg decode stream (reuse reader if same URL, else create new)
|
||||
if (!reader || reader.url !== url) {
|
||||
if (reader) reader.stop()
|
||||
reader = new AudioStreamReader({ url })
|
||||
reader.start(position)
|
||||
}
|
||||
reader.start(position, speed)
|
||||
|
||||
// Start render loop
|
||||
frameTimer = setInterval(renderFrame, FRAME_INTERVAL)
|
||||
@@ -107,7 +112,7 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) {
|
||||
}
|
||||
if (reader) {
|
||||
reader.stop()
|
||||
reader = null
|
||||
// Don't null reader — we reuse it across start/stop cycles
|
||||
}
|
||||
if (cava?.isReady) {
|
||||
cava.destroy()
|
||||
@@ -134,57 +139,70 @@ export function RealtimeWaveform(props: RealtimeWaveformProps) {
|
||||
setBarData(Array.from(output))
|
||||
}
|
||||
|
||||
// ── Reactive effects: respond to prop changes ──────────────────────
|
||||
// ── Single unified effect: respond to all prop changes ─────────────
|
||||
//
|
||||
// Instead of three competing effects that each independently call
|
||||
// startVisualization() and race against each other, we use ONE effect
|
||||
// that tracks all relevant inputs. Position is read with untrack()
|
||||
// so normal playback drift doesn't trigger restarts.
|
||||
//
|
||||
// SolidJS on() with an array of accessors compares each element
|
||||
// individually, so the effect only fires when a value actually changes.
|
||||
|
||||
// Start/stop based on isPlaying and audioUrl
|
||||
createEffect(
|
||||
on(
|
||||
() => [props.isPlaying, props.audioUrl] as const,
|
||||
([playing, url]) => {
|
||||
[
|
||||
() => props.isPlaying,
|
||||
() => props.audioUrl,
|
||||
() => props.speed ?? 1,
|
||||
resolution,
|
||||
],
|
||||
([playing, url, speed]) => {
|
||||
if (playing && url) {
|
||||
startVisualization(url, props.position)
|
||||
const pos = untrack(() => props.position)
|
||||
startVisualization(url, pos, speed)
|
||||
} else {
|
||||
stopVisualization()
|
||||
// Keep last bar data visible (freeze frame) when paused
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
// Handle seeks: restart the ffmpeg stream at the new position
|
||||
// We track position and restart only on significant jumps (>2s delta)
|
||||
// ── Seek detection: lightweight effect for position jumps ──────────
|
||||
//
|
||||
// Watches position and restarts the reader (not the whole pipeline)
|
||||
// only on significant jumps (>2s), which indicate a user seek.
|
||||
// This is intentionally a separate effect — it should NOT trigger a
|
||||
// full pipeline restart, just restart the ffmpeg stream at the new pos.
|
||||
|
||||
let lastSyncPosition = 0
|
||||
createEffect(
|
||||
on(
|
||||
() => props.position,
|
||||
(pos) => {
|
||||
if (!props.isPlaying || !reader?.running) return
|
||||
if (!props.isPlaying || !reader?.running) {
|
||||
lastSyncPosition = pos
|
||||
return
|
||||
}
|
||||
|
||||
const delta = Math.abs(pos - lastSyncPosition)
|
||||
// Only restart on seeks (>2s jump), not normal playback drift
|
||||
lastSyncPosition = pos
|
||||
|
||||
if (delta > 2) {
|
||||
reader.restart(pos)
|
||||
lastSyncPosition = pos
|
||||
} else {
|
||||
lastSyncPosition = pos
|
||||
const speed = props.speed ?? 1
|
||||
reader.restart(pos, speed)
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
// Re-init cavacore if resolution changes
|
||||
createEffect(
|
||||
on(resolution, (bars) => {
|
||||
if (props.isPlaying && props.audioUrl && cava) {
|
||||
// Restart with new bar count
|
||||
startVisualization(props.audioUrl, props.position)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// Cleanup on unmount
|
||||
onCleanup(() => {
|
||||
stopVisualization()
|
||||
if (reader) {
|
||||
reader.stop()
|
||||
reader = null
|
||||
}
|
||||
// Don't null cava itself — it can be reused. But do destroy its plan.
|
||||
if (cava?.isReady) {
|
||||
cava.destroy()
|
||||
|
||||
@@ -15,26 +15,31 @@ const SAMPLE_RATE = 44100
|
||||
const CHANNELS = 1
|
||||
const BYTES_PER_SAMPLE = 2 // s16le
|
||||
|
||||
/** How many samples to buffer (≈1 second) */
|
||||
/** How many samples to buffer (~1 second) */
|
||||
const RING_BUFFER_SAMPLES = SAMPLE_RATE
|
||||
|
||||
export interface AudioStreamReaderOptions {
|
||||
/** Audio URL or file path to decode */
|
||||
url: string
|
||||
/** Start position in seconds (for seeking sync) */
|
||||
startPosition?: number
|
||||
/** Sample rate (default: 44100) */
|
||||
sampleRate?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Monotonically increasing generation counter.
|
||||
* Each start() increments this; the read loop checks it to know
|
||||
* if it's been superseded and should bail out.
|
||||
*/
|
||||
let globalGeneration = 0
|
||||
|
||||
export class AudioStreamReader {
|
||||
private proc: ReturnType<typeof Bun.spawn> | null = null
|
||||
private ringBuffer: Float64Array
|
||||
private writePos = 0
|
||||
private totalSamplesWritten = 0
|
||||
private _running = false
|
||||
private readPromise: Promise<void> | null = null
|
||||
private url: string
|
||||
private generation = 0
|
||||
readonly url: string
|
||||
private sampleRate: number
|
||||
|
||||
constructor(options: AudioStreamReaderOptions) {
|
||||
@@ -55,14 +60,28 @@ export class AudioStreamReader {
|
||||
|
||||
/**
|
||||
* Start the ffmpeg decode process and begin reading PCM data.
|
||||
*
|
||||
* If already running, the previous process is killed first.
|
||||
* Uses a generation counter to guarantee that only one read loop
|
||||
* is ever active — stale loops from killed processes bail out
|
||||
* immediately.
|
||||
*
|
||||
* @param startPosition Seek position in seconds (default: 0).
|
||||
* @param speed Playback speed multiplier (default: 1). Applies ffmpeg
|
||||
* atempo filter so visualization stays in sync with audio.
|
||||
*/
|
||||
start(startPosition = 0): void {
|
||||
if (this._running) return
|
||||
start(startPosition = 0, speed = 1): void {
|
||||
// Always kill the previous process first — no early return on _running
|
||||
this.killProcess()
|
||||
|
||||
if (!Bun.which("ffmpeg")) {
|
||||
throw new Error("ffmpeg not found — required for audio visualization")
|
||||
}
|
||||
|
||||
// Increment generation so any lingering read loop from a previous
|
||||
// start() will see a mismatch and exit.
|
||||
this.generation = ++globalGeneration
|
||||
|
||||
const args = [
|
||||
"ffmpeg",
|
||||
"-loglevel", "quiet",
|
||||
@@ -73,13 +92,21 @@ export class AudioStreamReader {
|
||||
args.push("-ss", String(startPosition))
|
||||
}
|
||||
|
||||
args.push("-i", this.url)
|
||||
|
||||
// Apply speed via atempo filter if not 1x.
|
||||
// ffmpeg atempo only supports 0.5–100.0; chain multiple for extremes.
|
||||
if (speed !== 1 && speed > 0) {
|
||||
const atempoFilters = buildAtempoChain(speed)
|
||||
args.push("-af", atempoFilters)
|
||||
}
|
||||
|
||||
args.push(
|
||||
"-i", this.url,
|
||||
"-ac", String(CHANNELS),
|
||||
"-ar", String(this.sampleRate),
|
||||
"-f", "s16le", // raw signed 16-bit little-endian PCM
|
||||
"-f", "s16le",
|
||||
"-acodec", "pcm_s16le",
|
||||
"-", // output to stdout
|
||||
"-",
|
||||
)
|
||||
|
||||
this.proc = Bun.spawn(args, {
|
||||
@@ -92,14 +119,22 @@ export class AudioStreamReader {
|
||||
this.writePos = 0
|
||||
this.totalSamplesWritten = 0
|
||||
|
||||
// Capture generation for this run
|
||||
const myGeneration = this.generation
|
||||
|
||||
// Start async reading loop
|
||||
this.readPromise = this.readLoop()
|
||||
this.readLoop(myGeneration)
|
||||
|
||||
// Detect process exit
|
||||
this.proc.exited.then(() => {
|
||||
// Only clear _running if this is still the current generation
|
||||
if (this.generation === myGeneration) {
|
||||
this._running = false
|
||||
}
|
||||
}).catch(() => {
|
||||
if (this.generation === myGeneration) {
|
||||
this._running = false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -107,7 +142,7 @@ export class AudioStreamReader {
|
||||
* Read available samples into the provided buffer.
|
||||
* Returns the number of samples actually copied.
|
||||
*
|
||||
* @param out - Float64Array to fill with samples (scaled ~±32768 for cavacore).
|
||||
* @param out - Float64Array to fill with samples (scaled ~+/-32768 for cavacore).
|
||||
* @returns Number of samples written to `out`.
|
||||
*/
|
||||
read(out: Float64Array): number {
|
||||
@@ -118,10 +153,8 @@ export class AudioStreamReader {
|
||||
const readStart = (this.writePos - available + this.ringBuffer.length) % this.ringBuffer.length
|
||||
|
||||
if (readStart + available <= this.ringBuffer.length) {
|
||||
// Contiguous read
|
||||
out.set(this.ringBuffer.subarray(readStart, readStart + available))
|
||||
} else {
|
||||
// Wraps around
|
||||
const firstChunk = this.ringBuffer.length - readStart
|
||||
out.set(this.ringBuffer.subarray(readStart, this.ringBuffer.length))
|
||||
out.set(this.ringBuffer.subarray(0, available - firstChunk), firstChunk)
|
||||
@@ -132,40 +165,44 @@ export class AudioStreamReader {
|
||||
|
||||
/**
|
||||
* Stop the ffmpeg process and clean up.
|
||||
* Safe to call multiple times.
|
||||
* Safe to call multiple times. Guarantees the read loop exits.
|
||||
*/
|
||||
stop(): void {
|
||||
// Bump generation to invalidate any running read loop
|
||||
this.generation = ++globalGeneration
|
||||
this._running = false
|
||||
if (this.proc) {
|
||||
try { this.proc.kill() } catch { /* ignore */ }
|
||||
this.proc = null
|
||||
}
|
||||
this.killProcess()
|
||||
this.writePos = 0
|
||||
this.totalSamplesWritten = 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the reader at a new position (e.g. after a seek).
|
||||
* Restart the reader at a new position and/or speed.
|
||||
*/
|
||||
restart(startPosition = 0): void {
|
||||
this.stop()
|
||||
this.start(startPosition)
|
||||
restart(startPosition = 0, speed = 1): void {
|
||||
this.start(startPosition, speed)
|
||||
}
|
||||
|
||||
/** Kill the ffmpeg process without touching generation/state. */
|
||||
private killProcess(): void {
|
||||
if (this.proc) {
|
||||
try { this.proc.kill() } catch { /* ignore */ }
|
||||
this.proc = null
|
||||
}
|
||||
}
|
||||
|
||||
/** Internal: continuously reads stdout from ffmpeg and fills the ring buffer. */
|
||||
private async readLoop(): Promise<void> {
|
||||
private async readLoop(myGeneration: number): Promise<void> {
|
||||
const stdout = this.proc?.stdout
|
||||
if (!stdout || typeof stdout === "number") return
|
||||
|
||||
const reader = (stdout as ReadableStream<Uint8Array>).getReader()
|
||||
try {
|
||||
while (this._running) {
|
||||
while (this.generation === myGeneration) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done || !this._running) break
|
||||
if (done || this.generation !== myGeneration) break
|
||||
if (!value || value.byteLength === 0) continue
|
||||
|
||||
// Convert raw s16le bytes → Float64Array scaled for cavacore
|
||||
// Ensure we have an even number of bytes (each sample = 2 bytes)
|
||||
const sampleCount = Math.floor(value.byteLength / BYTES_PER_SAMPLE)
|
||||
if (sampleCount === 0) continue
|
||||
|
||||
@@ -175,9 +212,8 @@ export class AudioStreamReader {
|
||||
sampleCount,
|
||||
)
|
||||
|
||||
// Write samples into ring buffer (as doubles, preserving int16 scale)
|
||||
for (let i = 0; i < sampleCount; i++) {
|
||||
this.ringBuffer[this.writePos] = int16View[i] // ±32768 range
|
||||
this.ringBuffer[this.writePos] = int16View[i]
|
||||
this.writePos = (this.writePos + 1) % this.ringBuffer.length
|
||||
this.totalSamplesWritten++
|
||||
}
|
||||
@@ -189,3 +225,25 @@ export class AudioStreamReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an ffmpeg atempo filter chain for a given speed.
|
||||
* atempo only accepts values in [0.5, 100.0], so we chain
|
||||
* multiple filters for extreme values (e.g. 0.25 = atempo=0.5,atempo=0.5).
|
||||
*/
|
||||
function buildAtempoChain(speed: number): string {
|
||||
const parts: string[] = []
|
||||
let remaining = Math.max(0.25, Math.min(4, speed))
|
||||
|
||||
while (remaining > 100) {
|
||||
parts.push("atempo=100.0")
|
||||
remaining /= 100
|
||||
}
|
||||
while (remaining < 0.5) {
|
||||
parts.push("atempo=0.5")
|
||||
remaining /= 0.5
|
||||
}
|
||||
parts.push(`atempo=${remaining}`)
|
||||
|
||||
return parts.join(",")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user