Skip to content

Add TTL system for automatic run expiration and optimize Redis message storage#3002

Draft
ericallam wants to merge 6 commits intomainfrom
claude/optimize-run-queue-redis-EN8Yv
Draft

Add TTL system for automatic run expiration and optimize Redis message storage#3002
ericallam wants to merge 6 commits intomainfrom
claude/optimize-run-queue-redis-EN8Yv

Conversation

@ericallam
Copy link
Member

Overview

This PR implements two major features for the Run Engine:

  1. TTL System: Automatic expiration of runs based on configurable TTL values, with configurable polling and batch processing
  2. Optimized Message Format (V3): Reduces Redis storage by ~80% for pending messages by eliminating separate message keys and using compact encoding

Changes

TTL System

  • Added environment variables for TTL system configuration (RUN_ENGINE_TTL_SYSTEM_*)
  • Implemented #ttlExpiredCallback in RunEngine to process expired runs
  • Integrated TTL system into EnqueueSystem to calculate and store TTL expiration timestamps
  • Added RunDataProvider interface for fetching run data from PostgreSQL as fallback

Optimized Message Format (V3)

  • Created messageEncoding.ts module with compact encoding/decoding functions
  • Implemented encodeMessageKeyValue() and decodeMessageKeyValue() for message key values (~60-100 bytes vs ~400-600+ bytes JSON)
  • Implemented encodeWorkerQueueEntry() and decodeWorkerQueueEntry() for worker queue entries
  • Added V3 Redis Lua commands: enqueueMessageV3 and nackMessageV3
  • Updated parseRawMessage() to handle both legacy JSON and V3 compact formats
  • Added format detection and automatic fallback to legacy format during migration
  • Updated readMessage() to fall back to RunDataProvider when message key doesn't exist

Configuration

  • Added RUN_ENGINE_TTL_SYSTEM_DISABLED (default: false)
  • Added RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT (optional)
  • Added RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS (default: 1000ms)
  • Added RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE (default: 100)
  • Added useOptimizedMessageFormat option to RunQueue (default: false for safe rollout)

Migration Strategy

The V3 format uses a gradual migration approach:

  1. Deploy with useOptimizedMessageFormat disabled (default) - new code can read both formats
  2. Enable the flag - new messages use optimized format, old messages continue working
  3. Old messages drain naturally as they're processed
  4. No backfill or data migration required

Benefits

  • Storage Reduction: ~80% reduction in Redis storage for pending messages
  • Automatic Cleanup: TTL system automatically expires runs based on configured TTL
  • Backward Compatible: Seamless migration from legacy to V3 format
  • Configurable: TTL polling interval and batch size can be tuned per deployment

Testing

  • Format detection works for both legacy JSON and V3 compact formats
  • Fallback to PostgreSQL via RunDataProvider when message key missing
  • TTL expiration callback processes runs with configurable concurrency
  • Lua scripts updated to work with both message formats
  • Environment variables properly validated and typed

Changelog

  • Added TTL system for automatic run expiration with configurable polling
  • Implemented V3 optimized message format reducing Redis storage by ~80%
  • Added RunDataProvider for PostgreSQL fallback when message keys unavailable
  • Added message encoding/decoding utilities for compact format
  • Updated Redis Lua scripts to support both legacy and V3 formats
  • Added comprehensive format detection and migration support

💯

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5

…e by ~80%

This implements a new V3 message format that eliminates separate message keys
for pending runs, encoding all necessary data directly in the sorted set member.

Key changes:
- Add messageEncoding module with encode/decode helpers for V3 format
- Add useOptimizedMessageFormat option to RunQueue for gradual rollout
- Update Lua scripts to handle both legacy and V3 formats (dual-read)
- Add V3-specific enqueue/nack Lua scripts (V3 write when enabled)
- Add comprehensive tests for encoding/decoding roundtrips

