From ff54b70bad28a53d831d5f4fec0788ecd24dc619 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 6 Feb 2026 12:04:11 +0500 Subject: [PATCH 1/3] Fix _get_next_instance_num rely on autoflush --- .../tasks/process_submitted_jobs.py | 20 +++++++++---------- src/dstack/_internal/server/db.py | 1 + 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 2320394436..fea61ac579 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -481,17 +481,15 @@ async def _process_submitted_job( logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) provisioned_job_models = _get_job_models_for_jobs(run_model.jobs, provisioned_jobs) instance = None # Instance for attaching volumes in case of single job provisioned + # FIXME: Fleet is not locked which may lead to duplicate instance_num. + # This is currently hard to fix without locking the fleet for entire provisioning duration. + # Processing should be done in multiple steps so that + # InstanceModel is created before provisioning. + taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): provisioned_job_model.job_provisioning_data = jpd.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - # FIXME: Fleet is not locked which may lead to duplicate instance_num. - # This is currently hard to fix without locking the fleet for entire provisioning duration. - # Processing should be done in multiple steps so that - # InstanceModel is created before provisioning. - instance_num = await _get_next_instance_num( - session=session, - fleet_model=fleet_model, - ) + instance_num = get_next_instance_num(taken_instance_nums) instance = _create_instance_model_for_job( project=project, fleet_model=fleet_model, @@ -502,6 +500,7 @@ async def _process_submitted_job( instance_num=instance_num, profile=effective_profile, ) + taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, multinode ).json() @@ -906,15 +905,14 @@ async def _create_fleet_model_for_job( return fleet_model -async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) -> int: +async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]: res = await session.execute( select(InstanceModel.instance_num).where( InstanceModel.fleet_id == fleet_model.id, InstanceModel.deleted.is_(False), ) ) - taken_instance_nums = set(res.scalars().all()) - return get_next_instance_num(taken_instance_nums) + return set(res.scalars().all()) def _create_instance_model_for_job( diff --git a/src/dstack/_internal/server/db.py b/src/dstack/_internal/server/db.py index c9ed8d5280..2315c9dbd5 100644 --- a/src/dstack/_internal/server/db.py +++ b/src/dstack/_internal/server/db.py @@ -33,6 +33,7 @@ def __init__(self, url: str, engine: Optional[AsyncEngine] = None): self.session_maker = async_sessionmaker( bind=self.engine, # type: ignore[assignment] expire_on_commit=False, + autoflush=False, class_=AsyncSession, ) From a82d2015d78c1132bdbd344496392be3a66c385a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 6 Feb 2026 12:06:23 +0500 Subject: [PATCH 2/3] Fix TestSwitchInstanceStatus rely on autoflush --- src/tests/_internal/server/services/test_instances.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index ca6432d61e..4883e309cc 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -40,7 +40,7 @@ async def test_includes_termination_reason_in_event_messages_only_once( instance.termination_reason_message = "Some err" instances_services.switch_instance_status(session, instance, InstanceStatus.TERMINATING) instances_services.switch_instance_status(session, instance, InstanceStatus.TERMINATED) - + await session.commit() events = await list_events(session) assert len(events) == 2 assert {e.message for e in events} == { @@ -61,7 +61,7 @@ async def test_includes_termination_reason_in_event_message_when_switching_direc instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = "Some err" instances_services.switch_instance_status(session, instance, InstanceStatus.TERMINATED) - + await session.commit() events = await list_events(session) assert len(events) == 1 assert events[0].message == ( From fc56699030dc46fda73691a3b91b58cf0fbe6092 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 6 Feb 2026 13:38:25 +0500 Subject: [PATCH 3/3] Fix long write transaction when cleaning up placement groups --- .../background/tasks/process_submitted_jobs.py | 13 +++---------- src/dstack/_internal/server/db.py | 1 + 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index fea61ac579..a021096613 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -110,7 +110,6 @@ get_fleet_placement_group_models, get_placement_group_model_for_job, placement_group_model_to_placement_group_optional, - schedule_fleet_placement_groups_deletion, ) from dstack._internal.server.services.runs import ( run_model_to_run, @@ -846,15 +845,9 @@ async def _run_jobs_on_new_instances( finally: if fleet_model is not None and len(fleet_model.instances) == 0: # Clean up placement groups that did not end up being used. - # Flush to update still uncommitted placement groups. - await session.flush() - await schedule_fleet_placement_groups_deletion( - session=session, - fleet_id=fleet_model.id, - except_placement_group_ids=( - [placement_group_model.id] if placement_group_model is not None else [] - ), - ) + for pg in placement_group_models: + if placement_group_model is None or pg.id != placement_group_model.id: + pg.fleet_deleted = True return None diff --git a/src/dstack/_internal/server/db.py b/src/dstack/_internal/server/db.py index 2315c9dbd5..5f43f52e0a 100644 --- a/src/dstack/_internal/server/db.py +++ b/src/dstack/_internal/server/db.py @@ -33,6 +33,7 @@ def __init__(self, url: str, engine: Optional[AsyncEngine] = None): self.session_maker = async_sessionmaker( bind=self.engine, # type: ignore[assignment] expire_on_commit=False, + # Disable autoflush to avoid accidental long write transactions on SQLite. autoflush=False, class_=AsyncSession, )