Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/create-effect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@tanstack/db': minor
'@tanstack/react-db': minor
---

Add `createEffect` API for reactive delta-driven effects and `useLiveQueryEffect` React hook.

`createEffect` attaches handlers to a live query's delta stream — firing callbacks when rows enter, exit, or update within a query result — without materialising the full result set. Supports per-row and batch handlers, `skipInitial`, `orderBy` + `limit` (top-K window), joins, lazy loading, transaction coalescing, async disposal with `AbortSignal`, and `onSourceError` / `onError` callbacks.

`useLiveQueryEffect` is the React hook wrapper that manages the effect lifecycle (create on mount, dispose on unmount, recreate on dependency change).
365 changes: 365 additions & 0 deletions docs/guides/live-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The result types are automatically inferred from your query structure, providing
- [Distinct](#distinct)
- [Order By, Limit, and Offset](#order-by-limit-and-offset)
- [Composable Queries](#composable-queries)
- [Reactive Effects (createEffect)](#reactive-effects-createeffect)
- [Expression Functions Reference](#expression-functions-reference)
- [Functional Variants](#functional-variants)

Expand Down Expand Up @@ -1687,6 +1688,370 @@ const users = createLiveQueryCollection((q) =>

This approach makes your query logic more modular, testable, and reusable across your application.

## Reactive Effects (createEffect)

While live query collections materialise query results into a collection you can subscribe to and iterate over, **reactive effects** let you respond to query result *changes* without materialising the full result set. Effects fire callbacks when rows enter, exit, or update within a query result.

This is useful for triggering side effects — sending notifications, syncing to external systems, generating AI responses, updating counters — whenever your data changes.

### When to Use Effects vs Live Query Collections

| Use case | Approach |
|----------|----------|
| Display query results in UI | Live query collection + `useLiveQuery` |
| React to changes (side effects) | `createEffect` / `useLiveQueryEffect` |
| Track new items entering a result set | `createEffect` with `on: 'enter'` |
| Monitor items leaving a result set | `createEffect` with `on: 'exit'` |
| Respond to updates within a result set | `createEffect` with `on: 'update'` |

### Basic Usage

```ts
import { createEffect, eq } from '@tanstack/db'

const effect = createEffect({
query: (q) =>
q
.from({ msg: messagesCollection })
.where(({ msg }) => eq(msg.role, 'user')),
on: 'enter',
handler: async (event) => {
console.log('New user message:', event.value)
await generateResponse(event.value)
},
})

// Later: stop the effect
await effect.dispose()
```

### Configuration

`createEffect` accepts an `EffectConfig` object:

```ts
const effect = createEffect({
id: 'my-effect', // Optional: auto-generated if not provided
query: (q) => q.from(...), // Query to watch
on: 'delta', // Which delta types to handle
handler: (event, ctx) => { ... }, // Per-row handler
batchHandler: (events, ctx) => { ... }, // Per-batch handler
onError: (error, event) => { ... }, // Handler error callback
onSourceError: (error) => { ... }, // Source collection error callback
skipInitial: false, // Skip deltas during initial load
})
```

| Option | Type | Description |
|--------|------|-------------|
| `id` | `string` (optional) | Identifier for debugging/tracing. Auto-generated as `live-query-effect-{n}` if not provided. |
| `query` | `QueryBuilder` or function | The query to watch. Accepts the same builder function or `QueryBuilder` instance as live query collections. |
| `on` | `DeltaType \| DeltaType[] \| 'delta'` | Which delta types to fire handlers for. Use `'delta'` for all types, or specify one or more of `'enter'`, `'exit'`, `'update'`. |
| `handler` | `(event, ctx) => void \| Promise<void>` (optional) | Called once for each matching delta event. |
| `batchHandler` | `(events, ctx) => void \| Promise<void>` (optional) | Called once per batch with all matching delta events. |
| `onError` | `(error, event) => void` (optional) | Called when `handler` or `batchHandler` throws or rejects. |
| `onSourceError` | `(error) => void` (optional) | Called when a source collection enters an error or cleaned-up state. The effect is automatically disposed after this fires. If not provided, the error is logged to `console.error`. |
| `skipInitial` | `boolean` (optional) | When `true`, deltas from the initial data load are suppressed. Only subsequent changes fire handlers. Defaults to `false`. |

### Delta Events

Each delta event describes a single row change within the query result:

```ts
interface DeltaEvent<TRow, TKey> {
type: 'enter' | 'exit' | 'update'
key: TKey
value: TRow
previousValue?: TRow // Only present for 'update' events
}
```

| Event type | Meaning | `value` | `previousValue` |
|------------|---------|---------|------------------|
| `enter` | Row entered the query result | The new row | — |
| `exit` | Row left the query result | The exiting row | — |
| `update` | Row changed but stayed in the result | The new row | The row before the change |

### The `on` Parameter

Control which delta types your handlers receive:

```ts
// Only new rows entering the result
createEffect({ on: 'enter', ... })

// Only rows leaving the result
createEffect({ on: 'exit', ... })

// Only rows that changed but stayed in the result
createEffect({ on: 'update', ... })

// Multiple specific types
createEffect({ on: ['enter', 'exit'], ... })

// All delta types
createEffect({ on: 'delta', ... })
```

### Per-Row vs Batch Handlers

You can provide a `handler` (called once per event), a `batchHandler` (called once per batch with all events), or both:

```ts
createEffect({
query: (q) => q.from({ user: usersCollection }),
on: 'delta',

// Called once for each delta event
handler: (event, ctx) => {
console.log(`${event.type}: ${event.key}`)
},

// Called once per batch with all events
batchHandler: (events, ctx) => {
console.log(`Batch of ${events.length} events`)
},
})
```

Both handlers receive an `EffectContext`:

```ts
interface EffectContext {
effectId: string // The effect's ID
signal: AbortSignal // Aborted when effect.dispose() is called
}
```

The `signal` is useful for cancelling in-flight async work when the effect is disposed:

```ts
createEffect({
query: (q) => q.from({ task: tasksCollection }),
on: 'enter',
handler: async (event, ctx) => {
const result = await fetch('/api/process', {
method: 'POST',
body: JSON.stringify(event.value),
signal: ctx.signal, // Cancelled on dispose
})
// ...
},
})
```

### Skipping Initial Data

By default, effects process all data including the initial load. Set `skipInitial: true` to only respond to changes that happen after the initial sync:

```ts
// Only react to NEW messages, not existing ones
const effect = createEffect({
query: (q) =>
q.from({ msg: messagesCollection })
.where(({ msg }) => eq(msg.role, 'user')),
on: 'enter',
skipInitial: true,
handler: async (event) => {
await sendNotification(event.value)
},
})
```

### Error Handling

Errors thrown by `handler` or `batchHandler` (sync or async) are caught and routed to `onError`. If no `onError` is provided, they are logged to `console.error`:

```ts
createEffect({
query: (q) => q.from({ order: ordersCollection }),
on: 'enter',
handler: async (event) => {
await processOrder(event.value)
},
onError: (error, event) => {
console.error(`Failed to process order ${event.key}:`, error)
reportToErrorTracker(error)
},
})
```

If a source collection enters an error or cleaned-up state, the effect automatically disposes itself. Use `onSourceError` to handle this:

```ts
createEffect({
query: (q) => q.from({ data: dataCollection }),
on: 'delta',
handler: (event) => { ... },
onSourceError: (error) => {
console.warn('Data source failed, effect disposed:', error.message)
},
})
```

### Disposal

`createEffect` returns an `Effect` handle with a `dispose()` method:

```ts
const effect = createEffect({ ... })

// Check if disposed
console.log(effect.disposed) // false

// Dispose: unsubscribes from sources, aborts the signal,
// and waits for in-flight async handlers to settle
await effect.dispose()

console.log(effect.disposed) // true
```

`dispose()` is idempotent — calling it multiple times is safe. It returns a promise that resolves when all in-flight async handlers have settled (via `Promise.allSettled`).

### Query Features

Effects support the full query system — everything you can do with live query collections works with effects:

```ts
// Joins
createEffect({
query: (q) =>
q
.from({ user: usersCollection })
.join({ post: postsCollection }, ({ user, post }) =>
eq(user.id, post.userId)
)
.select(({ user, post }) => ({
userName: user.name,
postTitle: post.title,
})),
on: 'enter',
handler: (event) => {
console.log(`${event.value.userName} published "${event.value.postTitle}"`)
},
})

// Filters
createEffect({
query: (q) =>
q
.from({ user: usersCollection })
.where(({ user }) => eq(user.role, 'admin')),
on: 'enter',
handler: (event) => {
console.log(`New admin: ${event.value.name}`)
},
})

// OrderBy + Limit (top-K window)
createEffect({
query: (q) =>
q
.from({ score: scoresCollection })
.orderBy(({ score }) => score.points, 'desc')
.limit(10),
on: 'delta',
handler: (event) => {
// Fires when items enter or exit the top 10
console.log(`${event.type}: ${event.value.name} (${event.value.points} pts)`)
},
})
```

When using `orderBy` with `limit`, effects track a top-K window. You receive `enter` events when items enter the window and `exit` events when they're displaced.

### Transaction Coalescing

When multiple changes occur within a single transaction, effects coalesce them into a single batch. This means your handlers are called once with all the changes from that transaction, not once per individual write:

```ts
createEffect({
query: (q) => q.from({ item: itemsCollection }),
on: 'enter',
batchHandler: (events) => {
// If 3 items are inserted in one transaction,
// this fires once with all 3 events
console.log(`${events.length} items added`)
},
})
```

### Using with React

The `useLiveQueryEffect` hook manages the effect lifecycle automatically — creating on mount, disposing on unmount, and recreating when dependencies change:

```tsx
import { useLiveQueryEffect } from '@tanstack/react-db'
import { eq } from '@tanstack/db'

function ChatComponent({ channelId }: { channelId: string }) {
useLiveQueryEffect(
{
query: (q) =>
q
.from({ msg: messagesCollection })
.where(({ msg }) => eq(msg.channelId, channelId)),
on: 'enter',
skipInitial: true,
handler: async (event) => {
await playNotificationSound()
},
},
[channelId] // Recreate effect when channelId changes
)

return <div>...</div>
}
```

The second argument is a dependency array (like `useEffect`). When dependencies change, the old effect is disposed and a new one is created with the updated config.

### Complete Example

Here's a more complete example showing an effect that monitors order status changes and sends notifications:

```ts
import { createEffect, eq } from '@tanstack/db'

const orderEffect = createEffect({
id: 'order-status-monitor',
query: (q) =>
q
.from({ order: ordersCollection })
.join({ customer: customersCollection }, ({ order, customer }) =>
eq(order.customerId, customer.id)
)
.where(({ order }) => eq(order.status, 'shipped'))
.select(({ order, customer }) => ({
orderId: order.id,
customerEmail: customer.email,
trackingNumber: order.trackingNumber,
})),
on: 'enter',
skipInitial: true,

handler: async (event, ctx) => {
await sendShipmentEmail({
to: event.value.customerEmail,
orderId: event.value.orderId,
tracking: event.value.trackingNumber,
signal: ctx.signal,
})
},

onError: (error, event) => {
console.error(`Failed to notify for order ${event.key}:`, error)
},

onSourceError: (error) => {
alertOpsTeam('Order monitoring effect failed', error)
},
})

// On application shutdown
await orderEffect.dispose()
```

## Expression Functions Reference

The query system provides a comprehensive set of functions for filtering, transforming, and aggregating data.
Expand Down
Loading
Loading