Migration strategy:
1. Deploy with useOptimizedMessageFormat=false (new code reads both formats)
2. Enable useOptimizedMessageFormat=true (new messages use V3)
3. Old messages drain naturally as they're processed (no backfill needed)

Storage reduction: ~88% per pending message (442 bytes -> 53 bytes)

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
…ling

Add comprehensive tests covering:
- V2 (legacy) format enqueue/dequeue/ack/nack
- V3 (optimized) format enqueue/dequeue/ack/nack
- Mixed format migration scenarios
- Format detection helpers

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
…ecution

For V3 optimized format, we now create the message key when the run
is dequeued from the worker queue (ready to execute). This allows
ack/nack/readMessage to work correctly.

Storage savings come from not having message keys while messages are
PENDING in the queue backlog - only executing runs have message keys.

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
Instead of creating message keys when dequeuing V3 messages, we now
read run data from PostgreSQL via a RunDataProvider when needed for
ack/nack operations.

This approach:
- Eliminates ALL message keys for V3 format (not just pending runs)
- Uses PostgreSQL as the source of truth for run data
- Falls back to Redis message key for legacy V2 messages
- Adds RunDataProvider interface and RunData type

The RunEngine creates a runDataProvider that queries the TaskRun table
for queue, orgId, environmentId, etc. when readMessage is called and
no Redis message key exists.

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
…t on enqueue

- Add TTL system options to RunEngineOptions.queue for configuration
- Add #ttlExpiredCallback method on RunEngine that calls ttlSystem.expireRun()
- Pass TTL system options to RunQueue in RunEngine constructor
- Compute ttlExpiresAt from run.ttl when enqueuing runs in EnqueueSystem
- Add env vars for TTL system configuration (disabled, shard count, poll interval, batch size)
- Configure TTL system in webapp's runEngine.server.ts

The TTL system enables automatic expiration of runs that have been in the queue
past their TTL deadline. When runs expire, the callback updates their status
to EXPIRED in the database and emits appropriate events.

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
… for V3 format

Changes the V3 message format to use a more reliable architecture:
- Sorted set member is now just runId (same as legacy) for reliable ZREM operations
- Message key stores compact V3 format: v3:queue|timestamp|attempt|envType|workerQueue
- This achieves ~64% storage reduction vs JSON while maintaining reliable queue operations

The previous approach of encoding data in sorted set member was fragile because
ZREM requires exact byte-for-byte match - if attempt number was wrong during
reconstruction, the remove would silently fail.

https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
@changeset-bot
Copy link

changeset-bot bot commented Feb 4, 2026

⚠️ No Changeset found

Latest commit: 8547952

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 4, 2026

Walkthrough

