FRE-600: Fix code review blockers

- Consolidated duplicate UndoManagers to single instance
- Fixed connection promise to only resolve on 'connected' status
- Fixed WebSocketProvider import (WebsocketProvider)
- Added proper doc.destroy() cleanup
- Renamed isPresenceInitialized property to avoid conflict

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-25 00:08:01 -04:00
parent 65b552bb08
commit 7c684a42cc
48450 changed files with 5679671 additions and 383 deletions

118
node_modules/@trpc/client/src/TRPCClientError.ts generated vendored Normal file
View File

@@ -0,0 +1,118 @@
import type {
inferClientTypes,
InferrableClientTypes,
Maybe,
TRPCErrorResponse,
} from '@trpc/server/unstable-core-do-not-import';
import {
isObject,
type DefaultErrorShape,
} from '@trpc/server/unstable-core-do-not-import';
type inferErrorShape<TInferrable extends InferrableClientTypes> =
inferClientTypes<TInferrable>['errorShape'];
export interface TRPCClientErrorBase<TShape extends DefaultErrorShape> {
readonly message: string;
readonly shape: Maybe<TShape>;
readonly data: Maybe<TShape['data']>;
}
export type TRPCClientErrorLike<TInferrable extends InferrableClientTypes> =
TRPCClientErrorBase<inferErrorShape<TInferrable>>;
export function isTRPCClientError<TInferrable extends InferrableClientTypes>(
cause: unknown,
): cause is TRPCClientError<TInferrable> {
return cause instanceof TRPCClientError;
}
function isTRPCErrorResponse(obj: unknown): obj is TRPCErrorResponse<any> {
return (
isObject(obj) &&
isObject(obj['error']) &&
typeof obj['error']['code'] === 'number' &&
typeof obj['error']['message'] === 'string'
);
}
function getMessageFromUnknownError(err: unknown, fallback: string): string {
if (typeof err === 'string') {
return err;
}
if (isObject(err) && typeof err['message'] === 'string') {
return err['message'];
}
return fallback;
}
export class TRPCClientError<TRouterOrProcedure extends InferrableClientTypes>
extends Error
implements TRPCClientErrorBase<inferErrorShape<TRouterOrProcedure>>
{
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore override doesn't work in all environments due to "This member cannot have an 'override' modifier because it is not declared in the base class 'Error'"
public override readonly cause;
public readonly shape: Maybe<inferErrorShape<TRouterOrProcedure>>;
public readonly data: Maybe<inferErrorShape<TRouterOrProcedure>['data']>;
/**
* Additional meta data about the error
* In the case of HTTP-errors, we'll have `response` and potentially `responseJSON` here
*/
public meta;
constructor(
message: string,
opts?: {
result?: Maybe<TRPCErrorResponse<inferErrorShape<TRouterOrProcedure>>>;
cause?: Error;
meta?: Record<string, unknown>;
},
) {
const cause = opts?.cause;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore https://github.com/tc39/proposal-error-cause
super(message, { cause });
this.meta = opts?.meta;
this.cause = cause;
this.shape = opts?.result?.error;
this.data = opts?.result?.error.data;
this.name = 'TRPCClientError';
Object.setPrototypeOf(this, TRPCClientError.prototype);
}
public static from<TRouterOrProcedure extends InferrableClientTypes>(
_cause: Error | TRPCErrorResponse<any> | object,
opts: { meta?: Record<string, unknown>; cause?: Error } = {},
): TRPCClientError<TRouterOrProcedure> {
const cause = _cause as unknown;
if (isTRPCClientError(cause)) {
if (opts.meta) {
// Decorate with meta error data
cause.meta = {
...cause.meta,
...opts.meta,
};
}
return cause;
}
if (isTRPCErrorResponse(cause)) {
return new TRPCClientError(cause.error.message, {
...opts,
result: cause,
cause: opts.cause,
});
}
return new TRPCClientError(
getMessageFromUnknownError(cause, 'Unknown error'),
{
...opts,
cause: cause as any,
},
);
}
}

176
node_modules/@trpc/client/src/createTRPCClient.ts generated vendored Normal file
View File

@@ -0,0 +1,176 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import type { Unsubscribable } from '@trpc/server/observable';
import type {
AnyProcedure,
AnyRouter,
inferClientTypes,
inferProcedureInput,
InferrableClientTypes,
inferTransformedProcedureOutput,
ProcedureType,
RouterRecord,
} from '@trpc/server/unstable-core-do-not-import';
import {
createFlatProxy,
createRecursiveProxy,
} from '@trpc/server/unstable-core-do-not-import';
import type { CreateTRPCClientOptions } from './createTRPCUntypedClient';
import type { TRPCSubscriptionObserver } from './internals/TRPCUntypedClient';
import { TRPCUntypedClient } from './internals/TRPCUntypedClient';
import type { TRPCProcedureOptions } from './internals/types';
import type { TRPCClientError } from './TRPCClientError';
/**
* @public
* @deprecated use {@link TRPCClient} instead, will be removed in v12
**/
export type inferRouterClient<TRouter extends AnyRouter> = TRPCClient<TRouter>;
/**
* @public
* @deprecated use {@link TRPCClient} instead, will be removed in v12
**/
export type CreateTRPCClient<TRouter extends AnyRouter> = TRPCClient<TRouter>;
const untypedClientSymbol = Symbol.for('trpc_untypedClient');
/**
* @public
**/
export type TRPCClient<TRouter extends AnyRouter> = DecoratedProcedureRecord<
{
transformer: TRouter['_def']['_config']['$types']['transformer'];
errorShape: TRouter['_def']['_config']['$types']['errorShape'];
},
TRouter['_def']['record']
> & {
[untypedClientSymbol]: TRPCUntypedClient<TRouter>;
};
/** @internal */
export type TRPCResolverDef = {
input: any;
output: any;
transformer: boolean;
errorShape: any;
};
type coerceAsyncGeneratorToIterable<T> =
T extends AsyncGenerator<infer $T, infer $Return, infer $Next>
? AsyncIterable<$T, $Return, $Next>
: T;
/** @internal */
export type Resolver<TDef extends TRPCResolverDef> = (
input: TDef['input'],
opts?: TRPCProcedureOptions,
) => Promise<coerceAsyncGeneratorToIterable<TDef['output']>>;
/** @internal */
export type SubscriptionResolver<TDef extends TRPCResolverDef> = (
input: TDef['input'],
opts: Partial<
TRPCSubscriptionObserver<TDef['output'], TRPCClientError<TDef>>
> &
TRPCProcedureOptions,
) => Unsubscribable;
type DecorateProcedure<
TType extends ProcedureType,
TDef extends TRPCResolverDef,
> = TType extends 'query'
? {
query: Resolver<TDef>;
}
: TType extends 'mutation'
? {
mutate: Resolver<TDef>;
}
: TType extends 'subscription'
? {
subscribe: SubscriptionResolver<TDef>;
}
: never;
/**
* @internal
*/
type DecoratedProcedureRecord<
TRoot extends InferrableClientTypes,
TRecord extends RouterRecord,
> = {
[TKey in keyof TRecord]: TRecord[TKey] extends infer $Value
? $Value extends AnyProcedure
? DecorateProcedure<
$Value['_def']['type'],
{
input: inferProcedureInput<$Value>;
output: inferTransformedProcedureOutput<
inferClientTypes<TRoot>,
$Value
>;
errorShape: inferClientTypes<TRoot>['errorShape'];
transformer: inferClientTypes<TRoot>['transformer'];
}
>
: $Value extends RouterRecord
? DecoratedProcedureRecord<TRoot, $Value>
: never
: never;
};
const clientCallTypeMap: Record<
keyof DecorateProcedure<any, any>,
ProcedureType
> = {
query: 'query',
mutate: 'mutation',
subscribe: 'subscription',
};
/** @internal */
export const clientCallTypeToProcedureType = (
clientCallType: string,
): ProcedureType => {
return clientCallTypeMap[clientCallType as keyof typeof clientCallTypeMap];
};
/**
* @internal
*/
export function createTRPCClientProxy<TRouter extends AnyRouter>(
client: TRPCUntypedClient<TRouter>,
): TRPCClient<TRouter> {
const proxy = createRecursiveProxy<TRPCClient<TRouter>>(({ path, args }) => {
const pathCopy = [...path];
const procedureType = clientCallTypeToProcedureType(pathCopy.pop()!);
const fullPath = pathCopy.join('.');
return (client[procedureType] as any)(fullPath, ...(args as any));
});
return createFlatProxy<TRPCClient<TRouter>>((key) => {
if (key === untypedClientSymbol) {
return client;
}
return proxy[key];
});
}
export function createTRPCClient<TRouter extends AnyRouter>(
opts: CreateTRPCClientOptions<TRouter>,
): TRPCClient<TRouter> {
const client = new TRPCUntypedClient(opts);
const proxy = createTRPCClientProxy<TRouter>(client);
return proxy;
}
/**
* Get an untyped client from a proxy client
* @internal
*/
export function getUntypedClient<TRouter extends AnyRouter>(
client: TRPCClient<TRouter>,
): TRPCUntypedClient<TRouter> {
return client[untypedClientSymbol];
}

View File

@@ -0,0 +1,15 @@
import type { AnyRouter } from '@trpc/server/unstable-core-do-not-import';
import type { CreateTRPCClientOptions } from './internals/TRPCUntypedClient';
import { TRPCUntypedClient } from './internals/TRPCUntypedClient';
export function createTRPCUntypedClient<TRouter extends AnyRouter>(
opts: CreateTRPCClientOptions<TRouter>,
): TRPCUntypedClient<TRouter> {
return new TRPCUntypedClient(opts);
}
export type {
CreateTRPCClientOptions,
TRPCRequestOptions,
} from './internals/TRPCUntypedClient';
export { TRPCUntypedClient } from './internals/TRPCUntypedClient';

23
node_modules/@trpc/client/src/getFetch.ts generated vendored Normal file
View File

@@ -0,0 +1,23 @@
import type { FetchEsque, NativeFetchEsque } from './internals/types';
type AnyFn = (...args: any[]) => unknown;
const isFunction = (fn: unknown): fn is AnyFn => typeof fn === 'function';
export function getFetch(
customFetchImpl?: FetchEsque | NativeFetchEsque,
): FetchEsque {
if (customFetchImpl) {
return customFetchImpl as FetchEsque;
}
if (typeof window !== 'undefined' && isFunction(window.fetch)) {
return window.fetch as FetchEsque;
}
if (typeof globalThis !== 'undefined' && isFunction(globalThis.fetch)) {
return globalThis.fetch as FetchEsque;
}
throw new Error('No fetch implementation found');
}

20
node_modules/@trpc/client/src/index.ts generated vendored Normal file
View File

@@ -0,0 +1,20 @@
// TODO: Be explicit about what we export here
export * from './createTRPCUntypedClient';
export * from './createTRPCClient';
export * from './getFetch';
export * from './TRPCClientError';
export * from './links';
export {
/**
* @deprecated - use `createTRPCClient` instead
*/
createTRPCClient as createTRPCProxyClient,
/**
* @deprecated - use `inferRouterClient` instead
*/
type inferRouterClient as inferRouterProxyClient,
} from './createTRPCClient';
export { type TRPCProcedureOptions } from './internals/types';

View File

