The edge-native data lakehouse for Cloudflare.
sqlake bridges the gap between real-time transactional data in Durable Objects and analytics-ready data in a Parquet-based data lake. Define your schema once with Drizzle, and sqlake handles the rest: SQLite migrations, CDC streaming, Parquet files, Iceberg catalogs, and unified querying.
import { SQLake, sql, text, integer } from 'sqlake'
// Define your schema once
const db = SQLake({
users: {
id: text('id').primaryKey(),
email: text('email').notNull(),
plan: text('plan'),
},
shard: { users: 'id' },
})
// Query single user (strong consistency - routes to Durable Object)
const user = await sql`SELECT * FROM users WHERE id = ${userId}`.first()
// Query ALL users (eventual consistency - routes to DuckDB on R2)
const stats = await sql.analytics`
SELECT plan, COUNT(*) as count
FROM users
GROUP BY plan
`
// Time travel query (query historical data)
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000)
const oldData = await sql.asOf(yesterday)`SELECT * FROM users`Cloudflare Durable Objects give you globally distributed SQLite databases with zero configuration. But then you hit the wall:
- Cross-shard queries: Each DO is isolated - you cannot query across all users
- Analytics: Building aggregations requires custom data pipelines
- Historical data: No built-in time travel or audit trails
- Schema evolution: Coordinating migrations across thousands of DOs is complex
sqlake automatically streams every write from your Durable Objects to a unified data lake on R2:
- Unified schema: Define once with Drizzle, works everywhere
- Automatic CDC: SQLite triggers capture every change
- Parquet output: Columnar format optimized for analytics
- Iceberg catalog: Time travel, schema evolution, snapshot isolation
- Query routing: Automatic routing to DO (strong) or DuckDB (analytics)
| Feature | Benefit |
|---|---|
| Edge-native | Runs entirely on Cloudflare's edge infrastructure |
| SQLite + Iceberg | Best of OLTP and OLAP in one system |
| Real-time CDC | Changes stream to data lake in near real-time |
| Time travel | Query data at any point in history |
| Zero infrastructure | No external databases or data warehouses needed |
+------------------------------------------------------------------+
| Your Application |
+------------------------------------------------------------------+
| |
| sql`SELECT * FROM users WHERE id = ${id}` |
| | |
| v |
| +---------------+ |
| | Query Router | |
| +-------+-------+ |
| Strong | | Eventual |
| v v |
| +-------------+ +----------------+ |
| | Durable | | DuckDB/Parquet | |
| | Object | | on R2 | |
| | (SQLite) | | | |
| +------+------+ +----------------+ |
| | ^ |
| | CDC Triggers | |
| v | |
| +---------------+ | |
| | CDC Buffer +---------+ |
| | (Parquet) | Flush |
| +---------------+ |
| |
+------------------------------------------------------------------+
-
Durable Objects (SQLakeDO)
- Per-user/per-tenant SQLite databases
- Strong consistency for transactional workloads
- CDC triggers automatically installed
-
CDC (Change Data Capture)
- SQLite triggers capture INSERT/UPDATE/DELETE
- Events buffered in
_cdc_buffertable - Background flush to R2 as Parquet files
-
R2 Storage
- Parquet files with Hive-style partitioning (
_shard=xxx/) - Iceberg metadata for table management
- Zone maps for efficient file pruning
- Parquet files with Hive-style partitioning (
-
Iceberg Tables
- Apache Iceberg v2 compatible metadata
- Snapshot-based time travel
- Schema evolution support
- Manifest files with column statistics
-
DuckDB Analytics
- WASM or native DuckDB for query execution
- Reads Parquet files directly from R2
- Predicate pushdown using zone maps
npm install sqlake drizzle-orm// schema.ts
import { SQLake, text, integer, real } from 'sqlake'
export const db = SQLake({
users: {
id: text('id').primaryKey(),
email: text('email').notNull(),
name: text('name'),
plan: text('plan'),
createdAt: integer('created_at', { mode: 'timestamp' }),
},
orders: {
id: text('id').primaryKey(),
userId: text('user_id').notNull(),
amount: real('amount').notNull(),
status: text('status'),
},
// Configure shard keys
shard: {
users: 'id',
orders: 'userId',
},
// Storage mode for Parquet output
storage: 'hybrid', // 'columns' | 'variant' | 'hybrid'
// R2 bucket binding name
r2Bucket: 'SQLAKE_BUCKET',
})
// Type inference
export type User = typeof db.tables.users.$inferSelect
export type Order = typeof db.tables.orders.$inferSelect// worker.ts
import { SQLakeDO } from 'sqlake/do'
import { db } from './schema'
export class UserDO extends SQLakeDO {
schema = db
async getUser(id: string) {
return this.sql`SELECT * FROM users WHERE id = ${id}`.first()
}
async createUser(id: string, email: string) {
await this.sql`INSERT INTO users (id, email) VALUES (${id}, ${email})`.run()
return { id, email }
}
async updatePlan(id: string, plan: string) {
await this.sql`UPDATE users SET plan = ${plan} WHERE id = ${id}`.run()
}
}
export default {
async fetch(request: Request, env: Env) {
const url = new URL(request.url)
const userId = url.searchParams.get('userId')
if (!userId) {
return new Response('Missing userId', { status: 400 })
}
const doId = env.USER_DO.idFromName(userId)
const stub = env.USER_DO.get(doId)
// Route to the user's Durable Object
return stub.fetch(request)
}
}# wrangler.toml
name = "my-app"
main = "src/worker.ts"
[[durable_objects.bindings]]
name = "USER_DO"
class_name = "UserDO"
[[migrations]]
tag = "v1"
new_classes = ["UserDO"]
[[r2_buckets]]
binding = "SQLAKE_BUCKET"
bucket_name = "my-app-data"# Generate SQLite migrations and Parquet schema
npx sqlake generate
# Push migrations to R2 for DO consumption
npx sqlake push
# Deploy to Cloudflare
npx wrangler deploysqlake uses Drizzle ORM's column types to define your schema:
import { SQLake, text, integer, real, blob } from 'sqlake'
const db = SQLake({
// Table definitions (Drizzle column types)
products: {
id: text('id').primaryKey(),
name: text('name').notNull(),
price: real('price').notNull(),
stock: integer('stock').default(0),
metadata: blob('metadata', { mode: 'json' }),
},
// Shard key configuration
shard: {
products: 'id', // Routes queries by product ID
},
// Storage mode for Parquet
storage: 'hybrid', // columns + _row variant
// Enable CDC (default: true)
cdc: true,
// R2 bucket binding name
r2Bucket: 'SQLAKE_BUCKET',
// Variant column name (default: '_row')
variantColumn: '_row',
})| Mode | Description | Use Case |
|---|---|---|
columns |
First-class columns only | Pure analytics workloads |
variant |
_row VARIANT column only |
Document retrieval |
hybrid |
Both columns and _row |
Mixed workloads (recommended) |
SQLakeDO is the base class for your Durable Objects that provides:
- Automatic migration on cold start
- CDC trigger installation
- Background CDC flush to R2
- DO-local SQL query interface
import { SQLakeDO, type SQLakeWorkerEnv } from 'sqlake/do'
import { db } from './schema'
interface MyEnv extends SQLakeWorkerEnv {
API_KEY: string
}
export class MyDO extends SQLakeDO {
// Required: link your schema
schema = db
// Optional: customize CDC settings
protected cdcFlushThreshold = 1000 // Flush every N records
protected cdcFlushInterval = 60_000 // Flush every N ms
// Use this.sql for DO-local queries
async getData() {
return this.sql`SELECT * FROM products`.all()
}
async insertData(id: string, name: string) {
await this.sql`INSERT INTO products (id, name) VALUES (${id}, ${name})`.run()
}
// Migration status
getStatus() {
return {
migrations: this.getMigrationStatus(),
cdc: this.getCDCFlushMetrics(),
}
}
}// Get current migration status
const status = do.getMigrationStatus()
// { manifestVersion, appliedCount, pendingCount, applied: [...], pending: [...] }
// Get last migration result
const result = do.getLastMigrationResult()
// { success, applied, migrations: [...], durationMs, error? }
// Dry-run migrations
const preview = await do.dryRunMigrations()
// Rollback to a specific migration
await do.rollbackMigrations('002_add_column')// Get flush metrics
const metrics = do.getCDCFlushMetrics()
// { consecutiveFailures, totalFailures, lastError, lastSuccess }
// Get events that failed to flush
const deadLetters = do.getCDCDeadLetterEvents(100)
// Retry dead-letter events
await do.retryCDCDeadLetterEvents({ maxRetries: 3, batchSize: 100 })
// Purge old dead-letter events (default: 7 days)
const purged = do.purgeCDCDeadLetterEvents(7 * 24 * 60 * 60 * 1000)sqlake automatically captures all changes via SQLite triggers:
-- Automatically created trigger (example)
CREATE TRIGGER _cdc_users_insert
AFTER INSERT ON users
BEGIN
INSERT INTO _cdc_buffer (op, table_name, row_id, after_json)
SELECT 'c', 'users', NEW.id, json_object('id', NEW.id, 'email', NEW.email);
ENDinterface CDCEvent<T = unknown> {
_op: 'c' | 'u' | 'd' // create, update, delete
_table: string // Table name
_shard: string // Shard ID
_seq: bigint // Sequence number
_ts: number // Timestamp (ms since epoch)
_before: T | null // State before (null for inserts)
_after: T | null // State after (null for deletes)
}CDC events are written to R2 as Parquet files with Hive-style partitioning:
data/
users/
_shard=user-123/
cdc_1706788800000_1.parquet
cdc_1706788900000_2.parquet
_shard=user-456/
cdc_1706788850000_1.parquet
Query historical data using Iceberg snapshots:
// Query as of a specific timestamp
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000)
const oldUsers = await sql.asOf(yesterday)`SELECT * FROM users`
// Query as of an ISO timestamp
const snapshot = await sql.asOf('2024-01-15T00:00:00Z')`SELECT * FROM users`
// Query a specific Iceberg snapshot by ID
const exact = await sql.snapshot(1234567890123456)`SELECT * FROM users`
// Query changes between two timestamps (incremental processing)
const changes = await sql.between(startTime, endTime)`SELECT * FROM events`Cross-shard analytics queries are routed to DuckDB:
// Force analytics mode (eventual consistency)
const stats = await sql.analytics`
SELECT
plan,
COUNT(*) as user_count,
AVG(orders.amount) as avg_order
FROM users
LEFT JOIN orders ON users.id = orders.userId
GROUP BY plan
`
// Predicate pushdown for efficient queries
import { P, filterDataFiles } from 'sqlake/analytics'
const predicate = P.and(
P.eq('plan', 'pro'),
P.gte('created_at', startDate)
)
// Filter Parquet files using zone maps
const relevantFiles = filterDataFiles(allFiles, predicate)| Query Type | Consistency | Routing | Use Case |
|---|---|---|---|
| Single shard | Strong | Durable Object | User data, transactions |
| Analytics | Eventual | DuckDB on R2 | Reports, aggregations |
| Causal | Causal | R2 with bookmark wait | Read-your-writes |
// Strong consistency (automatic when shard key present)
const user = await sql`SELECT * FROM users WHERE id = ${id}`.first()
// Eventual consistency (explicit analytics mode)
const count = await sql.analytics`SELECT COUNT(*) FROM users`.first()
// Causal consistency (wait for your writes to replicate)
const bookmark: Bookmark = { shard: 'user-123', seq: 42n }
const orders = await sql.causal(bookmark)`SELECT * FROM orders`Generate SQLite migrations and Parquet schema from your Drizzle schema.
sqlake generateThis command:
- Runs
drizzle-kit generatefor SQLite migrations - Parses your schema files to extract table definitions
- Generates Parquet schema in
.sqlake/parquet-schema.json - Generates TypeScript CDC types in
.sqlake/types.ts - Updates the manifest in
.sqlake/manifest.json
Push migrations and schema to R2 for Durable Object consumption.
sqlake pushThis command:
- Uploads migration manifest to R2
- Updates Iceberg table metadata
- Handles schema evolution
Start a local development server with SQLite and CDC.
sqlake dev [options]
Options:
--port, -p <n> HTTP port (default: 4983)
--db, -d <path> SQLite path (default: .sqlake/local.db)
--verbose, -v Enable debug loggingThe server provides:
- Local SQLite database
- CDC triggers for change tracking
- Parquet file output in
.sqlake/data/ - HTTP query endpoint at
POST /query
Start Drizzle Studio with sqlake's analytics proxy.
sqlake studio [options]
Options:
--port, -p <n> Proxy port (default: 4984)
--db, -d <path> SQLite path (default: .sqlake/local.db)Execute ad-hoc SQL queries.
sqlake query <sql> [options]
Options:
--db, -d <path> SQLite path (default: .sqlake/local.db)
--analytics, -a Use DuckDB instead of SQLite
--format, -f <fmt> Output format: json, table (default: json)
--params, -p <json> Query parameters as JSON array
Examples:
sqlake query "SELECT * FROM users"
sqlake query "SELECT * FROM users WHERE id = ?" --params '["user-1"]'
sqlake query "SELECT COUNT(*) FROM users" --analytics --format tableApply or rollback migrations locally.
sqlake migrate [options]
Options:
--status, -s Show migration status
--dry-run, -n Preview without applying
--rollback, -r Rollback instead of forward migration
--target, -t <name> Migrate to specific version
--skip-checksum Skip checksum verification
Examples:
sqlake migrate Apply all pending
sqlake migrate --status Show status
sqlake migrate --dry-run Preview changes
sqlake migrate --rollback Rollback last migration
sqlake migrate --target 002_add_email Migrate to specific version// Schema and configuration
export { SQLake } from 'sqlake'
export type { SQLakeConfig, SQLakeDatabase, ShardConfig, StorageMode } from 'sqlake'
// SQL query interface
export { sql, setQueryRouter, getQueryRouter } from 'sqlake'
// Column types (re-exported from Drizzle)
export { text, integer, real, blob } from 'sqlake'
// Query classes
export { SQLQuery, PipelinedQuery, MappedQuery } from 'sqlake'
export { LocalRouter, WorkerRouter, createInMemoryRouter } from 'sqlake'
export { parseSQL, determineRoute, computeShardId } from 'sqlake'
// Types
export type { CDCEvent, Bookmark, TableRef, InferSelect, InferInsert } from 'sqlake'
export type { QueryResult, RunResult, QueryOptions, QueryRouter } from 'sqlake'
// Observability
export { createObservability, ConsoleLogger, InMemoryMetrics } from 'sqlake'
export type { Logger, Metrics, Span } from 'sqlake'// Base class
export { SQLakeDO } from 'sqlake/do'
export type { SQLakeDOOptions, SQLakeWorkerEnv, CDCBufferRecord } from 'sqlake/do'
// Migrations
export { MigrationRunner, MigrationError, ChecksumMismatchError } from 'sqlake/do'
export { fetchMigrationManifest, createMigrationManifest, computeChecksum } from 'sqlake/do'
export type { Migration, MigrationManifest, MigrationResult } from 'sqlake/do'
// CDC flush
export { flushCDCToR2, flushCDCToR2WithIceberg, CDCFlushErrorTracker } from 'sqlake/do'
export type { CDCFlushOptions, CDCFlushResult, CDCFlushMetrics } from 'sqlake/do'// DuckDB integration
export { DuckDBLoader, createDuckDBProvider, detectEnvironment } from 'sqlake/analytics'
export type { DuckDBLoaderConfig, DuckDBProvider } from 'sqlake/analytics'
// Predicate pushdown
export { P, PredicateBuilder, filterDataFiles, filterManifestEntries } from 'sqlake/analytics'
export type { Predicate, ColumnPredicate, PruningResult, ZoneMapStats } from 'sqlake/analytics'
// Time travel
export { TimeTravelQuery, resolveTimeTravel, resolveTimeRange } from 'sqlake/analytics'
export { SnapshotNotFoundError } from 'sqlake/analytics'
export type { TimeTravelOptions, TimeTravelResolution } from 'sqlake/analytics'
// Service binding (Workers RPC)
export { createDuckDBServiceClient, DuckDBServiceError } from 'sqlake/analytics'
export type { DuckDBService, DuckDBQueryRequest, DuckDBQueryResponse } from 'sqlake/analytics'// Table metadata
export { createTableMetadata, addSnapshot, getCurrentSnapshot } from 'sqlake/iceberg'
export { getSnapshotById, getSnapshotAsOf, getSnapshotsInRange } from 'sqlake/iceberg'
export type { IcebergTableMetadata, IcebergSchema, IcebergSnapshot } from 'sqlake/iceberg'
// Manifest management
export { createManifest, createManifestEntry, createDataFile } from 'sqlake/iceberg'
export type { IcebergManifest, IcebergManifestEntry, IcebergDataFile } from 'sqlake/iceberg'
// Registry (high-level API)
export { IcebergRegistry, createIcebergRegistry } from 'sqlake/iceberg'
export type { RegisterDataFileOptions, RegisterDataFileResult } from 'sqlake/iceberg'
// Commit (atomic operations)
export { IcebergCommitter, createIcebergCommitter, CommitConflictError } from 'sqlake/iceberg'
export type { SnapshotCommitOptions, SnapshotCommitResult } from 'sqlake/iceberg'
// Schema evolution
export { SchemaEvolutionBuilder, evolveSchema, isTypePromotionAllowed } from 'sqlake/iceberg'
export type { SchemaEvolutionOperation, AddColumnOperation } from 'sqlake/iceberg'
// Statistics
export { CDCStatsToIcebergConverter, buildDataFileStats } from 'sqlake/iceberg'// Drizzle column types
export { text, integer, real, blob, numeric } from 'sqlake/columns'
export { sqliteTable, primaryKey, foreignKey, unique, index, check } from 'sqlake/columns'// schema.ts
import { SQLake, text, integer, real } from 'sqlake'
export const db = SQLake({
users: {
id: text('id').primaryKey(),
email: text('email').notNull(),
name: text('name'),
plan: text('plan').default('free'),
},
orders: {
id: text('id').primaryKey(),
userId: text('user_id').notNull(),
amount: real('amount').notNull(),
status: text('status').default('pending'),
createdAt: integer('created_at'),
},
shard: {
users: 'id',
orders: 'userId',
},
})// user-do.ts
import { SQLakeDO } from 'sqlake/do'
import { db } from './schema'
export class UserDO extends SQLakeDO {
schema = db
async createOrder(orderId: string, amount: number) {
const userId = this.ctx.id.name
await this.sql`
INSERT INTO orders (id, user_id, amount, created_at)
VALUES (${orderId}, ${userId}, ${amount}, ${Date.now()})
`.run()
}
async getOrders() {
const userId = this.ctx.id.name
return this.sql`
SELECT * FROM orders
WHERE user_id = ${userId}
ORDER BY created_at DESC
`.all()
}
}// analytics.ts
import { sql } from 'sqlake'
export async function getDashboardStats() {
// All these queries use eventual consistency (DuckDB)
const usersByPlan = await sql.analytics`
SELECT plan, COUNT(*) as count
FROM users
GROUP BY plan
`
const revenueByDay = await sql.analytics`
SELECT
DATE(created_at / 1000, 'unixepoch') as date,
SUM(amount) as revenue,
COUNT(*) as orders
FROM orders
WHERE status = 'completed'
GROUP BY date
ORDER BY date DESC
LIMIT 30
`
const topCustomers = await sql.analytics`
SELECT
users.id,
users.email,
COUNT(orders.id) as order_count,
SUM(orders.amount) as total_spent
FROM users
JOIN orders ON users.id = orders.user_id
WHERE orders.status = 'completed'
GROUP BY users.id
ORDER BY total_spent DESC
LIMIT 10
`
return { usersByPlan, revenueByDay, topCustomers }
}import { sql } from 'sqlake'
// Using .map() for efficient dependent queries
const usersWithOrders = await sql.analytics`
SELECT * FROM users WHERE plan = 'pro'
`.map(async user => ({
...user,
recentOrders: await sql.analytics`
SELECT * FROM orders
WHERE user_id = ${user.id}
ORDER BY created_at DESC
LIMIT 5
`,
orderStats: await sql.analytics`
SELECT COUNT(*) as count, SUM(amount) as total
FROM orders
WHERE user_id = ${user.id}
`.first(),
}))import { sql } from 'sqlake'
import { resolveTimeRange } from 'sqlake/analytics'
async function processNewEvents(lastProcessedTime: number) {
const now = Date.now()
// Get all changes since last processing
const changes = await sql.between(lastProcessedTime, now)`
SELECT * FROM events
WHERE _op IN ('c', 'u')
`
for (const event of changes) {
await processEvent(event)
}
return now // New watermark
}- Runtime: Cloudflare Workers with Durable Objects
- Storage: Cloudflare R2 bucket
- CLI: Node.js 18+
- Dependencies:
drizzle-orm(peer dependency)hyparquet/hyparquet-writer(included)
MIT
Stop building data pipelines. Start building products.
npm install sqlake