diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dcbcac079a..885232dd12 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -591,6 +591,12 @@ const EnvironmentSchema = z RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(), RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(), + // TTL System settings for automatic run expiration + RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false), + RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(), + RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000), + RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100), + RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000), RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000), RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index efba5fbdb0..b0dc1e8d0d 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -80,6 +80,12 @@ function createRunEngine() { scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS, processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS, }, + ttlSystem: { + disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED, + shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT, + pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS, + batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE, + }, }, runLock: { redis: { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 9e81c99132..08f0d7849f 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -182,6 +182,51 @@ export class RunEngine { processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs, dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds, meter: options.meter, + ttlSystem: options.queue?.ttlSystem?.disabled + ? undefined + : { + shardCount: options.queue?.ttlSystem?.shardCount, + pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs, + batchSize: options.queue?.ttlSystem?.batchSize, + callback: this.#ttlExpiredCallback.bind(this), + }, + // Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists + runDataProvider: { + getRunData: async (runId: string) => { + const run = await this.prisma.taskRun.findUnique({ + where: { id: runId }, + select: { + queue: true, + organizationId: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + concurrencyKey: true, + attemptNumber: true, + queueTimestamp: true, + workerQueue: true, + taskIdentifier: true, + }, + }); + + if (!run || !run.organizationId || !run.environmentType) { + return undefined; + } + + return { + queue: run.queue, + orgId: run.organizationId, + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType, + concurrencyKey: run.concurrencyKey ?? undefined, + attempt: run.attemptNumber ?? 0, + timestamp: run.queueTimestamp?.getTime() ?? Date.now(), + workerQueue: run.workerQueue, + taskIdentifier: run.taskIdentifier, + }; + }, + }, }); this.worker = new Worker({ @@ -2066,6 +2111,35 @@ export class RunEngine { })); } + /** + * Callback for the TTL system when runs expire. + * Calls ttlSystem.expireRun() for each expired run to update database and emit events. + */ + async #ttlExpiredCallback( + runs: Array<{ queueKey: string; runId: string; orgId: string }> + ): Promise { + // Process expired runs concurrently with limited parallelism + await pMap( + runs, + async (run) => { + try { + await this.ttlSystem.expireRun({ runId: run.runId }); + this.logger.debug("TTL system expired run", { + runId: run.runId, + orgId: run.orgId, + }); + } catch (error) { + this.logger.error("Failed to expire run via TTL system", { + runId: run.runId, + orgId: run.orgId, + error, + }); + } + }, + { concurrency: 10 } + ); + } + /** * Invalidates the billing cache for an organization when their plan changes * Runs in background and handles all errors internally diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 395e44727c..4726bdb736 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -4,6 +4,7 @@ import { TaskRun, TaskRunExecutionStatus, } from "@trigger.dev/database"; +import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; import { MinimalAuthenticatedEnvironment } from "../../shared/index.js"; import { ExecutionSnapshotSystem } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; @@ -81,6 +82,15 @@ export class EnqueueSystem { const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + // Calculate TTL expiration timestamp if the run has a TTL + let ttlExpiresAt: number | undefined; + if (run.ttl) { + const expireAt = parseNaturalLanguageDuration(run.ttl); + if (expireAt) { + ttlExpiresAt = expireAt.getTime(); + } + } + await this.$.runQueue.enqueueMessage({ env, workerQueue, @@ -95,6 +105,7 @@ export class EnqueueSystem { concurrencyKey: run.concurrencyKey ?? undefined, timestamp, attempt: 0, + ttlExpiresAt, }, }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index ee5176c2fa..a9e188f2a3 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -63,6 +63,17 @@ export type RunEngineOptions = { scanJitterInMs?: number; processMarkedJitterInMs?: number; }; + /** TTL system options for automatic run expiration */ + ttlSystem?: { + /** Number of shards for TTL sorted sets (default: same as queue shards) */ + shardCount?: number; + /** How often to poll each shard for expired runs (ms, default: 1000) */ + pollIntervalMs?: number; + /** Max number of runs to expire per poll per shard (default: 100) */ + batchSize?: number; + /** Whether TTL consumers are disabled (default: false) */ + disabled?: boolean; + }; }; runLock: { redis: RedisOptions; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 5127ec3c75..43a9c92a61 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -39,10 +39,19 @@ import { InputPayload, OutputPayload, OutputPayloadV2, + RunDataProvider, RunQueueKeyProducer, RunQueueKeyProducerEnvironment, RunQueueSelectionStrategy, } from "./types.js"; +import { + encodeMessageKeyValue, + isV3MessageKeyValue, + encodeWorkerQueueEntry, + decodeWorkerQueueEntry, + isEncodedWorkerQueueEntry, + reconstructMessageFromWorkerEntry, +} from "./messageEncoding.js"; import { WorkerQueueResolver } from "./workerQueueResolver.js"; const SemanticAttributes = { @@ -92,6 +101,22 @@ export type RunQueueOptions = { processMarkedJitterInMs?: number; callback: ConcurrencySweeperCallback; }; + /** + * When enabled, uses an optimized message format that eliminates separate message keys. + * This reduces Redis storage by ~80% for pending messages. + * + * Migration strategy: + * 1. Deploy with this disabled (default) - new code can read both formats + * 2. Enable this flag - new messages use optimized format + * 3. Old messages drain naturally as they're processed + */ + useOptimizedMessageFormat?: boolean; + /** + * Provider for fetching run data from PostgreSQL. + * Required when using V3 optimized format for ack/nack operations. + * Falls back to Redis message key if not provided (legacy behavior). + */ + runDataProvider?: RunDataProvider; }; export interface ConcurrencySweeperCallback { @@ -175,8 +200,12 @@ export class RunQueue { private _observableWorkerQueues: Set = new Set(); private _meter: Meter; private _queueCooloffStates: Map = new Map(); + private _useOptimizedMessageFormat: boolean; + private _runDataProvider?: RunDataProvider; constructor(public readonly options: RunQueueOptions) { + this._useOptimizedMessageFormat = options.useOptimizedMessageFormat ?? false; + this._runDataProvider = options.runDataProvider; this.shardCount = options.shardCount ?? 2; this.retryOptions = options.retryOptions ?? defaultRetrySettings; this.redis = createRedisClient(options.redis, { @@ -555,11 +584,49 @@ export class RunQueue { return this.redis.exists(this.keys.messageKey(orgId, messageId)); } - public async readMessage(orgId: string, messageId: string) { - return this.readMessageFromKey(this.keys.messageKey(orgId, messageId)); + public async readMessage(orgId: string, messageId: string): Promise { + // First try to read from Redis (handles both legacy JSON and V3 compact format) + const redisMessage = await this.readMessageFromKey( + this.keys.messageKey(orgId, messageId), + messageId + ); + if (redisMessage) { + return redisMessage; + } + + // Fall back to runDataProvider (for cases where message key doesn't exist) + if (this._runDataProvider) { + const runData = await this._runDataProvider.getRunData(messageId); + if (runData) { + // Convert RunData to OutputPayloadV2 + const queueKey = this.keys.queueKey( + runData.orgId, + runData.projectId, + runData.environmentId, + runData.taskIdentifier, + runData.concurrencyKey + ); + return { + version: "2" as const, + runId: messageId, + taskIdentifier: runData.taskIdentifier, + orgId: runData.orgId, + projectId: runData.projectId, + environmentId: runData.environmentId, + environmentType: runData.environmentType, + queue: queueKey, + concurrencyKey: runData.concurrencyKey, + timestamp: runData.timestamp, + attempt: runData.attempt, + workerQueue: runData.workerQueue, + }; + } + } + + return undefined; } - public async readMessageFromKey(messageKey: string) { + public async readMessageFromKey(messageKey: string, runId?: string) { return this.#trace( "readMessageFromKey", async (span) => { @@ -569,7 +636,10 @@ export class RunQueue { return; } - const [error, message] = parseRawMessage(rawMessage); + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId, + }); if (error) { this.logger.error(`[${this.name}] Failed to parse message`, { @@ -1434,14 +1504,31 @@ export class RunQueue { this.#getWorkerQueueFromMessage(message.message) ); - const messageKeyValue = this.keys.messageKey(message.message.orgId, message.messageId); + let workerQueueEntry: string; + + if (this._useOptimizedMessageFormat) { + // V3 format: encode all needed data in worker queue entry + // This allows full reconstruction without a message key lookup + workerQueueEntry = encodeWorkerQueueEntry({ + runId: message.messageId, + workerQueue: this.#getWorkerQueueFromMessage(message.message), + attempt: message.message.attempt, + environmentType: message.message.environmentType, + queueKey: message.message.queue, + timestamp: message.message.timestamp, + }); + } else { + // Legacy format: store message key path + workerQueueEntry = this.keys.messageKey(message.message.orgId, message.messageId); + } operations.push({ workerQueueKey: workerQueueKey, messageId: message.messageId, + format: this._useOptimizedMessageFormat ? "v3" : "legacy", }); - pipeline.rpush(workerQueueKey, messageKeyValue); + pipeline.rpush(workerQueueKey, workerQueueEntry); } span.setAttribute("operations_count", operations.length); @@ -1470,39 +1557,79 @@ export class RunQueue { const queueName = message.queue; const messageId = message.runId; - const messageData = JSON.stringify(message); const messageScore = String(message.timestamp); - this.logger.debug("Calling enqueueMessage", { - queueKey, - messageKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - queueName, - messageId, - messageData, - messageScore, - masterQueueKey, - service: this.name, - }); + if (this._useOptimizedMessageFormat) { + // V3 optimized format: compact message key, runId in sorted set + const messageData = encodeMessageKeyValue({ + queue: message.queue, + timestamp: message.timestamp, + attempt: message.attempt, + environmentType: message.environmentType, + workerQueue: message.workerQueue, + }); - await this.redis.enqueueMessage( - masterQueueKey, - queueKey, - messageKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - queueName, - messageId, - messageData, - messageScore - ); + this.logger.debug("Calling enqueueMessageV3 (optimized)", { + queueKey, + messageKey, + messageData, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + envQueueKey, + messageScore, + masterQueueKey, + service: this.name, + }); + + await this.redis.enqueueMessageV3( + masterQueueKey, + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore + ); + } else { + // Legacy format: store full JSON in separate message key + const messageData = JSON.stringify(message); + + this.logger.debug("Calling enqueueMessage (legacy)", { + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore, + masterQueueKey, + service: this.name, + }); + + await this.redis.enqueueMessage( + masterQueueKey, + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore + ); + } } async #callDequeueMessagesFromQueue({ @@ -1583,25 +1710,29 @@ export class RunQueue { const messageScore = result[i + 1]; const rawMessage = result[i + 2]; - //read message - const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); - if (!parsedMessage.success) { - this.logger.error(`[${this.name}] Failed to parse message`, { + // Parse message - handles both JSON (legacy) and V3 compact format + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId: messageId, + }); + + if (error) { + this.logger.error(`[${this.name}] Failed to parse dequeued message`, { messageId, - error: parsedMessage.error, + error, + rawMessage, service: this.name, }); - continue; } - const message = parsedMessage.data; - - messages.push({ - messageId, - messageScore, - message, - }); + if (message) { + messages.push({ + messageId, + messageScore, + message, + }); + } } this.logger.debug("dequeueMessagesFromQueue parsed result", { @@ -1873,37 +2004,79 @@ export class RunQueue { const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt); const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now()); - this.logger.debug("Calling nackMessage", { - messageKey, - messageQueue, - masterQueueKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - messageId, - messageScore, - attempt: message.attempt, - service: this.name, - }); + if (this._useOptimizedMessageFormat) { + // V3 format: compact message key, runId in sorted set + const messageData = encodeMessageKeyValue({ + queue: message.queue, + timestamp: message.timestamp, + attempt: message.attempt, + environmentType: message.environmentType, + workerQueue: this.#getWorkerQueueFromMessage(message), + }); - await this.redis.nackMessage( - //keys - masterQueueKey, - messageKey, - messageQueue, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - //args - messageId, - messageQueue, - JSON.stringify(message), - String(messageScore) - ); + this.logger.debug("Calling nackMessageV3 (optimized)", { + messageKey, + messageData, + messageQueue, + masterQueueKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + messageId, + messageScore, + attempt: message.attempt, + service: this.name, + }); + + await this.redis.nackMessageV3( + //keys + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + //args + messageId, + messageQueue, + messageData, + String(messageScore) + ); + } else { + // Legacy format + this.logger.debug("Calling nackMessage (legacy)", { + messageKey, + messageQueue, + masterQueueKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + messageId, + messageScore, + attempt: message.attempt, + service: this.name, + }); + + await this.redis.nackMessage( + //keys + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + //args + messageId, + messageQueue, + JSON.stringify(message), + String(messageScore) + ); + } } async #callMoveToDeadLetterQueue({ message }: { message: OutputPayload }) { @@ -2212,12 +2385,43 @@ export class RunQueue { }); } - async #dequeueMessageFromKey(messageKey: string) { + async #dequeueMessageFromKey(workerQueueEntry: string) { return this.#trace("dequeueMessageFromKey", async (span) => { span.setAttributes({ - messageKey, + workerQueueEntry, }); + // Check if this is a V3 encoded worker queue entry + if (isEncodedWorkerQueueEntry(workerQueueEntry)) { + // V3 format: decode the entry directly, no Redis lookup needed + const decoded = decodeWorkerQueueEntry(workerQueueEntry); + if (!decoded) { + this.logger.error(`[${this.name}] Failed to decode V3 worker queue entry`, { + workerQueueEntry, + service: this.name, + }); + span.setAttribute("result", "DECODE_ERROR"); + return; + } + + const descriptor = this.keys.descriptorFromQueue(decoded.queueKey); + const message = reconstructMessageFromWorkerEntry(decoded, descriptor); + + // Update the currentDequeued sets (this is done in the Lua script for legacy) + const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey); + const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey); + await this.redis.sadd(queueCurrentDequeuedKey, message.runId); + await this.redis.sadd(envCurrentDequeuedKey, message.runId); + + span.setAttribute("result", "SUCCESS"); + span.setAttribute("messageId", message.runId); + span.setAttribute("format", "v3"); + + return message; + } + + // Legacy format: workerQueueEntry is the message key path + const messageKey = workerQueueEntry; const rawMessage = await this.redis.dequeueMessageFromKey( messageKey, this.options.redis.keyPrefix ?? "" @@ -2225,11 +2429,16 @@ export class RunQueue { if (!rawMessage) { span.setAttribute("result", "NO_MESSAGE"); - return; } - const [error, message] = parseRawMessage(rawMessage); + // Extract runId from message key path (format: {org:orgId}:message:{runId}) + const runIdFromKey = messageKey.split(":message:").pop(); + + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId: runIdFromKey, + }); if (error) { this.logger.error(`[${this.name}] Failed to parse message`, { @@ -2243,6 +2452,7 @@ export class RunQueue { if (message) { span.setAttribute("result", "SUCCESS"); span.setAttribute("messageId", message.runId); + span.setAttribute("format", "legacy"); } else { span.setAttribute("result", "NO_MESSAGE"); } @@ -2318,6 +2528,50 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) `, }); + // V3 optimized enqueue: compact message key format, runId in sorted set + this.redis.defineCommand("enqueueMessageV3", { + numberOfKeys: 8, + lua: ` +local masterQueueKey = KEYS[1] +local queueKey = KEYS[2] +local messageKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] + +local queueName = ARGV[1] +local messageId = ARGV[2] +local messageData = ARGV[3] +local messageScore = ARGV[4] + +-- Write the compact V3 message data to the message key +redis.call('SET', messageKey, messageData) + +-- Add the messageId (runId) to the queue - simple and reliable +redis.call('ZADD', queueKey, messageScore, messageId) + +-- Add the messageId to the env queue (for counting) +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Rebalance the parent queues +local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + +if #earliestMessage == 0 then + redis.call('ZREM', masterQueueKey, queueName) +else + redis.call('ZADD', masterQueueKey, earliestMessage[2], queueName) +end + +-- Update the concurrency keys (clear any existing entries for this run) +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + `, + }); + this.redis.defineCommand("dequeueMessagesFromQueue", { numberOfKeys: 9, lua: ` @@ -2377,27 +2631,29 @@ end local results = {} local dequeuedCount = 0 --- Process messages in pairs (messageId, score) +-- Process messages in pairs (member, score) +-- Member is always runId (both legacy and V3 formats use runId in sorted set) for i = 1, #messages, 2 do - local messageId = messages[i] + local runId = messages[i] local messageScore = tonumber(messages[i + 1]) - - -- Get the message payload - local messageKey = messageKeyPrefix .. messageId + + -- Fetch message data from message key + local messageKey = messageKeyPrefix .. runId local messagePayload = redis.call('GET', messageKey) - + if messagePayload then - -- Update concurrency - redis.call('ZREM', queueKey, messageId) - redis.call('ZREM', envQueueKey, messageId) - redis.call('SADD', queueCurrentConcurrencyKey, messageId) - redis.call('SADD', envCurrentConcurrencyKey, messageId) - - -- Add to results - table.insert(results, messageId) + -- Remove from queues and update concurrency + redis.call('ZREM', queueKey, runId) + redis.call('ZREM', envQueueKey, runId) + redis.call('SADD', queueCurrentConcurrencyKey, runId) + redis.call('SADD', envCurrentConcurrencyKey, runId) + + -- Add to results: [runId, score, payload, ...] + -- Payload can be JSON (legacy) or V3 compact format + table.insert(results, runId) table.insert(results, messageScore) table.insert(results, messagePayload) - + dequeuedCount = dequeuedCount + 1 end end @@ -2411,7 +2667,8 @@ else redis.call('ZADD', masterQueueKey, earliestMessage[2], queueName) end --- Return results as a flat array: [messageId1, messageScore1, messagePayload1, messageId2, messageScore2, messagePayload2, ...] +-- Return results as a flat array: [runId1, messageScore1, messagePayload1, runId2, messageScore2, messagePayload2, ...] +-- messagePayload can be JSON (legacy) or V3 compact format - TypeScript handles parsing return results `, }); @@ -2548,6 +2805,49 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) redis.call('ZADD', messageQueueKey, messageScore, messageId) redis.call('ZADD', envQueueKey, messageScore, messageId) +-- Rebalance the parent queues +local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') +if #earliestMessage == 0 then + redis.call('ZREM', masterQueueKey, messageQueueName) +else + redis.call('ZADD', masterQueueKey, earliestMessage[2], messageQueueName) +end +`, + }); + + // V3 optimized nack: compact message key format, runId in sorted set + this.redis.defineCommand("nackMessageV3", { + numberOfKeys: 8, + lua: ` +-- Keys: +local masterQueueKey = KEYS[1] +local messageKey = KEYS[2] +local messageQueueKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] + +-- Args: +local messageId = ARGV[1] +local messageQueueName = ARGV[2] +local messageData = ARGV[3] +local messageScore = tonumber(ARGV[4]) + +-- Write the compact V3 message data to the message key +redis.call('SET', messageKey, messageData) + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + +-- Enqueue the messageId (runId) into the queue - simple and reliable +redis.call('ZADD', messageQueueKey, messageScore, messageId) +redis.call('ZADD', envQueueKey, messageScore, messageId) + -- Rebalance the parent queues local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -2878,12 +3178,87 @@ declare module "@internal/redis" { maxCount: string, callback?: Callback ): Result; + + // V3 optimized commands (compact message key format) + enqueueMessageV3( + //keys + masterQueueKey: string, + queue: string, + messageKey: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + //args + queueName: string, + messageId: string, + messageData: string, + messageScore: string, + callback?: Callback + ): Result; + + nackMessageV3( + // keys + masterQueueKey: string, + messageKey: string, + messageQueue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + // args + messageId: string, + messageQueueName: string, + messageData: string, + messageScore: string, + callback?: Callback + ): Result; } } type ParseRawMessageResult = [Error | null, OutputPayload | null]; -function parseRawMessage(rawMessage: string): ParseRawMessageResult { +function parseRawMessage( + rawMessage: string, + options?: { + keys?: RunQueueKeyProducer; + runId?: string; + } +): ParseRawMessageResult { + // Check for V3 compact format (starts with "v3:" prefix) + if (isV3MessageKeyValue(rawMessage)) { + const decoded = decodeMessageKeyValue(rawMessage); + if (!decoded) { + return [new Error("Failed to decode V3 message format"), undefined]; + } + + // Extract additional fields from the queue key + if (!options?.keys) { + return [new Error("Keys required to parse V3 message format"), undefined]; + } + const descriptor = options.keys.descriptorFromQueue(decoded.queue); + + const message: OutputPayloadV2 = { + version: "2", + runId: options.runId ?? "", // Filled in by caller + taskIdentifier: descriptor.queue, + orgId: descriptor.orgId, + projectId: descriptor.projectId, + environmentId: descriptor.envId, + environmentType: decoded.environmentType, + queue: decoded.queue, + concurrencyKey: descriptor.concurrencyKey, + timestamp: decoded.timestamp, + attempt: decoded.attempt, + workerQueue: decoded.workerQueue, + }; + + return [null, message]; + } + + // Legacy JSON format const deserializedMessage = safeJsonParse(rawMessage); const message = OutputPayload.safeParse(deserializedMessage); diff --git a/internal-packages/run-engine/src/run-queue/messageEncoding.ts b/internal-packages/run-engine/src/run-queue/messageEncoding.ts new file mode 100644 index 0000000000..064dfc386d --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/messageEncoding.ts @@ -0,0 +1,358 @@ +import { RuntimeEnvironmentType } from "@trigger.dev/database"; +import type { OutputPayload, OutputPayloadV2, QueueDescriptor } from "./types.js"; + +/** + * Message encoding for optimized Redis storage. + * + * This module provides encoding/decoding for the "v3" message format that eliminates + * the need for separate message keys in Redis, reducing storage by ~80% for pending messages. + * + * ## Migration Strategy + * - New messages are written in v3 format (no message key created) + * - Old messages (v1/v2) continue to work via message key lookup + * - Detection is automatic based on format + * - Old messages drain naturally as they're processed + * - No backfill required + * + * ## Format Detection + * - Sorted set member with DELIMITER = v3 format + * - Sorted set member without DELIMITER = legacy format (needs message key lookup) + * - Worker queue entry starting with "{org:" = legacy format + * - Worker queue entry with DELIMITER = v3 format + */ + +// ASCII Record Separator - won't appear in IDs, queue names, or other fields +const DELIMITER = "\x1e"; + +// Environment type single-char encoding for compact storage +const ENV_TYPE_TO_CHAR: Record = { + DEVELOPMENT: "D", + STAGING: "S", + PREVIEW: "V", + PRODUCTION: "P", +}; + +const CHAR_TO_ENV_TYPE: Record = { + D: "DEVELOPMENT", + S: "STAGING", + V: "PREVIEW", + P: "PRODUCTION", +}; + +/** + * Data encoded in V3 message key value. + * Uses compact pipe-delimited format instead of JSON. + * Fields that can be derived from the queue key are excluded. + */ +export interface EncodedMessageKeyData { + /** Full queue key - needed for queue operations */ + queue: string; + /** Unix timestamp for scoring */ + timestamp: number; + /** Attempt number for retry logic */ + attempt: number; + /** Environment type (single char encoded) */ + environmentType: RuntimeEnvironmentType; + /** Worker queue name for routing */ + workerQueue: string; +} + +/** + * @deprecated V3 no longer encodes in sorted set member. Use runId directly. + * Kept for backwards compatibility during migration. + */ +export interface EncodedQueueMember { + runId: string; + workerQueue: string; + attempt: number; + environmentType: RuntimeEnvironmentType; +} + +/** + * Data encoded in worker queue entry for v3 format. + * Includes queue key and timestamp to fully reconstruct the message. + */ +export interface EncodedWorkerQueueEntry extends EncodedQueueMember { + queueKey: string; + timestamp: number; +} + +// V3 message key prefix to distinguish from legacy JSON +const V3_MESSAGE_PREFIX = "v3:"; + +/** + * Encode data for V3 message key value. + * Format: v3:queue␞timestamp␞attempt␞envTypeChar␞workerQueue + * + * This is ~60-100 bytes vs ~400-600+ bytes for JSON. + */ +export function encodeMessageKeyValue(data: EncodedMessageKeyData): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return ( + V3_MESSAGE_PREFIX + + [data.queue, data.timestamp.toString(), data.attempt.toString(), envChar, data.workerQueue].join( + DELIMITER + ) + ); +} + +/** + * Decode V3 message key value. + * Returns undefined if not in V3 format. + */ +export function decodeMessageKeyValue(value: string): EncodedMessageKeyData | undefined { + if (!value.startsWith(V3_MESSAGE_PREFIX)) { + return undefined; + } + + const content = value.slice(V3_MESSAGE_PREFIX.length); + const parts = content.split(DELIMITER); + + if (parts.length !== 5) { + return undefined; + } + + const [queue, timestampStr, attemptStr, envChar, workerQueue] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + queue, + timestamp: parseInt(timestampStr, 10), + attempt: parseInt(attemptStr, 10), + environmentType, + workerQueue, + }; +} + +/** + * Check if a message key value is V3 format (starts with v3: prefix). + */ +export function isV3MessageKeyValue(value: string): boolean { + return value.startsWith(V3_MESSAGE_PREFIX); +} + +/** + * Check if a sorted set member is in v3 encoded format. + * @deprecated V3 no longer encodes in sorted set. Members are just runIds. + */ +export function isEncodedQueueMember(member: string): boolean { + return member.includes(DELIMITER); +} + +/** + * Check if a worker queue entry is in v3 encoded format. + * Legacy format starts with "{org:" (message key path). + */ +export function isEncodedWorkerQueueEntry(entry: string): boolean { + return !entry.startsWith("{org:") && entry.includes(DELIMITER); +} + +/** + * Encode message data for sorted set member. + * Format: runId␞workerQueue␞attempt␞envTypeChar + */ +export function encodeQueueMember(data: EncodedQueueMember): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return [data.runId, data.workerQueue, data.attempt.toString(), envChar].join(DELIMITER); +} + +/** + * Decode sorted set member to message data. + * Returns undefined if not in v3 format. + */ +export function decodeQueueMember(member: string): EncodedQueueMember | undefined { + if (!isEncodedQueueMember(member)) { + return undefined; + } + + const parts = member.split(DELIMITER); + if (parts.length !== 4) { + return undefined; + } + + const [runId, workerQueue, attemptStr, envChar] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + runId, + workerQueue, + attempt: parseInt(attemptStr, 10), + environmentType, + }; +} + +/** + * Encode message data for worker queue entry. + * Format: runId␞workerQueue␞attempt␞envTypeChar␞queueKey␞timestamp + */ +export function encodeWorkerQueueEntry(data: EncodedWorkerQueueEntry): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return [ + data.runId, + data.workerQueue, + data.attempt.toString(), + envChar, + data.queueKey, + data.timestamp.toString(), + ].join(DELIMITER); +} + +/** + * Decode worker queue entry to message data. + * Returns undefined if not in v3 format. + */ +export function decodeWorkerQueueEntry(entry: string): EncodedWorkerQueueEntry | undefined { + if (!isEncodedWorkerQueueEntry(entry)) { + return undefined; + } + + const parts = entry.split(DELIMITER); + if (parts.length !== 6) { + return undefined; + } + + const [runId, workerQueue, attemptStr, envChar, queueKey, timestampStr] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + runId, + workerQueue, + attempt: parseInt(attemptStr, 10), + environmentType, + queueKey, + timestamp: parseInt(timestampStr, 10), + }; +} + +/** + * Reconstruct full OutputPayloadV2 from encoded worker queue entry and queue descriptor. + */ +export function reconstructMessageFromWorkerEntry( + entry: EncodedWorkerQueueEntry, + descriptor: QueueDescriptor +): OutputPayloadV2 { + return { + version: "2", + runId: entry.runId, + taskIdentifier: descriptor.queue, + orgId: descriptor.orgId, + projectId: descriptor.projectId, + environmentId: descriptor.envId, + environmentType: entry.environmentType, + queue: entry.queueKey, + concurrencyKey: descriptor.concurrencyKey, + timestamp: entry.timestamp, + attempt: entry.attempt, + workerQueue: entry.workerQueue, + }; +} + +/** + * Extract runId from either v3 encoded member or legacy member. + * Legacy members are just the runId itself. + */ +export function getRunIdFromMember(member: string): string { + if (isEncodedQueueMember(member)) { + const decoded = decodeQueueMember(member); + return decoded?.runId ?? member; + } + return member; +} + +/** + * Lua helper functions to be included in Redis scripts. + * These handle format detection and parsing within Lua. + */ +export const LUA_ENCODING_HELPERS = ` +-- Delimiter for v3 encoded format (ASCII Record Separator) +local DELIMITER = "\\x1e" + +-- Check if a string is v3 encoded (contains delimiter) +local function isV3Encoded(str) + return string.find(str, DELIMITER, 1, true) ~= nil +end + +-- Check if worker queue entry is legacy format (starts with {org:) +local function isLegacyWorkerEntry(entry) + return string.sub(entry, 1, 5) == "{org:" +end + +-- Encode queue member: runId, workerQueue, attempt, envTypeChar +local function encodeQueueMember(runId, workerQueue, attempt, envTypeChar) + return runId .. DELIMITER .. workerQueue .. DELIMITER .. tostring(attempt) .. DELIMITER .. envTypeChar +end + +-- Decode queue member, returns: runId, workerQueue, attempt, envTypeChar (or nil if not v3) +local function decodeQueueMember(member) + if not isV3Encoded(member) then + return nil + end + local parts = {} + for part in string.gmatch(member, "([^" .. DELIMITER .. "]+)") do + table.insert(parts, part) + end + if #parts ~= 4 then + return nil + end + return parts[1], parts[2], tonumber(parts[3]), parts[4] +end + +-- Encode worker queue entry: runId, workerQueue, attempt, envTypeChar, queueKey, timestamp +local function encodeWorkerEntry(runId, workerQueue, attempt, envTypeChar, queueKey, timestamp) + return runId .. DELIMITER .. workerQueue .. DELIMITER .. tostring(attempt) .. DELIMITER .. envTypeChar .. DELIMITER .. queueKey .. DELIMITER .. tostring(timestamp) +end + +-- Decode worker queue entry, returns: runId, workerQueue, attempt, envTypeChar, queueKey, timestamp (or nil if not v3) +local function decodeWorkerEntry(entry) + if isLegacyWorkerEntry(entry) then + return nil + end + if not isV3Encoded(entry) then + return nil + end + local parts = {} + for part in string.gmatch(entry, "([^" .. DELIMITER .. "]+)") do + table.insert(parts, part) + end + if #parts ~= 6 then + return nil + end + return parts[1], parts[2], tonumber(parts[3]), parts[4], parts[5], tonumber(parts[6]) +end + +-- Get runId from member (works for both v3 and legacy) +local function getRunIdFromMember(member) + if isV3Encoded(member) then + local runId = decodeQueueMember(member) + return runId or member + end + return member +end + +-- Environment type char mappings +local envTypeToChar = { + DEVELOPMENT = "D", + STAGING = "S", + PREVIEW = "V", + PRODUCTION = "P" +} + +local charToEnvType = { + D = "DEVELOPMENT", + S = "STAGING", + V = "PREVIEW", + P = "PRODUCTION" +} +`; diff --git a/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts new file mode 100644 index 0000000000..e3135af3b8 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts @@ -0,0 +1,368 @@ +import { describe, expect, test } from "vitest"; +import { + encodeQueueMember, + decodeQueueMember, + encodeWorkerQueueEntry, + decodeWorkerQueueEntry, + isEncodedQueueMember, + isEncodedWorkerQueueEntry, + getRunIdFromMember, + reconstructMessageFromWorkerEntry, + encodeMessageKeyValue, + decodeMessageKeyValue, + isV3MessageKeyValue, +} from "../messageEncoding.js"; + +describe("messageEncoding", () => { + describe("isEncodedQueueMember", () => { + test("returns true for v3 encoded member", () => { + const encoded = encodeQueueMember({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + }); + expect(isEncodedQueueMember(encoded)).toBe(true); + }); + + test("returns false for legacy member (just runId)", () => { + expect(isEncodedQueueMember("run_abc123")).toBe(false); + }); + }); + + describe("isEncodedWorkerQueueEntry", () => { + test("returns true for v3 encoded entry", () => { + const encoded = encodeWorkerQueueEntry({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + queueKey: "{org:o123}:proj:p123:env:e123:queue:my-task", + timestamp: 1706812800000, + }); + expect(isEncodedWorkerQueueEntry(encoded)).toBe(true); + }); + + test("returns false for legacy message key path", () => { + expect(isEncodedWorkerQueueEntry("{org:o123}:message:run_abc123")).toBe(false); + }); + }); + + describe("encodeQueueMember / decodeQueueMember", () => { + test("roundtrips correctly for PRODUCTION", () => { + const original = { + runId: "run_abc123xyz", + workerQueue: "env_def456", + attempt: 0, + environmentType: "PRODUCTION" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for DEVELOPMENT", () => { + const original = { + runId: "run_test", + workerQueue: "env_dev", + attempt: 3, + environmentType: "DEVELOPMENT" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for STAGING", () => { + const original = { + runId: "run_staging", + workerQueue: "env_stage", + attempt: 1, + environmentType: "STAGING" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for PREVIEW", () => { + const original = { + runId: "run_preview", + workerQueue: "env_preview", + attempt: 5, + environmentType: "PREVIEW" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeQueueMember returns undefined for legacy format", () => { + expect(decodeQueueMember("run_abc123")).toBeUndefined(); + }); + + test("decodeQueueMember returns undefined for malformed v3", () => { + // Only 3 parts instead of 4 + expect(decodeQueueMember("run_abc\x1eenv_xyz\x1e0")).toBeUndefined(); + }); + }); + + describe("encodeWorkerQueueEntry / decodeWorkerQueueEntry", () => { + test("roundtrips correctly", () => { + const original = { + runId: "run_abc123xyz", + workerQueue: "env_def456", + attempt: 2, + environmentType: "PRODUCTION" as const, + queueKey: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + }; + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly with concurrency key in queue", () => { + const original = { + runId: "run_xyz", + workerQueue: "env_abc", + attempt: 0, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:task:ck:user-123", + timestamp: 1706812800000, + }; + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeWorkerQueueEntry returns undefined for legacy message key", () => { + expect(decodeWorkerQueueEntry("{org:o123}:message:run_abc")).toBeUndefined(); + }); + }); + + describe("getRunIdFromMember", () => { + test("extracts runId from v3 encoded member", () => { + const encoded = encodeQueueMember({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + }); + expect(getRunIdFromMember(encoded)).toBe("run_abc123"); + }); + + test("returns legacy member as-is (it is the runId)", () => { + expect(getRunIdFromMember("run_abc123")).toBe("run_abc123"); + }); + }); + + describe("reconstructMessageFromWorkerEntry", () => { + test("reconstructs full message payload", () => { + const entry = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 1, + environmentType: "PRODUCTION" as const, + queueKey: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + }; + const descriptor = { + orgId: "org123", + projectId: "proj456", + envId: "env789", + queue: "my-task", + concurrencyKey: undefined, + }; + + const message = reconstructMessageFromWorkerEntry(entry, descriptor); + + expect(message).toEqual({ + version: "2", + runId: "run_abc123", + taskIdentifier: "my-task", + orgId: "org123", + projectId: "proj456", + environmentId: "env789", + environmentType: "PRODUCTION", + queue: "{org:org123}:proj:proj456:env:env789:queue:my-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 1, + workerQueue: "env_xyz", + }); + }); + + test("reconstructs message with concurrency key", () => { + const entry = { + runId: "run_xyz", + workerQueue: "env_dev", + attempt: 0, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:task:ck:user-42", + timestamp: 1706812800000, + }; + const descriptor = { + orgId: "o1", + projectId: "p1", + envId: "e1", + queue: "task", + concurrencyKey: "user-42", + }; + + const message = reconstructMessageFromWorkerEntry(entry, descriptor); + + expect(message.concurrencyKey).toBe("user-42"); + expect(message.queue).toBe("{org:o1}:proj:p1:env:e1:queue:task:ck:user-42"); + }); + }); + + describe("encodeMessageKeyValue / decodeMessageKeyValue", () => { + test("roundtrips correctly for PRODUCTION", () => { + const original = { + queue: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION" as const, + workerQueue: "env_xyz", + }; + const encoded = encodeMessageKeyValue(original); + const decoded = decodeMessageKeyValue(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for DEVELOPMENT", () => { + const original = { + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800123, + attempt: 5, + environmentType: "DEVELOPMENT" as const, + workerQueue: "env_dev", + }; + const encoded = encodeMessageKeyValue(original); + const decoded = decodeMessageKeyValue(encoded); + + expect(decoded).toEqual(original); + }); + + test("encoded value starts with v3: prefix", () => { + const encoded = encodeMessageKeyValue({ + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_xyz", + }); + + expect(encoded.startsWith("v3:")).toBe(true); + }); + + test("decodeMessageKeyValue returns undefined for JSON", () => { + expect(decodeMessageKeyValue('{"version":"2","runId":"run_123"}')).toBeUndefined(); + }); + + test("decodeMessageKeyValue returns undefined for malformed v3", () => { + expect(decodeMessageKeyValue("v3:only_two_parts")).toBeUndefined(); + }); + }); + + describe("isV3MessageKeyValue", () => { + test("returns true for v3 encoded message key value", () => { + const encoded = encodeMessageKeyValue({ + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_xyz", + }); + expect(isV3MessageKeyValue(encoded)).toBe(true); + }); + + test("returns false for JSON format", () => { + expect(isV3MessageKeyValue('{"version":"2","runId":"run_123"}')).toBe(false); + }); + + test("returns false for legacy v2 format", () => { + expect(isV3MessageKeyValue('{"version":"2"}')).toBe(false); + }); + }); + + describe("encoded size comparison", () => { + test("v3 message key format is significantly smaller than full JSON", () => { + const fullPayload = JSON.stringify({ + version: "2", + runId: "run_clxyz123abc456def789", + taskIdentifier: "my-background-task", + orgId: "org_clxyz123abc456def789", + projectId: "proj_clxyz123abc456def789", + environmentId: "env_clxyz123abc456def789", + environmentType: "PRODUCTION", + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 0, + workerQueue: "env_clxyz123abc456def789", + }); + + const v3MessageKey = encodeMessageKeyValue({ + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_clxyz123abc456def789", + }); + + // Full JSON is typically 400-600 bytes + // V3 message key should be ~200 bytes (includes the full queue key) + expect(v3MessageKey.length).toBeLessThan(fullPayload.length * 0.6); + + console.log(`Full JSON size: ${fullPayload.length} bytes`); + console.log(`V3 message key size: ${v3MessageKey.length} bytes`); + console.log( + `Reduction: ${((1 - v3MessageKey.length / fullPayload.length) * 100).toFixed(1)}%` + ); + }); + + test("v3 queue member format is significantly smaller than full JSON", () => { + const fullPayload = JSON.stringify({ + version: "2", + runId: "run_clxyz123abc456def789", + taskIdentifier: "my-background-task", + orgId: "org_clxyz123abc456def789", + projectId: "proj_clxyz123abc456def789", + environmentId: "env_clxyz123abc456def789", + environmentType: "PRODUCTION", + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 0, + workerQueue: "env_clxyz123abc456def789", + }); + + const v3Encoded = encodeQueueMember({ + runId: "run_clxyz123abc456def789", + workerQueue: "env_clxyz123abc456def789", + attempt: 0, + environmentType: "PRODUCTION", + }); + + // Full JSON is typically 400-600 bytes + // v3 encoded queue member should be ~70-80 bytes + expect(v3Encoded.length).toBeLessThan(fullPayload.length * 0.2); + + console.log(`Full JSON size: ${fullPayload.length} bytes`); + console.log(`V3 encoded size: ${v3Encoded.length} bytes`); + console.log(`Reduction: ${((1 - v3Encoded.length / fullPayload.length) * 100).toFixed(1)}%`); + }); + }); +}); diff --git a/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts b/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts new file mode 100644 index 0000000000..058b8eb321 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts @@ -0,0 +1,655 @@ +import { redisTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { setTimeout } from "node:timers/promises"; +import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; +import { RunQueue } from "../index.js"; +import { RunQueueFullKeyProducer } from "../keyProducer.js"; +import { InputPayload } from "../types.js"; +import { + encodeQueueMember, + encodeWorkerQueueEntry, + decodeQueueMember, + decodeWorkerQueueEntry, + isEncodedQueueMember, + isEncodedWorkerQueueEntry, +} from "../messageEncoding.js"; + +// Mock Decimal since we can't import from @trigger.dev/database without Prisma +const createDecimal = (value: number) => ({ + toNumber: () => value, + toString: () => value.toString(), +}); + +const testOptions = { + name: "rq", + tracer: trace.getTracer("rq"), + workers: 1, + defaultEnvConcurrency: 25, + logger: new Logger("RunQueue", "warn"), + retryOptions: { + maxAttempts: 5, + factor: 1.1, + minTimeoutInMs: 100, + maxTimeoutInMs: 1_000, + randomize: true, + }, + keys: new RunQueueFullKeyProducer(), +}; + +const createEnv = (id: string, type: "DEVELOPMENT" | "PRODUCTION" = "DEVELOPMENT") => ({ + id, + type, + maximumConcurrencyLimit: 10, + concurrencyLimitBurstFactor: createDecimal(1.0), + project: { id: "p1234" }, + organization: { id: "o1234" }, +}); + +const createMessage = ( + runId: string, + timestamp: number = Date.now() +): InputPayload => ({ + runId, + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e1234", + environmentType: "DEVELOPMENT", + queue: "task/my-task", + timestamp, + attempt: 0, +}); + +vi.setConfig({ testTimeout: 60_000 }); + +describe("Message Format Handling", () => { + describe("V2 (Legacy) Format", () => { + redisTest( + "should enqueue and dequeue messages in V2 format when useOptimizedMessageFormat is false", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, // V2 legacy format + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_test_1"); + + // Enqueue message + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify message key was created (V2 format) + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(1); // Message key should exist in V2 format + + // Verify queue length + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait for worker to process + await setTimeout(600); + + // Dequeue and verify + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.version).toBe("2"); + expect(dequeued?.message.runId).toBe(message.runId); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should read message from message key in V2 format", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2read:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2read:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_read_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Read message directly + const readMessage = await queue.readMessage(message.orgId, message.runId); + expect(readMessage).toBeDefined(); + expect(readMessage?.runId).toBe(message.runId); + expect(readMessage?.taskIdentifier).toBe(message.taskIdentifier); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should acknowledge V2 message and delete message key", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_ack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Wait for processing + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + + // Acknowledge + await queue.acknowledgeMessage(message.orgId, message.runId); + + // Verify message key is deleted + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(0); + + // Verify queue is empty + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should nack V2 message and update message key with incremented attempt", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_nack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.message.attempt).toBe(0); + + // Nack (will increment attempt and requeue) + const nackResult = await queue.nackMessage({ + orgId: message.orgId, + messageId: message.runId, + retryAt: Date.now(), // Retry immediately + }); + expect(nackResult).toBe(true); + + // Verify message is back in queue + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Read message and verify attempt was incremented + const updatedMessage = await queue.readMessage(message.orgId, message.runId); + expect(updatedMessage?.attempt).toBe(1); + } finally { + await queue.quit(); + } + } + ); + }); + + describe("V3 (Optimized) Format", () => { + redisTest( + "should enqueue and dequeue messages in V3 format when useOptimizedMessageFormat is true", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, // V3 optimized format + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_test_1"); + + // Enqueue message + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify NO message key was created (V3 format) + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(0); // Message key should NOT exist in V3 format + + // Verify queue length + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait for worker to process + await setTimeout(600); + + // Dequeue and verify + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.version).toBe("2"); // Still returns OutputPayloadV2 + expect(dequeued?.message.runId).toBe(message.runId); + expect(dequeued?.message.taskIdentifier).toBe(message.taskIdentifier); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should acknowledge V3 message without error (no message key to delete)", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_ack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + + // Acknowledge (should not error even though no message key exists) + await queue.acknowledgeMessage(message.orgId, message.runId); + + // Verify queue is empty + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should nack V3 message and requeue with encoded format", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_nack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.message.attempt).toBe(0); + + // Nack + const nackResult = await queue.nackMessage({ + orgId: message.orgId, + messageId: message.runId, + retryAt: Date.now(), + }); + expect(nackResult).toBe(true); + + // Verify message is back in queue + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait and dequeue again to verify attempt was incremented + await setTimeout(600); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued2).toBeDefined(); + expect(dequeued2?.message.attempt).toBe(1); + } finally { + await queue.quit(); + } + } + ); + }); + + describe("Mixed Format Migration", () => { + redisTest( + "V3 queue should be able to read V2 messages during migration", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + + // First, enqueue with V2 format + const queueV2 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, // V2 + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + const message = createMessage("run_mixed_v2_to_v3"); + + await queueV2.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify V2 message key exists + expect(await queueV2.messageExists(message.orgId, message.runId)).toBe(1); + + await queueV2.quit(); + + // Now create V3 queue and try to read the V2 message + const queueV3 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, // V3 + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + await setTimeout(600); + + // V3 queue should be able to dequeue V2 message + const dequeued = await queueV3.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.runId).toBe(message.runId); + } finally { + await queueV3.quit(); + } + } + ); + + redisTest( + "should handle multiple messages with mixed formats", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + + // Create V2 queue and enqueue some messages + const queueV2 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + const messageV2_1 = createMessage("run_v2_1", Date.now() - 3000); + const messageV2_2 = createMessage("run_v2_2", Date.now() - 2000); + + await queueV2.enqueueMessage({ env: env as any, message: messageV2_1, workerQueue: env.id }); + await queueV2.enqueueMessage({ env: env as any, message: messageV2_2, workerQueue: env.id }); + + await queueV2.quit(); + + // Switch to V3 and add more messages + const queueV3 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const messageV3_1 = createMessage("run_v3_1", Date.now() - 1000); + const messageV3_2 = createMessage("run_v3_2", Date.now()); + + await queueV3.enqueueMessage({ env: env as any, message: messageV3_1, workerQueue: env.id }); + await queueV3.enqueueMessage({ env: env as any, message: messageV3_2, workerQueue: env.id }); + + // Total should be 4 messages + expect(await queueV3.lengthOfQueue(env as any, messageV2_1.queue)).toBe(4); + + await setTimeout(600); + + // Dequeue all messages - should handle both formats + const dequeuedMessages: string[] = []; + for (let i = 0; i < 4; i++) { + const dequeued = await queueV3.dequeueMessageFromWorkerQueue("test_consumer", env.id); + if (dequeued) { + dequeuedMessages.push(dequeued.messageId); + } + } + + // All 4 messages should be dequeued successfully + expect(dequeuedMessages).toContain("run_v2_1"); + expect(dequeuedMessages).toContain("run_v2_2"); + expect(dequeuedMessages).toContain("run_v3_1"); + expect(dequeuedMessages).toContain("run_v3_2"); + } finally { + await queueV3.quit(); + } + } + ); + }); + + describe("Encoding Format Detection", () => { + test("isEncodedQueueMember correctly identifies V3 format", () => { + const v3Member = encodeQueueMember({ + runId: "run_test", + workerQueue: "env_test", + attempt: 0, + environmentType: "PRODUCTION", + }); + + expect(isEncodedQueueMember(v3Member)).toBe(true); + expect(isEncodedQueueMember("run_test")).toBe(false); // V2 is just runId + }); + + test("isEncodedWorkerQueueEntry correctly identifies formats", () => { + const v3Entry = encodeWorkerQueueEntry({ + runId: "run_test", + workerQueue: "env_test", + attempt: 0, + environmentType: "PRODUCTION", + queueKey: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: Date.now(), + }); + + const v2Entry = "{org:o1}:message:run_test"; + + expect(isEncodedWorkerQueueEntry(v3Entry)).toBe(true); + expect(isEncodedWorkerQueueEntry(v2Entry)).toBe(false); + }); + + test("decodeQueueMember extracts correct data", () => { + const original = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 3, + environmentType: "STAGING" as const, + }; + + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeWorkerQueueEntry extracts correct data", () => { + const original = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 2, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:my-task", + timestamp: 1706812800000, + }; + + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + }); +}); diff --git a/internal-packages/run-engine/src/run-queue/types.ts b/internal-packages/run-engine/src/run-queue/types.ts index ee1ce41b79..903ae138e2 100644 --- a/internal-packages/run-engine/src/run-queue/types.ts +++ b/internal-packages/run-engine/src/run-queue/types.ts @@ -133,3 +133,31 @@ export interface RunQueueSelectionStrategy { consumerId: string ): Promise>; } + +/** + * Provider for fetching run data from a persistent store (e.g., PostgreSQL). + * Used for V3 optimized format where message data is not stored in Redis. + */ +export interface RunDataProvider { + /** + * Fetch run data for ack/nack operations. + * Returns undefined if the run is not found. + */ + getRunData(runId: string): Promise; +} + +/** + * Run data needed for queue operations (ack, nack, release concurrency). + */ +export type RunData = { + queue: string; + orgId: string; + projectId: string; + environmentId: string; + environmentType: RuntimeEnvironmentType; + concurrencyKey?: string; + attempt: number; + timestamp: number; + workerQueue: string; + taskIdentifier: string; +};