@@ -0,0 +1,162 @@
import type {
inferObservableValue,
Unsubscribable,
} from '@trpc/server/observable';
import { observableToPromise, share } from '@trpc/server/observable';
import type {
AnyRouter,
inferAsyncIterableYield,
InferrableClientTypes,
Maybe,
TypeError,
} from '@trpc/server/unstable-core-do-not-import';
import { createChain } from '../links/internals/createChain';
import type { TRPCConnectionState } from '../links/internals/subscriptions';
import type {
OperationContext,
OperationLink,
TRPCClientRuntime,
TRPCLink,
} from '../links/types';
import { TRPCClientError } from '../TRPCClientError';
type TRPCType = 'mutation' | 'query' | 'subscription';
export interface TRPCRequestOptions {
/**
* Pass additional context to links
*/
context?: OperationContext;
signal?: AbortSignal;
}
export interface TRPCSubscriptionObserver<TValue, TError> {
onStarted: (opts: { context: OperationContext | undefined }) => void;
onData: (value: inferAsyncIterableYield<TValue>) => void;
onError: (err: TError) => void;
onStopped: () => void;
onComplete: () => void;
onConnectionStateChange: (state: TRPCConnectionState<TError>) => void;
}
/** @internal */
export type CreateTRPCClientOptions<TRouter extends InferrableClientTypes> = {
links: TRPCLink<TRouter>[];
transformer?: TypeError<'The transformer property has moved to httpLink/httpBatchLink/wsLink'>;
};
export class TRPCUntypedClient<TInferrable extends InferrableClientTypes> {
private readonly links: OperationLink<TInferrable>[];
public readonly runtime: TRPCClientRuntime;
private requestId: number;
constructor(opts: CreateTRPCClientOptions<TInferrable>) {
this.requestId = 0;
this.runtime = {};
// Initialize the links
this.links = opts.links.map((link) => link(this.runtime));
}
private $request<TInput = unknown, TOutput = unknown>(opts: {
type: TRPCType;
input: TInput;
path: string;
context?: OperationContext;
signal: Maybe<AbortSignal>;
}) {
const chain$ = createChain<AnyRouter, TInput, TOutput>({
links: this.links as OperationLink<any, any, any>[],
op: {
...opts,
context: opts.context ?? {},
id: ++this.requestId,
},
});
return chain$.pipe(share());
}
private async requestAsPromise<TInput = unknown, TOutput = unknown>(opts: {
type: TRPCType;
input: TInput;
path: string;
context?: OperationContext;
signal: Maybe<AbortSignal>;
}): Promise<TOutput> {
try {
const req$ = this.$request<TInput, TOutput>(opts);
type TValue = inferObservableValue<typeof req$>;
const envelope = await observableToPromise<TValue>(req$);
const data = (envelope.result as any).data;
return data;
} catch (err) {
throw TRPCClientError.from(err as Error);
}
}
public query(path: string, input?: unknown, opts?: TRPCRequestOptions) {
return this.requestAsPromise<unknown, unknown>({
type: 'query',
path,
input,
context: opts?.context,
signal: opts?.signal,
});
}
public mutation(path: string, input?: unknown, opts?: TRPCRequestOptions) {
return this.requestAsPromise<unknown, unknown>({
type: 'mutation',
path,
input,
context: opts?.context,
signal: opts?.signal,
});
}
public subscription(
path: string,
input: unknown,
opts: Partial<
TRPCSubscriptionObserver<unknown, TRPCClientError<AnyRouter>>
> &
TRPCRequestOptions,
): Unsubscribable {
const observable$ = this.$request({
type: 'subscription',
path,
input,
context: opts.context,
signal: opts.signal,
});
return observable$.subscribe({
next(envelope) {
switch (envelope.result.type) {
case 'state': {
opts.onConnectionStateChange?.(envelope.result);
break;
}
case 'started': {
opts.onStarted?.({
context: envelope.context,
});
break;
}
case 'stopped': {
opts.onStopped?.();
break;
}
case 'data':
case undefined: {
opts.onData?.(envelope.result.data);
break;
}
}
},
error(err) {
opts.onError?.(err);
},
complete() {
opts.onComplete?.();
},
});
}
}

160
node_modules/@trpc/client/src/internals/dataLoader.ts generated vendored Normal file
View File

@@ -0,0 +1,160 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
type BatchItem<TKey, TValue> = {
aborted: boolean;
key: TKey;
resolve: ((value: TValue) => void) | null;
reject: ((error: Error) => void) | null;
batch: Batch<TKey, TValue> | null;
};
type Batch<TKey, TValue> = {
items: BatchItem<TKey, TValue>[];
};
export type BatchLoader<TKey, TValue> = {
validate: (keys: TKey[]) => boolean;
fetch: (keys: TKey[]) => Promise<TValue[] | Promise<TValue>[]>;
};
/**
* A function that should never be called unless we messed something up.
*/
const throwFatalError = () => {
throw new Error(
'Something went wrong. Please submit an issue at https://github.com/trpc/trpc/issues/new',
);
};
/**
* Dataloader that's very inspired by https://github.com/graphql/dataloader
* Less configuration, no caching, and allows you to cancel requests
* When cancelling a single fetch the whole batch will be cancelled only when _all_ items are cancelled
*/
export function dataLoader<TKey, TValue>(
batchLoader: BatchLoader<TKey, TValue>,
) {
let pendingItems: BatchItem<TKey, TValue>[] | null = null;
let dispatchTimer: ReturnType<typeof setTimeout> | null = null;
const destroyTimerAndPendingItems = () => {
clearTimeout(dispatchTimer as any);
dispatchTimer = null;
pendingItems = null;
};
/**
* Iterate through the items and split them into groups based on the `batchLoader`'s validate function
*/
function groupItems(items: BatchItem<TKey, TValue>[]) {
const groupedItems: BatchItem<TKey, TValue>[][] = [[]];
let index = 0;
while (true) {
const item = items[index];
if (!item) {
// we're done
break;
}
const lastGroup = groupedItems[groupedItems.length - 1]!;
if (item.aborted) {
// Item was aborted before it was dispatched
item.reject?.(new Error('Aborted'));
index++;
continue;
}
const isValid = batchLoader.validate(
lastGroup.concat(item).map((it) => it.key),
);
if (isValid) {
lastGroup.push(item);
index++;
continue;
}
if (lastGroup.length === 0) {
item.reject?.(new Error('Input is too big for a single dispatch'));
index++;
continue;
}
// Create new group, next iteration will try to add the item to that
groupedItems.push([]);
}
return groupedItems;
}
function dispatch() {
const groupedItems = groupItems(pendingItems!);
destroyTimerAndPendingItems();
// Create batches for each group of items
for (const items of groupedItems) {
if (!items.length) {
continue;
}
const batch: Batch<TKey, TValue> = {
items,
};
for (const item of items) {
item.batch = batch;
}
const promise = batchLoader.fetch(batch.items.map((_item) => _item.key));
promise
.then(async (result) => {
await Promise.all(
result.map(async (valueOrPromise, index) => {
const item = batch.items[index]!;
try {
const value = await Promise.resolve(valueOrPromise);
item.resolve?.(value);
} catch (cause) {
item.reject?.(cause as Error);
}
item.batch = null;
item.reject = null;
item.resolve = null;
}),
);
for (const item of batch.items) {
item.reject?.(new Error('Missing result'));
item.batch = null;
}
})
.catch((cause) => {
for (const item of batch.items) {
item.reject?.(cause);
item.batch = null;
}
});
}
}
function load(key: TKey): Promise<TValue> {
const item: BatchItem<TKey, TValue> = {
aborted: false,
key,
batch: null,
resolve: throwFatalError,
reject: throwFatalError,
};
const promise = new Promise<TValue>((resolve, reject) => {
item.reject = reject;
item.resolve = resolve;
pendingItems ??= [];
pendingItems.push(item);
});
dispatchTimer ??= setTimeout(dispatch);
return promise;
}
return {
load,
};
}

View File

@@ -0,0 +1,15 @@
export function inputWithTrackedEventId(
input: unknown,
lastEventId: string | undefined,
) {
if (!lastEventId) {
return input;
}
if (input != null && typeof input !== 'object') {
return input;
}
return {
...(input ?? {}),
lastEventId,
};
}

70
node_modules/@trpc/client/src/internals/signals.ts generated vendored Normal file
View File

@@ -0,0 +1,70 @@
import type { Maybe } from '@trpc/server/unstable-core-do-not-import';
/**
* Like `Promise.all()` but for abort signals
* - When all signals have been aborted, the merged signal will be aborted
* - If one signal is `null`, no signal will be aborted
*/
export function allAbortSignals(...signals: Maybe<AbortSignal>[]): AbortSignal {
const ac = new AbortController();
const count = signals.length;
let abortedCount = 0;
const onAbort = () => {
if (++abortedCount === count) {
ac.abort();
}
};
for (const signal of signals) {
if (signal?.aborted) {
onAbort();
} else {
signal?.addEventListener('abort', onAbort, {
once: true,
});
}
}
return ac.signal;
}
/**
* Like `Promise.race` but for abort signals
*
* Basically, a ponyfill for
* [`AbortSignal.any`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/any_static).
*/
export function raceAbortSignals(
...signals: Maybe<AbortSignal>[]
): AbortSignal {
const ac = new AbortController();
for (const signal of signals) {
if (signal?.aborted) {
ac.abort();
} else {
signal?.addEventListener('abort', () => ac.abort(), { once: true });
}
}
return ac.signal;
}
export function abortSignalToPromise(signal: AbortSignal): Promise<never> {
return new Promise((_, reject) => {
if (signal.aborted) {
reject(signal.reason);
return;
}
signal.addEventListener(
'abort',
() => {
reject(signal.reason);
},
{ once: true },
);
});
}

76
node_modules/@trpc/client/src/internals/transformer.ts generated vendored Normal file
View File

@@ -0,0 +1,76 @@
import type {
AnyClientTypes,
CombinedDataTransformer,
DataTransformerOptions,
TypeError,
} from '@trpc/server/unstable-core-do-not-import';
/**
* @internal
*/
export type CoercedTransformerParameters = {
transformer?: DataTransformerOptions;
};
type TransformerOptionYes = {
/**
* Data transformer
*
* You must use the same transformer on the backend and frontend
* @see https://trpc.io/docs/v11/data-transformers
**/
transformer: DataTransformerOptions;
};
type TransformerOptionNo = {
/**
* Data transformer
*
* You must use the same transformer on the backend and frontend
* @see https://trpc.io/docs/v11/data-transformers
**/
transformer?: TypeError<'You must define a transformer on your your `initTRPC`-object first'>;
};
/**
* @internal
*/
export type TransformerOptions<
TRoot extends Pick<AnyClientTypes, 'transformer'>,
> = TRoot['transformer'] extends true
? TransformerOptionYes
: TransformerOptionNo;
/**
* @internal
*/
/**
* @internal
*/
export function getTransformer(
transformer:
| TransformerOptions<{ transformer: false }>['transformer']
| TransformerOptions<{ transformer: true }>['transformer']
| undefined,
): CombinedDataTransformer {
const _transformer =
transformer as CoercedTransformerParameters['transformer'];
if (!_transformer) {
return {
input: {
serialize: (data) => data,
deserialize: (data) => data,
},
output: {
serialize: (data) => data,
deserialize: (data) => data,
},
};
}
if ('input' in _transformer) {
return _transformer;
}
return {
input: _transformer,
output: _transformer,
};
}

102
node_modules/@trpc/client/src/internals/types.ts generated vendored Normal file
View File

@@ -0,0 +1,102 @@
/**
* A subset of the standard fetch function type needed by tRPC internally.
* @see fetch from lib.dom.d.ts
* @remarks
* If you need a property that you know exists but doesn't exist on this
* interface, go ahead and add it.
*/
export type FetchEsque = (
input: RequestInfo | URL | string,
init?: RequestInit | RequestInitEsque,
) => Promise<ResponseEsque>;
/**
* A simpler version of the native fetch function's type for packages with
* their own fetch types, such as undici and node-fetch.
*/
export type NativeFetchEsque = (
url: URL | string,
init?: NodeFetchRequestInitEsque,
) => Promise<ResponseEsque>;
export interface NodeFetchRequestInitEsque {
body?: string;
}
/**
* A subset of the standard RequestInit properties needed by tRPC internally.
* @see RequestInit from lib.dom.d.ts
* @remarks
* If you need a property that you know exists but doesn't exist on this
* interface, go ahead and add it.
*/
export interface RequestInitEsque {
/**
* Sets the request's body.
*/
body?: FormData | string | null | Uint8Array<ArrayBuffer> | Blob | File;
/**
* Sets the request's associated headers.
*/
headers?: [string, string][] | Record<string, string>;
/**
* The request's HTTP-style method.
*/
method?: string;
/**
* Sets the request's signal.
*/
signal?: AbortSignal | undefined;
}
/**
* A subset of the standard ReadableStream properties needed by tRPC internally.
* @see ReadableStream from lib.dom.d.ts
*/
export type WebReadableStreamEsque = {
getReader: () => ReadableStreamDefaultReader<Uint8Array>;
};
export type NodeJSReadableStreamEsque = {
on(
eventName: string | symbol,
listener: (...args: any[]) => void,
): NodeJSReadableStreamEsque;
};
/**
* A subset of the standard Response properties needed by tRPC internally.
* @see Response from lib.dom.d.ts
*/
export interface ResponseEsque {
readonly ok: boolean;
readonly body?: NodeJSReadableStreamEsque | WebReadableStreamEsque | null;
/**
* @remarks
* The built-in Response::json() method returns Promise<any>, but
* that's not as type-safe as unknown. We use unknown because we're
* more type-safe. You do want more type safety, right? 😉
*/
json(): Promise<unknown>;
}
/**
* @internal
*/
export type NonEmptyArray<TItem> = [TItem, ...TItem[]];
type ClientContext = Record<string, unknown>;
/**
* @public
*/
export interface TRPCProcedureOptions {
/**
* Client-side context
*/
context?: ClientContext;
signal?: AbortSignal;
}

15
node_modules/@trpc/client/src/links.ts generated vendored Normal file
View File

@@ -0,0 +1,15 @@
export * from './links/types';
export * from './links/HTTPBatchLinkOptions';
export * from './links/httpBatchLink';
export * from './links/httpBatchStreamLink';
export * from './links/httpLink';
export * from './links/loggerLink';
export * from './links/splitLink';
export * from './links/wsLink/wsLink';
export * from './links/httpSubscriptionLink';
export * from './links/retryLink';
export * from './links/localLink';
// These are not public (yet) as we get this functionality from tanstack query
// export * from './links/internals/dedupeLink';

View File

