diff --git a/.changeset/create-effect.md b/.changeset/create-effect.md new file mode 100644 index 000000000..13949bc56 --- /dev/null +++ b/.changeset/create-effect.md @@ -0,0 +1,10 @@ +--- +'@tanstack/db': minor +'@tanstack/react-db': minor +--- + +Add `createEffect` API for reactive delta-driven effects and `useLiveQueryEffect` React hook. + +`createEffect` attaches handlers to a live query's delta stream — firing callbacks when rows enter, exit, or update within a query result — without materialising the full result set. Supports per-row and batch handlers, `skipInitial`, `orderBy` + `limit` (top-K window), joins, lazy loading, transaction coalescing, async disposal with `AbortSignal`, and `onSourceError` / `onError` callbacks. + +`useLiveQueryEffect` is the React hook wrapper that manages the effect lifecycle (create on mount, dispose on unmount, recreate on dependency change). diff --git a/docs/guides/live-queries.md b/docs/guides/live-queries.md index b69689e16..c532bfd41 100644 --- a/docs/guides/live-queries.md +++ b/docs/guides/live-queries.md @@ -42,6 +42,7 @@ The result types are automatically inferred from your query structure, providing - [Distinct](#distinct) - [Order By, Limit, and Offset](#order-by-limit-and-offset) - [Composable Queries](#composable-queries) +- [Reactive Effects (createEffect)](#reactive-effects-createeffect) - [Expression Functions Reference](#expression-functions-reference) - [Functional Variants](#functional-variants) @@ -1687,6 +1688,370 @@ const users = createLiveQueryCollection((q) => This approach makes your query logic more modular, testable, and reusable across your application. +## Reactive Effects (createEffect) + +While live query collections materialise query results into a collection you can subscribe to and iterate over, **reactive effects** let you respond to query result *changes* without materialising the full result set. Effects fire callbacks when rows enter, exit, or update within a query result. + +This is useful for triggering side effects — sending notifications, syncing to external systems, generating AI responses, updating counters — whenever your data changes. + +### When to Use Effects vs Live Query Collections + +| Use case | Approach | +|----------|----------| +| Display query results in UI | Live query collection + `useLiveQuery` | +| React to changes (side effects) | `createEffect` / `useLiveQueryEffect` | +| Track new items entering a result set | `createEffect` with `on: 'enter'` | +| Monitor items leaving a result set | `createEffect` with `on: 'exit'` | +| Respond to updates within a result set | `createEffect` with `on: 'update'` | + +### Basic Usage + +```ts +import { createEffect, eq } from '@tanstack/db' + +const effect = createEffect({ + query: (q) => + q + .from({ msg: messagesCollection }) + .where(({ msg }) => eq(msg.role, 'user')), + on: 'enter', + handler: async (event) => { + console.log('New user message:', event.value) + await generateResponse(event.value) + }, +}) + +// Later: stop the effect +await effect.dispose() +``` + +### Configuration + +`createEffect` accepts an `EffectConfig` object: + +```ts +const effect = createEffect({ + id: 'my-effect', // Optional: auto-generated if not provided + query: (q) => q.from(...), // Query to watch + on: 'delta', // Which delta types to handle + handler: (event, ctx) => { ... }, // Per-row handler + batchHandler: (events, ctx) => { ... }, // Per-batch handler + onError: (error, event) => { ... }, // Handler error callback + onSourceError: (error) => { ... }, // Source collection error callback + skipInitial: false, // Skip deltas during initial load +}) +``` + +| Option | Type | Description | +|--------|------|-------------| +| `id` | `string` (optional) | Identifier for debugging/tracing. Auto-generated as `live-query-effect-{n}` if not provided. | +| `query` | `QueryBuilder` or function | The query to watch. Accepts the same builder function or `QueryBuilder` instance as live query collections. | +| `on` | `DeltaType \| DeltaType[] \| 'delta'` | Which delta types to fire handlers for. Use `'delta'` for all types, or specify one or more of `'enter'`, `'exit'`, `'update'`. | +| `handler` | `(event, ctx) => void \| Promise` (optional) | Called once for each matching delta event. | +| `batchHandler` | `(events, ctx) => void \| Promise` (optional) | Called once per batch with all matching delta events. | +| `onError` | `(error, event) => void` (optional) | Called when `handler` or `batchHandler` throws or rejects. | +| `onSourceError` | `(error) => void` (optional) | Called when a source collection enters an error or cleaned-up state. The effect is automatically disposed after this fires. If not provided, the error is logged to `console.error`. | +| `skipInitial` | `boolean` (optional) | When `true`, deltas from the initial data load are suppressed. Only subsequent changes fire handlers. Defaults to `false`. | + +### Delta Events + +Each delta event describes a single row change within the query result: + +```ts +interface DeltaEvent { + type: 'enter' | 'exit' | 'update' + key: TKey + value: TRow + previousValue?: TRow // Only present for 'update' events +} +``` + +| Event type | Meaning | `value` | `previousValue` | +|------------|---------|---------|------------------| +| `enter` | Row entered the query result | The new row | — | +| `exit` | Row left the query result | The exiting row | — | +| `update` | Row changed but stayed in the result | The new row | The row before the change | + +### The `on` Parameter + +Control which delta types your handlers receive: + +```ts +// Only new rows entering the result +createEffect({ on: 'enter', ... }) + +// Only rows leaving the result +createEffect({ on: 'exit', ... }) + +// Only rows that changed but stayed in the result +createEffect({ on: 'update', ... }) + +// Multiple specific types +createEffect({ on: ['enter', 'exit'], ... }) + +// All delta types +createEffect({ on: 'delta', ... }) +``` + +### Per-Row vs Batch Handlers + +You can provide a `handler` (called once per event), a `batchHandler` (called once per batch with all events), or both: + +```ts +createEffect({ + query: (q) => q.from({ user: usersCollection }), + on: 'delta', + + // Called once for each delta event + handler: (event, ctx) => { + console.log(`${event.type}: ${event.key}`) + }, + + // Called once per batch with all events + batchHandler: (events, ctx) => { + console.log(`Batch of ${events.length} events`) + }, +}) +``` + +Both handlers receive an `EffectContext`: + +```ts +interface EffectContext { + effectId: string // The effect's ID + signal: AbortSignal // Aborted when effect.dispose() is called +} +``` + +The `signal` is useful for cancelling in-flight async work when the effect is disposed: + +```ts +createEffect({ + query: (q) => q.from({ task: tasksCollection }), + on: 'enter', + handler: async (event, ctx) => { + const result = await fetch('/api/process', { + method: 'POST', + body: JSON.stringify(event.value), + signal: ctx.signal, // Cancelled on dispose + }) + // ... + }, +}) +``` + +### Skipping Initial Data + +By default, effects process all data including the initial load. Set `skipInitial: true` to only respond to changes that happen after the initial sync: + +```ts +// Only react to NEW messages, not existing ones +const effect = createEffect({ + query: (q) => + q.from({ msg: messagesCollection }) + .where(({ msg }) => eq(msg.role, 'user')), + on: 'enter', + skipInitial: true, + handler: async (event) => { + await sendNotification(event.value) + }, +}) +``` + +### Error Handling + +Errors thrown by `handler` or `batchHandler` (sync or async) are caught and routed to `onError`. If no `onError` is provided, they are logged to `console.error`: + +```ts +createEffect({ + query: (q) => q.from({ order: ordersCollection }), + on: 'enter', + handler: async (event) => { + await processOrder(event.value) + }, + onError: (error, event) => { + console.error(`Failed to process order ${event.key}:`, error) + reportToErrorTracker(error) + }, +}) +``` + +If a source collection enters an error or cleaned-up state, the effect automatically disposes itself. Use `onSourceError` to handle this: + +```ts +createEffect({ + query: (q) => q.from({ data: dataCollection }), + on: 'delta', + handler: (event) => { ... }, + onSourceError: (error) => { + console.warn('Data source failed, effect disposed:', error.message) + }, +}) +``` + +### Disposal + +`createEffect` returns an `Effect` handle with a `dispose()` method: + +```ts +const effect = createEffect({ ... }) + +// Check if disposed +console.log(effect.disposed) // false + +// Dispose: unsubscribes from sources, aborts the signal, +// and waits for in-flight async handlers to settle +await effect.dispose() + +console.log(effect.disposed) // true +``` + +`dispose()` is idempotent — calling it multiple times is safe. It returns a promise that resolves when all in-flight async handlers have settled (via `Promise.allSettled`). + +### Query Features + +Effects support the full query system — everything you can do with live query collections works with effects: + +```ts +// Joins +createEffect({ + query: (q) => + q + .from({ user: usersCollection }) + .join({ post: postsCollection }, ({ user, post }) => + eq(user.id, post.userId) + ) + .select(({ user, post }) => ({ + userName: user.name, + postTitle: post.title, + })), + on: 'enter', + handler: (event) => { + console.log(`${event.value.userName} published "${event.value.postTitle}"`) + }, +}) + +// Filters +createEffect({ + query: (q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.role, 'admin')), + on: 'enter', + handler: (event) => { + console.log(`New admin: ${event.value.name}`) + }, +}) + +// OrderBy + Limit (top-K window) +createEffect({ + query: (q) => + q + .from({ score: scoresCollection }) + .orderBy(({ score }) => score.points, 'desc') + .limit(10), + on: 'delta', + handler: (event) => { + // Fires when items enter or exit the top 10 + console.log(`${event.type}: ${event.value.name} (${event.value.points} pts)`) + }, +}) +``` + +When using `orderBy` with `limit`, effects track a top-K window. You receive `enter` events when items enter the window and `exit` events when they're displaced. + +### Transaction Coalescing + +When multiple changes occur within a single transaction, effects coalesce them into a single batch. This means your handlers are called once with all the changes from that transaction, not once per individual write: + +```ts +createEffect({ + query: (q) => q.from({ item: itemsCollection }), + on: 'enter', + batchHandler: (events) => { + // If 3 items are inserted in one transaction, + // this fires once with all 3 events + console.log(`${events.length} items added`) + }, +}) +``` + +### Using with React + +The `useLiveQueryEffect` hook manages the effect lifecycle automatically — creating on mount, disposing on unmount, and recreating when dependencies change: + +```tsx +import { useLiveQueryEffect } from '@tanstack/react-db' +import { eq } from '@tanstack/db' + +function ChatComponent({ channelId }: { channelId: string }) { + useLiveQueryEffect( + { + query: (q) => + q + .from({ msg: messagesCollection }) + .where(({ msg }) => eq(msg.channelId, channelId)), + on: 'enter', + skipInitial: true, + handler: async (event) => { + await playNotificationSound() + }, + }, + [channelId] // Recreate effect when channelId changes + ) + + return
...
+} +``` + +The second argument is a dependency array (like `useEffect`). When dependencies change, the old effect is disposed and a new one is created with the updated config. + +### Complete Example + +Here's a more complete example showing an effect that monitors order status changes and sends notifications: + +```ts +import { createEffect, eq } from '@tanstack/db' + +const orderEffect = createEffect({ + id: 'order-status-monitor', + query: (q) => + q + .from({ order: ordersCollection }) + .join({ customer: customersCollection }, ({ order, customer }) => + eq(order.customerId, customer.id) + ) + .where(({ order }) => eq(order.status, 'shipped')) + .select(({ order, customer }) => ({ + orderId: order.id, + customerEmail: customer.email, + trackingNumber: order.trackingNumber, + })), + on: 'enter', + skipInitial: true, + + handler: async (event, ctx) => { + await sendShipmentEmail({ + to: event.value.customerEmail, + orderId: event.value.orderId, + tracking: event.value.trackingNumber, + signal: ctx.signal, + }) + }, + + onError: (error, event) => { + console.error(`Failed to notify for order ${event.key}:`, error) + }, + + onSourceError: (error) => { + alertOpsTeam('Order monitoring effect failed', error) + }, +}) + +// On application shutdown +await orderEffect.dispose() +``` + ## Expression Functions Reference The query system provides a comprehensive set of functions for filtering, transforming, and aggregating data. diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index ccf7cbb6e..963e51abe 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -26,6 +26,17 @@ export { type IndexOptions } from './indexes/index-options.js' // Expression helpers export * from './query/expression-helpers.js' +// Reactive effects +export { + createEffect, + type DeltaEvent, + type DeltaType, + type EffectConfig, + type EffectContext, + type Effect, + type EffectQueryInput, +} from './query/effect.js' + // Re-export some stuff explicitly to ensure the type & value is exported export type { Collection } from './collection/index.js' export { IR } diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts new file mode 100644 index 000000000..331fd4359 --- /dev/null +++ b/packages/db/src/query/effect.ts @@ -0,0 +1,1091 @@ +import { D2, output } from '@tanstack/db-ivm' +import { transactionScopedScheduler } from '../scheduler.js' +import { getActiveTransaction } from '../transactions.js' +import { compileQuery } from './compiler/index.js' +import { + normalizeExpressionPaths, + normalizeOrderByPaths, +} from './compiler/expressions.js' +import { getCollectionBuilder } from './live/collection-registry.js' +import { + buildQueryFromConfig, + computeOrderedLoadCursor, + computeSubscriptionOrderByHints, + extractCollectionAliases, + extractCollectionsFromQuery, + filterDuplicateInserts, + sendChangesToInput, + splitUpdates, + trackBiggestSentValue, +} from './live/utils.js' +import type { RootStreamBuilder } from '@tanstack/db-ivm' +import type { Collection } from '../collection/index.js' +import type { CollectionSubscription } from '../collection/subscription.js' +import type { InitialQueryBuilder, QueryBuilder } from './builder/index.js' +import type { Context } from './builder/types.js' +import type { BasicExpression, QueryIR } from './ir.js' +import type { OrderByOptimizationInfo } from './compiler/order-by.js' +import type { ChangeMessage, KeyedStream, ResultStream } from '../types.js' + +// --------------------------------------------------------------------------- +// Public Types +// --------------------------------------------------------------------------- + +/** Event types for query result deltas */ +export type DeltaType = 'enter' | 'exit' | 'update' + +/** Delta event emitted when a row enters, exits, or updates within a query result */ +export interface DeltaEvent< + TRow extends object = Record, + TKey extends string | number = string | number, +> { + type: DeltaType + key: TKey + /** Current value (new value for enter/update, exiting value for exit) */ + value: TRow + /** Previous value before the batch (only present for update events) */ + previousValue?: TRow + metadata?: Record +} + +/** Context passed to effect handlers */ +export interface EffectContext { + /** ID of this effect (auto-generated if not provided) */ + effectId: string + /** Aborted when effect.dispose() is called */ + signal: AbortSignal +} + +/** Query input - can be a builder function or a prebuilt query */ +export type EffectQueryInput = + | ((q: InitialQueryBuilder) => QueryBuilder) + | QueryBuilder + +/** Effect configuration */ +export interface EffectConfig< + TRow extends object = Record, + TKey extends string | number = string | number, +> { + /** Optional ID for debugging/tracing */ + id?: string + + /** Query to watch for deltas */ + query: EffectQueryInput + + /** Which delta types to handle */ + on: DeltaType | Array | 'delta' + + /** Per-row handler (called once per matching delta event) */ + handler?: ( + event: DeltaEvent, + ctx: EffectContext, + ) => void | Promise + + /** Per-batch handler (called once per graph run with all matching events) */ + batchHandler?: ( + events: Array>, + ctx: EffectContext, + ) => void | Promise + + /** Error handler for exceptions thrown by handler/batchHandler */ + onError?: (error: Error, event: DeltaEvent) => void + + /** + * Called when a source collection enters an error or cleaned-up state. + * The effect is automatically disposed after this callback fires. + * If not provided, the error is logged to console.error. + */ + onSourceError?: (error: Error) => void + + /** + * Skip deltas during initial collection load. + * Defaults to false (process all deltas including initial sync). + * Set to true for effects that should only process new changes. + */ + skipInitial?: boolean +} + +/** Handle returned by createEffect */ +export interface Effect { + /** Dispose the effect. Returns a promise that resolves when in-flight handlers complete. */ + dispose: () => Promise + /** Whether this effect has been disposed */ + readonly disposed: boolean +} + +// --------------------------------------------------------------------------- +// Internal Types +// --------------------------------------------------------------------------- + +/** Accumulated changes for a single key within a graph run */ +interface EffectChanges { + deletes: number + inserts: number + /** Value from the latest insert (the newest/current value) */ + insertValue?: T + /** Value from the first delete (the oldest/previous value before the batch) */ + deleteValue?: T +} + +// --------------------------------------------------------------------------- +// Global Counter +// --------------------------------------------------------------------------- + +let effectCounter = 0 + +// --------------------------------------------------------------------------- +// createEffect +// --------------------------------------------------------------------------- + +/** + * Creates a reactive effect that fires handlers when rows enter, exit, or + * update within a query result. Effects process deltas only — they do not + * maintain or require the full materialised query result. + * + * @example + * ```typescript + * const effect = createEffect({ + * query: (q) => q.from({ msg: messagesCollection }) + * .where(({ msg }) => eq(msg.role, 'user')), + * on: 'enter', + * handler: async (event) => { + * await generateResponse(event.value) + * }, + * }) + * + * // Later: stop the effect + * await effect.dispose() + * ``` + */ +export function createEffect< + TRow extends object = Record, + TKey extends string | number = string | number, +>(config: EffectConfig): Effect { + const id = config.id ?? `live-query-effect-${++effectCounter}` + + // Normalise the `on` parameter into a set of delta types + const deltaTypes = normaliseDeltaTypes(config.on) + + // AbortController for signalling disposal to handlers + const abortController = new AbortController() + + const ctx: EffectContext = { + effectId: id, + signal: abortController.signal, + } + + // Track in-flight async handler promises so dispose() can await them + const inFlightHandlers = new Set>() + let disposed = false + + // Callback invoked by the pipeline runner with each batch of delta events + const onBatchProcessed = (events: Array>) => { + if (disposed) return + + // Filter to only the requested delta types + const filtered = events.filter((e) => deltaTypes.has(e.type)) + if (filtered.length === 0) return + + // Batch handler + if (config.batchHandler) { + try { + const result = config.batchHandler(filtered, ctx) + if (result instanceof Promise) { + const tracked = result.catch((error) => { + reportError(error, filtered[0]!, config.onError) + }) + trackPromise(tracked, inFlightHandlers) + } + } catch (error) { + // For batch handler errors, report with first event as context + reportError(error, filtered[0]!, config.onError) + } + } + + // Per-row handler + if (config.handler) { + for (const event of filtered) { + if (abortController.signal.aborted) break + try { + const result = config.handler(event, ctx) + if (result instanceof Promise) { + const tracked = result.catch((error) => { + reportError(error, event, config.onError) + }) + trackPromise(tracked, inFlightHandlers) + } + } catch (error) { + reportError(error, event, config.onError) + } + } + } + } + + // The dispose function is referenced by both the returned Effect object + // and the onSourceError callback, so we define it first. + const dispose = async () => { + if (disposed) return + disposed = true + + // Abort signal for in-flight handlers + abortController.abort() + + // Tear down the pipeline (unsubscribe from sources, etc.) + runner.dispose() + + // Wait for any in-flight async handlers to settle + if (inFlightHandlers.size > 0) { + await Promise.allSettled([...inFlightHandlers]) + } + } + + // Create and start the pipeline + const runner = new EffectPipelineRunner({ + query: config.query, + skipInitial: config.skipInitial ?? false, + onBatchProcessed, + onSourceError: (error: Error) => { + if (disposed) return + + if (config.onSourceError) { + config.onSourceError(error) + } else { + console.error(`[Effect '${id}'] ${error.message}. Disposing effect.`) + } + + // Auto-dispose — the effect can no longer function + dispose() + }, + }) + runner.start() + + return { + dispose, + get disposed() { + return disposed + }, + } +} + +// --------------------------------------------------------------------------- +// EffectPipelineRunner +// --------------------------------------------------------------------------- + +interface EffectPipelineRunnerConfig< + TRow extends object, + TKey extends string | number, +> { + query: EffectQueryInput + skipInitial: boolean + onBatchProcessed: (events: Array>) => void + /** Called when a source collection enters error or cleaned-up state */ + onSourceError: (error: Error) => void +} + +/** + * Internal class that manages a D2 pipeline for effect delta processing. + * + * Sets up the IVM graph, subscribes to source collections, runs the graph + * when changes arrive, and classifies output multiplicities into DeltaEvents. + * + * Unlike CollectionConfigBuilder, this does NOT: + * - Create or write to a collection (no materialisation) + * - Manage ordering, windowing, or lazy loading + */ +class EffectPipelineRunner { + private readonly query: QueryIR + private readonly collections: Record> + private readonly collectionByAlias: Record> + + private graph: D2 | undefined + private inputs: Record> | undefined + private pipeline: ResultStream | undefined + private sourceWhereClauses: Map> | undefined + private compiledAliasToCollectionId: Record = {} + + // Mutable objects passed to compileQuery by reference. + // The join compiler captures these references and reads them later when + // the graph runs, so they must be populated before the first graph run. + private readonly subscriptions: Record = {} + private readonly lazySourcesCallbacks: Record = {} + private readonly lazySources = new Set() + // OrderBy optimization info populated by the compiler when limit is present + private readonly optimizableOrderByCollections: Record< + string, + OrderByOptimizationInfo + > = {} + + // Ordered subscription state for cursor-based loading + private readonly biggestSentValue = new Map() + private readonly lastLoadRequestKey = new Map() + private pendingOrderedLoadPromise: Promise | undefined + + // Subscription management + private readonly unsubscribeCallbacks = new Set<() => void>() + // Duplicate insert prevention per alias + private readonly sentToD2KeysByAlias = new Map>() + + // Output accumulator + private pendingChanges: Map> = new Map() + + // skipInitial state + private readonly skipInitial: boolean + private initialLoadComplete = false + + // Scheduler integration + private subscribedToAllCollections = false + private readonly builderDependencies = new Set() + private readonly aliasDependencies: Record> = {} + + // Reentrance guard + private isGraphRunning = false + private disposed = false + // When dispose() is called mid-graph-run, defer heavy cleanup until the run completes + private deferredCleanup = false + + private readonly onBatchProcessed: ( + events: Array>, + ) => void + private readonly onSourceError: (error: Error) => void + + constructor(config: EffectPipelineRunnerConfig) { + this.skipInitial = config.skipInitial + this.onBatchProcessed = config.onBatchProcessed + this.onSourceError = config.onSourceError + + // Parse query + this.query = buildQueryFromConfig({ query: config.query }) + + // Extract source collections + this.collections = extractCollectionsFromQuery(this.query) + const aliasesById = extractCollectionAliases(this.query) + + // Build alias → collection map + this.collectionByAlias = {} + for (const [collectionId, aliases] of aliasesById.entries()) { + const collection = this.collections[collectionId] + if (!collection) continue + for (const alias of aliases) { + this.collectionByAlias[alias] = collection + } + } + + // Compile the pipeline + this.compilePipeline() + } + + /** Compile the D2 graph and query pipeline */ + private compilePipeline(): void { + this.graph = new D2() + this.inputs = Object.fromEntries( + Object.keys(this.collectionByAlias).map((alias) => [ + alias, + this.graph!.newInput(), + ]), + ) + + const compilation = compileQuery( + this.query, + this.inputs as Record, + this.collections, + // These mutable objects are captured by reference. The join compiler + // reads them later when the graph runs, so they must be populated + // (in start()) before the first graph run. + this.subscriptions, + this.lazySourcesCallbacks, + this.lazySources, + this.optimizableOrderByCollections, + () => {}, // setWindowFn (no-op — effects don't paginate) + ) + + this.pipeline = compilation.pipeline + this.sourceWhereClauses = compilation.sourceWhereClauses + this.compiledAliasToCollectionId = compilation.aliasToCollectionId + + // Attach the output operator that accumulates changes + this.pipeline.pipe( + output((data) => { + const messages = data.getInner() + messages.reduce(accumulateEffectChanges, this.pendingChanges) + }), + ) + + this.graph.finalize() + } + + /** Subscribe to source collections and start processing */ + start(): void { + // Use compiled aliases as the source of truth + const compiledAliases = Object.entries(this.compiledAliasToCollectionId) + if (compiledAliases.length === 0) { + // Nothing to subscribe to + return + } + + // When not skipping initial, we always process events immediately + if (!this.skipInitial) { + this.initialLoadComplete = true + } + + // We need to defer initial data processing until ALL subscriptions are + // created, because join pipelines look up subscriptions by alias during + // the graph run. If we run the graph while some aliases are still missing, + // the join tap operator will throw. + // + // Strategy: subscribe to each collection but buffer incoming changes. + // After all subscriptions are in place, flush the buffers and switch to + // direct processing mode. + + const pendingBuffers = new Map< + string, + Array>> + >() + + for (const [alias, collectionId] of compiledAliases) { + const collection = + this.collectionByAlias[alias] ?? this.collections[collectionId]! + + // Initialise per-alias duplicate tracking + this.sentToD2KeysByAlias.set(alias, new Set()) + + // Discover dependencies: if source collection is itself a live query + // collection, its builder must run first during transaction flushes. + const dependencyBuilder = getCollectionBuilder(collection) + if (dependencyBuilder) { + this.aliasDependencies[alias] = [dependencyBuilder] + this.builderDependencies.add(dependencyBuilder) + } else { + this.aliasDependencies[alias] = [] + } + + // Get where clause for this alias (for predicate push-down) + const whereClause = this.sourceWhereClauses?.get(alias) + const whereExpression = whereClause + ? normalizeExpressionPaths(whereClause, alias) + : undefined + + // Initialise buffer for this alias + const buffer: Array>> = [] + pendingBuffers.set(alias, buffer) + + // Lazy aliases (marked by the join compiler) should NOT load initial state + // eagerly — the join tap operator will load exactly the rows it needs on demand. + // For on-demand collections, eager loading would trigger a full server fetch + // for data that should be lazily loaded based on join keys. + const isLazy = this.lazySources.has(alias) + + // Check if this alias has orderBy optimization (cursor-based loading) + const orderByInfo = this.getOrderByInfoForAlias(alias) + + // Build the change callback — for ordered aliases, split updates into + // delete+insert and track the biggest sent value for cursor positioning. + const changeCallback = orderByInfo + ? (changes: Array>) => { + if (pendingBuffers.has(alias)) { + pendingBuffers.get(alias)!.push(changes) + } else { + this.trackSentValues(alias, changes, orderByInfo.comparator) + const split = [...splitUpdates(changes)] + this.handleSourceChanges(alias, split) + } + } + : (changes: Array>) => { + if (pendingBuffers.has(alias)) { + pendingBuffers.get(alias)!.push(changes) + } else { + this.handleSourceChanges(alias, changes) + } + } + + // Determine subscription options based on ordered vs unordered path + const subscriptionOptions = this.buildSubscriptionOptions( + alias, + isLazy, + orderByInfo, + whereExpression, + ) + + // Subscribe to source changes + const subscription = collection.subscribeChanges( + changeCallback, + subscriptionOptions, + ) + + // Store subscription immediately so the join compiler can find it + this.subscriptions[alias] = subscription + + // For ordered aliases with an index, trigger the initial limited snapshot. + // This loads only the top N rows rather than the entire collection. + if (orderByInfo) { + this.requestInitialOrderedSnapshot(alias, orderByInfo, subscription) + } + + this.unsubscribeCallbacks.add(() => { + subscription.unsubscribe() + delete this.subscriptions[alias] + }) + + // Listen for status changes on source collections + const statusUnsubscribe = collection.on(`status:change`, (event) => { + if (this.disposed) return + + const { status } = event + + // Source entered error state — effect can no longer function + if (status === `error`) { + this.onSourceError( + new Error( + `Source collection '${collectionId}' entered error state`, + ), + ) + return + } + + // Source was manually cleaned up — effect can no longer function + if (status === `cleaned-up`) { + this.onSourceError( + new Error( + `Source collection '${collectionId}' was cleaned up while effect depends on it`, + ), + ) + return + } + + // Track source readiness for skipInitial + if ( + this.skipInitial && + !this.initialLoadComplete && + this.checkAllCollectionsReady() + ) { + this.initialLoadComplete = true + } + }) + this.unsubscribeCallbacks.add(statusUnsubscribe) + } + + // Mark as subscribed so the graph can start running + this.subscribedToAllCollections = true + + // All subscriptions are now in place. Flush buffered changes by sending + // data to D2 inputs first (without running the graph), then run the graph + // once. This prevents intermediate join states from producing duplicates. + // + // We remove each alias from pendingBuffers *before* draining, which + // switches that alias to direct-processing mode. Any new callbacks that + // fire during the drain (e.g. from requestLimitedSnapshot) will go + // through handleSourceChanges directly instead of being lost. + for (const [alias] of pendingBuffers) { + const buffer = pendingBuffers.get(alias)! + pendingBuffers.delete(alias) + + const orderByInfo = this.getOrderByInfoForAlias(alias) + + // Drain all buffered batches. Since we deleted the alias from + // pendingBuffers above, any new changes arriving during drain go + // through handleSourceChanges directly (not back into this buffer). + for (const changes of buffer) { + if (orderByInfo) { + this.trackSentValues(alias, changes, orderByInfo.comparator) + const split = [...splitUpdates(changes)] + this.sendChangesToD2(alias, split) + } else { + this.sendChangesToD2(alias, changes) + } + } + } + + // Initial graph run to process any synchronously-available data. + // For skipInitial, this run's output is discarded (initialLoadComplete is still false). + this.runGraph() + + // After the initial graph run, if all sources are ready, + // mark initial load as complete so future events are processed. + if (this.skipInitial && !this.initialLoadComplete) { + if (this.checkAllCollectionsReady()) { + this.initialLoadComplete = true + } + } + } + + /** Handle incoming changes from a source collection */ + private handleSourceChanges( + alias: string, + changes: Array>, + ): void { + this.sendChangesToD2(alias, changes) + this.scheduleGraphRun(alias) + } + + /** + * Schedule a graph run via the transaction-scoped scheduler. + * + * When called within a transaction, the run is deferred until the + * transaction flushes, coalescing multiple changes into a single graph + * execution. Without a transaction, the graph runs immediately. + * + * Dependencies are discovered from source collections that are themselves + * live query collections, ensuring parent queries run before effects. + */ + private scheduleGraphRun(alias?: string): void { + const contextId = getActiveTransaction()?.id + + // Collect dependencies for this schedule call + const deps = new Set(this.builderDependencies) + if (alias) { + const aliasDeps = this.aliasDependencies[alias] + if (aliasDeps) { + for (const dep of aliasDeps) { + deps.add(dep) + } + } + } + + // Ensure dependent builders are scheduled in this context so that + // dependency edges always point to a real job. + if (contextId) { + for (const dep of deps) { + if ( + typeof dep === `object` && + dep !== null && + `scheduleGraphRun` in dep && + typeof (dep as any).scheduleGraphRun === `function` + ) { + ;(dep as any).scheduleGraphRun(undefined, { contextId }) + } + } + } + + transactionScopedScheduler.schedule({ + contextId, + jobId: this, + dependencies: deps, + run: () => this.executeScheduledGraphRun(), + }) + } + + /** + * Called by the scheduler when dependencies are satisfied. + * Checks that the effect is still active before running. + */ + private executeScheduledGraphRun(): void { + if (this.disposed || !this.subscribedToAllCollections) return + this.runGraph() + } + + /** + * Send changes to the D2 input for the given alias. + * Returns the number of multiset entries sent. + */ + private sendChangesToD2( + alias: string, + changes: Array>, + ): number { + if (this.disposed || !this.inputs || !this.graph) return 0 + + const input = this.inputs[alias] + if (!input) return 0 + + const collection = this.collectionByAlias[alias] + if (!collection) return 0 + + // Filter duplicates per alias + const sentKeys = this.sentToD2KeysByAlias.get(alias)! + const filtered = filterDuplicateInserts(changes, sentKeys) + + return sendChangesToInput(input, filtered, collection.config.getKey) + } + + /** + * Run the D2 graph until quiescence, then emit accumulated events once. + * + * All output across the entire while-loop is accumulated into a single + * batch so that users see one `onBatchProcessed` invocation per scheduler + * run, even when ordered loading causes multiple graph steps. + */ + private runGraph(): void { + if (this.isGraphRunning || this.disposed || !this.graph) return + + this.isGraphRunning = true + try { + while (this.graph.pendingWork()) { + this.graph.run() + // A handler (via onBatchProcessed) or source error callback may have + // called dispose() during graph.run(). Stop early to avoid operating + // on stale state. TS narrows disposed to false from the guard above + // but it can change during graph.run() via callbacks. + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.disposed) break + // After each step, check if ordered queries need more data. + // loadMoreIfNeeded may send data to D2 inputs (via requestLimitedSnapshot), + // causing pendingWork() to return true for the next iteration. + this.loadMoreIfNeeded() + } + // Emit all accumulated events once the graph reaches quiescence + this.flushPendingChanges() + } finally { + this.isGraphRunning = false + // If dispose() was called during this graph run, it deferred the heavy + // cleanup (clearing graph/inputs/pipeline) to avoid nulling references + // mid-loop. Complete that cleanup now. + if (this.deferredCleanup) { + this.deferredCleanup = false + this.finalCleanup() + } + } + } + + /** Classify accumulated changes into DeltaEvents and invoke the callback */ + private flushPendingChanges(): void { + if (this.pendingChanges.size === 0) return + + // If skipInitial and initial load isn't complete yet, discard + if (this.skipInitial && !this.initialLoadComplete) { + this.pendingChanges = new Map() + return + } + + const events: Array> = [] + + for (const [key, changes] of this.pendingChanges) { + const event = classifyDelta(key as TKey, changes) + if (event) { + events.push(event) + } + } + + this.pendingChanges = new Map() + + if (events.length > 0) { + this.onBatchProcessed(events) + } + } + + /** Check if all source collections are in the ready state */ + private checkAllCollectionsReady(): boolean { + return Object.values(this.collections).every((collection) => + collection.isReady(), + ) + } + + /** + * Build subscription options for an alias based on whether it uses ordered + * loading, is lazy, or should pass orderBy/limit hints. + */ + private buildSubscriptionOptions( + alias: string, + isLazy: boolean, + orderByInfo: OrderByOptimizationInfo | undefined, + whereExpression: BasicExpression | undefined, + ): { + includeInitialState?: boolean + whereExpression?: BasicExpression + orderBy?: any + limit?: number + } { + // Ordered aliases explicitly disable initial state — data is loaded + // via requestLimitedSnapshot/requestSnapshot after subscription setup. + if (orderByInfo) { + return { includeInitialState: false, whereExpression } + } + + const includeInitialState = !isLazy + + // For unordered subscriptions, pass orderBy/limit hints so on-demand + // collections can optimise server-side fetching. + const hints = computeSubscriptionOrderByHints(this.query, alias) + + return { + includeInitialState, + whereExpression, + ...(hints.orderBy ? { orderBy: hints.orderBy } : {}), + ...(hints.limit !== undefined ? { limit: hints.limit } : {}), + } + } + + /** + * Request the initial ordered snapshot for an alias. + * Uses requestLimitedSnapshot (index-based cursor) or requestSnapshot + * (full load with limit) depending on whether an index is available. + */ + private requestInitialOrderedSnapshot( + alias: string, + orderByInfo: OrderByOptimizationInfo, + subscription: CollectionSubscription, + ): void { + const { orderBy, offset, limit, index } = orderByInfo + const normalizedOrderBy = normalizeOrderByPaths(orderBy, alias) + + if (index) { + subscription.setOrderByIndex(index) + subscription.requestLimitedSnapshot({ + limit: offset + limit, + orderBy: normalizedOrderBy, + trackLoadSubsetPromise: false, + }) + } else { + subscription.requestSnapshot({ + orderBy: normalizedOrderBy, + limit: offset + limit, + trackLoadSubsetPromise: false, + }) + } + } + + /** + * Get orderBy optimization info for a given alias. + * Returns undefined if no optimization exists for this alias. + */ + private getOrderByInfoForAlias( + alias: string, + ): OrderByOptimizationInfo | undefined { + // optimizableOrderByCollections is keyed by collection ID + const collectionId = this.compiledAliasToCollectionId[alias] + if (!collectionId) return undefined + + const info = this.optimizableOrderByCollections[collectionId] + if (info && info.alias === alias) { + return info + } + return undefined + } + + /** + * After each graph run step, check if any ordered query's topK operator + * needs more data. If so, load more rows via requestLimitedSnapshot. + */ + private loadMoreIfNeeded(): void { + for (const [, orderByInfo] of Object.entries( + this.optimizableOrderByCollections, + )) { + if (!orderByInfo.dataNeeded) continue + + if (this.pendingOrderedLoadPromise) { + // Wait for in-flight loads to complete before requesting more + continue + } + + const n = orderByInfo.dataNeeded() + if (n > 0) { + this.loadNextItems(orderByInfo, n) + } + } + } + + /** + * Load n more items from the source collection, starting from the cursor + * position (the biggest value sent so far). + */ + private loadNextItems(orderByInfo: OrderByOptimizationInfo, n: number): void { + const { alias } = orderByInfo + const subscription = this.subscriptions[alias] + if (!subscription) return + + const cursor = computeOrderedLoadCursor( + orderByInfo, + this.biggestSentValue.get(alias), + this.lastLoadRequestKey.get(alias), + alias, + n, + ) + if (!cursor) return // Duplicate request — skip + + this.lastLoadRequestKey.set(alias, cursor.loadRequestKey) + + subscription.requestLimitedSnapshot({ + orderBy: cursor.normalizedOrderBy, + limit: n, + minValues: cursor.minValues, + trackLoadSubsetPromise: false, + onLoadSubsetResult: (loadResult: Promise | true) => { + // Track in-flight load to prevent redundant concurrent requests + if (loadResult instanceof Promise) { + this.pendingOrderedLoadPromise = loadResult + loadResult.finally(() => { + if (this.pendingOrderedLoadPromise === loadResult) { + this.pendingOrderedLoadPromise = undefined + } + }) + } + }, + }) + } + + /** + * Track the biggest value sent for a given ordered alias. + * Used for cursor-based pagination in loadNextItems. + */ + private trackSentValues( + alias: string, + changes: Array>, + comparator: (a: any, b: any) => number, + ): void { + const sentKeys = this.sentToD2KeysByAlias.get(alias) ?? new Set() + const result = trackBiggestSentValue( + changes, + this.biggestSentValue.get(alias), + sentKeys, + comparator, + ) + this.biggestSentValue.set(alias, result.biggest) + if (result.shouldResetLoadKey) { + this.lastLoadRequestKey.delete(alias) + } + } + + /** Tear down subscriptions and clear state */ + dispose(): void { + if (this.disposed) return + this.disposed = true + this.subscribedToAllCollections = false + + // Immediately unsubscribe from sources and clear cheap state + this.unsubscribeCallbacks.forEach((fn) => fn()) + this.unsubscribeCallbacks.clear() + this.sentToD2KeysByAlias.clear() + this.pendingChanges.clear() + this.lazySources.clear() + this.builderDependencies.clear() + this.biggestSentValue.clear() + this.lastLoadRequestKey.clear() + this.pendingOrderedLoadPromise = undefined + + // Clear mutable objects + for (const key of Object.keys(this.lazySourcesCallbacks)) { + delete this.lazySourcesCallbacks[key] + } + for (const key of Object.keys(this.aliasDependencies)) { + delete this.aliasDependencies[key] + } + for (const key of Object.keys(this.optimizableOrderByCollections)) { + delete this.optimizableOrderByCollections[key] + } + + // If the graph is currently running, defer clearing graph/inputs/pipeline + // until runGraph() completes — otherwise we'd null references mid-loop. + if (this.isGraphRunning) { + this.deferredCleanup = true + } else { + this.finalCleanup() + } + } + + /** Clear graph references — called after graph run completes or immediately from dispose */ + private finalCleanup(): void { + this.graph = undefined + this.inputs = undefined + this.pipeline = undefined + this.sourceWhereClauses = undefined + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Normalise the `on` config value into a Set of DeltaTypes */ +function normaliseDeltaTypes( + on: DeltaType | Array | 'delta', +): Set { + if (on === `delta`) { + return new Set([`enter`, `exit`, `update`]) + } + if (Array.isArray(on)) { + return new Set(on) + } + return new Set([on]) +} + +/** + * Accumulate D2 output multiplicities into per-key effect changes. + * Tracks both insert values (new) and delete values (old) separately + * so that update and exit events can include previousValue. + */ +function accumulateEffectChanges( + acc: Map>, + [[key, tupleData], multiplicity]: [ + [unknown, [any, string | undefined]], + number, + ], +): Map> { + const [value] = tupleData as [T, string | undefined] + + const changes: EffectChanges = acc.get(key) || { + deletes: 0, + inserts: 0, + } + + if (multiplicity < 0) { + changes.deletes += Math.abs(multiplicity) + // Keep only the first delete value — this is the pre-batch state + changes.deleteValue ??= value + } else if (multiplicity > 0) { + changes.inserts += multiplicity + // Always overwrite with the latest insert — this is the post-batch state + changes.insertValue = value + } + + acc.set(key, changes) + return acc +} + +/** Classify accumulated per-key changes into a DeltaEvent */ +function classifyDelta( + key: TKey, + changes: EffectChanges, +): DeltaEvent | undefined { + const { inserts, deletes, insertValue, deleteValue } = changes + + if (inserts > 0 && deletes === 0) { + // Row entered the query result + return { type: `enter`, key, value: insertValue! } + } + + if (deletes > 0 && inserts === 0) { + // Row exited the query result — value is the exiting value, + // previousValue is omitted (it would be identical to value) + return { type: `exit`, key, value: deleteValue! } + } + + if (inserts > 0 && deletes > 0) { + // Row updated within the query result + return { + type: `update`, + key, + value: insertValue!, + previousValue: deleteValue, + } + } + + // inserts === 0 && deletes === 0 — no net change (should not happen) + return undefined +} + +/** Track a promise in the in-flight set, automatically removing on settlement */ +function trackPromise( + promise: Promise, + inFlightHandlers: Set>, +): void { + inFlightHandlers.add(promise) + promise.finally(() => { + inFlightHandlers.delete(promise) + }) +} + +/** Report an error to the onError callback or console */ +function reportError( + error: unknown, + event: DeltaEvent, + onError?: (error: Error, event: DeltaEvent) => void, +): void { + const normalised = error instanceof Error ? error : new Error(String(error)) + if (onError) { + try { + onError(normalised, event) + } catch { + // Don't let onError errors propagate + console.error(`[Effect] Error in onError handler:`, normalised) + } + } else { + console.error(`[Effect] Unhandled error in handler:`, normalised) + } +} diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b2a25ead1..d4ad22db2 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,6 +1,5 @@ import { D2, output } from '@tanstack/db-ivm' import { compileQuery } from '../compiler/index.js' -import { buildQuery, getQueryIR } from '../builder/index.js' import { MissingAliasInputsError, SetWindowRequiresOrderByError, @@ -10,6 +9,12 @@ import { getActiveTransaction } from '../../transactions.js' import { CollectionSubscriber } from './collection-subscriber.js' import { getCollectionBuilder } from './collection-registry.js' import { LIVE_QUERY_INTERNAL } from './internal.js' +import { + buildQueryFromConfig, + extractCollectionAliases, + extractCollectionFromSource, + extractCollectionsFromQuery, +} from './utils.js' import type { LiveQueryInternalUtils } from './internal.js' import type { WindowOptions } from '../compiler/index.js' import type { SchedulerContextId } from '../../scheduler.js' @@ -946,16 +951,6 @@ export class CollectionConfigBuilder< } } -function buildQueryFromConfig( - config: LiveQueryCollectionConfig, -) { - // Build the query using the provided query builder function or instance - if (typeof config.query === `function`) { - return buildQuery(config.query) - } - return getQueryIR(config.query) -} - function createOrderByComparator( orderByIndices: WeakMap, ) { @@ -980,127 +975,6 @@ function createOrderByComparator( } } -/** - * Helper function to extract collections from a compiled query - * Traverses the query IR to find all collection references - * Maps collections by their ID (not alias) as expected by the compiler - */ -function extractCollectionsFromQuery( - query: any, -): Record> { - const collections: Record = {} - - // Helper function to recursively extract collections from a query or source - function extractFromSource(source: any) { - if (source.type === `collectionRef`) { - collections[source.collection.id] = source.collection - } else if (source.type === `queryRef`) { - // Recursively extract from subquery - extractFromQuery(source.query) - } - } - - // Helper function to recursively extract collections from a query - function extractFromQuery(q: any) { - // Extract from FROM clause - if (q.from) { - extractFromSource(q.from) - } - - // Extract from JOIN clauses - if (q.join && Array.isArray(q.join)) { - for (const joinClause of q.join) { - if (joinClause.from) { - extractFromSource(joinClause.from) - } - } - } - } - - // Start extraction from the root query - extractFromQuery(query) - - return collections -} - -/** - * Helper function to extract the collection that is referenced in the query's FROM clause. - * The FROM clause may refer directly to a collection or indirectly to a subquery. - */ -function extractCollectionFromSource(query: any): Collection { - const from = query.from - - if (from.type === `collectionRef`) { - return from.collection - } else if (from.type === `queryRef`) { - // Recursively extract from subquery - return extractCollectionFromSource(from.query) - } - - throw new Error( - `Failed to extract collection. Invalid FROM clause: ${JSON.stringify(query)}`, - ) -} - -/** - * Extracts all aliases used for each collection across the entire query tree. - * - * Traverses the QueryIR recursively to build a map from collection ID to all aliases - * that reference that collection. This is essential for self-join support, where the - * same collection may be referenced multiple times with different aliases. - * - * For example, given a query like: - * ```ts - * q.from({ employee: employeesCollection }) - * .join({ manager: employeesCollection }, ({ employee, manager }) => - * eq(employee.managerId, manager.id) - * ) - * ``` - * - * This function would return: - * ``` - * Map { "employees" => Set { "employee", "manager" } } - * ``` - * - * @param query - The query IR to extract aliases from - * @returns A map from collection ID to the set of all aliases referencing that collection - */ -function extractCollectionAliases(query: QueryIR): Map> { - const aliasesById = new Map>() - - function recordAlias(source: any) { - if (!source) return - - if (source.type === `collectionRef`) { - const { id } = source.collection - const existing = aliasesById.get(id) - if (existing) { - existing.add(source.alias) - } else { - aliasesById.set(id, new Set([source.alias])) - } - } else if (source.type === `queryRef`) { - traverse(source.query) - } - } - - function traverse(q?: QueryIR) { - if (!q) return - - recordAlias(q.from) - - if (q.join) { - for (const joinClause of q.join) { - recordAlias(joinClause.from) - } - } - } - - traverse(query) - - return aliasesById -} - function accumulateChanges( acc: Map>, [[key, tupleData], multiplicity]: [ diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 4ff265220..8eda5cc88 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -1,9 +1,15 @@ -import { MultiSet, serializeValue } from '@tanstack/db-ivm' import { normalizeExpressionPaths, normalizeOrderByPaths, } from '../compiler/expressions.js' -import type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm' +import { + computeOrderedLoadCursor, + computeSubscriptionOrderByHints, + filterDuplicateInserts, + sendChangesToInput, + splitUpdates, + trackBiggestSentValue, +} from './utils.js' import type { Collection } from '../../collection/index.js' import type { ChangeMessage, @@ -147,25 +153,11 @@ export class CollectionSubscriber< changes: Iterable>, callback?: () => boolean, ) { - // Filter changes to prevent duplicate inserts to D2 pipeline. - // This ensures D2 multiplicity stays at 1 for visible items, so deletes - // properly reduce multiplicity to 0 (triggering DELETE output). const changesArray = Array.isArray(changes) ? changes : [...changes] - const filteredChanges: Array> = [] - for (const change of changesArray) { - if (change.type === `insert`) { - if (this.sentToD2Keys.has(change.key)) { - // Skip duplicate insert - already sent to D2 - continue - } - this.sentToD2Keys.add(change.key) - } else if (change.type === `delete`) { - // Remove from tracking so future re-inserts are allowed - this.sentToD2Keys.delete(change.key) - } - // Updates are handled as delete+insert by splitUpdates, so no special handling needed - filteredChanges.push(change) - } + const filteredChanges = filterDuplicateInserts( + changesArray, + this.sentToD2Keys, + ) // currentSyncState and input are always defined when this method is called // (only called from active subscriptions during a sync session) @@ -202,27 +194,10 @@ export class CollectionSubscriber< } // Get the query's orderBy and limit to pass to loadSubset. - // Only include orderBy when it is scoped to this alias and uses simple refs, - // to avoid leaking cross-collection paths into backend-specific compilers. - const { orderBy, limit, offset } = this.collectionConfigBuilder.query - const effectiveLimit = - limit !== undefined && offset !== undefined ? limit + offset : limit - const normalizedOrderBy = orderBy - ? normalizeOrderByPaths(orderBy, this.alias) - : undefined - const canPassOrderBy = - normalizedOrderBy?.every((clause) => { - const exp = clause.expression - if (exp.type !== `ref`) { - return false - } - const path = exp.path - return Array.isArray(path) && path.length === 1 - }) ?? false - const orderByForSubscription = canPassOrderBy - ? normalizedOrderBy - : undefined - const limitForSubscription = canPassOrderBy ? effectiveLimit : undefined + const hints = computeSubscriptionOrderByHints( + this.collectionConfigBuilder.query, + this.alias, + ) // Track loading via the loadSubset promise directly. // requestSnapshot uses trackLoadSubsetPromise: false (needed for truncate handling), @@ -241,8 +216,8 @@ export class CollectionSubscriber< ...(includeInitialState && { includeInitialState }), whereExpression, onStatusChange, - orderBy: orderByForSubscription, - limit: limitForSubscription, + orderBy: hints.orderBy, + limit: hints.limit, onLoadSubsetResult, }) @@ -415,52 +390,28 @@ export class CollectionSubscriber< if (!orderByInfo) { return } - const { orderBy, valueExtractorForRawRow, offset } = orderByInfo - const biggestSentRow = this.biggest - - // Extract all orderBy column values from the biggest sent row - // For single-column: returns single value, for multi-column: returns array - const extractedValues = biggestSentRow - ? valueExtractorForRawRow(biggestSentRow) - : undefined - - // Normalize to array format for minValues - let minValues: Array | undefined - if (extractedValues !== undefined) { - minValues = Array.isArray(extractedValues) - ? extractedValues - : [extractedValues] - } - - const loadRequestKey = this.getLoadRequestKey({ - minValues, - offset, - limit: n, - }) - // Skip if we already requested a load for this cursor+window. - // This prevents infinite loops from cached data re-writes while still allowing - // window moves (offset/limit changes) to trigger new requests. - if (this.lastLoadRequestKey === loadRequestKey) { - return - } + const cursor = computeOrderedLoadCursor( + orderByInfo, + this.biggest, + this.lastLoadRequestKey, + this.alias, + n, + ) + if (!cursor) return // Duplicate request — skip - // Normalize the orderBy clauses such that the references are relative to the collection - const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) + this.lastLoadRequestKey = cursor.loadRequestKey // Take the `n` items after the biggest sent value - // Pass the current window offset to ensure proper deduplication + // Omit offset so requestLimitedSnapshot can advance based on + // the number of rows already loaded (supports offset-based backends). subscription.requestLimitedSnapshot({ - orderBy: normalizedOrderBy, + orderBy: cursor.normalizedOrderBy, limit: n, - minValues, - // Omit offset so requestLimitedSnapshot can advance the offset based on - // the number of rows already loaded (supports offset-based backends). + minValues: cursor.minValues, trackLoadSubsetPromise: false, onLoadSubsetResult: this.orderedLoadSubsetResult, }) - - this.lastLoadRequestKey = loadRequestKey } private getWhereClauseForAlias(): BasicExpression | undefined { @@ -487,24 +438,15 @@ export class CollectionSubscriber< changes: Array>, comparator: (a: any, b: any) => number, ): void { - for (const change of changes) { - if (change.type === `delete`) { - continue - } - - const isNewKey = !this.sentToD2Keys.has(change.key) - - // Only track inserts/updates for cursor positioning, not deletes - if (!this.biggest) { - this.biggest = change.value - this.lastLoadRequestKey = undefined - } else if (comparator(this.biggest, change.value) < 0) { - this.biggest = change.value - this.lastLoadRequestKey = undefined - } else if (isNewKey) { - // New key with same orderBy value - allow another load if needed - this.lastLoadRequestKey = undefined - } + const result = trackBiggestSentValue( + changes, + this.biggest, + this.sentToD2Keys, + comparator, + ) + this.biggest = result.biggest + if (result.shouldResetLoadKey) { + this.lastLoadRequestKey = undefined } } @@ -525,62 +467,4 @@ export class CollectionSubscriber< promise, ) } - - private getLoadRequestKey(options: { - minValues: Array | undefined - offset: number - limit: number - }): string { - return serializeValue({ - minValues: options.minValues ?? null, - offset: options.offset, - limit: options.limit, - }) - } -} - -/** - * Helper function to send changes to a D2 input stream - */ -function sendChangesToInput( - input: RootStreamBuilder, - changes: Iterable, - getKey: (item: ChangeMessage[`value`]) => any, -): number { - const multiSetArray: MultiSetArray = [] - for (const change of changes) { - const key = getKey(change.value) - if (change.type === `insert`) { - multiSetArray.push([[key, change.value], 1]) - } else if (change.type === `update`) { - multiSetArray.push([[key, change.previousValue], -1]) - multiSetArray.push([[key, change.value], 1]) - } else { - // change.type === `delete` - multiSetArray.push([[key, change.value], -1]) - } - } - - if (multiSetArray.length !== 0) { - input.sendData(new MultiSet(multiSetArray)) - } - - return multiSetArray.length -} - -/** Splits updates into a delete of the old value and an insert of the new value */ -function* splitUpdates< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable>, -): Generator> { - for (const change of changes) { - if (change.type === `update`) { - yield { type: `delete`, key: change.key, value: change.previousValue! } - yield { type: `insert`, key: change.key, value: change.value } - } else { - yield change - } - } } diff --git a/packages/db/src/query/live/utils.ts b/packages/db/src/query/live/utils.ts new file mode 100644 index 000000000..07e68d774 --- /dev/null +++ b/packages/db/src/query/live/utils.ts @@ -0,0 +1,356 @@ +import { MultiSet, serializeValue } from '@tanstack/db-ivm' +import { normalizeOrderByPaths } from '../compiler/expressions.js' +import { buildQuery, getQueryIR } from '../builder/index.js' +import type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm' +import type { Collection } from '../../collection/index.js' +import type { ChangeMessage } from '../../types.js' +import type { InitialQueryBuilder, QueryBuilder } from '../builder/index.js' +import type { Context } from '../builder/types.js' +import type { OrderBy, QueryIR } from '../ir.js' +import type { OrderByOptimizationInfo } from '../compiler/order-by.js' + +/** + * Helper function to extract collections from a compiled query. + * Traverses the query IR to find all collection references. + * Maps collections by their ID (not alias) as expected by the compiler. + */ +export function extractCollectionsFromQuery( + query: any, +): Record> { + const collections: Record = {} + + // Helper function to recursively extract collections from a query or source + function extractFromSource(source: any) { + if (source.type === `collectionRef`) { + collections[source.collection.id] = source.collection + } else if (source.type === `queryRef`) { + // Recursively extract from subquery + extractFromQuery(source.query) + } + } + + // Helper function to recursively extract collections from a query + function extractFromQuery(q: any) { + // Extract from FROM clause + if (q.from) { + extractFromSource(q.from) + } + + // Extract from JOIN clauses + if (q.join && Array.isArray(q.join)) { + for (const joinClause of q.join) { + if (joinClause.from) { + extractFromSource(joinClause.from) + } + } + } + } + + // Start extraction from the root query + extractFromQuery(query) + + return collections +} + +/** + * Helper function to extract the collection that is referenced in the query's FROM clause. + * The FROM clause may refer directly to a collection or indirectly to a subquery. + */ +export function extractCollectionFromSource( + query: any, +): Collection { + const from = query.from + + if (from.type === `collectionRef`) { + return from.collection + } else if (from.type === `queryRef`) { + // Recursively extract from subquery + return extractCollectionFromSource(from.query) + } + + throw new Error( + `Failed to extract collection. Invalid FROM clause: ${JSON.stringify(query)}`, + ) +} + +/** + * Extracts all aliases used for each collection across the entire query tree. + * + * Traverses the QueryIR recursively to build a map from collection ID to all aliases + * that reference that collection. This is essential for self-join support, where the + * same collection may be referenced multiple times with different aliases. + * + * For example, given a query like: + * ```ts + * q.from({ employee: employeesCollection }) + * .join({ manager: employeesCollection }, ({ employee, manager }) => + * eq(employee.managerId, manager.id) + * ) + * ``` + * + * This function would return: + * ``` + * Map { "employees" => Set { "employee", "manager" } } + * ``` + * + * @param query - The query IR to extract aliases from + * @returns A map from collection ID to the set of all aliases referencing that collection + */ +export function extractCollectionAliases( + query: QueryIR, +): Map> { + const aliasesById = new Map>() + + function recordAlias(source: any) { + if (!source) return + + if (source.type === `collectionRef`) { + const { id } = source.collection + const existing = aliasesById.get(id) + if (existing) { + existing.add(source.alias) + } else { + aliasesById.set(id, new Set([source.alias])) + } + } else if (source.type === `queryRef`) { + traverse(source.query) + } + } + + function traverse(q?: QueryIR) { + if (!q) return + + recordAlias(q.from) + + if (q.join) { + for (const joinClause of q.join) { + recordAlias(joinClause.from) + } + } + } + + traverse(query) + + return aliasesById +} + +/** + * Builds a query IR from a config object that contains either a query builder + * function or a QueryBuilder instance. + */ +export function buildQueryFromConfig(config: { + query: + | ((q: InitialQueryBuilder) => QueryBuilder) + | QueryBuilder +}): QueryIR { + // Build the query using the provided query builder function or instance + if (typeof config.query === `function`) { + return buildQuery(config.query) + } + return getQueryIR(config.query) +} + +/** + * Helper function to send changes to a D2 input stream. + * Converts ChangeMessages to D2 MultiSet data and sends to the input. + * + * @returns The number of multiset entries sent + */ +export function sendChangesToInput( + input: RootStreamBuilder, + changes: Iterable, + getKey: (item: ChangeMessage[`value`]) => any, +): number { + const multiSetArray: MultiSetArray = [] + for (const change of changes) { + const key = getKey(change.value) + if (change.type === `insert`) { + multiSetArray.push([[key, change.value], 1]) + } else if (change.type === `update`) { + multiSetArray.push([[key, change.previousValue], -1]) + multiSetArray.push([[key, change.value], 1]) + } else { + // change.type === `delete` + multiSetArray.push([[key, change.value], -1]) + } + } + + if (multiSetArray.length !== 0) { + input.sendData(new MultiSet(multiSetArray)) + } + + return multiSetArray.length +} + +/** Splits updates into a delete of the old value and an insert of the new value */ +export function* splitUpdates< + T extends object = Record, + TKey extends string | number = string | number, +>( + changes: Iterable>, +): Generator> { + for (const change of changes) { + if (change.type === `update`) { + yield { type: `delete`, key: change.key, value: change.previousValue! } + yield { type: `insert`, key: change.key, value: change.value } + } else { + yield change + } + } +} + +/** + * Filter changes to prevent duplicate inserts to a D2 pipeline. + * Maintains D2 multiplicity at 1 for visible items so that deletes + * properly reduce multiplicity to 0. + * + * Mutates `sentKeys` in place: adds keys on insert, removes on delete. + */ +export function filterDuplicateInserts( + changes: Array>, + sentKeys: Set, +): Array> { + const filtered: Array> = [] + for (const change of changes) { + if (change.type === `insert`) { + if (sentKeys.has(change.key)) { + continue // Skip duplicate + } + sentKeys.add(change.key) + } else if (change.type === `delete`) { + sentKeys.delete(change.key) + } + filtered.push(change) + } + return filtered +} + +/** + * Track the biggest value seen in a stream of changes, used for cursor-based + * pagination in ordered subscriptions. Returns whether the load request key + * should be reset (allowing another load). + * + * @param changes - changes to process (deletes are skipped) + * @param current - the current biggest value (or undefined if none) + * @param sentKeys - set of keys already sent to D2 (for new-key detection) + * @param comparator - orderBy comparator + * @returns `{ biggest, shouldResetLoadKey }` — the new biggest value and + * whether the caller should clear its last-load-request-key + */ +export function trackBiggestSentValue( + changes: Array>, + current: unknown | undefined, + sentKeys: Set, + comparator: (a: any, b: any) => number, +): { biggest: unknown; shouldResetLoadKey: boolean } { + let biggest = current + let shouldResetLoadKey = false + + for (const change of changes) { + if (change.type === `delete`) continue + + const isNewKey = !sentKeys.has(change.key) + + if (biggest === undefined) { + biggest = change.value + shouldResetLoadKey = true + } else if (comparator(biggest, change.value) < 0) { + biggest = change.value + shouldResetLoadKey = true + } else if (isNewKey) { + // New key at same sort position — allow another load if needed + shouldResetLoadKey = true + } + } + + return { biggest, shouldResetLoadKey } +} + +/** + * Compute orderBy/limit subscription hints for an alias. + * Returns normalised orderBy and effective limit suitable for passing to + * `subscribeChanges`, or `undefined` values when the query's orderBy cannot + * be scoped to the given alias (e.g. cross-collection refs or aggregates). + */ +export function computeSubscriptionOrderByHints( + query: { orderBy?: OrderBy; limit?: number; offset?: number }, + alias: string, +): { orderBy: OrderBy | undefined; limit: number | undefined } { + const { orderBy, limit, offset } = query + const effectiveLimit = + limit !== undefined && offset !== undefined ? limit + offset : limit + + const normalizedOrderBy = orderBy + ? normalizeOrderByPaths(orderBy, alias) + : undefined + + // Only pass orderBy when it is scoped to this alias and uses simple refs, + // to avoid leaking cross-collection paths into backend-specific compilers. + const canPassOrderBy = + normalizedOrderBy?.every((clause) => { + const exp = clause.expression + if (exp.type !== `ref`) return false + const path = exp.path + return Array.isArray(path) && path.length === 1 + }) ?? false + + return { + orderBy: canPassOrderBy ? normalizedOrderBy : undefined, + limit: canPassOrderBy ? effectiveLimit : undefined, + } +} + +/** + * Compute the cursor for loading the next batch of ordered data. + * Extracts values from the biggest sent row and builds the `minValues` + * array and a deduplication key. + * + * @returns `undefined` if the load should be skipped (duplicate request), + * otherwise `{ minValues, normalizedOrderBy, loadRequestKey }`. + */ +export function computeOrderedLoadCursor( + orderByInfo: Pick< + OrderByOptimizationInfo, + 'orderBy' | 'valueExtractorForRawRow' | 'offset' + >, + biggestSentRow: unknown | undefined, + lastLoadRequestKey: string | undefined, + alias: string, + limit: number, +): + | { + minValues: Array | undefined + normalizedOrderBy: OrderBy + loadRequestKey: string + } + | undefined { + const { orderBy, valueExtractorForRawRow, offset } = orderByInfo + + // Extract all orderBy column values from the biggest sent row + // For single-column: returns single value, for multi-column: returns array + const extractedValues = biggestSentRow + ? valueExtractorForRawRow(biggestSentRow as Record) + : undefined + + // Normalize to array format for minValues + let minValues: Array | undefined + if (extractedValues !== undefined) { + minValues = Array.isArray(extractedValues) + ? extractedValues + : [extractedValues] + } + + // Deduplicate: skip if we already issued an identical load request + const loadRequestKey = serializeValue({ + minValues: minValues ?? null, + offset, + limit, + }) + if (lastLoadRequestKey === loadRequestKey) { + return undefined + } + + const normalizedOrderBy = normalizeOrderByPaths(orderBy, alias) + + return { minValues, normalizedOrderBy, loadRequestKey } +} diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts new file mode 100644 index 000000000..6ec84a2b3 --- /dev/null +++ b/packages/db/tests/effect.test.ts @@ -0,0 +1,1678 @@ +import { describe, expect, it, vi } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { Query, createEffect, createTransaction, eq } from '../src/index.js' +import { + mockSyncCollectionOptions, + mockSyncCollectionOptionsNoInitialState, +} from './utils.js' +import type { DeltaEvent } from '../src/index.js' + +// --------------------------------------------------------------------------- +// Test types and helpers +// --------------------------------------------------------------------------- + +type User = { + id: number + name: string + active: boolean +} + +type Issue = { + id: number + title: string + userId: number +} + +const sampleUsers: Array = [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: false }, +] + +const sampleIssues: Array = [ + { id: 1, title: `Bug report`, userId: 1 }, + { id: 2, title: `Feature request`, userId: 2 }, +] + +function createUsersCollection(initialData = sampleUsers) { + return createCollection( + mockSyncCollectionOptions({ + id: `test-users`, + getKey: (user) => user.id, + initialData, + }), + ) +} + +function createIssuesCollection(initialData = sampleIssues) { + return createCollection( + mockSyncCollectionOptions({ + id: `test-issues`, + getKey: (issue) => issue.id, + initialData, + }), + ) +} + +/** Wait for microtasks to flush */ +const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0)) + +/** Collect events from an effect into an array */ +function collectEvents>( + events: Array>, +) { + return (event: DeltaEvent) => { + events.push(event) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe(`createEffect`, () => { + describe(`basic delta events`, () => { + it(`should fire 'enter' events for initial data`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + expect(events.length).toBe(3) + expect(events.every((e) => e.type === `enter`)).toBe(true) + expect(events.map((e) => e.value.name).sort()).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + ]) + + await effect.dispose() + }) + + it(`should fire 'enter' event when a row is inserted into source`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + skipInitial: true, + handler: collectEvents(events), + }) + + await flushPromises() + expect(events.length).toBe(0) // skipInitial should suppress initial data + + // Insert a new user via sync + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Diana`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.key).toBe(4) + expect(events[0]!.value.name).toBe(`Diana`) + + await effect.dispose() + }) + + it(`should fire 'exit' event when a row is deleted from source`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `exit`, + handler: collectEvents(events), + }) + + await flushPromises() + // No exit events from initial data + expect(events.length).toBe(0) + + // Delete a user via sync + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`exit`) + expect(events[0]!.key).toBe(1) + expect(events[0]!.value.name).toBe(`Alice`) + + await effect.dispose() + }) + + it(`should fire 'update' event when a row is updated in source`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `update`, + handler: collectEvents(events), + }) + + await flushPromises() + // No update events from initial data + expect(events.length).toBe(0) + + // Update a user via sync + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated`, active: true }, + previousValue: { id: 1, name: `Alice`, active: true }, + } as any) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`update`) + expect(events[0]!.key).toBe(1) + expect(events[0]!.value.name).toBe(`Alice Updated`) + expect(events[0]!.previousValue?.name).toBe(`Alice`) + + await effect.dispose() + }) + }) + + describe(`filtered queries`, () => { + it(`should only fire for rows matching the where clause`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q.from({ user: users }).where(({ user }) => eq(user.active, true)), + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + // Only active users (Alice, Bob) — Charlie is inactive + expect(events.length).toBe(2) + expect(events.map((e) => e.value.name).sort()).toEqual([`Alice`, `Bob`]) + + await effect.dispose() + }) + + it(`should fire exit when a row stops matching the filter`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q.from({ user: users }).where(({ user }) => eq(user.active, true)), + on: `delta`, + handler: collectEvents(events), + }) + + await flushPromises() + events.length = 0 // Clear initial enter events + + // Update Alice to inactive — should exit the filtered result + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice`, active: false }, + previousValue: { id: 1, name: `Alice`, active: true }, + } as any) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`exit`) + expect(events[0]!.value.name).toBe(`Alice`) + + await effect.dispose() + }) + + it(`should fire enter when a row starts matching the filter`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q.from({ user: users }).where(({ user }) => eq(user.active, true)), + on: `delta`, + handler: collectEvents(events), + }) + + await flushPromises() + events.length = 0 // Clear initial events + + // Update Charlie to active — should enter the filtered result + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 3, name: `Charlie`, active: true }, + previousValue: { id: 3, name: `Charlie`, active: false }, + } as any) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Charlie`) + + await effect.dispose() + }) + }) + + describe(`on parameter`, () => { + it(`should support on: 'delta' for all event types`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + handler: collectEvents(events), + }) + + await flushPromises() + // Initial data produces enter events + expect(events.filter((e) => e.type === `enter`).length).toBe(3) + + // Update + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated`, active: true }, + previousValue: { id: 1, name: `Alice`, active: true }, + } as any) + users.utils.commit() + await flushPromises() + + expect(events.filter((e) => e.type === `update`).length).toBe(1) + + // Delete + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 2, name: `Bob`, active: true }, + }) + users.utils.commit() + await flushPromises() + + expect(events.filter((e) => e.type === `exit`).length).toBe(1) + + await effect.dispose() + }) + + it(`should support on as an array of delta types`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: [`enter`, `exit`], + handler: collectEvents(events), + }) + + await flushPromises() + expect(events.filter((e) => e.type === `enter`).length).toBe(3) + + // Update should NOT fire (not in the on array) + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated`, active: true }, + previousValue: { id: 1, name: `Alice`, active: true }, + } as any) + users.utils.commit() + await flushPromises() + + // Should still be 3 events (no update event) + expect(events.length).toBe(3) + + // Delete SHOULD fire + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 2, name: `Bob`, active: true }, + }) + users.utils.commit() + await flushPromises() + + expect(events.length).toBe(4) + expect(events[3]!.type).toBe(`exit`) + + await effect.dispose() + }) + }) + + describe(`batchHandler`, () => { + it(`should receive all events in a single batch per graph run`, async () => { + const users = createUsersCollection([]) + const batches: Array>> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + batchHandler: (events) => { + batches.push([...events]) + }, + }) + + await flushPromises() + + // Insert multiple users in one sync transaction + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + // Should receive one batch with 2 events + expect(batches.length).toBe(1) + expect(batches[0]!.length).toBe(2) + + await effect.dispose() + }) + }) + + describe(`skipInitial`, () => { + it(`should skip initial data when skipInitial is true`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + skipInitial: true, + handler: collectEvents(events), + }) + + await flushPromises() + // Initial 3 users should be skipped + expect(events.length).toBe(0) + + // New insert should fire + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Diana`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.value.name).toBe(`Diana`) + + await effect.dispose() + }) + + it(`should process initial data when skipInitial is false (default)`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + // All 3 initial users should fire enter events + expect(events.length).toBe(3) + + await effect.dispose() + }) + }) + + describe(`error handling`, () => { + it(`should route sync handler errors to onError`, async () => { + const users = createUsersCollection() + const errors: Array<{ error: Error; event: DeltaEvent }> = + [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: () => { + throw new Error(`handler error`) + }, + onError: (error, event) => { + errors.push({ error, event }) + }, + }) + + await flushPromises() + + // All 3 initial events should produce errors + expect(errors.length).toBe(3) + expect(errors[0]!.error.message).toBe(`handler error`) + + await effect.dispose() + }) + + it(`should route async handler errors to onError`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + const errors: Array<{ error: Error; event: DeltaEvent }> = + [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: () => { + return Promise.reject(new Error(`async error`)) + }, + onError: (error, event) => { + errors.push({ error, event }) + }, + }) + + await flushPromises() + + expect(errors.length).toBe(1) + expect(errors[0]!.error.message).toBe(`async error`) + + await effect.dispose() + }) + + it(`should log to console when no onError is provided`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + const consoleSpy = vi.spyOn(console, `error`).mockImplementation(() => {}) + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: () => { + throw new Error(`unhandled error`) + }, + }) + + await flushPromises() + + expect(consoleSpy).toHaveBeenCalled() + consoleSpy.mockRestore() + + await effect.dispose() + }) + }) + + describe(`disposal`, () => { + it(`should not fire events after disposal`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + skipInitial: true, + handler: collectEvents(events), + }) + + await flushPromises() + expect(events.length).toBe(0) + + await effect.dispose() + expect(effect.disposed).toBe(true) + + // Insert after disposal + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Diana`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + // Should not have received any events + expect(events.length).toBe(0) + }) + + it(`should await in-flight async handlers on dispose`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + let handlerCompleted = false + let resolveHandler: (() => void) | undefined + + const handlerPromise = new Promise((resolve) => { + resolveHandler = resolve + }) + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: async () => { + await handlerPromise + handlerCompleted = true + }, + }) + + await flushPromises() + + // Start disposing — should wait for the handler + const disposePromise = effect.dispose() + + // Handler hasn't completed yet + expect(handlerCompleted).toBe(false) + + // Resolve the handler + resolveHandler!() + await disposePromise + + expect(handlerCompleted).toBe(true) + }) + + it(`should abort the signal on dispose`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + let capturedSignal: AbortSignal | undefined + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (_event, ctx) => { + capturedSignal = ctx.signal + }, + }) + + await flushPromises() + expect(capturedSignal).toBeDefined() + expect(capturedSignal!.aborted).toBe(false) + + await effect.dispose() + expect(capturedSignal!.aborted).toBe(true) + }) + + it(`should be idempotent on multiple dispose calls`, async () => { + const users = createUsersCollection() + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: () => {}, + }) + + await effect.dispose() + await effect.dispose() // Should not throw + expect(effect.disposed).toBe(true) + }) + }) + + describe(`auto-generated IDs`, () => { + it(`should generate incrementing IDs`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + const capturedIds: Array = [] + + const effect1 = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (_event, ctx) => { + capturedIds.push(ctx.effectId) + }, + }) + + await flushPromises() + + const effect2 = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (_event, ctx) => { + capturedIds.push(ctx.effectId) + }, + }) + + await flushPromises() + + // Both should have live-query-effect-{N} format + expect(capturedIds[0]).toMatch(/^live-query-effect-\d+$/) + expect(capturedIds[1]).toMatch(/^live-query-effect-\d+$/) + // And they should be different + expect(capturedIds[0]).not.toBe(capturedIds[1]) + + await effect1.dispose() + await effect2.dispose() + }) + + it(`should use custom ID when provided`, async () => { + const users = createUsersCollection([sampleUsers[0]!]) + let capturedId: string | undefined + + const effect = createEffect({ + id: `my-custom-effect`, + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (_event, ctx) => { + capturedId = ctx.effectId + }, + }) + + await flushPromises() + + expect(capturedId).toBe(`my-custom-effect`) + + await effect.dispose() + }) + }) + + describe(`QueryBuilder instance input`, () => { + it(`should accept a QueryBuilder instance`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const queryBuilder = new Query() + .from({ user: users }) + .where(({ user }) => eq(user.active, true)) + + const effect = createEffect({ + query: queryBuilder, + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + expect(events.length).toBe(2) // Only active users + expect(events.map((e) => e.value.name).sort()).toEqual([`Alice`, `Bob`]) + + await effect.dispose() + }) + }) + + describe(`join queries`, () => { + it(`should work with joined collections`, async () => { + const users = createUsersCollection() + const issues = createIssuesCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ) + .select(({ issue, user }) => ({ + issueId: issue.id, + title: issue.title, + userName: user!.name, + })), + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + expect(events.length).toBe(2) + const titles = events.map((e) => e.value.title).sort() + expect(titles).toEqual([`Bug report`, `Feature request`]) + + // Verify joined data is present + const bugReport = events.find((e) => e.value.title === `Bug report`) + expect(bugReport!.value.userName).toBe(`Alice`) + + await effect.dispose() + }) + }) + + describe(`row transitions`, () => { + it(`should fire enter then exit when a row is inserted and deleted`, async () => { + const users = createUsersCollection([]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + handler: collectEvents(events), + }) + + await flushPromises() + + // Insert + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `Eve`, active: true }, + }) + users.utils.commit() + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Eve`) + + // Delete + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 10, name: `Eve`, active: true }, + }) + users.utils.commit() + await flushPromises() + + expect(events.length).toBe(2) + expect(events[1]!.type).toBe(`exit`) + expect(events[1]!.value.name).toBe(`Eve`) + + await effect.dispose() + }) + }) + + describe(`select queries`, () => { + it(`should work with select to project specific fields`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.active, true)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + })), + on: `enter`, + handler: collectEvents(events), + }) + + await flushPromises() + + expect(events.length).toBe(2) + // Should only have projected fields + const alice = events.find((e) => e.value.name === `Alice`) + expect(alice).toBeDefined() + expect(alice!.value.id).toBe(1) + // The projected result should not have the `active` field + expect(alice!.value.active).toBeUndefined() + + await effect.dispose() + }) + }) + + describe(`transaction coalescing`, () => { + it(`should coalesce multiple changes within a transaction into a single batch`, async () => { + const users = createUsersCollection([]) + const batches: Array>> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + batchHandler: (events) => { + batches.push([...events]) + }, + }) + + await flushPromises() + + // Use a transaction to batch multiple inserts. + // The scheduler defers graph runs until the transaction flushes. + const tx = createTransaction({ + mutationFn: async () => {}, + }) + tx.mutate(() => { + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `Eve`, active: true }, + }) + users.utils.commit() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 11, name: `Frank`, active: true }, + }) + users.utils.commit() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 12, name: `Grace`, active: true }, + }) + users.utils.commit() + }) + + await flushPromises() + + // All 3 inserts should be in a single batch (coalesced by the scheduler) + expect(batches.length).toBe(1) + expect(batches[0]!.length).toBe(3) + expect(batches[0]!.every((e) => e.type === `enter`)).toBe(true) + expect(batches[0]!.map((e) => e.value.name).sort()).toEqual([ + `Eve`, + `Frank`, + `Grace`, + ]) + + await effect.dispose() + }) + + it(`should run graph immediately when not in a transaction`, async () => { + const users = createUsersCollection([]) + const batches: Array>> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + batchHandler: (events) => { + batches.push([...events]) + }, + }) + + await flushPromises() + + // Without a transaction, each change runs the graph immediately + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `Eve`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 11, name: `Frank`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + // Each insert should be a separate batch (no coalescing) + expect(batches.length).toBe(2) + expect(batches[0]!.length).toBe(1) + expect(batches[1]!.length).toBe(1) + + await effect.dispose() + }) + }) + + describe(`truncate handling`, () => { + it(`should emit exit events for all items then enter events for re-inserted items after truncate`, async () => { + const options = mockSyncCollectionOptionsNoInitialState({ + id: `test-truncate-users`, + getKey: (user) => user.id, + }) + const users = createCollection(options) + users.startSyncImmediate() + + // Manually insert initial data and mark ready + options.utils.begin() + options.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + options.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + }) + options.utils.commit() + options.utils.markReady() + + await flushPromises() + + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // skipInitial — no events yet + expect(events.length).toBe(0) + + // Truncate: delete everything and re-insert a subset + options.utils.begin() + options.utils.truncate() + options.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice-v2`, active: true }, + }) + options.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: false }, + }) + options.utils.commit() + + await flushPromises() + + // Bob was deleted (exit), Alice was updated (exit old + enter new), + // Charlie was inserted (enter) + const exits = events.filter((e) => e.type === `exit`) + const enters = events.filter((e) => e.type === `enter`) + const updates = events.filter((e) => e.type === `update`) + + // Bob should have exited + expect( + exits.some((e) => e.value.name === `Bob`) || + exits.some((e) => e.value.name === `Bob`), + ).toBe(true) + + // Charlie should have entered + expect(enters.some((e) => e.value.name === `Charlie`)).toBe(true) + + // Alice should be an update (same key, new value) or exit+enter + const aliceUpdate = updates.find( + (e) => e.key === 1 || e.value.name === `Alice-v2`, + ) + const aliceEnter = enters.find( + (e) => e.key === 1 || e.value.name === `Alice-v2`, + ) + expect(aliceUpdate ?? aliceEnter).toBeDefined() + + await effect.dispose() + }) + + it(`should handle truncate that clears all items`, async () => { + const options = mockSyncCollectionOptionsNoInitialState({ + id: `test-truncate-clear`, + getKey: (user) => user.id, + }) + const users = createCollection(options) + users.startSyncImmediate() + + // Insert initial data and mark ready + options.utils.begin() + options.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + options.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + }) + options.utils.commit() + options.utils.markReady() + + await flushPromises() + + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Truncate without re-inserting anything + options.utils.begin() + options.utils.truncate() + options.utils.commit() + + await flushPromises() + + // All items should have exit events + expect(events.length).toBe(2) + expect(events.every((e) => e.type === `exit`)).toBe(true) + const exitNames = events.map((e) => e.value.name).sort() + expect(exitNames).toEqual([`Alice`, `Bob`]) + + await effect.dispose() + }) + + it(`should accept new data after truncate`, async () => { + const options = mockSyncCollectionOptionsNoInitialState({ + id: `test-truncate-then-insert`, + getKey: (user) => user.id, + }) + const users = createCollection(options) + users.startSyncImmediate() + + options.utils.begin() + options.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + options.utils.commit() + options.utils.markReady() + + await flushPromises() + + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Truncate everything + options.utils.begin() + options.utils.truncate() + options.utils.commit() + + await flushPromises() + events.length = 0 + + // Now insert fresh data — effect should see enter events + options.utils.begin() + options.utils.write({ + type: `insert`, + value: { id: 10, name: `NewUser`, active: true }, + }) + options.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`NewUser`) + + await effect.dispose() + }) + }) + + describe(`orderBy with limit`, () => { + it(`should only emit enter events for items within the top-K window`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `Dave`, active: true }, + { id: 5, name: `Eve`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `delta`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Only the top 3 alphabetically (Alice, Bob, Charlie) should enter + expect(events.length).toBe(3) + expect(events.every((e) => e.type === `enter`)).toBe(true) + const names = events.map((e) => e.value.name).sort() + expect(names).toEqual([`Alice`, `Bob`, `Charlie`]) + + await effect.dispose() + }) + + it(`should emit enter and exit events when an insert displaces an item from the window`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Bob`, active: true }, + { id: 2, name: `Charlie`, active: true }, + { id: 3, name: `Dave`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Insert 'Alice' — alphabetically first, pushes 'Dave' out of the window + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + // Alice enters the window, Dave exits + const enters = events.filter((e) => e.type === `enter`) + const exits = events.filter((e) => e.type === `exit`) + + expect(enters.length).toBe(1) + expect(enters[0]!.value.name).toBe(`Alice`) + + expect(exits.length).toBe(1) + expect(exits[0]!.value.name).toBe(`Dave`) + + await effect.dispose() + }) + + it(`should emit exit event when a delete opens a window slot and new item enters`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `Dave`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Delete Alice — Bob, Charlie remain, Dave should enter + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + const exits = events.filter((e) => e.type === `exit`) + const enters = events.filter((e) => e.type === `enter`) + + expect(exits.length).toBe(1) + expect(exits[0]!.value.name).toBe(`Alice`) + + expect(enters.length).toBe(1) + expect(enters[0]!.value.name).toBe(`Dave`) + + await effect.dispose() + }) + + it(`should handle desc ordering with limit`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `Dave`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `desc`) + .limit(2), + on: `delta`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Top 2 by desc: Dave, Charlie + expect(events.length).toBe(2) + expect(events.every((e) => e.type === `enter`)).toBe(true) + const names = events.map((e) => e.value.name).sort() + expect(names).toEqual([`Charlie`, `Dave`]) + + await effect.dispose() + }) + + it(`should emit update event when an item changes but stays in the window`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Update Alice's active flag — she stays in the window + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice`, active: false }, + previousValue: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`update`) + expect(events[0]!.value.active).toBe(false) + expect(events[0]!.previousValue?.active).toBe(true) + + await effect.dispose() + }) + }) + + describe(`orderBy with limit and lazy loading`, () => { + // These tests verify that when a where clause filters items out of the + // initial orderBy window, the loadMoreIfNeeded mechanism requests more + // data from the source collection to fill the window. + + function createUsersCollectionWithIndex(initialData: Array) { + return createCollection( + mockSyncCollectionOptions({ + id: `test-users-indexed`, + getKey: (user) => user.id, + initialData, + autoIndex: `eager`, + }), + ) + } + + it(`should load more data when pipeline filters items from the orderBy window`, async () => { + // 6 users, ordered by name asc, limit 3 + // But we filter on active=true, and Bob/Dave are inactive + // Initial load gets top 3 by name: Alice, Bob, Charlie + // Bob is filtered → topK has only 2 items, needs 1 more + // loadMoreIfNeeded loads Dave → filtered → needs 1 more + // Loads Eve → active → topK has Alice, Charlie, Eve + const users = createUsersCollectionWithIndex([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: false }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `Dave`, active: false }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `enter`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Should get exactly 3 active users: Alice, Charlie, Eve + expect(events.length).toBe(3) + const names = events.map((e) => e.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`, `Eve`]) + + await effect.dispose() + }) + + it(`should pass orderBy/limit hints for unordered subscriptions`, async () => { + // This test verifies the unordered path also gets orderBy/limit hints. + // With a simple orderBy + limit, the effect should still show correct results. + const users = createUsersCollection([ + { id: 1, name: `Charlie`, active: true }, + { id: 2, name: `Alice`, active: true }, + { id: 3, name: `Bob`, active: true }, + { id: 4, name: `Dave`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .orderBy(({ user }) => user.name, `asc`) + .limit(2), + on: `enter`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Top 2 alphabetically: Alice, Bob + expect(events.length).toBe(2) + const names = events.map((e) => e.value.name).sort() + expect(names).toEqual([`Alice`, `Bob`]) + + await effect.dispose() + }) + + it(`should load more data when an exit reduces the window below the limit`, async () => { + const users = createUsersCollectionWithIndex([ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `Dave`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(3), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + expect(events.length).toBe(0) + + // Deactivate Alice — she exits the window, Dave should enter + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice`, active: false }, + previousValue: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + const exits = events.filter((e) => e.type === `exit`) + const enters = events.filter((e) => e.type === `enter`) + + expect(exits.length).toBe(1) + expect(exits[0]!.value.name).toBe(`Alice`) + + expect(enters.length).toBe(1) + expect(enters[0]!.value.name).toBe(`Dave`) + + await effect.dispose() + }) + }) + + describe(`source error handling`, () => { + it(`should auto-dispose when source collection is cleaned up`, async () => { + const users = createUsersCollection() + const events: Array> = [] + const consoleErrorSpy = vi + .spyOn(console, `error`) + .mockImplementation(() => {}) + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Should receive initial events + expect(events.length).toBe(3) + expect(effect.disposed).toBe(false) + + // Clean up the source collection — should auto-dispose the effect + await users.cleanup() + await flushPromises() + + expect(effect.disposed).toBe(true) + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.stringContaining(`cleaned up`), + ) + + consoleErrorSpy.mockRestore() + }) + + it(`should not fire events after source collection is cleaned up`, async () => { + const users = createUsersCollection() + const events: Array> = [] + vi.spyOn(console, `error`).mockImplementation(() => {}) + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + const countAfterInit = events.length + + // Clean up source — effect auto-disposes + await users.cleanup() + await flushPromises() + + expect(effect.disposed).toBe(true) + // No new events should have been received from the cleanup itself + // (the disposed guard prevents any further processing) + expect(events.length).toBe(countAfterInit) + + vi.restoreAllMocks() + }) + + it(`should call onSourceError callback instead of console.error when provided`, async () => { + const users = createUsersCollection() + const sourceErrors: Array = [] + const consoleErrorSpy = vi + .spyOn(console, `error`) + .mockImplementation(() => {}) + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `delta`, + handler: () => {}, + onSourceError: (error) => { + sourceErrors.push(error) + }, + }) + + await flushPromises() + + await users.cleanup() + await flushPromises() + + expect(effect.disposed).toBe(true) + expect(sourceErrors.length).toBe(1) + expect(sourceErrors[0]!.message).toContain(`cleaned up`) + // Should NOT have called console.error + expect(consoleErrorSpy).not.toHaveBeenCalled() + + consoleErrorSpy.mockRestore() + }) + }) + + describe(`correctness edge cases`, () => { + it(`batchHandler rejecting should trigger onError and not produce unhandled rejection`, async () => { + const users = createUsersCollection() + const errors: Array = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + batchHandler: () => Promise.reject(new Error(`batch boom`)), + onError: (error) => { + errors.push(error) + }, + }) + + await flushPromises() + + expect(errors.length).toBe(1) + expect(errors[0]!.message).toBe(`batch boom`) + + await effect.dispose() + }) + + it(`disposing inside handler should not throw and should stop further events`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effectHandle = createEffect({ + query: (q) => q.from({ user: users }), + on: `enter`, + skipInitial: true, + handler: (event) => { + events.push(event) + // Dispose inside the handler — should not crash + effectHandle.dispose() + }, + }) + + await flushPromises() + expect(effectHandle.disposed).toBe(false) + + // Insert a row — handler will fire and call dispose() mid-graph-run + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `NewUser`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(effectHandle.disposed).toBe(true) + // Should have received the event that triggered disposal + expect(events.length).toBe(1) + expect(events[0]!.value.name).toBe(`NewUser`) + }) + + it(`previousValue should reflect the value before the batch for multi-step updates`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `update`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + // Perform multi-step update in a single transaction: Alice → Bob → Charlie + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 1, name: `Bob`, active: true }, + previousValue: { id: 1, name: `Alice`, active: true }, + }) + users.utils.write({ + type: `update`, + value: { id: 1, name: `Charlie`, active: true }, + previousValue: { id: 1, name: `Bob`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + // previousValue should be the original value (Alice), not the intermediate (Bob) + expect(events[0]!.previousValue?.name).toBe(`Alice`) + // value should be the final value (Charlie) + expect(events[0]!.value.name).toBe(`Charlie`) + + await effect.dispose() + }) + + it(`exit events should not have previousValue`, async () => { + const users = createUsersCollection([ + { id: 1, name: `Alice`, active: true }, + ]) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + on: `exit`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }) + + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + + await flushPromises() + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`exit`) + expect(events[0]!.value.name).toBe(`Alice`) + // previousValue should be undefined for exit events + expect(events[0]!.previousValue).toBeUndefined() + + await effect.dispose() + }) + }) +}) diff --git a/packages/react-db/src/index.ts b/packages/react-db/src/index.ts index b352a0ba0..96db7e279 100644 --- a/packages/react-db/src/index.ts +++ b/packages/react-db/src/index.ts @@ -3,6 +3,7 @@ export * from './useLiveQuery' export * from './useLiveSuspenseQuery' export * from './usePacedMutations' export * from './useLiveInfiniteQuery' +export * from './useLiveQueryEffect' // Re-export everything from @tanstack/db export * from '@tanstack/db' diff --git a/packages/react-db/src/useLiveQueryEffect.ts b/packages/react-db/src/useLiveQueryEffect.ts new file mode 100644 index 000000000..040164139 --- /dev/null +++ b/packages/react-db/src/useLiveQueryEffect.ts @@ -0,0 +1,45 @@ +import { useEffect, useRef } from 'react' +import { createEffect } from '@tanstack/db' +import type { Effect, EffectConfig } from '@tanstack/db' + +/** + * React hook for creating a reactive effect that fires handlers when rows + * enter, exit, or update within a query result. + * + * The effect is created on mount and disposed on unmount. If `deps` change, + * the previous effect is disposed and a new one is created. + * + * @example + * ```tsx + * function ChatComponent() { + * useLiveQueryEffect( + * { + * query: (q) => q.from({ msg: messages }).where(({ msg }) => eq(msg.role, 'user')), + * on: 'enter', + * skipInitial: true, + * handler: async (event) => { + * await generateResponse(event.value) + * }, + * }, + * [] + * ) + * + * return
...
+ * } + * ``` + */ +export function useLiveQueryEffect< + TRow extends object = Record, + TKey extends string | number = string | number, +>(config: EffectConfig, deps: React.DependencyList = []): void { + const effectRef = useRef(null) + + useEffect(() => { + effectRef.current = createEffect(config) + return () => { + // Fire-and-forget disposal; AbortSignal cancels in-flight work + effectRef.current?.dispose() + effectRef.current = null + } + }, deps) +} diff --git a/packages/react-db/tests/useLiveQueryEffect.test.tsx b/packages/react-db/tests/useLiveQueryEffect.test.tsx new file mode 100644 index 000000000..102e10b68 --- /dev/null +++ b/packages/react-db/tests/useLiveQueryEffect.test.tsx @@ -0,0 +1,173 @@ +import { describe, expect, it } from 'vitest' +import { act, renderHook } from '@testing-library/react' +import { createCollection, eq } from '@tanstack/db' +import { useLiveQueryEffect } from '../src/useLiveQueryEffect' +import { mockSyncCollectionOptions } from '../../db/tests/utils' +import type { DeltaEvent } from '@tanstack/db' + +type User = { + id: number + name: string + active: boolean +} + +const initialUsers: Array = [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, +] + +const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0)) + +function createUsersCollection(initialData = initialUsers) { + return createCollection( + mockSyncCollectionOptions({ + id: `test-users-hook`, + getKey: (user) => user.id, + initialData, + }), + ) +} + +describe(`useLiveQueryEffect`, () => { + it(`should create effect on mount and dispose on unmount`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const { unmount } = renderHook(() => { + useLiveQueryEffect( + { + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (event) => { + events.push(event) + }, + }, + [], + ) + }) + + await act(async () => { + await flushPromises() + }) + + // Should have received enter events for initial data + expect(events.length).toBe(2) + + const countBefore = events.length + + // Unmount — should dispose the effect + unmount() + + // Insert after unmount + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + }) + users.utils.commit() + + await act(async () => { + await flushPromises() + }) + + // Should not have received new events after unmount + expect(events.length).toBe(countBefore) + }) + + it(`should recreate effect when deps change`, async () => { + const users = createUsersCollection() + const effectIds: Array = [] + + const { rerender } = renderHook( + ({ dep }: { dep: number }) => { + useLiveQueryEffect( + { + query: (q) => q.from({ user: users }), + on: `enter`, + handler: (_event, ctx) => { + if (!effectIds.includes(ctx.effectId)) { + effectIds.push(ctx.effectId) + } + }, + }, + [dep], + ) + }, + { initialProps: { dep: 1 } }, + ) + + await act(async () => { + await flushPromises() + }) + + expect(effectIds.length).toBe(1) + const firstId = effectIds[0] + + // Change deps — should dispose old effect and create new one + rerender({ dep: 2 }) + + await act(async () => { + await flushPromises() + }) + + expect(effectIds.length).toBe(2) + expect(effectIds[1]).not.toBe(firstId) + }) + + it(`should receive events from source collection changes`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + renderHook(() => { + useLiveQueryEffect( + { + query: (q) => + q.from({ user: users }).where(({ user }) => eq(user.active, true)), + on: `delta`, + skipInitial: true, + handler: (event) => { + events.push(event) + }, + }, + [], + ) + }) + + await act(async () => { + await flushPromises() + }) + + // skipInitial — no initial events + expect(events.length).toBe(0) + + // Insert a new active user + await act(async () => { + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + }) + users.utils.commit() + await flushPromises() + }) + + expect(events.length).toBe(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Charlie`) + + // Delete a user + await act(async () => { + users.utils.begin() + users.utils.write({ + type: `delete`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + await flushPromises() + }) + + expect(events.length).toBe(2) + expect(events[1]!.type).toBe(`exit`) + expect(events[1]!.value.name).toBe(`Alice`) + }) +})