From b06f470aa53a898444ef7e7bd0ef46273aa6b573 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 13 Feb 2026 12:41:19 +0100 Subject: [PATCH] Fix race condition between job enqueue and concurrency unblock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This addresses #456. There is a race condition in the concurrency control mechanism where a job that finishes and tries to unblock the next blocked execution can miss a `BlockedExecution` that is being created concurrently. This causes the blocked job to remain stuck until the `ConcurrencyMaintenance` periodic task runs (potentially minutes later). It happens as follows: 1. Job A is running (semaphore value=0) 2. Job B enqueue starts: reads semaphore (value=0, no row lock) → decides to block 3. Job A finishes: `Semaphore.signal` → `UPDATE` value to 1 (succeeds immediately since no lock held) 4. Job A: `BlockedExecution.release_one` → `SELECT` finds nothing (Job B's `BlockedExecution` not committed yet) 5. Job B enqueue commits: `BlockedExecution` now exists but nobody will unblock it The root cause is that `Semaphore::Proxy#wait` doesn't lock the semaphore row when checking the semaphore. This allows the concurrent `signal` to complete before the enqueue transaction commits, creating a window where the `BlockedExecution` is invisible. To fix, we lock the semaphore row with `FOR UPDATE` during the wait check so that the enqueue transaction holds the lock from the check through `BlockedExecution` creation and commit. This forces a concurrent signal `UPDATE` to wait, guaranteeing the `BlockedExecution` is visible when release_one runs. This shouldn't introduce any dead locks, as there's no new circular dependencies introduced by these two: - Enqueue path: locks `Semaphore` row → `INSERT`s `BlockedExecution` (no lock on existing rows) - `release_one` path: locks `BlockedExecution` row (`SKIP LOCKED`) → locks `Semaphore` row (via wait in release) Co-Authored-By: Claude Opus 4.6 --- app/models/solid_queue/semaphore.rb | 2 +- test/models/solid_queue/semaphore_test.rb | 119 ++++++++++++++++++++++ 2 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 test/models/solid_queue/semaphore_test.rb diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index e93b2419..d8caa64e 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -40,7 +40,7 @@ def initialize(job) end def wait - if semaphore = Semaphore.find_by(key: key) + if semaphore = Semaphore.lock.find_by(key: key) semaphore.value > 0 && attempt_decrement else attempt_creation diff --git a/test/models/solid_queue/semaphore_test.rb b/test/models/solid_queue/semaphore_test.rb new file mode 100644 index 00000000..432b97af --- /dev/null +++ b/test/models/solid_queue/semaphore_test.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +require "test_helper" + +class SolidQueue::SemaphoreTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + @result = JobResult.create!(queue_name: "default") + end + + test "wait acquires a row lock that blocks concurrent signal" do + skip_on_sqlite + + # Enqueue first job to create semaphore with value=0 + NonOverlappingUpdateResultJob.perform_later(@result) + concurrency_key = SolidQueue::Job.last.concurrency_key + assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value + + lock_held = Concurrent::Event.new + + # Background thread: holds a FOR UPDATE lock on the semaphore row + locker = Thread.new do + SolidQueue::Record.connection_pool.with_connection do + SolidQueue::Record.transaction do + SolidQueue::Semaphore.where(key: concurrency_key).lock.first + lock_held.set + sleep 1 + end + end + end + + lock_held.wait(5) + sleep 0.1 + + # Main thread: this UPDATE should block until the locker's transaction commits + start = monotonic_now + SolidQueue::Semaphore.where(key: concurrency_key).update_all("value = value + 1") + elapsed = monotonic_now - start + + locker.join(5) + + assert elapsed >= 0.5, "UPDATE should have been blocked by FOR UPDATE lock (took #{elapsed.round(3)}s)" + assert_equal 1, SolidQueue::Semaphore.find_by(key: concurrency_key).value + end + + test "blocked execution created during enqueue is visible to release_one after signal" do + skip_on_sqlite + + # Enqueue first job to create semaphore with value=0 + NonOverlappingUpdateResultJob.perform_later(@result) + job_a = SolidQueue::Job.last + concurrency_key = job_a.concurrency_key + assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value + + lock_held = Concurrent::Event.new + + # Background thread: simulates the enqueue path for a second job. + # Locks the semaphore row (as the code does), creates a BlockedExecution, + # then holds the transaction open to simulate the window where the race occurs. + enqueue_thread = Thread.new do + SolidQueue::Record.connection_pool.with_connection do + SolidQueue::Record.transaction do + # Lock the semaphore (same as Semaphore::Proxy#wait) + SolidQueue::Semaphore.where(key: concurrency_key).lock.first + + # Create job and blocked execution bypassing after_create callbacks + # to avoid re-entering Semaphore.wait + uuid = SecureRandom.uuid + SolidQueue::Job.insert({ + queue_name: "default", + class_name: "NonOverlappingUpdateResultJob", + concurrency_key: concurrency_key, + active_job_id: uuid, + arguments: "{}", + scheduled_at: Time.current + }) + job_b_id = SolidQueue::Job.where(active_job_id: uuid).pick(:id) + + SolidQueue::BlockedExecution.insert({ + job_id: job_b_id, + queue_name: "default", + concurrency_key: concurrency_key, + expires_at: SolidQueue.default_concurrency_control_period.from_now, + priority: 0 + }) + + lock_held.set + + # Hold the transaction open so the signal path must wait + sleep 1 + end + end + end + + lock_held.wait(5) + sleep 0.1 + + # Main thread: simulates job_a finishing — signal + release_one. + # The signal UPDATE will block until the enqueue transaction commits, + # guaranteeing the BlockedExecution is visible to release_one. + assert SolidQueue::Semaphore.signal(job_a) + assert SolidQueue::BlockedExecution.release_one(concurrency_key), + "release_one should find the BlockedExecution created by the concurrent enqueue" + + enqueue_thread.join(5) + + assert_equal 0, SolidQueue::BlockedExecution.where(concurrency_key: concurrency_key).count + end + + private + def skip_on_sqlite + skip "Row-level locking not supported on SQLite" if SolidQueue::Record.connection.adapter_name.downcase.include?("sqlite") + end + + def monotonic_now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end +end