@@ -0,0 +1,24 @@
import type { AnyClientTypes } from '@trpc/server/unstable-core-do-not-import';
import type { NonEmptyArray } from '../internals/types';
import type { HTTPLinkBaseOptions } from './internals/httpUtils';
import type { HTTPHeaders, Operation } from './types';
export type HTTPBatchLinkOptions<TRoot extends AnyClientTypes> =
HTTPLinkBaseOptions<TRoot> & {
maxURLLength?: number;
/**
* Headers to be set on outgoing requests or a callback that of said headers
* @see http://trpc.io/docs/client/headers
*/
headers?:
| HTTPHeaders
| ((opts: {
opList: NonEmptyArray<Operation>;
}) => HTTPHeaders | Promise<HTTPHeaders>);
/**
* Maximum number of calls in a single batch request
* @default Infinity
*/
maxItems?: number;
};

141
node_modules/@trpc/client/src/links/httpBatchLink.ts generated vendored Normal file
View File

@@ -0,0 +1,141 @@
import type { AnyRouter, ProcedureType } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { transformResult } from '@trpc/server/unstable-core-do-not-import';
import type { BatchLoader } from '../internals/dataLoader';
import { dataLoader } from '../internals/dataLoader';
import { allAbortSignals } from '../internals/signals';
import type { NonEmptyArray } from '../internals/types';
import { TRPCClientError } from '../TRPCClientError';
import type { HTTPBatchLinkOptions } from './HTTPBatchLinkOptions';
import type { HTTPResult } from './internals/httpUtils';
import {
getUrl,
jsonHttpRequester,
resolveHTTPLinkOptions,
} from './internals/httpUtils';
import type { Operation, TRPCLink } from './types';
/**
* @see https://trpc.io/docs/client/links/httpBatchLink
*/
export function httpBatchLink<TRouter extends AnyRouter>(
opts: HTTPBatchLinkOptions<TRouter['_def']['_config']['$types']>,
): TRPCLink<TRouter> {
const resolvedOpts = resolveHTTPLinkOptions(opts);
const maxURLLength = opts.maxURLLength ?? Infinity;
const maxItems = opts.maxItems ?? Infinity;
return () => {
const batchLoader = (
type: ProcedureType,
): BatchLoader<Operation, HTTPResult> => {
return {
validate(batchOps) {
if (maxURLLength === Infinity && maxItems === Infinity) {
// escape hatch for quick calcs
return true;
}
if (batchOps.length > maxItems) {
return false;
}
const path = batchOps.map((op) => op.path).join(',');
const inputs = batchOps.map((op) => op.input);
const url = getUrl({
...resolvedOpts,
type,
path,
inputs,
signal: null,
});
return url.length <= maxURLLength;
},
async fetch(batchOps) {
const path = batchOps.map((op) => op.path).join(',');
const inputs = batchOps.map((op) => op.input);
const signal = allAbortSignals(...batchOps.map((op) => op.signal));
const res = await jsonHttpRequester({
...resolvedOpts,
path,
inputs,
type,
headers() {
if (!opts.headers) {
return {};
}
if (typeof opts.headers === 'function') {
return opts.headers({
opList: batchOps as NonEmptyArray<Operation>,
});
}
return opts.headers;
},
signal,
});
const resJSON = Array.isArray(res.json)
? res.json
: batchOps.map(() => res.json);
const result = resJSON.map((item) => ({
meta: res.meta,
json: item,
}));
return result;
},
};
};
const query = dataLoader(batchLoader('query'));
const mutation = dataLoader(batchLoader('mutation'));
const loaders = { query, mutation };
return ({ op }) => {
return observable((observer) => {
/* istanbul ignore if -- @preserve */
if (op.type === 'subscription') {
throw new Error(
'Subscriptions are unsupported by `httpLink` - use `httpSubscriptionLink` or `wsLink`',
);
}
const loader = loaders[op.type];
const promise = loader.load(op);
let _res = undefined as HTTPResult | undefined;
promise
.then((res) => {
_res = res;
const transformed = transformResult(
res.json,
resolvedOpts.transformer.output,
);
if (!transformed.ok) {
observer.error(
TRPCClientError.from(transformed.error, {
meta: res.meta,
}),
);
return;
}
observer.next({
context: res.meta,
result: transformed.result,
});
observer.complete();
})
.catch((err) => {
observer.error(
TRPCClientError.from(err, {
meta: _res?.meta,
}),
);
});
return () => {
// noop
};
});
};
};
}

View File

@@ -0,0 +1,227 @@
import type { AnyRouter, ProcedureType } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import type { TRPCErrorShape, TRPCResponse } from '@trpc/server/rpc';
import type { AnyClientTypes } from '@trpc/server/unstable-core-do-not-import';
import { jsonlStreamConsumer } from '@trpc/server/unstable-core-do-not-import';
import type { BatchLoader } from '../internals/dataLoader';
import { dataLoader } from '../internals/dataLoader';
import { allAbortSignals, raceAbortSignals } from '../internals/signals';
import type { NonEmptyArray } from '../internals/types';
import { TRPCClientError } from '../TRPCClientError';
import type { HTTPBatchLinkOptions } from './HTTPBatchLinkOptions';
import type { HTTPResult } from './internals/httpUtils';
import {
fetchHTTPResponse,
getBody,
getUrl,
resolveHTTPLinkOptions,
} from './internals/httpUtils';
import type { Operation, TRPCLink } from './types';
export type HTTPBatchStreamLinkOptions<TRoot extends AnyClientTypes> =
HTTPBatchLinkOptions<TRoot> & {
/**
* Which header to use to signal the server that the client wants a streaming response.
* - `'trpc-accept'` (default): sends `trpc-accept: application/jsonl` header
* - `'accept'`: sends `Accept: application/jsonl` header, which can avoid CORS preflight for cross-origin streaming queries. Be aware that `application/jsonl` is not an official MIME type and so this is not completely spec-compliant - you should test that your infrastructure supports this value.
* @default 'trpc-accept'
*/
streamHeader?: 'trpc-accept' | 'accept';
};
/**
* @see https://trpc.io/docs/client/links/httpBatchStreamLink
*/
export function httpBatchStreamLink<TRouter extends AnyRouter>(
opts: HTTPBatchStreamLinkOptions<TRouter['_def']['_config']['$types']>,
): TRPCLink<TRouter> {
const resolvedOpts = resolveHTTPLinkOptions(opts);
const maxURLLength = opts.maxURLLength ?? Infinity;
const maxItems = opts.maxItems ?? Infinity;
return () => {
const batchLoader = (
type: ProcedureType,
): BatchLoader<Operation, HTTPResult> => {
return {
validate(batchOps) {
if (maxURLLength === Infinity && maxItems === Infinity) {
// escape hatch for quick calcs
return true;
}
if (batchOps.length > maxItems) {
return false;
}
const path = batchOps.map((op) => op.path).join(',');
const inputs = batchOps.map((op) => op.input);
const url = getUrl({
...resolvedOpts,
type,
path,
inputs,
signal: null,
});
return url.length <= maxURLLength;
},
async fetch(batchOps) {
const path = batchOps.map((op) => op.path).join(',');
const inputs = batchOps.map((op) => op.input);
const batchSignals = allAbortSignals(
...batchOps.map((op) => op.signal),
);
const abortController = new AbortController();
const responsePromise = fetchHTTPResponse({
...resolvedOpts,
signal: raceAbortSignals(batchSignals, abortController.signal),
type,
contentTypeHeader: 'application/json',
trpcAcceptHeader: 'application/jsonl',
trpcAcceptHeaderKey: opts.streamHeader ?? 'trpc-accept',
getUrl,
getBody,
inputs,
path,
headers() {
if (!opts.headers) {
return {};
}
if (typeof opts.headers === 'function') {
return opts.headers({
opList: batchOps as NonEmptyArray<Operation>,
});
}
return opts.headers;
},
});
const res = await responsePromise;
if (!res.ok) {
// Server returned a non-2xx response (e.g. batching disabled).
// The body is plain JSON, not JSONL, so parse it directly and
// propagate the same error to every operation in the batch.
const json = (await res.json()) as TRPCResponse;
if ('error' in json) {
json.error = resolvedOpts.transformer.output.deserialize(
json.error,
);
}
return batchOps.map(
(): Promise<HTTPResult> =>
Promise.resolve({
json,
meta: {
response: res,
},
}),
);
}
const [head] = await jsonlStreamConsumer<
Record<string, Promise<any>>
>({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
from: res.body!,
deserialize: (data) =>
resolvedOpts.transformer.output.deserialize(data),
// onError: console.error,
formatError(opts) {
const error = opts.error as TRPCErrorShape;
return TRPCClientError.from({
error,
});
},
abortController,
});
const promises = Object.keys(batchOps).map(
async (key): Promise<HTTPResult> => {
let json: TRPCResponse = await Promise.resolve(head[key]);
if ('result' in json) {
/**
* Not very pretty, but we need to unwrap nested data as promises
* Our stream producer will only resolve top-level async values or async values that are directly nested in another async value
*/
const result = await Promise.resolve(json.result);
json = {
result: {
data: await Promise.resolve(result.data),
},
};
}
return {
json,
meta: {
response: res,
},
};
},
);
return promises;
},
};
};
const query = dataLoader(batchLoader('query'));
const mutation = dataLoader(batchLoader('mutation'));
const loaders = { query, mutation };
return ({ op }) => {
return observable((observer) => {
/* istanbul ignore if -- @preserve */
if (op.type === 'subscription') {
throw new Error(
'Subscriptions are unsupported by `httpBatchStreamLink` - use `httpSubscriptionLink` or `wsLink`',
);
}
const loader = loaders[op.type];
const promise = loader.load(op);
let _res = undefined as HTTPResult | undefined;
promise
.then((res) => {
_res = res;
if ('error' in res.json) {
observer.error(
TRPCClientError.from(res.json, {
meta: res.meta,
}),
);
return;
} else if ('result' in res.json) {
observer.next({
context: res.meta,
result: res.json.result,
});
observer.complete();
return;
}
observer.complete();
})
.catch((err) => {
observer.error(
TRPCClientError.from(err, {
meta: _res?.meta,
}),
);
});
return () => {
// noop
};
});
};
};
}
/**
* @deprecated use {@link httpBatchStreamLink} instead
*/
export const unstable_httpBatchStreamLink = httpBatchStreamLink;

142
node_modules/@trpc/client/src/links/httpLink.ts generated vendored Normal file
View File

@@ -0,0 +1,142 @@
import { observable } from '@trpc/server/observable';
import type {
AnyClientTypes,
AnyRouter,
} from '@trpc/server/unstable-core-do-not-import';
import { transformResult } from '@trpc/server/unstable-core-do-not-import';
import { TRPCClientError } from '../TRPCClientError';
import type {
HTTPLinkBaseOptions,
HTTPResult,
Requester,
} from './internals/httpUtils';
import {
getUrl,
httpRequest,
jsonHttpRequester,
resolveHTTPLinkOptions,
} from './internals/httpUtils';
import {
isFormData,
isOctetType,
type HTTPHeaders,
type Operation,
type TRPCLink,
} from './types';
export type HTTPLinkOptions<TRoot extends AnyClientTypes> =
HTTPLinkBaseOptions<TRoot> & {
/**
* Headers to be set on outgoing requests or a callback that of said headers
* @see http://trpc.io/docs/client/headers
*/
headers?:
| HTTPHeaders
| ((opts: { op: Operation }) => HTTPHeaders | Promise<HTTPHeaders>);
};
const universalRequester: Requester = (opts) => {
if ('input' in opts) {
const { input } = opts;
if (isFormData(input)) {
if (opts.type !== 'mutation' && opts.methodOverride !== 'POST') {
throw new Error('FormData is only supported for mutations');
}
return httpRequest({
...opts,
// The browser will set this automatically and include the boundary= in it
contentTypeHeader: undefined,
getUrl,
getBody: () => input,
});
}
if (isOctetType(input)) {
if (opts.type !== 'mutation' && opts.methodOverride !== 'POST') {
throw new Error('Octet type input is only supported for mutations');
}
return httpRequest({
...opts,
contentTypeHeader: 'application/octet-stream',
getUrl,
getBody: () => input,
});
}
}
return jsonHttpRequester(opts);
};
/**
* @see https://trpc.io/docs/client/links/httpLink
*/
export function httpLink<TRouter extends AnyRouter = AnyRouter>(
opts: HTTPLinkOptions<TRouter['_def']['_config']['$types']>,
): TRPCLink<TRouter> {
const resolvedOpts = resolveHTTPLinkOptions(opts);
return () => {
return (operationOpts) => {
const { op } = operationOpts;
return observable((observer) => {
const { path, input, type } = op;
/* istanbul ignore if -- @preserve */
if (type === 'subscription') {
throw new Error(
'Subscriptions are unsupported by `httpLink` - use `httpSubscriptionLink` or `wsLink`',
);
}
const request = universalRequester({
...resolvedOpts,
type,
path,
input,
signal: op.signal,
headers() {
if (!opts.headers) {
return {};
}
if (typeof opts.headers === 'function') {
return opts.headers({
op,
});
}
return opts.headers;
},
});
let meta: HTTPResult['meta'] | undefined = undefined;
request
.then((res) => {
meta = res.meta;
const transformed = transformResult(
res.json,
resolvedOpts.transformer.output,
);
if (!transformed.ok) {
observer.error(
TRPCClientError.from(transformed.error, {
meta,
}),
);
return;
}
observer.next({
context: res.meta,
result: transformed.result,
});
observer.complete();
})
.catch((cause) => {
observer.error(TRPCClientError.from(cause, { meta }));
});
return () => {
// noop
};
});
};
};
}

View File

@@ -0,0 +1,245 @@
import { behaviorSubject, observable } from '@trpc/server/observable';
import type { TRPCErrorShape, TRPCResult } from '@trpc/server/rpc';
import type {
AnyClientTypes,
EventSourceLike,
inferClientTypes,
InferrableClientTypes,
} from '@trpc/server/unstable-core-do-not-import';
import {
retryableRpcCodes,
run,
sseStreamConsumer,
} from '@trpc/server/unstable-core-do-not-import';
import { inputWithTrackedEventId } from '../internals/inputWithTrackedEventId';
import { raceAbortSignals } from '../internals/signals';
import { TRPCClientError } from '../TRPCClientError';
import type { TRPCConnectionState } from '../unstable-internals';
import { getTransformer, type TransformerOptions } from '../unstable-internals';
import { getUrl } from './internals/httpUtils';
import {
resultOf,
type UrlOptionsWithConnectionParams,
} from './internals/urlWithConnectionParams';
import type { Operation, TRPCLink } from './types';
async function urlWithConnectionParams(
opts: UrlOptionsWithConnectionParams,
): Promise<string> {
let url = await resultOf(opts.url);
if (opts.connectionParams) {
const params = await resultOf(opts.connectionParams);
const prefix = url.includes('?') ? '&' : '?';
url +=
prefix + 'connectionParams=' + encodeURIComponent(JSON.stringify(params));
}
return url;
}
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
} & TransformerOptions<TRoot> &
UrlOptionsWithConnectionParams;
/**
* @see https://trpc.io/docs/client/links/httpSubscriptionLink
*/
export function httpSubscriptionLink<
TInferrable extends InferrableClientTypes,
TEventSource extends EventSourceLike.AnyConstructor,
>(
opts: HTTPSubscriptionLinkOptions<
inferClientTypes<TInferrable>,
TEventSource
>,
): TRPCLink<TInferrable> {
const transformer = getTransformer(opts.transformer);
return () => {
return ({ op }) => {
return observable((observer) => {
const { type, path, input } = op;
/* istanbul ignore if -- @preserve */
if (type !== 'subscription') {
throw new Error('httpSubscriptionLink only supports subscriptions');
}
let lastEventId: string | undefined = undefined;
const ac = new AbortController();
const signal = raceAbortSignals(op.signal, ac.signal);
const eventSourceStream = sseStreamConsumer<{
EventSource: TEventSource;
data: Partial<{
id?: string;
data: unknown;
}>;
error: TRPCErrorShape;
}>({
url: async () =>
getUrl({
transformer,
url: await urlWithConnectionParams(opts),
input: inputWithTrackedEventId(input, lastEventId),
path,
type,
signal: null,
}),
init: () => resultOf(opts.eventSourceOptions, { op }),
signal,
deserialize: (data) => transformer.output.deserialize(data),
EventSource:
opts.EventSource ??
(globalThis.EventSource as never as TEventSource),
});
const connectionState = behaviorSubject<
TRPCConnectionState<TRPCClientError<any>>
>({
type: 'state',
state: 'connecting',
error: null,
});
const connectionSub = connectionState.subscribe({
next(state) {
observer.next({
result: state,
});
},
});
run(async () => {
for await (const chunk of eventSourceStream) {
switch (chunk.type) {
case 'ping':
// do nothing
break;
case 'data':
const chunkData = chunk.data;
let result: TRPCResult<unknown>;
if (chunkData.id) {
// if the `tracked()`-helper is used, we always have an `id` field
lastEventId = chunkData.id;
result = {
id: chunkData.id,
data: chunkData,
};
} else {
result = {
data: chunkData.data,
};
}
observer.next({
result,
context: {
eventSource: chunk.eventSource,
},
});
break;
case 'connected': {
observer.next({
result: {
type: 'started',
},
context: {
eventSource: chunk.eventSource,
},
});
connectionState.next({
type: 'state',
state: 'pending',
error: null,
});
break;
}
case 'serialized-error': {
const error = TRPCClientError.from({ error: chunk.error });
if (retryableRpcCodes.includes(chunk.error.code)) {
//
connectionState.next({
type: 'state',
state: 'connecting',
error,
});
break;
}
//
// non-retryable error, cancel the subscription
throw error;
}
case 'connecting': {
const lastState = connectionState.get();
const error = chunk.event && TRPCClientError.from(chunk.event);
if (!error && lastState.state === 'connecting') {
break;
}
connectionState.next({
type: 'state',
state: 'connecting',
error,
});
break;
}
case 'timeout': {
connectionState.next({
type: 'state',
state: 'connecting',
error: new TRPCClientError(
`Timeout of ${chunk.ms}ms reached while waiting for a response`,
),
});
}
}
}
observer.next({
result: {
type: 'stopped',
},
});
connectionState.next({
type: 'state',
state: 'idle',
error: null,
});
observer.complete();
}).catch((error) => {
observer.error(TRPCClientError.from(error));
});
return () => {
observer.complete();
ac.abort();
connectionSub.unsubscribe();
};
});
};
};
}
/**
* @deprecated use {@link httpSubscriptionLink} instead
*/
export const unstable_httpSubscriptionLink = httpSubscriptionLink;

View File

@@ -0,0 +1,17 @@
export function isOctetType(
input: unknown,
): input is Uint8Array<ArrayBuffer> | Blob {
return (
input instanceof Uint8Array ||
// File extends from Blob but is only available in nodejs from v20
input instanceof Blob
);
}
export function isFormData(input: unknown) {
return input instanceof FormData;
}
export function isNonJsonSerializable(input: unknown) {
return isOctetType(input) || isFormData(input);
}

View File

@@ -0,0 +1,40 @@
import { observable } from '@trpc/server/observable';
import type { AnyRouter } from '@trpc/server/unstable-core-do-not-import';
import type {
Operation,
OperationLink,
OperationResultObservable,
} from '../types';
/** @internal */
export function createChain<
TRouter extends AnyRouter,
TInput = unknown,
TOutput = unknown,
>(opts: {
links: OperationLink<TRouter, TInput, TOutput>[];
op: Operation<TInput>;
}): OperationResultObservable<TRouter, TOutput> {
return observable((observer) => {
function execute(index = 0, op = opts.op) {
const next = opts.links[index];
if (!next) {
throw new Error(
'No more links to execute - did you forget to add an ending link?',
);
}
const subscription = next({
op,
next(nextOp) {
const nextObserver = execute(index + 1, nextOp);
return nextObserver;
},
});
return subscription;
}
const obs$ = execute();
return obs$.subscribe(observer);
});
}

View File

@@ -0,0 +1,56 @@
/* istanbul ignore file -- @preserve */
// We're not actually exporting this link
import type { Observable } from '@trpc/server/observable';
import { observable, share } from '@trpc/server/observable';
import type { AnyRouter } from '@trpc/server/unstable-core-do-not-import';
import type { TRPCLink } from '../types';
/**
* @internal used for testing
*/
export function dedupeLink<
TRouter extends AnyRouter = AnyRouter,
>(): TRPCLink<TRouter> {
// initialized config
return () => {
// initialized in app
const pending: Record<string, Observable<any, any>> = {};
return ({ op, next }) => {
// initialized for request
if (op.type !== 'query') {
// pass through
return next(op);
}
const key = JSON.stringify([op.path, op.input]);
const obs$ = pending[key];
if (obs$) {
// console.log('hooking into pending', { op });
return observable((observer) => obs$.subscribe(observer));
}
const shared$ = observable((observer) => {
function reset() {
delete pending[key];
}
const subscription = next(op).subscribe({
...observer,
error(e) {
reset();
observer.error(e);
},
complete() {
reset();
observer.complete();
},
});
return () => {
reset();
subscription.unsubscribe();
};
}).pipe(share());
pending[key] = shared$;
return shared$;
};
};
}

View File

@@ -0,0 +1,243 @@
import type {
AnyClientTypes,
CombinedDataTransformer,
Maybe,
ProcedureType,
TRPCAcceptHeader,
TRPCResponse,
} from '@trpc/server/unstable-core-do-not-import';
import { getFetch } from '../../getFetch';
import type {
FetchEsque,
RequestInitEsque,
ResponseEsque,
} from '../../internals/types';
import type { TransformerOptions } from '../../unstable-internals';
import { getTransformer } from '../../unstable-internals';
import type { HTTPHeaders } from '../types';
/**
* @internal
*/
export type HTTPLinkBaseOptions<
TRoot extends Pick<AnyClientTypes, 'transformer'>,
> = {
url: string | URL;
/**
* Add ponyfill for fetch
*/
fetch?: FetchEsque;
/**
* Send all requests `as POST`s requests regardless of the procedure type
* The HTTP handler must separately allow overriding the method. See:
* @see https://trpc.io/docs/rpc
*/
methodOverride?: 'POST';
} & TransformerOptions<TRoot>;
export interface ResolvedHTTPLinkOptions {
url: string;
fetch?: FetchEsque;
transformer: CombinedDataTransformer;
methodOverride?: 'POST';
}
export function resolveHTTPLinkOptions(
opts: HTTPLinkBaseOptions<AnyClientTypes>,
): ResolvedHTTPLinkOptions {
return {
url: opts.url.toString(),
fetch: opts.fetch,
transformer: getTransformer(opts.transformer),
methodOverride: opts.methodOverride,
};
}
// https://github.com/trpc/trpc/pull/669
function arrayToDict(array: unknown[]) {
const dict: Record<number, unknown> = {};
for (let index = 0; index < array.length; index++) {
const element = array[index];
dict[index] = element;
}
return dict;
}
const METHOD = {
query: 'GET',
mutation: 'POST',
subscription: 'PATCH',
} as const;
export interface HTTPResult {
json: TRPCResponse;
meta: {
response: ResponseEsque;
responseJSON?: unknown;
};
}
type GetInputOptions = {
transformer: CombinedDataTransformer;
} & ({ input: unknown } | { inputs: unknown[] });
export function getInput(opts: GetInputOptions) {
return 'input' in opts
? opts.transformer.input.serialize(opts.input)
: arrayToDict(
opts.inputs.map((_input) => opts.transformer.input.serialize(_input)),
);
}
export type HTTPBaseRequestOptions = GetInputOptions &
ResolvedHTTPLinkOptions & {
type: ProcedureType;
path: string;
signal: Maybe<AbortSignal>;
};
type GetUrl = (opts: HTTPBaseRequestOptions) => string;
type GetBody = (opts: HTTPBaseRequestOptions) => RequestInitEsque['body'];
export type ContentOptions = {
trpcAcceptHeader?: TRPCAcceptHeader;
trpcAcceptHeaderKey?: 'trpc-accept' | 'accept';
contentTypeHeader?: string;
getUrl: GetUrl;
getBody: GetBody;
};
export const getUrl: GetUrl = (opts) => {
const parts = opts.url.split('?') as [string, string?];
const base = parts[0].replace(/\/$/, ''); // Remove any trailing slashes
let url = base + '/' + opts.path;
const queryParts: string[] = [];
if (parts[1]) {
queryParts.push(parts[1]);
}
if ('inputs' in opts) {
queryParts.push('batch=1');
}
if (opts.type === 'query' || opts.type === 'subscription') {
const input = getInput(opts);
if (input !== undefined && opts.methodOverride !== 'POST') {
queryParts.push(`input=${encodeURIComponent(JSON.stringify(input))}`);
}
}
if (queryParts.length) {
url += '?' + queryParts.join('&');
}
return url;
};
export const getBody: GetBody = (opts) => {
if (opts.type === 'query' && opts.methodOverride !== 'POST') {
return undefined;
}
const input = getInput(opts);
return input !== undefined ? JSON.stringify(input) : undefined;
};
export type Requester = (
opts: HTTPBaseRequestOptions & {
headers: () => HTTPHeaders | Promise<HTTPHeaders>;
},
) => Promise<HTTPResult>;
export const jsonHttpRequester: Requester = (opts) => {
return httpRequest({
...opts,
contentTypeHeader: 'application/json',
getUrl,
getBody,
});
};
/**
* Polyfill for DOMException with AbortError name
*/
class AbortError extends Error {
constructor() {
const name = 'AbortError';
super(name);
this.name = name;
this.message = name;
}
}
export type HTTPRequestOptions = ContentOptions &
HTTPBaseRequestOptions & {
headers: () => HTTPHeaders | Promise<HTTPHeaders>;
};
/**
* Polyfill for `signal.throwIfAborted()`
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/throwIfAborted
*/
const throwIfAborted = (signal: Maybe<AbortSignal>) => {
if (!signal?.aborted) {
return;
}
// If available, use the native implementation
signal.throwIfAborted?.();
// If we have `DOMException`, use it
if (typeof DOMException !== 'undefined') {
throw new DOMException('AbortError', 'AbortError');
}
// Otherwise, use our own implementation
throw new AbortError();
};
export async function fetchHTTPResponse(opts: HTTPRequestOptions) {
throwIfAborted(opts.signal);
const url = opts.getUrl(opts);
const body = opts.getBody(opts);
const method = opts.methodOverride ?? METHOD[opts.type];
const resolvedHeaders = await (async () => {
const heads = await opts.headers();
if (Symbol.iterator in heads) {
return Object.fromEntries(heads);
}
return heads;
})();
const headers = {
...(opts.contentTypeHeader && method !== 'GET'
? { 'content-type': opts.contentTypeHeader }
: {}),
...(opts.trpcAcceptHeader
? { [opts.trpcAcceptHeaderKey ?? 'trpc-accept']: opts.trpcAcceptHeader }
: undefined),
...resolvedHeaders,
};
return getFetch(opts.fetch)(url, {
method,
signal: opts.signal,
body,
headers,
});
}
export async function httpRequest(
opts: HTTPRequestOptions,
): Promise<HTTPResult> {
const meta = {} as HTTPResult['meta'];
const res = await fetchHTTPResponse(opts);
meta.response = res;
const json = await res.json();
meta.responseJSON = json;
return {
json: json as TRPCResponse,
meta,
};
}