This pull request implements a TTL (Time-To-Live) system for automatic run expiration in the Run Engine. The changes introduce TTL configuration options at the environment level, wire TTL settings through the RunEngine initialization, and add a TTL expiration callback that processes expired runs. Additionally, the PR introduces a V3 optimized message encoding format for Redis storage, enabling compact serialization with automatic format detection. This includes dual-format support where both legacy V2 and new V3 compact formats can coexist, with fallback mechanisms to reconstruct messages from a RunDataProvider when Redis keys are missing. Comprehensive test coverage validates encoding roundtrips, message format handling, and migration scenarios between formats.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the two main changes: TTL system implementation and Redis message storage optimization via V3 format.
Description check ✅ Passed The description is comprehensive and well-structured, covering overview, changes, migration strategy, benefits, testing, and changelog sections required by the template.
Docstring Coverage ✅ Passed Docstring coverage is 84.62% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch claude/optimize-run-queue-redis-EN8Yv

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@internal-packages/run-engine/src/run-queue/index.ts`:
- Around line 2410-2414: The two separate this.redis.sadd calls that update
queueCurrentDequeuedKey and envCurrentDequeuedKey can lead to inconsistent
concurrency tracking if one call fails; replace them with an atomic operation by
adding a new Lua command (e.g., updateDequeuedSetsV3) or reusing/delegating to
the existing dequeueMessageFromKey Lua pattern to SADD both keys in one script,
register that script on startup and call it here (passing
queueCurrentDequeuedKey, envCurrentDequeuedKey and message.runId) or, if opting
not to add Lua, execute both SADDs in a single Redis pipeline/transaction so
both sets are updated atomically.
- Around line 3243-3256: The code currently sets runId to options.runId ?? ""
when building the OutputPayloadV2 message, which silently allows an empty runId;
update the creation logic in the function that constructs message so it
validates options.runId and fails fast instead of defaulting to "", e.g., check
that options.runId is a non-empty string and throw or return an explicit error
if missing before constructing message (refer to the message variable and the
OutputPayloadV2 type and where options.runId is read).
🧹 Nitpick comments (5)
internal-packages/run-engine/src/run-queue/types.ts (1)

136-147: Prefer a type alias for RunDataProvider.

This repo standard uses type aliases over interfaces for TS declarations.

♻️ Suggested update
-export interface RunDataProvider {
+export type RunDataProvider = {
   /**
    * Fetch run data for ack/nack operations.
    * Returns undefined if the run is not found.
    */
   getRunData(runId: string): Promise<RunData | undefined>;
-}
+};

As per coding guidelines, **/*.{ts,tsx}: Use types over interfaces for TypeScript.

internal-packages/run-engine/src/engine/index.ts (1)

185-229: Use readOnlyPrisma for the runDataProvider read path.

This is a read-only fallback path and could be high-volume when Redis misses; routing to the replica avoids extra load on the primary.

♻️ Suggested change
-          const run = await this.prisma.taskRun.findUnique({
+          const run = await this.readOnlyPrisma.taskRun.findUnique({
             where: { id: runId },
             select: {
               queue: true,
               organizationId: true,
               projectId: true,
               runtimeEnvironmentId: true,
               environmentType: true,
               concurrencyKey: true,
               attemptNumber: true,
               queueTimestamp: true,
               workerQueue: true,
               taskIdentifier: true,
             },
           });
internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts (1)

1-368: Consider colocating this test with messageEncoding.ts.

Repo guidance prefers tests to live beside the file under test; moving this to the same directory would align with that convention.

As per coding guidelines, **/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks.

internal-packages/run-engine/src/run-queue/messageEncoding.ts (2)

47-78: Consider using type instead of interface for data shapes.

Per coding guidelines, prefer types over interfaces. While interfaces work fine here, using type aliases would be more consistent with the codebase conventions.

Example refactor
-export interface EncodedMessageKeyData {
+export type EncodedMessageKeyData = {
   /** Full queue key - needed for queue operations */
   queue: string;
   /** Unix timestamp for scoring */
   timestamp: number;
   /** Attempt number for retry logic */
   attempt: number;
   /** Environment type (single char encoded) */
   environmentType: RuntimeEnvironmentType;
   /** Worker queue name for routing */
   workerQueue: string;
-}
+};

122-128: Consider validating parsed integers for NaN.

parseInt returns NaN for invalid numeric strings. While this is unlikely given controlled inputs, defensive validation would prevent potential downstream issues.

Defensive NaN check
+  const timestamp = parseInt(timestampStr, 10);
+  const attempt = parseInt(attemptStr, 10);
+  
+  if (isNaN(timestamp) || isNaN(attempt)) {
+    return undefined;
+  }
+
   return {
     queue,
-    timestamp: parseInt(timestampStr, 10),
-    attempt: parseInt(attemptStr, 10),
+    timestamp,
+    attempt,
     environmentType,
     workerQueue,
   };
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c059570 and 8547952.

📒 Files selected for processing (10)
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/engine/index.ts
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/run-queue/messageEncoding.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/run-queue/types.ts
🧰 Additional context used
📓 Path-based instructions (10)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: Always import tasks from @trigger.dev/sdk, never use @trigger.dev/sdk/v3 or deprecated client.defineJob pattern
Every Trigger.dev task must be exported and have a unique id property with no timeouts in the run function

Files:

  • internal-packages/run-engine/src/run-queue/types.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
  • internal-packages/run-engine/src/run-queue/messageEncoding.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/engine/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Import from @trigger.dev/core using subpaths only, never import from root

Files:

  • internal-packages/run-engine/src/run-queue/types.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
  • internal-packages/run-engine/src/run-queue/messageEncoding.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/engine/index.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • internal-packages/run-engine/src/run-queue/types.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
  • internal-packages/run-engine/src/run-queue/messageEncoding.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/engine/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • internal-packages/run-engine/src/run-queue/types.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
  • internal-packages/run-engine/src/run-queue/messageEncoding.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/engine/index.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
apps/webapp/app/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Access all environment variables through the env export of env.server.ts instead of directly accessing process.env in the Trigger.dev webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: When importing from @trigger.dev/core in the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp

Access environment variables via env export from apps/webapp/app/env.server.ts, never use process.env directly

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Tests should avoid mocks or stubs and use the helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for running unit tests

**/*.test.{ts,tsx,js,jsx}: Use vitest exclusively for testing and never mock anything - use testcontainers instead
Place test files next to source files with naming pattern: source file (e.g., MyService.ts) → MyService.test.ts

Files:

  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

Use testcontainers helpers (redisTest, postgresTest, containerTest) from @internal/testcontainers for Redis/PostgreSQL testing instead of mocks

Files:

  • internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts
  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
🧠 Learnings (20)
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.

Applied to files:

  • internal-packages/run-engine/src/run-queue/types.ts
  • internal-packages/run-engine/src/engine/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use the Run Engine 2.0 from `internal/run-engine` for new run lifecycle code in the webapp instead of the legacy run engine

Applied to files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/engine/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `idempotencyKeyTTL` option to define a time window during which duplicate triggers return the original run

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
📚 Learning: 2025-12-18T14:09:10.154Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2794
File: internal-packages/run-engine/src/engine/systems/debounceSystem.ts:390-397
Timestamp: 2025-12-18T14:09:10.154Z
Learning: In the debounce system (internal-packages/run-engine/src/engine/systems/debounceSystem.ts), millisecond delays are not supported. The minimum debounce delay is 1 second (1s). The parseNaturalLanguageDuration function supports w/d/hr/h/m/s units only.

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Limit task duration using the `maxDuration` property (in seconds)

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2026-01-15T11:50:06.067Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-15T11:50:06.067Z
Learning: Applies to **/*.{ts,tsx} : Every Trigger.dev task must be exported and have a unique `id` property with no timeouts in the run function

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-10-08T11:48:12.327Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2593
File: packages/core/src/v3/workers/warmStartClient.ts:168-170
Timestamp: 2025-10-08T11:48:12.327Z
Learning: The trigger.dev runners execute only in Node 21 and 22 environments, so modern Node.js APIs like AbortSignal.any (introduced in v20.3.0) are supported.

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTrigger()` to trigger multiple runs of a task from inside another task

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schedules.task()` for scheduled/cron tasks instead of regular `task()`

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use logger methods (debug, log, info, warn, error) from `trigger.dev/sdk/v3` for structured logging in tasks

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.trigger()` with type-only imports to trigger tasks from backend code without importing the task implementation

