From ffee68bfbadd27886057fdead1f5f459dfc7905b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Wed, 21 Jan 2026 13:19:50 +0100 Subject: [PATCH] [IMP] queue_job: prevent commit during queue job execution This would release the job lock, causing spurious restarts by the dead jobs requeuer. --- queue_job/controllers/main.py | 58 +++++++++++++++++++++++++++++------ queue_job/models/queue_job.py | 4 ++- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index adc450d52..f1232a60e 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -6,6 +6,7 @@ import random import time import traceback +from contextlib import contextmanager from io import StringIO from psycopg2 import OperationalError, errorcodes @@ -25,6 +26,29 @@ DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5 +@contextmanager +def _prevent_commit(cr): + """Context manager to prevent commits on a cursor. + + Commiting while the job is not finished would release the job lock, causing + it to be started again by the dead jobs requeuer. + """ + + def forbidden_commit(*args, **kwargs): + raise RuntimeError( + "Commit is forbidden in queue jobs. " + "If the current job is a cron running as queue job, " + "modify it to run as a normal cron." + ) + + original_commit = cr.commit + cr.commit = forbidden_commit + try: + yield + finally: + cr.commit = original_commit + + class RunJobController(http.Controller): @classmethod def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: @@ -68,13 +92,16 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: def _try_perform_job(cls, env, job): """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() - # Triggers any stored computed fields before calling 'set_done' - # so that will be part of the 'exec_time' - env.flush_all() - job.set_done() - job.store() - env.flush_all() + # TODO refactor, the relation between env and job.env is not clear + assert env.cr is job.env.cr + with _prevent_commit(env.cr): + job.perform() + # Triggers any stored computed fields before calling 'set_done' + # so that will be part of the 'exec_time' + env.flush_all() + job.set_done() + job.store() + env.flush_all() env.cr.commit() _logger.debug("%s done", job) @@ -211,6 +238,7 @@ def create_test_job( size=1, failure_rate=0, job_duration=0, + commit_within_job=False, ): """Create test jobs @@ -266,6 +294,7 @@ def create_test_job( description=description, failure_rate=failure_rate, job_duration=job_duration, + commit_within_job=commit_within_job, ) if size > 1: @@ -277,6 +306,7 @@ def create_test_job( description=description, failure_rate=failure_rate, job_duration=job_duration, + commit_within_job=commit_within_job, ) return "" @@ -289,6 +319,7 @@ def _create_single_test_job( size=1, failure_rate=0, job_duration=0, + commit_within_job=False, ): delayed = ( http.request.env["queue.job"] @@ -298,7 +329,11 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate, job_duration=job_duration) + ._test_job( + failure_rate=failure_rate, + job_duration=job_duration, + commit_within_job=commit_within_job, + ) ) return "job uuid: %s" % (delayed.db_record().uuid,) @@ -313,6 +348,7 @@ def _create_graph_test_jobs( description="Test job", failure_rate=0, job_duration=0, + commit_within_job=False, ): model = http.request.env["queue.job"] current_count = 0 @@ -335,7 +371,11 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate, job_duration=job_duration) + )._test_job( + failure_rate=failure_rate, + job_duration=job_duration, + commit_within_job=commit_within_job, + ) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index d538a2a75..f8c117fa2 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -459,9 +459,11 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0, job_duration=0): + def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") if job_duration: time.sleep(job_duration) + if commit_within_job: + self.env.cr.commit() # pylint: disable=invalid-commit