View File

@@ -0,0 +1,23 @@
interface ConnectionStateBase<TError> {
type: 'state';
data?: never;
error: TError | null;
}
interface ConnectionIdleState extends ConnectionStateBase<null> {
state: 'idle';
}
interface ConnectionConnectingState<TError>
extends ConnectionStateBase<TError | null> {
state: 'connecting';
}
interface ConnectionPendingState extends ConnectionStateBase<null> {
state: 'pending';
}
export type TRPCConnectionState<TError> =
| ConnectionIdleState
| ConnectionConnectingState<TError>
| ConnectionPendingState;

View File

@@ -0,0 +1,33 @@
import { type TRPCRequestInfo } from '@trpc/server/http';
/**
* Get the result of a value or function that returns a value
* It also optionally accepts typesafe arguments for the function
*/
export const resultOf = <T, TArgs extends any[]>(
value: T | ((...args: TArgs) => T),
...args: TArgs
): T => {
return typeof value === 'function'
? (value as (...args: TArgs) => T)(...args)
: value;
};
/**
* A value that can be wrapped in callback
*/
export type CallbackOrValue<T> = T | (() => T | Promise<T>);
export interface UrlOptionsWithConnectionParams {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: CallbackOrValue<string>;
/**
* Connection params that are available in `createContext()`
* - For `wsLink`/`wsClient`, these are sent as the first message
* - For `httpSubscriptionLink`, these are serialized as part of the URL under the `connectionParams` query
*/
connectionParams?: CallbackOrValue<TRPCRequestInfo['connectionParams']>;
}

284
node_modules/@trpc/client/src/links/localLink.ts generated vendored Normal file
View File

@@ -0,0 +1,284 @@
import {
getTRPCErrorFromUnknown,
getTRPCErrorShape,
isTrackedEnvelope,
} from '@trpc/server';
import { behaviorSubject, observable } from '@trpc/server/observable';
import { TRPC_ERROR_CODES_BY_KEY, type TRPCResult } from '@trpc/server/rpc';
import {
callProcedure,
isAbortError,
isAsyncIterable,
iteratorResource,
makeResource,
retryableRpcCodes,
run,
type AnyRouter,
type ErrorHandlerOptions,
type inferClientTypes,
type inferRouterContext,
} from '@trpc/server/unstable-core-do-not-import';
import { inputWithTrackedEventId } from '../internals/inputWithTrackedEventId';
import { abortSignalToPromise, raceAbortSignals } from '../internals/signals';
import { getTransformer } from '../internals/transformer';
import type { TransformerOptions } from '../internals/transformer';
import { isTRPCClientError, TRPCClientError } from '../TRPCClientError';
import type { TRPCConnectionState } from './internals/subscriptions';
import type { TRPCLink } from './types';
export type LocalLinkOptions<TRouter extends AnyRouter> = {
router: TRouter;
createContext: () => Promise<inferRouterContext<TRouter>>;
onError?: (opts: ErrorHandlerOptions<inferRouterContext<TRouter>>) => void;
} & TransformerOptions<inferClientTypes<TRouter>>;
/**
* localLink is a terminating link that allows you to make tRPC procedure calls directly in your application without going through HTTP.
*
* @see https://trpc.io/docs/links/localLink
*/
export function unstable_localLink<TRouter extends AnyRouter>(
opts: LocalLinkOptions<TRouter>,
): TRPCLink<TRouter> {
const transformer = getTransformer(opts.transformer);
const transformChunk = (chunk: unknown) => {
if (opts.transformer) {
// assume transformer will do the right thing
return chunk;
}
// Special case for undefined, because `JSON.stringify(undefined)` throws
if (chunk === undefined) {
return chunk;
}
const serialized = JSON.stringify(transformer.input.serialize(chunk));
const deserialized = JSON.parse(transformer.output.deserialize(serialized));
return deserialized;
};
return () =>
({ op }) =>
observable((observer) => {
let ctx: inferRouterContext<TRouter> | undefined = undefined;
const ac = new AbortController();
const signal = raceAbortSignals(op.signal, ac.signal);
const signalPromise = abortSignalToPromise(signal);
signalPromise.catch(() => {
// prevent unhandled rejection
});
let input = op.input;
async function runProcedure(newInput: unknown): Promise<unknown> {
input = newInput;
ctx = await opts.createContext();
return callProcedure({
router: opts.router,
path: op.path,
getRawInput: async () => newInput,
ctx,
type: op.type,
signal,
batchIndex: 0,
});
}
function onErrorCallback(cause: unknown) {
if (isAbortError(cause)) {
return;
}
opts.onError?.({
error: getTRPCErrorFromUnknown(cause),
type: op.type,
path: op.path,
input,
ctx,
});
}
function coerceToTRPCClientError(cause: unknown) {
if (isTRPCClientError<TRouter>(cause)) {
return cause;
}
const error = getTRPCErrorFromUnknown(cause);
const shape = getTRPCErrorShape({
config: opts.router._def._config,
ctx,
error,
input,
path: op.path,
type: op.type,
});
return TRPCClientError.from(
{ error: transformChunk(shape) },
{ cause: cause instanceof Error ? cause : undefined },
);
}
run(async () => {
switch (op.type) {
case 'query':
case 'mutation': {
const result = await runProcedure(op.input);
if (!isAsyncIterable(result)) {
observer.next({
result: { data: transformChunk(result) },
});
observer.complete();
break;
}
observer.next({
result: {
data: (async function* () {
await using iterator = iteratorResource(result);
using _finally = makeResource({}, () => {
observer.complete();
});
try {
while (true) {
const res = await Promise.race([
iterator.next(),
signalPromise,
]);
if (res.done) {
return transformChunk(res.value);
}
yield transformChunk(res.value);
}
} catch (cause) {
onErrorCallback(cause);
throw coerceToTRPCClientError(cause);
}
})(),
},
});
break;
}
case 'subscription': {
const connectionState = behaviorSubject<
TRPCConnectionState<TRPCClientError<any>>
>({
type: 'state',
state: 'connecting',
error: null,
});
const connectionSub = connectionState.subscribe({
next(state) {
observer.next({
result: state,
});
},
});
let lastEventId: string | undefined = undefined;
using _finally = makeResource({}, async () => {
observer.complete();
connectionState.next({
type: 'state',
state: 'idle',
error: null,
});
connectionSub.unsubscribe();
});
while (true) {
const result = await runProcedure(
inputWithTrackedEventId(op.input, lastEventId),
);
if (!isAsyncIterable(result)) {
throw new Error('Expected an async iterable');
}
await using iterator = iteratorResource(result);
observer.next({
result: {
type: 'started',
},
});
connectionState.next({
type: 'state',
state: 'pending',
error: null,
});
// Use a while loop to handle errors and reconnects
while (true) {
let res;
try {
res = await Promise.race([iterator.next(), signalPromise]);
} catch (cause) {
if (isAbortError(cause)) {
return;
}
const error = getTRPCErrorFromUnknown(cause);
if (
!retryableRpcCodes.includes(
TRPC_ERROR_CODES_BY_KEY[error.code],
)
) {
throw coerceToTRPCClientError(error);
}
onErrorCallback(error);
connectionState.next({
type: 'state',
state: 'connecting',
error: coerceToTRPCClientError(error),
});
break;
}
if (res.done) {
return;
}
let chunk: TRPCResult<unknown>;
if (isTrackedEnvelope(res.value)) {
lastEventId = res.value[0];
chunk = {
id: res.value[0],
data: {
id: res.value[0],
data: res.value[1],
},
};
} else {
chunk = {
data: res.value,
};
}
observer.next({
result: {
...chunk,
data: transformChunk(chunk.data),
},
});
}
}
break;
}
}
}).catch((cause) => {
onErrorCallback(cause);
observer.error(coerceToTRPCClientError(cause));
});
return () => {
ac.abort();
};
});
}
/**
* @deprecated Renamed to `unstable_localLink`. This alias will be removed in a future major release.
*/
export const experimental_localLink: typeof unstable_localLink =
unstable_localLink;

268
node_modules/@trpc/client/src/links/loggerLink.ts generated vendored Normal file
View File

@@ -0,0 +1,268 @@
/// <reference lib="dom.iterable" />
// `dom.iterable` types are explicitly required for extracting `FormData` values,
// as all implementations of `Symbol.iterable` are separated from the main `dom` types.
// Using triple-slash directive makes sure that it will be available,
// even if end-user `tsconfig.json` omits it in the `lib` array.
import { observable, tap } from '@trpc/server/observable';
import type {
AnyRouter,
InferrableClientTypes,
} from '@trpc/server/unstable-core-do-not-import';
import type { TRPCClientError } from '../TRPCClientError';
import type { Operation, OperationResultEnvelope, TRPCLink } from './types';
type ConsoleEsque = {
log: (...args: any[]) => void;
error: (...args: any[]) => void;
};
type EnableFnOptions<TRouter extends InferrableClientTypes> =
| {
direction: 'down';
result:
| OperationResultEnvelope<unknown, TRPCClientError<TRouter>>
| TRPCClientError<TRouter>;
}
| (Operation & {
direction: 'up';
});
type EnabledFn<TRouter extends AnyRouter> = (
opts: EnableFnOptions<TRouter>,
) => boolean;
type LoggerLinkFnOptions<TRouter extends AnyRouter> = Operation &
(
| {
/**
* Request result
*/
direction: 'down';
result:
| OperationResultEnvelope<unknown, TRPCClientError<TRouter>>
| TRPCClientError<TRouter>;
elapsedMs: number;
}
| {
/**
* Request was just initialized
*/
direction: 'up';
}
);
type LoggerLinkFn<TRouter extends AnyRouter> = (
opts: LoggerLinkFnOptions<TRouter>,
) => void;
type ColorMode = 'ansi' | 'css' | 'none';
export interface LoggerLinkOptions<TRouter extends AnyRouter> {
logger?: LoggerLinkFn<TRouter>;
enabled?: EnabledFn<TRouter>;
/**
* Used in the built-in defaultLogger
*/
console?: ConsoleEsque;
/**
* Color mode
* @default typeof window === 'undefined' ? 'ansi' : 'css'
*/
colorMode?: ColorMode;
/**
* Include context in the log - defaults to false unless `colorMode` is 'css'
*/
withContext?: boolean;
}
function isFormData(value: unknown): value is FormData {
if (typeof FormData === 'undefined') {
// FormData is not supported
return false;
}
return value instanceof FormData;
}
const palettes = {
css: {
query: ['72e3ff', '3fb0d8'],
mutation: ['c5a3fc', '904dfc'],
subscription: ['ff49e1', 'd83fbe'],
},
ansi: {
regular: {
// Cyan background, black and white text respectively
query: ['\x1b[30;46m', '\x1b[97;46m'],
// Magenta background, black and white text respectively
mutation: ['\x1b[30;45m', '\x1b[97;45m'],
// Green background, black and white text respectively
subscription: ['\x1b[30;42m', '\x1b[97;42m'],
},
bold: {
query: ['\x1b[1;30;46m', '\x1b[1;97;46m'],
mutation: ['\x1b[1;30;45m', '\x1b[1;97;45m'],
subscription: ['\x1b[1;30;42m', '\x1b[1;97;42m'],
},
},
} as const;
function constructPartsAndArgs(
opts: LoggerLinkFnOptions<any> & {
colorMode: ColorMode;
withContext?: boolean;
},
) {
const { direction, type, withContext, path, id, input } = opts;
const parts: string[] = [];
const args: any[] = [];
if (opts.colorMode === 'none') {
parts.push(direction === 'up' ? '>>' : '<<', type, `#${id}`, path);
} else if (opts.colorMode === 'ansi') {
const [lightRegular, darkRegular] = palettes.ansi.regular[type];
const [lightBold, darkBold] = palettes.ansi.bold[type];
const reset = '\x1b[0m';
parts.push(
direction === 'up' ? lightRegular : darkRegular,
direction === 'up' ? '>>' : '<<',
type,
direction === 'up' ? lightBold : darkBold,
`#${id}`,
path,
reset,
);
} else {
// css color mode
const [light, dark] = palettes.css[type];
const css = `
background-color: #${direction === 'up' ? light : dark};
color: ${direction === 'up' ? 'black' : 'white'};
padding: 2px;
`;
parts.push(
'%c',
direction === 'up' ? '>>' : '<<',
type,
`#${id}`,
`%c${path}%c`,
'%O',
);
args.push(
css,
`${css}; font-weight: bold;`,
`${css}; font-weight: normal;`,
);
}
if (direction === 'up') {
args.push(withContext ? { input, context: opts.context } : { input });
} else {
args.push({
input,
result: opts.result,
elapsedMs: opts.elapsedMs,
...(withContext && { context: opts.context }),
});
}
return { parts, args };
}
// maybe this should be moved to it's own package
const defaultLogger =
<TRouter extends AnyRouter>({
c = console,
colorMode = 'css',
withContext,
}: {
c?: ConsoleEsque;
colorMode?: ColorMode;
withContext?: boolean;
}): LoggerLinkFn<TRouter> =>
(props) => {
const rawInput = props.input;
const input = isFormData(rawInput)
? Object.fromEntries(rawInput)
: rawInput;
const { parts, args } = constructPartsAndArgs({
...props,
colorMode,
input,
withContext,
});
const fn: 'error' | 'log' =
props.direction === 'down' &&
props.result &&
(props.result instanceof Error ||
('error' in props.result.result && props.result.result.error))
? 'error'
: 'log';
c[fn].apply(null, [parts.join(' ')].concat(args));
};
/**
* @see https://trpc.io/docs/v11/client/links/loggerLink
*/
export function loggerLink<TRouter extends AnyRouter = AnyRouter>(
opts: LoggerLinkOptions<TRouter> = {},
): TRPCLink<TRouter> {
const { enabled = () => true } = opts;
const colorMode =
opts.colorMode ?? (typeof window === 'undefined' ? 'ansi' : 'css');
const withContext = opts.withContext ?? colorMode === 'css';
const {
logger = defaultLogger({ c: opts.console, colorMode, withContext }),
} = opts;
return () => {
return ({ op, next }) => {
return observable((observer) => {
// ->
if (enabled({ ...op, direction: 'up' })) {
logger({
...op,
direction: 'up',
});
}
const requestStartTime = Date.now();
function logResult(
result:
| OperationResultEnvelope<unknown, TRPCClientError<TRouter>>
| TRPCClientError<TRouter>,
) {
const elapsedMs = Date.now() - requestStartTime;
if (enabled({ ...op, direction: 'down', result })) {
logger({
...op,
direction: 'down',
elapsedMs,
result,
});
}
}
return next(op)
.pipe(
tap({
next(result) {
logResult(result);
},
error(result) {
logResult(result);
},
}),
)
.subscribe(observer);
});
};
};
}