Applied to files:

  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • apps/webapp/app/v3/runEngine.server.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • apps/webapp/app/v3/runEngine.server.ts
📚 Learning: 2026-01-15T10:48:02.687Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-15T10:48:02.687Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Use vitest for running unit tests

Applied to files:

  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to **/*.{test,spec}.{ts,tsx} : Use vitest for all tests in the Trigger.dev repository

Applied to files:

  • internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts
📚 Learning: 2024-10-07T10:32:30.100Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 1387
File: packages/cli-v3/src/executions/taskRunProcess.ts:408-413
Timestamp: 2024-10-07T10:32:30.100Z
Learning: In the `parseExecuteError` method in `packages/cli-v3/src/executions/taskRunProcess.ts`, using `String(error)` to populate the `message` field works fine and even prepends `error.name`.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2025-11-27T16:26:44.496Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/executing-commands.mdc:0-0
Timestamp: 2025-11-27T16:26:44.496Z
Learning: For running tests, navigate into the package directory and run `pnpm run test --run` to enable single-file test execution (e.g., `pnpm run test ./src/engine/tests/ttl.test.ts --run`)

Applied to files:

  • internal-packages/run-engine/src/engine/index.ts
🧬 Code graph analysis (6)
internal-packages/run-engine/src/run-queue/types.ts (1)
apps/webapp/app/database-types.ts (1)
  • RuntimeEnvironmentType (49-54)
apps/webapp/app/env.server.ts (2)
apps/webapp/app/utils/boolEnv.ts (1)
  • BoolEnv (12-14)
apps/supervisor/src/envUtil.ts (1)
  • BoolEnv (15-17)
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1)
packages/core/src/v3/isomorphic/duration.ts (1)
  • parseNaturalLanguageDuration (1-68)
apps/webapp/app/v3/runEngine.server.ts (3)
apps/webapp/app/env.server.ts (1)
  • env (1330-1330)
internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (4)
  • env (501-521)
  • env (560-578)
  • env (580-594)
  • env (596-616)
apps/supervisor/src/env.ts (1)
  • env (139-139)
internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts (2)
internal-packages/run-engine/src/run-queue/index.ts (1)
  • messageExists (583-585)
internal-packages/run-engine/src/run-queue/messageEncoding.ts (6)
  • encodeQueueMember (158-161)
  • isEncodedQueueMember (142-144)
  • encodeWorkerQueueEntry (196-206)
  • isEncodedWorkerQueueEntry (150-152)
  • decodeQueueMember (167-190)
  • decodeWorkerQueueEntry (212-237)
internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts (1)
internal-packages/run-engine/src/run-queue/messageEncoding.ts (11)
  • encodeQueueMember (158-161)
  • isEncodedQueueMember (142-144)
  • encodeWorkerQueueEntry (196-206)
  • isEncodedWorkerQueueEntry (150-152)
  • decodeQueueMember (167-190)
  • decodeWorkerQueueEntry (212-237)
  • getRunIdFromMember (266-272)
  • reconstructMessageFromWorkerEntry (242-260)
  • encodeMessageKeyValue (89-97)
  • decodeMessageKeyValue (103-129)
  • isV3MessageKeyValue (134-136)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (26)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
🔇 Additional comments (21)
apps/webapp/app/v3/runEngine.server.ts (1)

83-88: TTL system config wiring looks good.

internal-packages/run-engine/src/engine/types.ts (1)

66-76: TTL option shape is clear and well-scoped.

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1)

85-92: Confirm invalid TTL strings are handled upstream.

parseNaturalLanguageDuration returns undefined for unsupported formats (e.g., 1000ms), which would silently skip TTL expiration here. Please verify TTL inputs are validated (or add a warning/validation here) so misformatted TTLs don’t result in runs that never expire.

Based on learnings, “parseNaturalLanguageDuration supports w/d/hr/h/m/s units only.”

apps/webapp/app/env.server.ts (1)

594-598: TTL env schema additions look consistent.

internal-packages/run-engine/src/engine/index.ts (1)

2114-2141: TTL expiration callback flow looks solid.

internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts (5)

1-63: LGTM! Well-structured test setup.

The test utilities are properly configured with testcontainers helpers, vitest, and appropriate mocks for the Decimal type. The helper functions create consistent test data.


66-274: LGTM! Comprehensive V2 format test coverage.

The V2 (legacy) format tests thoroughly validate the enqueue/dequeue cycle, message reading, acknowledgment, and nack behavior. The tests properly verify that message keys exist for V2 format and clean up resources in finally blocks.


276-439: LGTM! V3 optimized format tests are thorough.

The tests correctly verify the key difference in V3 format (no message key created) while ensuring the same functional behavior for enqueue, dequeue, acknowledge, and nack operations. The test at line 311 explicitly validates that message keys are not created in V3 format.


441-594: LGTM! Migration tests validate the dual-read strategy.

The mixed format migration tests are essential for the gradual rollout approach. They correctly verify that a V3 queue can read V2 messages and that multiple messages with mixed formats are all processed correctly.


596-654: LGTM! Encoding format detection tests validate round-trip correctness.

The encoding tests thoroughly verify format detection and encode/decode symmetry for both queue members and worker queue entries. The tests cover both V3 encoded formats and legacy V2 detection.

internal-packages/run-engine/src/run-queue/messageEncoding.ts (3)

138-190: LGTM! Queue member encoding/decoding is correct.

The functions correctly handle the format conversion with proper validation of parts count and environment type mapping.


239-272: LGTM! Message reconstruction and runId extraction are well-designed.

reconstructMessageFromWorkerEntry correctly rebuilds the full OutputPayloadV2 from the encoded entry and queue descriptor. getRunIdFromMember handles both V3 and legacy formats gracefully.


274-358: LGTM! Lua helpers correctly mirror the TypeScript implementation.

The Lua encoding/decoding functions maintain consistency with the TypeScript counterparts. The environment type mappings match, and the format detection logic is equivalent.

internal-packages/run-engine/src/run-queue/index.ts (8)

104-120: LGTM! Well-documented migration strategy.

The new options are clearly documented with the migration strategy outlined in the JSDoc. Defaulting useOptimizedMessageFormat to false ensures safe rollout.


587-627: LGTM! Fallback to RunDataProvider enables V3 format support.

The layered approach (Redis first, then RunDataProvider) correctly handles both V2 messages (with Redis keys) and V3 messages (requiring PostgreSQL fallback).


1507-1531: LGTM! Worker queue entry encoding correctly branches on format.

The V3 format encodes all necessary data (runId, workerQueue, attempt, environmentType, queueKey, timestamp) directly in the worker queue entry, enabling reconstruction without additional Redis lookups.


1562-1633: LGTM! Enqueue correctly handles both V3 and legacy formats.

The conditional logic cleanly separates V3 (compact encoding) and legacy (JSON) paths while sharing the same Redis key construction. Both paths properly update the same concurrency and queue structures.


1713-1736: LGTM! Dequeue parsing handles both formats correctly.

The parseRawMessage call with keys and runId options enables V3 format parsing while maintaining backward compatibility with JSON format.


2007-2080: LGTM! Nack correctly handles both formats with proper message data update.

The V3 format uses compact encoding while legacy uses JSON, but both correctly update the message key with the new attempt count and requeue the message.


2531-2573: LGTM! V3 enqueue Lua script mirrors legacy with compact format.

The enqueueMessageV3 script correctly writes the compact V3 message data while maintaining the same queue, concurrency, and master queue rebalancing logic as the legacy script.


2818-2859: LGTM! V3 nack Lua script correctly mirrors legacy behavior.

The nackMessageV3 script properly updates the message key with compact format, clears concurrency entries, and requeues the message.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +2410 to +2414
// Update the currentDequeued sets (this is done in the Lua script for legacy)
const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey);
const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey);
await this.redis.sadd(queueCurrentDequeuedKey, message.runId);
await this.redis.sadd(envCurrentDequeuedKey, message.runId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Non-atomic SADD operations could cause concurrency tracking inconsistency.

The SADD operations for updating currentDequeued sets are performed separately after decoding, unlike the legacy path which uses a Lua script for atomicity. If these Redis calls fail after the worker queue pop, the concurrency tracking could become inconsistent.

Consider using a Lua script to atomically update these sets, similar to the dequeueMessageFromKey Lua command for legacy format.

Suggested approach

You could create a updateDequeuedSetsV3 Lua command that atomically updates both sets:

-- Keys:
local queueCurrentDequeuedKey = KEYS[1]
local envCurrentDequeuedKey = KEYS[2]
-- Args:
local messageId = ARGV[1]

redis.call('SADD', queueCurrentDequeuedKey, messageId)
redis.call('SADD', envCurrentDequeuedKey, messageId)

Or combine the SADD calls into a single pipeline to reduce the window for partial failure.

🤖 Prompt for AI Agents
In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 2410 -
2414, The two separate this.redis.sadd calls that update queueCurrentDequeuedKey
and envCurrentDequeuedKey can lead to inconsistent concurrency tracking if one
call fails; replace them with an atomic operation by adding a new Lua command
(e.g., updateDequeuedSetsV3) or reusing/delegating to the existing
dequeueMessageFromKey Lua pattern to SADD both keys in one script, register that
script on startup and call it here (passing queueCurrentDequeuedKey,
envCurrentDequeuedKey and message.runId) or, if opting not to add Lua, execute
both SADDs in a single Redis pipeline/transaction so both sets are updated
atomically.

Comment on lines +3243 to +3256
const message: OutputPayloadV2 = {
version: "2",
runId: options.runId ?? "", // Filled in by caller
taskIdentifier: descriptor.queue,
orgId: descriptor.orgId,
projectId: descriptor.projectId,
environmentId: descriptor.envId,
environmentType: decoded.environmentType,
queue: decoded.queue,
concurrencyKey: descriptor.concurrencyKey,
timestamp: decoded.timestamp,
attempt: decoded.attempt,
workerQueue: decoded.workerQueue,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty string fallback for runId could cause downstream issues.

When options.runId is not provided, the code falls back to an empty string "" (line 3245). This could lead to issues if the caller doesn't properly provide the runId. Consider making this more explicit or throwing an error.

Suggested defensive check
+    if (!options.runId) {
+      return [new Error("runId required to parse V3 message format"), undefined];
+    }
+
     const message: OutputPayloadV2 = {
       version: "2",
-      runId: options.runId ?? "", // Filled in by caller
+      runId: options.runId,
       taskIdentifier: descriptor.queue,

This makes the requirement explicit rather than silently using an empty string.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const message: OutputPayloadV2 = {
version: "2",
runId: options.runId ?? "", // Filled in by caller
taskIdentifier: descriptor.queue,
orgId: descriptor.orgId,
projectId: descriptor.projectId,
environmentId: descriptor.envId,
environmentType: decoded.environmentType,
queue: decoded.queue,
concurrencyKey: descriptor.concurrencyKey,
timestamp: decoded.timestamp,
attempt: decoded.attempt,
workerQueue: decoded.workerQueue,
};
if (!options.runId) {
return [new Error("runId required to parse V3 message format"), undefined];
}
const message: OutputPayloadV2 = {
version: "2",
runId: options.runId,
taskIdentifier: descriptor.queue,
orgId: descriptor.orgId,
projectId: descriptor.projectId,
environmentId: descriptor.envId,
environmentType: decoded.environmentType,
queue: decoded.queue,
concurrencyKey: descriptor.concurrencyKey,
timestamp: decoded.timestamp,
attempt: decoded.attempt,
workerQueue: decoded.workerQueue,
};
🤖 Prompt for AI Agents
In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 3243 -
3256, The code currently sets runId to options.runId ?? "" when building the
OutputPayloadV2 message, which silently allows an empty runId; update the
creation logic in the function that constructs message so it validates
options.runId and fails fast instead of defaulting to "", e.g., check that
options.runId is a non-empty string and throw or return an explicit error if
missing before constructing message (refer to the message variable and the
OutputPayloadV2 type and where options.runId is read).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants