import { DebugLogger } from '@affine/debug'; import { Unreachable } from '@affine/env/constant'; import { type OperatorFunction, Subject } from 'rxjs'; const logger = new DebugLogger('effect'); export type Effect = (T | undefined extends T // hack to detect if T is unknown ? () => void : (value: T) => void) & { // unsubscribe effect, all ongoing effects will be cancelled. unsubscribe: () => void; }; /** * Create an effect. * * `effect( op1, op2, op3, ... )` * * You can think of an effect as a pipeline. When the effect is called, argument will be sent to the pipeline, * and the operators in the pipeline can be triggered. * * * * @example * ```ts * const loadUser = effect( * switchMap((id: number) => * from(fetchUser(id)).pipe( * mapInto(user$), * catchErrorInto(error$), * onStart(() => isLoading$.next(true)), * onComplete(() => isLoading$.next(false)) * ) * ) * ); * * // emit value to effect * loadUser(1); * * // unsubscribe effect, will stop all ongoing processes * loadUser.unsubscribe(); * ``` */ export function effect(op1: OperatorFunction): Effect; export function effect( op1: OperatorFunction, op2: OperatorFunction ): Effect; export function effect( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction ): Effect; export function effect( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction ): Effect; export function effect( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction ): Effect; export function effect( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction ): Effect; export function effect(...args: any[]) { const subject$ = new Subject(); const effectLocation = environment.isDebug ? `(${new Error().stack?.split('\n')[2].trim()})` : ''; class EffectError extends Unreachable { constructor(message: string, value?: any) { logger.error(`effect ${effectLocation} ${message}`, value); super( `effect ${effectLocation} ${message}` + ` ${value ? (value instanceof Error ? (value.stack ?? value.message) : value + '') : ''}` ); } } // eslint-disable-next-line prefer-spread const subscription = subject$.pipe.apply(subject$, args as any).subscribe({ next(value) { const error = new EffectError('should not emit value', value); setImmediate(() => { throw error; }); }, complete() { const error = new EffectError('effect unexpected complete'); setImmediate(() => { throw error; }); }, error(error) { const effectError = new EffectError('effect uncaught error', error); setImmediate(() => { throw effectError; }); }, }); const fn = (value: unknown) => { subject$.next(value); }; fn.unsubscribe = () => subscription.unsubscribe(); return fn as never; }