117
node_modules/@trpc/client/src/links/retryLink.ts generated vendored Normal file
View File

@@ -0,0 +1,117 @@
/* istanbul ignore file -- @preserve */
// We're not actually exporting this link
import type { Unsubscribable } from '@trpc/server/observable';
import { observable } from '@trpc/server/observable';
import type { InferrableClientTypes } from '@trpc/server/unstable-core-do-not-import';
import { inputWithTrackedEventId } from '../internals/inputWithTrackedEventId';
import type { TRPCClientError } from '../TRPCClientError';
import type { Operation, TRPCLink } from './types';
interface RetryLinkOptions<TInferrable extends InferrableClientTypes> {
/**
* The retry function
*/
retry: (opts: RetryFnOptions<TInferrable>) => boolean;
/**
* The delay between retries in ms (defaults to 0)
*/
retryDelayMs?: (attempt: number) => number;
}
interface RetryFnOptions<TInferrable extends InferrableClientTypes> {
/**
* The operation that failed
*/
op: Operation;
/**
* The error that occurred
*/
error: TRPCClientError<TInferrable>;
/**
* The number of attempts that have been made (including the first call)
*/
attempts: number;
}
/**
* @see https://trpc.io/docs/v11/client/links/retryLink
*/
export function retryLink<TInferrable extends InferrableClientTypes>(
opts: RetryLinkOptions<TInferrable>,
): TRPCLink<TInferrable> {
// initialized config
return () => {
// initialized in app
return (callOpts) => {
// initialized for request
return observable((observer) => {
let next$: Unsubscribable;
let callNextTimeout: ReturnType<typeof setTimeout> | undefined =
undefined;
let lastEventId: string | undefined = undefined;
attempt(1);
function opWithLastEventId() {
const op = callOpts.op;
if (!lastEventId) {
return op;
}
return {
...op,
input: inputWithTrackedEventId(op.input, lastEventId),
};
}
function attempt(attempts: number) {
const op = opWithLastEventId();
next$ = callOpts.next(op).subscribe({
error(error) {
const shouldRetry = opts.retry({
op,
attempts,
error,
});
if (!shouldRetry) {
observer.error(error);
return;
}
const delayMs = opts.retryDelayMs?.(attempts) ?? 0;
if (delayMs <= 0) {
attempt(attempts + 1);
return;
}
callNextTimeout = setTimeout(
() => attempt(attempts + 1),
delayMs,
);
},
next(envelope) {
//
if (
(!envelope.result.type || envelope.result.type === 'data') &&
envelope.result.id
) {
//
lastEventId = envelope.result.id;
}
observer.next(envelope);
},
complete() {
observer.complete();
},
});
}
return () => {
next$.unsubscribe();
clearTimeout(callNextTimeout);
};
});
};
};
}

30
node_modules/@trpc/client/src/links/splitLink.ts generated vendored Normal file
View File

@@ -0,0 +1,30 @@
import { observable } from '@trpc/server/observable';
import type { AnyRouter } from '@trpc/server/unstable-core-do-not-import';
import { createChain } from './internals/createChain';
import type { Operation, TRPCLink } from './types';
function asArray<TType>(value: TType | TType[]) {
return Array.isArray(value) ? value : [value];
}
export function splitLink<TRouter extends AnyRouter = AnyRouter>(opts: {
condition: (op: Operation) => boolean;
/**
* The link to execute next if the test function returns `true`.
*/
true: TRPCLink<TRouter> | TRPCLink<TRouter>[];
/**
* The link to execute next if the test function returns `false`.
*/
false: TRPCLink<TRouter> | TRPCLink<TRouter>[];
}): TRPCLink<TRouter> {
return (runtime) => {
const yes = asArray(opts.true).map((link) => link(runtime));
const no = asArray(opts.false).map((link) => link(runtime));
return (props) => {
return observable((observer) => {
const links = opts.condition(props.op) ? yes : no;
return createChain({ op: props.op, links }).subscribe(observer);
});
};
};
}

111
node_modules/@trpc/client/src/links/types.ts generated vendored Normal file
View File

@@ -0,0 +1,111 @@
import type { Observable, Observer } from '@trpc/server/observable';
import type {
InferrableClientTypes,
Maybe,
TRPCResultMessage,
TRPCSuccessResponse,
} from '@trpc/server/unstable-core-do-not-import';
import type { ResponseEsque } from '../internals/types';
import type { TRPCClientError } from '../TRPCClientError';
import type { TRPCConnectionState } from './internals/subscriptions';
export {
isNonJsonSerializable,
isFormData,
isOctetType,
} from './internals/contentTypes';
/**
* @internal
*/
export interface OperationContext extends Record<string, unknown> {}
/**
* @internal
*/
export type Operation<TInput = unknown> = {
id: number;
type: 'mutation' | 'query' | 'subscription';
input: TInput;
path: string;
context: OperationContext;
signal: Maybe<AbortSignal>;
};
interface HeadersInitEsque {
[Symbol.iterator](): IterableIterator<[string, string]>;
}
/**
* @internal
*/
export type HTTPHeaders =
| HeadersInitEsque
| Record<string, string[] | string | undefined>;
/**
* The default `fetch` implementation has an overloaded signature. By convention this library
* only uses the overload taking a string and options object.
*/
export type TRPCFetch = (
url: string,
options?: RequestInit,
) => Promise<ResponseEsque>;
export interface TRPCClientRuntime {
// nothing here anymore
}
/**
* @internal
*/
export interface OperationResultEnvelope<TOutput, TError> {
result:
| TRPCResultMessage<TOutput>['result']
| TRPCSuccessResponse<TOutput>['result']
| TRPCConnectionState<TError>;
context?: OperationContext;
}
/**
* @internal
*/
export type OperationResultObservable<
TInferrable extends InferrableClientTypes,
TOutput,
> = Observable<
OperationResultEnvelope<TOutput, TRPCClientError<TInferrable>>,
TRPCClientError<TInferrable>
>;
/**
* @internal
*/
export type OperationResultObserver<
TInferrable extends InferrableClientTypes,
TOutput,
> = Observer<
OperationResultEnvelope<TOutput, TRPCClientError<TInferrable>>,
TRPCClientError<TInferrable>
>;
/**
* @internal
*/
export type OperationLink<
TInferrable extends InferrableClientTypes,
TInput = unknown,
TOutput = unknown,
> = (opts: {
op: Operation<TInput>;
next: (
op: Operation<TInput>,
) => OperationResultObservable<TInferrable, TOutput>;
}) => OperationResultObservable<TInferrable, TOutput>;
/**
* @public
*/
export type TRPCLink<TInferrable extends InferrableClientTypes> = (
opts: TRPCClientRuntime,
) => OperationLink<TInferrable>;

View File

@@ -0,0 +1,12 @@
import type { Encoder } from './wsClient/encoder';
import { jsonEncoder } from './wsClient/encoder';
import type { WebSocketClientOptions } from './wsClient/options';
import { WsClient } from './wsClient/wsClient';
export function createWSClient(opts: WebSocketClientOptions) {
return new WsClient(opts);
}
export type TRPCWebSocketClient = ReturnType<typeof createWSClient>;
export { jsonEncoder, type Encoder, type WebSocketClientOptions };

View File

@@ -0,0 +1,16 @@
import type { Encoder } from '@trpc/server/adapters/ws';
export type { Encoder };
export const jsonEncoder: Encoder = {
encode: (data) => JSON.stringify(data),
decode: (data) => {
if (typeof data !== 'string') {
throw new Error(
'jsonEncoder received binary data. JSON uses text frames. ' +
'Use a binary encoder for binary data.',
);
}
return JSON.parse(data);
},
};

View File

@@ -0,0 +1,97 @@
import type { UrlOptionsWithConnectionParams } from '../../internals/urlWithConnectionParams';
import type { Encoder } from './encoder';
export interface WebSocketClientOptions extends UrlOptionsWithConnectionParams {
/**
* Ponyfill which WebSocket implementation to use
*/
WebSocket?: typeof WebSocket;
/**
* The number of milliseconds before a reconnect is attempted.
* @default {@link exponentialBackoff}
*/
retryDelayMs?: (attemptIndex: number) => number;
/**
* Triggered when a WebSocket connection is established
*/
onOpen?: () => void;
/**
* Triggered when a WebSocket connection encounters an error
*/
onError?: (evt?: Event) => void;
/**
* Triggered when a WebSocket connection is closed
*/
onClose?: (cause?: { code?: number }) => void;
/**
* Lazy mode will close the WebSocket automatically after a period of inactivity (no messages sent or received and no pending requests)
*/
lazy?: {
/**
* Enable lazy mode
* @default false
*/
enabled: boolean;
/**
* Close the WebSocket after this many milliseconds
* @default 0
*/
closeMs: number;
};
/**
* Send ping messages to the server and kill the connection if no pong message is returned
*/
keepAlive?: {
/**
* @default false
*/
enabled: boolean;
/**
* Send a ping message every this many milliseconds
* @default 5_000
*/
intervalMs?: number;
/**
* Close the WebSocket after this many milliseconds if the server does not respond
* @default 1_000
*/
pongTimeoutMs?: number;
};
/**
* Custom encoder for wire encoding (e.g. custom binary formats)
* @default jsonEncoder
*/
experimental_encoder?: Encoder;
}
/**
* Default options for lazy WebSocket connections.
* Determines whether the connection should be established lazily and defines the delay before closure.
*/
export type LazyOptions = Required<NonNullable<WebSocketClientOptions['lazy']>>;
export const lazyDefaults: LazyOptions = {
enabled: false,
closeMs: 0,
};
/**
* Default options for the WebSocket keep-alive mechanism.
* Configures whether keep-alive is enabled and specifies the timeout and interval for ping-pong messages.
*/
export type KeepAliveOptions = Required<
NonNullable<WebSocketClientOptions['keepAlive']>
>;
export const keepAliveDefaults: KeepAliveOptions = {
enabled: false,
pongTimeoutMs: 1_000,
intervalMs: 5_000,
};
/**
* Calculates a delay for exponential backoff based on the retry attempt index.
* The delay starts at 0 for the first attempt and doubles for each subsequent attempt,
* capped at 30 seconds.
*/
export const exponentialBackoff = (attemptIndex: number) => {
return attemptIndex === 0 ? 0 : Math.min(1000 * 2 ** attemptIndex, 30000);
};

View File

@@ -0,0 +1,183 @@
import type { AnyTRPCRouter, inferRouterError } from '@trpc/server';
import type { Observer } from '@trpc/server/observable';
import type {
TRPCClientOutgoingMessage,
TRPCResponseMessage,
} from '@trpc/server/unstable-core-do-not-import';
import type { TRPCClientError } from '../../../TRPCClientError';
import { withResolvers } from './utils';
export type TCallbacks = Observer<
TRPCResponseMessage<unknown, inferRouterError<AnyTRPCRouter>>,
TRPCClientError<AnyTRPCRouter>
>;
type MessageId = string;
type MessageIdLike = string | number | null;
/**
* Represents a WebSocket request managed by the RequestManager.
* Combines the network message, a utility promise (`end`) that mirrors the lifecycle
* handled by `callbacks`, and a set of state monitoring callbacks.
*/
interface Request {
message: TRPCClientOutgoingMessage;
end: Promise<void>;
callbacks: TCallbacks;
}
/**
* Manages WebSocket requests, tracking their lifecycle and providing utility methods
* for handling outgoing and pending requests.
*
* - **Outgoing requests**: Requests that are queued and waiting to be sent.
* - **Pending requests**: Requests that have been sent and are in flight awaiting a response.
* For subscriptions, multiple responses may be received until the subscription is closed.
*/
export class RequestManager {
/**
* Stores requests that are outgoing, meaning they are registered but not yet sent over the WebSocket.
*/
private outgoingRequests = new Array<Request & { id: MessageId }>();
/**
* Stores requests that are pending (in flight), meaning they have been sent over the WebSocket
* and are awaiting responses. For subscriptions, this includes requests
* that may receive multiple responses.
*/
private pendingRequests: Record<MessageId, Request> = {};
/**
* Registers a new request by adding it to the outgoing queue and setting up
* callbacks for lifecycle events such as completion or error.
*
* @param message - The outgoing message to be sent.
* @param callbacks - Callback functions to observe the request's state.
* @returns A cleanup function to manually remove the request.
*/
public register(message: TRPCClientOutgoingMessage, callbacks: TCallbacks) {
const { promise: end, resolve } = withResolvers<void>();
this.outgoingRequests.push({
id: String(message.id),
message,
end,
callbacks: {
next: callbacks.next,
complete: () => {
callbacks.complete();
resolve();
},
error: (e) => {
callbacks.error(e);
resolve();
},
},
});
return () => {
this.delete(message.id);
callbacks.complete();
resolve();
};
}
/**
* Deletes a request from both the outgoing and pending collections, if it exists.
*/
public delete(messageId: MessageIdLike) {
if (messageId === null) return;
this.outgoingRequests = this.outgoingRequests.filter(
({ id }) => id !== String(messageId),
);
delete this.pendingRequests[String(messageId)];
}
/**
* Moves all outgoing requests to the pending state and clears the outgoing queue.
*
* The caller is expected to handle the actual sending of the requests
* (e.g., sending them over the network) after this method is called.
*
* @returns The list of requests that were transitioned to the pending state.
*/
public flush() {
const requests = this.outgoingRequests;
this.outgoingRequests = [];
for (const request of requests) {
this.pendingRequests[request.id] = request;
}
return requests;
}
/**
* Retrieves all currently pending requests, which are in flight awaiting responses
* or handling ongoing subscriptions.
*/
public getPendingRequests() {
return Object.values(this.pendingRequests);
}
/**
* Retrieves a specific pending request by its message ID.
*/
public getPendingRequest(messageId: MessageIdLike) {
if (messageId === null) return null;
return this.pendingRequests[String(messageId)];
}
/**
* Retrieves all outgoing requests, which are waiting to be sent.
*/
public getOutgoingRequests() {
return this.outgoingRequests;
}
/**
* Retrieves all requests, both outgoing and pending, with their respective states.
*
* @returns An array of all requests with their state ("outgoing" or "pending").
*/
public getRequests() {
return [
...this.getOutgoingRequests().map((request) => ({
state: 'outgoing' as const,
message: request.message,
end: request.end,
callbacks: request.callbacks,
})),
...this.getPendingRequests().map((request) => ({
state: 'pending' as const,
message: request.message,
end: request.end,
callbacks: request.callbacks,
})),
];
}
/**
* Checks if there are any pending requests, including ongoing subscriptions.
*/
public hasPendingRequests() {
return this.getPendingRequests().length > 0;
}
/**
* Checks if there are any pending subscriptions
*/
public hasPendingSubscriptions() {
return this.getPendingRequests().some(
(request) => request.message.method === 'subscription',
);
}
/**
* Checks if there are any outgoing requests waiting to be sent.
*/
public hasOutgoingRequests() {
return this.outgoingRequests.length > 0;
}
}

View File

@@ -0,0 +1,96 @@
import type {
TRPCConnectionParamsMessage,
TRPCRequestInfo,
} from '@trpc/server/unstable-core-do-not-import';
import type {
CallbackOrValue,
UrlOptionsWithConnectionParams,
} from '../../internals/urlWithConnectionParams';
import { resultOf } from '../../internals/urlWithConnectionParams';
import type { Encoder } from './encoder';
export class TRPCWebSocketClosedError extends Error {
constructor(opts: { message: string; cause?: unknown }) {
super(opts.message, {
cause: opts.cause,
});
this.name = 'TRPCWebSocketClosedError';
Object.setPrototypeOf(this, TRPCWebSocketClosedError.prototype);
}
}
/**
* Utility class for managing a timeout that can be started, stopped, and reset.
* Useful for scenarios where the timeout duration is reset dynamically based on events.
*/
export class ResettableTimeout {
private timeout: ReturnType<typeof setTimeout> | undefined;
constructor(
private readonly onTimeout: () => void,
private readonly timeoutMs: number,
) {}
/**
* Resets the current timeout, restarting it with the same duration.
* Does nothing if no timeout is active.
*/
public reset() {
if (!this.timeout) return;
clearTimeout(this.timeout);
this.timeout = setTimeout(this.onTimeout, this.timeoutMs);
}
public start() {
clearTimeout(this.timeout);
this.timeout = setTimeout(this.onTimeout, this.timeoutMs);
}
public stop() {
clearTimeout(this.timeout);
this.timeout = undefined;
}
}
// Ponyfill for Promise.withResolvers https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers
export function withResolvers<T>() {
let resolve: (value: T | PromiseLike<T>) => void;
let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return { promise, resolve: resolve!, reject: reject! };
}
/**
* Resolves a WebSocket URL and optionally appends connection parameters.
*
* If connectionParams are provided, appends 'connectionParams=1' query parameter.
*/
export async function prepareUrl(urlOptions: UrlOptionsWithConnectionParams) {
const url = await resultOf(urlOptions.url);
if (!urlOptions.connectionParams) return url;
// append `?connectionParams=1` when connection params are used
const prefix = url.includes('?') ? '&' : '?';
const connectionParams = `${prefix}connectionParams=1`;
return url + connectionParams;
}
export async function buildConnectionMessage(
connectionParams: CallbackOrValue<TRPCRequestInfo['connectionParams']>,
encoder: Encoder,
) {
const message: TRPCConnectionParamsMessage = {
method: 'connectionParams',
data: await resultOf(connectionParams),
};
return encoder.encode(message);
}

View File

@@ -0,0 +1,440 @@
import type { AnyTRPCRouter } from '@trpc/server';
import type { BehaviorSubject } from '@trpc/server/observable';
import { behaviorSubject, observable } from '@trpc/server/observable';
import type {
CombinedDataTransformer,
TRPCClientIncomingMessage,
TRPCClientIncomingRequest,
TRPCClientOutgoingMessage,
TRPCResponseMessage,
} from '@trpc/server/unstable-core-do-not-import';
import {
run,
sleep,
transformResult,
} from '@trpc/server/unstable-core-do-not-import';
import { TRPCClientError } from '../../../TRPCClientError';
import type { TRPCConnectionState } from '../../internals/subscriptions';
import type { Operation, OperationResultEnvelope } from '../../types';
import type { Encoder } from './encoder';
import { jsonEncoder } from './encoder';
import type { WebSocketClientOptions } from './options';
import { exponentialBackoff, keepAliveDefaults, lazyDefaults } from './options';
import type { TCallbacks } from './requestManager';
import { RequestManager } from './requestManager';
import { ResettableTimeout, TRPCWebSocketClosedError } from './utils';
import { backwardCompatibility, WsConnection } from './wsConnection';
/**
* A WebSocket client for managing TRPC operations, supporting lazy initialization,
* reconnection, keep-alive, and request management.
*/
export class WsClient {
/**
* Observable tracking the current connection state, including errors.
*/
public readonly connectionState: BehaviorSubject<
TRPCConnectionState<TRPCClientError<AnyTRPCRouter>>
>;
private allowReconnect = false;
private requestManager = new RequestManager();
private readonly activeConnection: WsConnection;
private readonly reconnectRetryDelay: (attemptIndex: number) => number;
private inactivityTimeout: ResettableTimeout;
private readonly callbacks: Pick<
WebSocketClientOptions,
'onOpen' | 'onClose' | 'onError'
>;
private readonly lazyMode: boolean;
private readonly encoder: Encoder;
constructor(opts: WebSocketClientOptions) {
this.encoder = opts.experimental_encoder ?? jsonEncoder;
// Initialize callbacks, connection parameters, and options.
this.callbacks = {
onOpen: opts.onOpen,
onClose: opts.onClose,
onError: opts.onError,
};
const lazyOptions = {
...lazyDefaults,
...opts.lazy,
};
// Set up inactivity timeout for lazy connections.
this.inactivityTimeout = new ResettableTimeout(() => {
if (
this.requestManager.hasOutgoingRequests() ||
this.requestManager.hasPendingRequests()
) {
this.inactivityTimeout.reset();
return;
}
this.close().catch(() => null);
}, lazyOptions.closeMs);
// Initialize the WebSocket connection.
this.activeConnection = new WsConnection({
WebSocketPonyfill: opts.WebSocket,
urlOptions: opts,
keepAlive: {
...keepAliveDefaults,
...opts.keepAlive,
},
encoder: this.encoder,
});
this.activeConnection.wsObservable.subscribe({
next: (ws) => {
if (!ws) return;
this.setupWebSocketListeners(ws);
},
});
this.reconnectRetryDelay = opts.retryDelayMs ?? exponentialBackoff;
this.lazyMode = lazyOptions.enabled;
this.connectionState = behaviorSubject<
TRPCConnectionState<TRPCClientError<AnyTRPCRouter>>
>({
type: 'state',
state: lazyOptions.enabled ? 'idle' : 'connecting',
error: null,
});
// Automatically open the connection if lazy mode is disabled.
if (!this.lazyMode) {
this.open().catch(() => null);
}
}
/**
* Opens the WebSocket connection. Handles reconnection attempts and updates
* the connection state accordingly.
*/
private async open() {
this.allowReconnect = true;
if (this.connectionState.get().state === 'idle') {
this.connectionState.next({
type: 'state',
state: 'connecting',
error: null,
});
}
try {
await this.activeConnection.open();
} catch (error) {
this.reconnect(
new TRPCWebSocketClosedError({
message: 'Initialization error',
cause: error,
}),
);
return this.reconnecting;
}
}
/**
* Closes the WebSocket connection and stops managing requests.
* Ensures all outgoing and pending requests are properly finalized.
*/
public async close() {
this.allowReconnect = false;
this.inactivityTimeout.stop();
const requestsToAwait: Promise<void>[] = [];
for (const request of this.requestManager.getRequests()) {
if (request.message.method === 'subscription') {
request.callbacks.complete();
} else if (request.state === 'outgoing') {
request.callbacks.error(
TRPCClientError.from(
new TRPCWebSocketClosedError({
message: 'Closed before connection was established',
}),
),
);
} else {
requestsToAwait.push(request.end);
}
}
await Promise.all(requestsToAwait).catch(() => null);
await this.activeConnection.close().catch(() => null);
this.connectionState.next({
type: 'state',
state: 'idle',
error: null,
});
}
/**
* Method to request the server.
* Handles data transformation, batching of requests, and subscription lifecycle.
*
* @param op - The operation details including id, type, path, input and signal
* @param transformer - Data transformer for serializing requests and deserializing responses
* @param lastEventId - Optional ID of the last received event for subscriptions
*
* @returns An observable that emits operation results and handles cleanup
*/
public request({
op: { id, type, path, input, signal },
transformer,
lastEventId,
}: {
op: Pick<Operation, 'id' | 'type' | 'path' | 'input' | 'signal'>;
transformer: CombinedDataTransformer;
lastEventId?: string;
}) {
return observable<
OperationResultEnvelope<unknown, TRPCClientError<AnyTRPCRouter>>,
TRPCClientError<AnyTRPCRouter>
>((observer) => {
const abort = this.batchSend(
{
id,
method: type,
params: {
input: transformer.input.serialize(input),
path,
lastEventId,
},
},
{
...observer,
next(event) {
const transformed = transformResult(event, transformer.output);
if (!transformed.ok) {
observer.error(TRPCClientError.from(transformed.error));
return;
}
observer.next({
result: transformed.result,
});
},
},
);
return () => {
abort();
if (type === 'subscription' && this.activeConnection.isOpen()) {
this.send({
id,
method: 'subscription.stop',
});
}
signal?.removeEventListener('abort', abort);
};
});
}
public get connection() {
return backwardCompatibility(this.activeConnection);
}
/**
* Manages the reconnection process for the WebSocket using retry logic.
* Ensures that only one reconnection attempt is active at a time by tracking the current
* reconnection state in the `reconnecting` promise.
*/
private reconnecting: Promise<void> | null = null;
private reconnect(closedError: TRPCWebSocketClosedError) {
this.connectionState.next({
type: 'state',
state: 'connecting',
error: TRPCClientError.from(closedError),
});
if (this.reconnecting) return;
const tryReconnect = async (attemptIndex: number) => {
try {
await sleep(this.reconnectRetryDelay(attemptIndex));
if (this.allowReconnect) {
await this.activeConnection.close();
await this.activeConnection.open();
if (this.requestManager.hasPendingRequests()) {
this.send(
this.requestManager
.getPendingRequests()
.map(({ message }) => message),
);
}
}
this.reconnecting = null;
} catch {
await tryReconnect(attemptIndex + 1);
}
};
this.reconnecting = tryReconnect(0);
}
private setupWebSocketListeners(ws: WebSocket) {
const handleCloseOrError = (cause: unknown) => {
const reqs = this.requestManager.getPendingRequests();
for (const { message, callbacks } of reqs) {
if (message.method === 'subscription') continue;
callbacks.error(
TRPCClientError.from(
cause ??
new TRPCWebSocketClosedError({
message: 'WebSocket closed',
cause,
}),
),
);
this.requestManager.delete(message.id);
}
};
ws.addEventListener('open', () => {
run(async () => {
if (this.lazyMode) {
this.inactivityTimeout.start();
}
this.callbacks.onOpen?.();
this.connectionState.next({
type: 'state',
state: 'pending',
error: null,
});
}).catch((error) => {
ws.close(3000);
handleCloseOrError(error);
});
});
ws.addEventListener('message', ({ data }) => {
this.inactivityTimeout.reset();
// Handle PING/PONG as text regardless of encoder
if (['PING', 'PONG'].includes(data)) return;
const incomingMessage = this.encoder.decode(
data,
) as TRPCClientIncomingMessage;
if ('method' in incomingMessage) {
this.handleIncomingRequest(incomingMessage);
return;
}
this.handleResponseMessage(incomingMessage);
});
ws.addEventListener('close', (event) => {
handleCloseOrError(event);
this.callbacks.onClose?.(event);
if (!this.lazyMode || this.requestManager.hasPendingSubscriptions()) {
this.reconnect(
new TRPCWebSocketClosedError({
message: 'WebSocket closed',
cause: event,
}),
);
}
});
ws.addEventListener('error', (event) => {
handleCloseOrError(event);
this.callbacks.onError?.(event);
this.reconnect(
new TRPCWebSocketClosedError({
message: 'WebSocket closed',
cause: event,
}),
);
});
}
private handleResponseMessage(message: TRPCResponseMessage) {
const request = this.requestManager.getPendingRequest(message.id);
if (!request) return;
request.callbacks.next(message);
let completed = true;
if ('result' in message && request.message.method === 'subscription') {
if (message.result.type === 'data') {
request.message.params.lastEventId = message.result.id;
}
if (message.result.type !== 'stopped') {
completed = false;
}
}
if (completed) {
request.callbacks.complete();
this.requestManager.delete(message.id);
}
}
private handleIncomingRequest(message: TRPCClientIncomingRequest) {
if (message.method === 'reconnect') {
this.reconnect(
new TRPCWebSocketClosedError({
message: 'Server requested reconnect',
}),
);
}
}
/**
* Sends a message or batch of messages directly to the server.
*/
private send(
messageOrMessages: TRPCClientOutgoingMessage | TRPCClientOutgoingMessage[],
) {
if (!this.activeConnection.isOpen()) {
throw new Error('Active connection is not open');
}
const messages =
messageOrMessages instanceof Array
? messageOrMessages
: [messageOrMessages];
this.activeConnection.ws.send(
this.encoder.encode(messages.length === 1 ? messages[0] : messages),
);
}
/**
* Groups requests for batch sending.
*
* @returns A function to abort the batched request.
*/
private batchSend(message: TRPCClientOutgoingMessage, callbacks: TCallbacks) {
this.inactivityTimeout.reset();
run(async () => {
if (!this.activeConnection.isOpen()) {
await this.open();
}
await sleep(0);
if (!this.requestManager.hasOutgoingRequests()) return;
this.send(this.requestManager.flush().map(({ message }) => message));
}).catch((err) => {
this.requestManager.delete(message.id);
callbacks.error(TRPCClientError.from(err));
});
return this.requestManager.register(message, callbacks);
}
}

View File

@@ -0,0 +1,255 @@
import { behaviorSubject } from '@trpc/server/observable';
import type { UrlOptionsWithConnectionParams } from '../../internals/urlWithConnectionParams';
import type { Encoder } from './encoder';
import { buildConnectionMessage, prepareUrl, withResolvers } from './utils';
/**
* Opens a WebSocket connection asynchronously and returns a promise
* that resolves when the connection is successfully established.
* The promise rejects if an error occurs during the connection attempt.
*/
function asyncWsOpen(ws: WebSocket) {
const { promise, resolve, reject } = withResolvers<void>();
ws.addEventListener('open', () => {
ws.removeEventListener('error', reject);
resolve();
});
ws.addEventListener('error', reject);
return promise;
}
interface PingPongOptions {
/**
* The interval (in milliseconds) between "PING" messages.
*/
intervalMs: number;
/**
* The timeout (in milliseconds) to wait for a "PONG" response before closing the connection.
*/
pongTimeoutMs: number;
}
/**
* Sets up a periodic ping-pong mechanism to keep the WebSocket connection alive.
*
* - Sends "PING" messages at regular intervals defined by `intervalMs`.
* - If a "PONG" response is not received within the `pongTimeoutMs`, the WebSocket is closed.
* - The ping timer resets upon receiving any message to maintain activity.
* - Automatically starts the ping process when the WebSocket connection is opened.
* - Cleans up timers when the WebSocket is closed.
*
* @param ws - The WebSocket instance to manage.
* @param options - Configuration options for ping-pong intervals and timeouts.
*/
function setupPingInterval(
ws: WebSocket,
{ intervalMs, pongTimeoutMs }: PingPongOptions,
) {
let pingTimeout: ReturnType<typeof setTimeout> | undefined;
let pongTimeout: ReturnType<typeof setTimeout> | undefined;
function start() {
pingTimeout = setTimeout(() => {
ws.send('PING');
pongTimeout = setTimeout(() => {
ws.close();
}, pongTimeoutMs);
}, intervalMs);
}
function reset() {
clearTimeout(pingTimeout);
start();
}
function pong() {
clearTimeout(pongTimeout);
reset();
}
ws.addEventListener('open', start);
ws.addEventListener('message', ({ data }) => {
clearTimeout(pingTimeout);
start();
if (data === 'PONG') {
pong();
}
});
ws.addEventListener('close', () => {
clearTimeout(pingTimeout);
clearTimeout(pongTimeout);
});
}
export interface WebSocketConnectionOptions {
WebSocketPonyfill?: typeof WebSocket;
urlOptions: UrlOptionsWithConnectionParams;
keepAlive: PingPongOptions & {
enabled: boolean;
};
encoder: Encoder;
}
/**
* Manages a WebSocket connection with support for reconnection, keep-alive mechanisms,
* and observable state tracking.
*/
export class WsConnection {
static connectCount = 0;
public id = ++WsConnection.connectCount;
private readonly WebSocketPonyfill: typeof WebSocket;
private readonly urlOptions: UrlOptionsWithConnectionParams;
private readonly keepAliveOpts: WebSocketConnectionOptions['keepAlive'];
private readonly encoder: Encoder;
public readonly wsObservable = behaviorSubject<WebSocket | null>(null);
constructor(opts: WebSocketConnectionOptions) {
this.WebSocketPonyfill = opts.WebSocketPonyfill ?? WebSocket;
if (!this.WebSocketPonyfill) {
throw new Error(
"No WebSocket implementation found - you probably don't want to use this on the server, but if you do you need to pass a `WebSocket`-ponyfill",
);
}
this.urlOptions = opts.urlOptions;
this.keepAliveOpts = opts.keepAlive;
this.encoder = opts.encoder;
}
public get ws() {
return this.wsObservable.get();
}
private set ws(ws) {
this.wsObservable.next(ws);
}
/**
* Checks if the WebSocket connection is open and ready to communicate.
*/
public isOpen(): this is { ws: WebSocket } {
return (
!!this.ws &&
this.ws.readyState === this.WebSocketPonyfill.OPEN &&
!this.openPromise
);
}
/**
* Checks if the WebSocket connection is closed or in the process of closing.
*/
public isClosed(): this is { ws: WebSocket } {
return (
!!this.ws &&
(this.ws.readyState === this.WebSocketPonyfill.CLOSING ||
this.ws.readyState === this.WebSocketPonyfill.CLOSED)
);
}
/**
* Manages the WebSocket opening process, ensuring that only one open operation
* occurs at a time. Tracks the ongoing operation with `openPromise` to avoid
* redundant calls and ensure proper synchronization.
*
* Sets up the keep-alive mechanism and necessary event listeners for the connection.
*
* @returns A promise that resolves once the WebSocket connection is successfully opened.
*/
private openPromise: Promise<void> | null = null;
public async open() {
if (this.openPromise) return this.openPromise;
this.id = ++WsConnection.connectCount;
const wsPromise = prepareUrl(this.urlOptions).then(
(url) => new this.WebSocketPonyfill(url),
);
this.openPromise = wsPromise.then(async (ws) => {
this.ws = ws;
// Set binaryType to handle both text and binary messages consistently
ws.binaryType = 'arraybuffer';
// Setup ping listener
ws.addEventListener('message', function ({ data }) {
if (data === 'PING') {
this.send('PONG');
}
});
if (this.keepAliveOpts.enabled) {
setupPingInterval(ws, this.keepAliveOpts);
}
ws.addEventListener('close', () => {
if (this.ws === ws) {
this.ws = null;
}
});
await asyncWsOpen(ws);
if (this.urlOptions.connectionParams) {
ws.send(
await buildConnectionMessage(
this.urlOptions.connectionParams,
this.encoder,
),
);
}
});
try {
await this.openPromise;
} finally {
this.openPromise = null;
}
}
/**
* Closes the WebSocket connection gracefully.
* Waits for any ongoing open operation to complete before closing.
*/
public async close() {
try {
await this.openPromise;
} finally {
this.ws?.close();
}
}
}
/**
* Provides a backward-compatible representation of the connection state.
*/
export function backwardCompatibility(connection: WsConnection) {
if (connection.isOpen()) {
return {
id: connection.id,
state: 'open',
ws: connection.ws,
} as const;
}
if (connection.isClosed()) {
return {
id: connection.id,
state: 'closed',
ws: connection.ws,
} as const;
}
if (!connection.ws) {
return null;
}
return {
id: connection.id,
state: 'connecting',
ws: connection.ws,
} as const;
}

62
node_modules/@trpc/client/src/links/wsLink/wsLink.ts generated vendored Normal file
View File

@@ -0,0 +1,62 @@
import { observable } from '@trpc/server/observable';
import type {
AnyRouter,
inferClientTypes,
} from '@trpc/server/unstable-core-do-not-import';
import type { TransformerOptions } from '../../unstable-internals';
import { getTransformer } from '../../unstable-internals';
import type { TRPCLink } from '../types';
import type {
Encoder,
TRPCWebSocketClient,
WebSocketClientOptions,
} from './createWsClient';
import { createWSClient, jsonEncoder } from './createWsClient';
export type WebSocketLinkOptions<TRouter extends AnyRouter> = {
client: TRPCWebSocketClient;
} & TransformerOptions<inferClientTypes<TRouter>>;
export function wsLink<TRouter extends AnyRouter>(
opts: WebSocketLinkOptions<TRouter>,
): TRPCLink<TRouter> {
const { client } = opts;
const transformer = getTransformer(opts.transformer);
return () => {
return ({ op }) => {
return observable((observer) => {
const connStateSubscription =
op.type === 'subscription'
? client.connectionState.subscribe({
next(result) {
observer.next({
result,
context: op.context,
});
},
})
: null;
const requestSubscription = client
.request({
op,
transformer,
})
.subscribe(observer);
return () => {
requestSubscription.unsubscribe();
connStateSubscription?.unsubscribe();
};
});
};
};
}
export {
createWSClient,
jsonEncoder,
type Encoder,
type TRPCWebSocketClient,
type WebSocketClientOptions,
};

2
node_modules/@trpc/client/src/unstable-internals.ts generated vendored Normal file
View File

@@ -0,0 +1,2 @@
export * from './internals/transformer';
export * from './links/internals/subscriptions';