From 5cb1f8581deb33a36ff256a1afd8d19d87a3f3fe Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 13 Jan 2026 14:48:47 +0500 Subject: [PATCH 01/20] Add DSTACK_GATEWAY_PROCESSING_DISABLED --- src/dstack/_internal/server/background/__init__.py | 13 +++++++++++-- .../server/background/tasks/process_gateways.py | 1 + src/dstack/_internal/server/settings.py | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 85af7d3315..1a2e05fd7c 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -33,6 +33,10 @@ process_terminating_jobs, ) from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + _scheduler = AsyncIOScheduler() @@ -76,8 +80,13 @@ def start_background_tasks() -> AsyncIOScheduler: collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 ) _scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1) - _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) - _scheduler.add_job(process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5) + if settings.GATEWAY_PROCESSING_DISABLED: + logger.info("Gateway processing disabled") + else: + _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) + _scheduler.add_job( + process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5 + ) _scheduler.add_job( process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) diff --git a/src/dstack/_internal/server/background/tasks/process_gateways.py b/src/dstack/_internal/server/background/tasks/process_gateways.py index a54cb9e319..3db5a28248 100644 --- a/src/dstack/_internal/server/background/tasks/process_gateways.py +++ b/src/dstack/_internal/server/background/tasks/process_gateways.py @@ -24,6 +24,7 @@ logger = get_logger(__name__) +@sentry_utils.instrument_background_task async def process_gateways_connections(): await _remove_inactive_connections() await _process_active_connections() diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 74d1d7b8d5..0fb94d1f6e 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -132,6 +132,7 @@ DO_NOT_UPDATE_DEFAULT_PROJECT = os.getenv("DSTACK_DO_NOT_UPDATE_DEFAULT_PROJECT") is not None SKIP_GATEWAY_UPDATE = os.getenv("DSTACK_SKIP_GATEWAY_UPDATE") is not None ENABLE_PROMETHEUS_METRICS = os.getenv("DSTACK_ENABLE_PROMETHEUS_METRICS") is not None +GATEWAY_PROCESSING_DISABLED = os.getenv("DSTACK_GATEWAY_PROCESSING_DISABLED") is not None class JobNetworkMode(Enum): From 785b3cfbfde142012559ff76dbcc310a57b60e0c Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 13 Jan 2026 17:44:03 +0500 Subject: [PATCH 02/20] Add DSTACK_LIST_NO_ACTIVE_FLEETS_DISABLED --- src/dstack/_internal/server/services/fleets.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index e347829fa4..e1c2522160 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -1,3 +1,4 @@ +import os import uuid from collections.abc import Callable from datetime import datetime @@ -113,6 +114,10 @@ async def list_projects_with_no_active_fleets( Applies to all users (both regular users and admins require membership). """ + # Testing https://github.com/sqlalchemy/sqlalchemy/discussions/12536 + if os.getenv("DSTACK_LIST_NO_ACTIVE_FLEETS_DISABLED") is not None: + return [] + active_fleet_alias = aliased(FleetModel) member_alias = aliased(MemberModel) From de6760184dd276d13eb20f2de40d000565c2e85a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 14 Jan 2026 10:30:15 +0500 Subject: [PATCH 03/20] Allow disabling every background task --- .../_internal/server/background/__init__.py | 133 ++++++++++-------- src/dstack/_internal/server/settings.py | 1 - 2 files changed, 77 insertions(+), 57 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 1a2e05fd7c..88b1dca8d1 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -1,3 +1,5 @@ +import os + from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger @@ -71,74 +73,93 @@ def start_background_tasks() -> AsyncIOScheduler: # that the first waiting for the lock will acquire it. # The jitter is needed to give all tasks a chance to acquire locks. - _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) - _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) - _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) - _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) + if os.getenv("DSTACK_PROBES_PROCESSING_DISABLED") is None: + _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) + if os.getenv("DSTACK_COLLECT_METRICS_PROCESSING_DISABLED") is None: + _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) + if os.getenv("DSTACK_DELETE_METRICS_PROCESSING_DISABLED") is None: + _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) + if os.getenv("DSTACK_DELETE_EVENTS_PROCESSING_DISABLED") is None: + _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: - _scheduler.add_job( - collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 - ) - _scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1) - if settings.GATEWAY_PROCESSING_DISABLED: - logger.info("Gateway processing disabled") - else: + if os.getenv("DSTACK_COLLECT_PROMETHEUS_METRICS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 + ) + if os.getenv("DSTACK_DELETE_PROMETHEUS_METRICS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1 + ) + if os.getenv("DSTACK_GATEWAY_PROCESSING_DISABLED") is None: _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) _scheduler.add_job( process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) - _scheduler.add_job( - process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 - ) - _scheduler.add_job( - process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1 - ) - _scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5)) - _scheduler.add_job( - process_fleets, - IntervalTrigger(seconds=10, jitter=2), - max_instances=1, - ) - _scheduler.add_job(delete_instance_health_checks, IntervalTrigger(minutes=5), max_instances=1) - for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR): - # Add multiple copies of tasks if requested. - # max_instances=1 for additional copies to avoid running too many tasks. - # Move other tasks here when they need per-replica scaling. + if os.getenv("DSTACK_SUBMITTED_VOLUMES_PROCESSING_DISABLED") is None: _scheduler.add_job( - process_submitted_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, - max_instances=4 if replica == 0 else 1, + process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) + if os.getenv("DSTACK_IDLE_VOLUMES_PROCESSING_DISABLED") is None: _scheduler.add_job( - process_running_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, - max_instances=2 if replica == 0 else 1, + process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1 ) + if os.getenv("DSTACK_PLACEMENT_GROUPS_PROCESSING_DISABLED") is None: + _scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5)) + if os.getenv("DSTACK_FLEETS_PROCESSING_DISABLED") is None: _scheduler.add_job( - process_terminating_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, - max_instances=2 if replica == 0 else 1, + process_fleets, + IntervalTrigger(seconds=10, jitter=2), + max_instances=1, ) + if os.getenv("DSTACK_DELETE_INSTANCE_HEALTH_CHECKS_PROCESSING_DISABLED") is None: _scheduler.add_job( - process_runs, - IntervalTrigger(seconds=2, jitter=1), - kwargs={"batch_size": 5}, - max_instances=2 if replica == 0 else 1, - ) - _scheduler.add_job( - process_instances, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, - max_instances=2 if replica == 0 else 1, - ) - _scheduler.add_job( - process_compute_groups, - IntervalTrigger(seconds=15, jitter=2), - kwargs={"batch_size": 1}, - max_instances=2 if replica == 0 else 1, + delete_instance_health_checks, IntervalTrigger(minutes=5), max_instances=1 ) + for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR): + # Add multiple copies of tasks if requested. + # max_instances=1 for additional copies to avoid running too many tasks. + # Move other tasks here when they need per-replica scaling. + if os.getenv("DSTACK_SUBMITTED_JOBS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_submitted_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=4 if replica == 0 else 1, + ) + if os.getenv("DSTACK_RUNNING_JOBS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_running_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + if os.getenv("DSTACK_TERMINATING_JOBS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_terminating_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + if os.getenv("DSTACK_RUNS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_runs, + IntervalTrigger(seconds=2, jitter=1), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + if os.getenv("DSTACK_INSTANCES_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_instances, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + if os.getenv("DSTACK_COMPUTE_GROUPS_PROCESSING_DISABLED") is None: + _scheduler.add_job( + process_compute_groups, + IntervalTrigger(seconds=15, jitter=2), + kwargs={"batch_size": 1}, + max_instances=2 if replica == 0 else 1, + ) _scheduler.start() return _scheduler diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 0fb94d1f6e..74d1d7b8d5 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -132,7 +132,6 @@ DO_NOT_UPDATE_DEFAULT_PROJECT = os.getenv("DSTACK_DO_NOT_UPDATE_DEFAULT_PROJECT") is not None SKIP_GATEWAY_UPDATE = os.getenv("DSTACK_SKIP_GATEWAY_UPDATE") is not None ENABLE_PROMETHEUS_METRICS = os.getenv("DSTACK_ENABLE_PROMETHEUS_METRICS") is not None -GATEWAY_PROCESSING_DISABLED = os.getenv("DSTACK_GATEWAY_PROCESSING_DISABLED") is not None class JobNetworkMode(Enum): From 8e747c0786713f5412b68cc009cfcac799362315 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 14 Jan 2026 16:14:55 +0500 Subject: [PATCH 04/20] Optimize process_running_jobs select --- .../background/tasks/process_running_jobs.py | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 341b47a38b..353ef92573 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -5,9 +5,9 @@ from datetime import timedelta from typing import Dict, List, Optional -from sqlalchemy import select +from sqlalchemy import and_, func, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, load_only +from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only from dstack._internal import settings from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_SHIM_HTTP_PORT @@ -148,14 +148,37 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): .execution_options(populate_existing=True) ) job_model = res.unique().scalar_one() + # Select only latest submissions for every job. + latest_submissions_sq = ( + select( + JobModel.run_id.label("run_id"), + JobModel.replica_num.label("replica_num"), + JobModel.job_num.label("job_num"), + func.max(JobModel.submission_num).label("max_submission_num"), + ) + .where(JobModel.run_id == job_model.run_id) + .group_by(JobModel.run_id, JobModel.replica_num, JobModel.job_num) + .subquery() + ) + job_alias = aliased(JobModel) res = await session.execute( select(RunModel) .where(RunModel.id == job_model.run_id) + .join(job_alias, job_alias.run_id == RunModel.id) + .join( + latest_submissions_sq, + onclause=and_( + job_alias.run_id == latest_submissions_sq.c.run_id, + job_alias.replica_num == latest_submissions_sq.c.replica_num, + job_alias.job_num == latest_submissions_sq.c.job_num, + job_alias.submission_num == latest_submissions_sq.c.max_submission_num, + ), + ) .options(joinedload(RunModel.project)) .options(joinedload(RunModel.user)) .options(joinedload(RunModel.repo)) .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) - .options(joinedload(RunModel.jobs)) + .options(contains_eager(RunModel.jobs, alias=job_alias)) ) run_model = res.unique().scalar_one() repo_model = run_model.repo From e94fa835ed5488e4ff2dcdb9d165d4e99d8e4087 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 11:16:21 +0500 Subject: [PATCH 05/20] Optimize process_runs select --- .../background/tasks/process_running_jobs.py | 91 ++++++++++--------- .../server/background/tasks/process_runs.py | 61 +++++++++---- 2 files changed, 92 insertions(+), 60 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 353ef92573..f5ca6c61ae 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -139,48 +139,8 @@ async def _process_next_running_job(): async def _process_running_job(session: AsyncSession, job_model: JobModel): - # Refetch to load related attributes. - res = await session.execute( - select(JobModel) - .where(JobModel.id == job_model.id) - .options(joinedload(JobModel.instance).joinedload(InstanceModel.project)) - .options(joinedload(JobModel.probes).load_only(ProbeModel.success_streak)) - .execution_options(populate_existing=True) - ) - job_model = res.unique().scalar_one() - # Select only latest submissions for every job. - latest_submissions_sq = ( - select( - JobModel.run_id.label("run_id"), - JobModel.replica_num.label("replica_num"), - JobModel.job_num.label("job_num"), - func.max(JobModel.submission_num).label("max_submission_num"), - ) - .where(JobModel.run_id == job_model.run_id) - .group_by(JobModel.run_id, JobModel.replica_num, JobModel.job_num) - .subquery() - ) - job_alias = aliased(JobModel) - res = await session.execute( - select(RunModel) - .where(RunModel.id == job_model.run_id) - .join(job_alias, job_alias.run_id == RunModel.id) - .join( - latest_submissions_sq, - onclause=and_( - job_alias.run_id == latest_submissions_sq.c.run_id, - job_alias.replica_num == latest_submissions_sq.c.replica_num, - job_alias.job_num == latest_submissions_sq.c.job_num, - job_alias.submission_num == latest_submissions_sq.c.max_submission_num, - ), - ) - .options(joinedload(RunModel.project)) - .options(joinedload(RunModel.user)) - .options(joinedload(RunModel.repo)) - .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) - .options(contains_eager(RunModel.jobs, alias=job_alias)) - ) - run_model = res.unique().scalar_one() + job_model = await _refetch_job_model(session, job_model) + run_model = await _fetch_run_model(session, job_model.run_id) repo_model = run_model.repo project = run_model.project run = run_model_to_run(run_model, include_sensitive=True) @@ -444,6 +404,53 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): await session.commit() +async def _refetch_job_model(session: AsyncSession, job_model: JobModel) -> JobModel: + res = await session.execute( + select(JobModel) + .where(JobModel.id == job_model.id) + .options(joinedload(JobModel.instance).joinedload(InstanceModel.project)) + .options(joinedload(JobModel.probes).load_only(ProbeModel.success_streak)) + .execution_options(populate_existing=True) + ) + return res.unique().scalar_one() + + +async def _fetch_run_model(session: AsyncSession, run_id: uuid.UUID) -> RunModel: + # Select only latest submissions for every job. + latest_submissions_sq = ( + select( + JobModel.run_id.label("run_id"), + JobModel.replica_num.label("replica_num"), + JobModel.job_num.label("job_num"), + func.max(JobModel.submission_num).label("max_submission_num"), + ) + .where(JobModel.run_id == run_id) + .group_by(JobModel.run_id, JobModel.replica_num, JobModel.job_num) + .subquery() + ) + job_alias = aliased(JobModel) + res = await session.execute( + select(RunModel) + .where(RunModel.id == run_id) + .join(job_alias, job_alias.run_id == RunModel.id) + .join( + latest_submissions_sq, + onclause=and_( + job_alias.run_id == latest_submissions_sq.c.run_id, + job_alias.replica_num == latest_submissions_sq.c.replica_num, + job_alias.job_num == latest_submissions_sq.c.job_num, + job_alias.submission_num == latest_submissions_sq.c.max_submission_num, + ), + ) + .options(joinedload(RunModel.project)) + .options(joinedload(RunModel.user)) + .options(joinedload(RunModel.repo)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) + .options(contains_eager(RunModel.jobs, alias=job_alias)) + ) + return res.unique().scalar_one() + + async def _wait_for_instance_provisioning_data(session: AsyncSession, job_model: JobModel): """ This function will be called until instance IP address appears diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index af2dcee8d8..56648ceb2a 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -2,9 +2,9 @@ import datetime from typing import List, Optional, Set, Tuple -from sqlalchemy import and_, or_, select +from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, load_only, selectinload +from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only import dstack._internal.server.services.services.autoscalers as autoscalers from dstack._internal.core.errors import ServerError @@ -144,22 +144,7 @@ async def _process_next_run(): async def _process_run(session: AsyncSession, run_model: RunModel): - # Refetch to load related attributes. - res = await session.execute( - select(RunModel) - .where(RunModel.id == run_model.id) - .execution_options(populate_existing=True) - .options(joinedload(RunModel.project).load_only(ProjectModel.id, ProjectModel.name)) - .options(joinedload(RunModel.user).load_only(UserModel.name)) - .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) - .options( - selectinload(RunModel.jobs) - .joinedload(JobModel.instance) - .load_only(InstanceModel.fleet_id) - ) - .execution_options(populate_existing=True) - ) - run_model = res.unique().scalar_one() + run_model = await _refetch_run_model(session, run_model) logger.debug("%s: processing run", fmt(run_model)) try: if run_model.status == RunStatus.PENDING: @@ -181,6 +166,46 @@ async def _process_run(session: AsyncSession, run_model: RunModel): await session.commit() +async def _refetch_run_model(session: AsyncSession, run_model: RunModel) -> RunModel: + # Select only latest submissions for every job. + latest_submissions_sq = ( + select( + JobModel.run_id.label("run_id"), + JobModel.replica_num.label("replica_num"), + JobModel.job_num.label("job_num"), + func.max(JobModel.submission_num).label("max_submission_num"), + ) + .where(JobModel.run_id == run_model.id) + .group_by(JobModel.run_id, JobModel.replica_num, JobModel.job_num) + .subquery() + ) + job_alias = aliased(JobModel) + res = await session.execute( + select(RunModel) + .where(RunModel.id == run_model.id) + .outerjoin(latest_submissions_sq, latest_submissions_sq.c.run_id == RunModel.id) + .outerjoin( + job_alias, + onclause=and_( + job_alias.run_id == latest_submissions_sq.c.run_id, + job_alias.replica_num == latest_submissions_sq.c.replica_num, + job_alias.job_num == latest_submissions_sq.c.job_num, + job_alias.submission_num == latest_submissions_sq.c.max_submission_num, + ), + ) + .options(joinedload(RunModel.project).load_only(ProjectModel.id, ProjectModel.name)) + .options(joinedload(RunModel.user).load_only(UserModel.name)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) + .options( + contains_eager(RunModel.jobs, alias=job_alias) + .joinedload(JobModel.instance) + .load_only(InstanceModel.fleet_id) + ) + .execution_options(populate_existing=True) + ) + return res.unique().scalar_one() + + async def _process_pending_run(session: AsyncSession, run_model: RunModel): """Jobs are not created yet""" run = run_model_to_run(run_model) From 97735749aa7db38236bc7a38f2406d92c4eb5ff7 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 12:15:48 +0500 Subject: [PATCH 06/20] Add test_calculates_retry_duration_since_last_successful_submission --- .../background/tasks/test_process_runs.py | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index 81c1ef0026..b34b5e5cb7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -1,6 +1,6 @@ import datetime from collections.abc import Iterable -from typing import Union, cast +from typing import Optional, Union, cast from unittest.mock import patch import pytest @@ -15,7 +15,7 @@ TaskConfiguration, ) from dstack._internal.core.models.instances import InstanceStatus -from dstack._internal.core.models.profiles import Profile, ProfileRetry, Schedule +from dstack._internal.core.models.profiles import Profile, ProfileRetry, RetryEvent, Schedule from dstack._internal.core.models.resources import Range from dstack._internal.core.models.runs import ( JobSpec, @@ -48,6 +48,7 @@ async def make_run( deployment_num: int = 0, image: str = "ubuntu:latest", probes: Iterable[ProbeConfig] = (), + retry: Optional[ProfileRetry] = None, ) -> RunModel: project = await create_project(session=session) user = await create_user(session=session) @@ -58,7 +59,7 @@ async def make_run( run_name = "test-run" profile = Profile( name="test-profile", - retry=True, + retry=retry or True, ) run_spec = get_run_spec( repo_id=repo.name, @@ -230,6 +231,43 @@ async def test_retry_running_to_failed(self, test_db, session: AsyncSession): assert run.status == RunStatus.TERMINATING assert run.termination_reason == RunTerminationReason.JOB_FAILED + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_calculates_retry_duration_since_last_successful_submission( + self, test_db, session: AsyncSession + ): + run = await make_run( + session, + status=RunStatus.RUNNING, + replicas=1, + retry=ProfileRetry(duration=300, on_events=[RetryEvent.NO_CAPACITY]), + ) + now = run.submitted_at + datetime.timedelta(minutes=10) + # Retry logic should look at this job and calculate retry duration since its last_processed_at. + await create_job( + session=session, + run=run, + status=JobStatus.FAILED, + termination_reason=JobTerminationReason.EXECUTOR_ERROR, + last_processed_at=now - datetime.timedelta(minutes=4), + replica_num=0, + ) + await create_job( + session=session, + run=run, + status=JobStatus.FAILED, + termination_reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + replica_num=0, + submission_num=1, + last_processed_at=now - datetime.timedelta(minutes=2), + job_provisioning_data=None, + ) + with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: + datetime_mock.return_value = now + await process_runs.process_runs() + await session.refresh(run) + assert run.status == RunStatus.PENDING + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_pending_to_submitted(self, test_db, session: AsyncSession): From 3676f002288b5dc3373a716385dd65f7017abc96 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 12:34:54 +0500 Subject: [PATCH 07/20] Fix _should_retry_job --- .../server/background/tasks/process_runs.py | 47 ++++++++++++++----- .../background/tasks/test_process_runs.py | 1 + 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 56648ceb2a..b4397b95e0 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -33,6 +33,7 @@ get_job_specs_from_run_spec, group_jobs_by_replica_latest, is_master_job, + job_model_to_job_submission, switch_job_status, ) from dstack._internal.server.services.locking import get_locker @@ -319,7 +320,7 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): and job_model.termination_reason not in {JobTerminationReason.DONE_BY_RUNNER, JobTerminationReason.SCALED_DOWN} ): - current_duration = _should_retry_job(run, job, job_model) + current_duration = await _should_retry_job(session, run, job, job_model) if current_duration is None: replica_statuses.add(RunStatus.FAILED) run_termination_reasons.add(RunTerminationReason.JOB_FAILED) @@ -577,19 +578,44 @@ def _has_out_of_date_replicas(run: RunModel) -> bool: return False -def _should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datetime.timedelta]: +async def _should_retry_job( + session: AsyncSession, + run: Run, + job: Job, + job_model: JobModel, +) -> Optional[datetime.timedelta]: """ Checks if the job should be retried. Returns the current duration of retrying if retry is enabled. + Retrying duration is calculated as the time since `last_processed_at` + of the latest provisioned submission. """ if job.job_spec.retry is None: return None last_provisioned_submission = None - for job_submission in reversed(job.job_submissions): - if job_submission.job_provisioning_data is not None: - last_provisioned_submission = job_submission - break + if len(job.job_submissions) > 0: + last_submission = job.job_submissions[-1] + if last_submission.job_provisioning_data is not None: + last_provisioned_submission = last_submission + else: + # The caller passes at most one latest submission in job.job_submissions, so check the db. + res = await session.execute( + select(JobModel) + .where( + JobModel.run_id == job_model.run_id, + JobModel.replica_num == job_model.replica_num, + JobModel.job_num == job_model.job_num, + JobModel.job_provisioning_data.is_not(None), + ) + .order_by(JobModel.last_processed_at.desc()) + .limit(1) + ) + last_provisioned_submission_model = res.scalar() + if last_provisioned_submission_model is not None: + last_provisioned_submission = job_model_to_job_submission( + last_provisioned_submission_model + ) if ( job_model.termination_reason is not None @@ -599,13 +625,10 @@ def _should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datet ): return common.get_current_datetime() - run.submitted_at - if last_provisioned_submission is None: - return None - if ( - last_provisioned_submission.termination_reason is not None - and JobTerminationReason(last_provisioned_submission.termination_reason).to_retry_event() - in job.job_spec.retry.on_events + job_model.termination_reason is not None + and job_model.termination_reason.to_retry_event() in job.job_spec.retry.on_events + and last_provisioned_submission is not None ): return common.get_current_datetime() - last_provisioned_submission.last_processed_at diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index b34b5e5cb7..46aaa9b48e 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -251,6 +251,7 @@ async def test_calculates_retry_duration_since_last_successful_submission( termination_reason=JobTerminationReason.EXECUTOR_ERROR, last_processed_at=now - datetime.timedelta(minutes=4), replica_num=0, + job_provisioning_data=get_job_provisioning_data(), ) await create_job( session=session, From d83d29d32a0a578b902683559669ffa2089634ca Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 15:51:35 +0500 Subject: [PATCH 08/20] Optimize fleet instances db queries --- .../_internal/server/background/tasks/process_fleets.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 733029abf8..4727fc4500 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -5,7 +5,7 @@ from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, load_only, selectinload +from sqlalchemy.orm import joinedload, load_only, selectinload, with_loader_criteria from dstack._internal.core.models.fleets import FleetSpec, FleetStatus from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason @@ -60,6 +60,7 @@ async def process_fleets(): .options( load_only(FleetModel.id, FleetModel.name), selectinload(FleetModel.instances).load_only(InstanceModel.id), + with_loader_criteria(InstanceModel, InstanceModel.deleted == False), ) .order_by(FleetModel.last_processed_at.asc()) .limit(BATCH_SIZE) @@ -72,6 +73,7 @@ async def process_fleets(): .where( InstanceModel.id.not_in(instance_lockset), InstanceModel.fleet_id.in_(fleet_ids), + InstanceModel.deleted == False, ) .options(load_only(InstanceModel.id, InstanceModel.fleet_id)) .order_by(InstanceModel.id) @@ -113,8 +115,9 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]) .where(FleetModel.id.in_(fleet_ids)) .options( joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id), - joinedload(FleetModel.project), + with_loader_criteria(InstanceModel, InstanceModel.deleted == False), ) + .options(joinedload(FleetModel.project)) .options(joinedload(FleetModel.runs).load_only(RunModel.status)) .execution_options(populate_existing=True) ) From 64b2b8bde4e5ffb899f1917259c09f45af9bc681 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 16:25:26 +0500 Subject: [PATCH 09/20] Use with_loader_criteria in process_submitted_jobs --- .../tasks/process_submitted_jobs.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index d1d86c41aa..e132f83a49 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -7,7 +7,14 @@ from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload +from sqlalchemy.orm import ( + contains_eager, + joinedload, + load_only, + noload, + selectinload, + with_loader_criteria, +) from dstack._internal.core.backends.base.backend import Backend from dstack._internal.core.backends.base.compute import ( @@ -213,7 +220,12 @@ async def _process_submitted_job( select(JobModel) .where(JobModel.id == job_model.id) .options(joinedload(JobModel.instance)) - .options(joinedload(JobModel.fleet).joinedload(FleetModel.instances)) + .options( + joinedload(JobModel.fleet).joinedload(FleetModel.instances), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), + ) ) job_model = res.unique().scalar_one() res = await session.execute( @@ -221,7 +233,12 @@ async def _process_submitted_job( .where(RunModel.id == job_model.run_id) .options(joinedload(RunModel.project).joinedload(ProjectModel.backends)) .options(joinedload(RunModel.user).load_only(UserModel.name)) - .options(joinedload(RunModel.fleet).joinedload(FleetModel.instances)) + .options( + joinedload(RunModel.fleet).joinedload(FleetModel.instances), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), + ) ) run_model = res.unique().scalar_one() logger.debug("%s: provisioning has started", fmt(job_model)) From e19bf07103049afad3c4646e034ec509f4aef43d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 Jan 2026 16:25:46 +0500 Subject: [PATCH 10/20] Use with_loader_criteria in process_instances --- .../server/background/tasks/process_fleets.py | 8 ++++++-- .../server/background/tasks/process_instances.py | 16 +++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 4727fc4500..d369c7d242 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -60,7 +60,9 @@ async def process_fleets(): .options( load_only(FleetModel.id, FleetModel.name), selectinload(FleetModel.instances).load_only(InstanceModel.id), - with_loader_criteria(InstanceModel, InstanceModel.deleted == False), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), ) .order_by(FleetModel.last_processed_at.asc()) .limit(BATCH_SIZE) @@ -115,7 +117,9 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]) .where(FleetModel.id.in_(fleet_ids)) .options( joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id), - with_loader_criteria(InstanceModel, InstanceModel.deleted == False), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), ) .options(joinedload(FleetModel.project)) .options(joinedload(FleetModel.runs).load_only(RunModel.status)) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 2241c4c6a4..2c14e4c4af 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -11,7 +11,7 @@ from pydantic import ValidationError from sqlalchemy import and_, delete, func, not_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import joinedload, with_loader_criteria from dstack._internal import settings from dstack._internal.core.backends.base.compute import ( @@ -218,7 +218,12 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .where(InstanceModel.id == instance.id) .options(joinedload(InstanceModel.project).joinedload(ProjectModel.backends)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) - .options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances)) + .options( + joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), + ) .execution_options(populate_existing=True) ) instance = res.unique().scalar_one() @@ -228,7 +233,12 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .where(InstanceModel.id == instance.id) .options(joinedload(InstanceModel.project)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) - .options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances)) + .options( + joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), + with_loader_criteria( + InstanceModel, InstanceModel.deleted == False, include_aliases=True + ), + ) .execution_options(populate_existing=True) ) instance = res.unique().scalar_one() From 12ae814602b53ea0dd4f7db376227452a389a201 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 10:34:21 +0500 Subject: [PATCH 11/20] Fix master instance selection --- .../background/tasks/process_instances.py | 45 ++++++++++++++----- .../_internal/server/services/fleets.py | 4 -- .../_internal/server/services/placement.py | 3 +- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 2c14e4c4af..4f829eabe7 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -79,7 +79,6 @@ fleet_model_to_fleet, get_create_instance_offers, is_cloud_cluster, - is_fleet_master_instance, ) from dstack._internal.server.services.instances import ( get_instance_configuration, @@ -553,7 +552,8 @@ def _deploy_instance( async def _create_instance(session: AsyncSession, instance: InstanceModel) -> None: - if _need_to_wait_fleet_provisioning(instance): + master_instance = await _get_fleet_master_instance(session, instance) + if _need_to_wait_fleet_provisioning(instance, master_instance): logger.debug("Waiting for the first instance in the fleet to be provisioned") return @@ -586,6 +586,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No placement_group_model = get_placement_group_model_for_instance( placement_group_models=placement_group_models, instance_model=instance, + master_instance_model=master_instance, ) offers = await get_create_instance_offers( project=instance.project, @@ -604,11 +605,15 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No continue compute = backend.compute() assert isinstance(compute, ComputeWithCreateInstanceSupport) - instance_offer = _get_instance_offer_for_instance(instance_offer, instance) + instance_offer = _get_instance_offer_for_instance( + instance_offer=instance_offer, + instance=instance, + master_instance=master_instance, + ) if ( instance.fleet and is_cloud_cluster(instance.fleet) - and is_fleet_master_instance(instance) + and instance.id == master_instance.id and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) and ( @@ -677,7 +682,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No "instance_status": InstanceStatus.PROVISIONING.value, }, ) - if instance.fleet_id and is_fleet_master_instance(instance): + if instance.fleet_id and instance.id == master_instance.id: # Clean up placement groups that did not end up being used. # Flush to update still uncommitted placement groups. await session.flush() @@ -695,7 +700,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No InstanceTerminationReason.NO_OFFERS, "All offers failed" if offers else "No offers found", ) - if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet): + if instance.fleet and instance.id == master_instance.id and is_cloud_cluster(instance.fleet): # Do not attempt to deploy other instances, as they won't determine the correct cluster # backend, region, and placement group without a successfully deployed master instance for sibling_instance in instance.fleet.instances: @@ -704,6 +709,20 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No _mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED) +async def _get_fleet_master_instance( + session: AsyncSession, instance: InstanceModel +) -> InstanceModel: + # The "master" fleet instance is relevant for cloud clusters only: + # it can be any fixed instance that is chosen to be provisioned first. + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.fleet_id == instance.fleet_id) + .order_by(InstanceModel.instance_num, InstanceModel.created_at) + .limit(1) + ) + return res.scalar_one() + + def _mark_terminated( instance: InstanceModel, termination_reason: InstanceTerminationReason, @@ -1192,15 +1211,17 @@ def _get_termination_deadline(instance: InstanceModel) -> datetime.datetime: return instance.first_termination_retry_at + TERMINATION_RETRY_MAX_DURATION -def _need_to_wait_fleet_provisioning(instance: InstanceModel) -> bool: +def _need_to_wait_fleet_provisioning( + instance: InstanceModel, master_instance: InstanceModel +) -> bool: # Cluster cloud instances should wait for the first fleet instance to be provisioned # so that they are provisioned in the same backend/region if instance.fleet is None: return False if ( - is_fleet_master_instance(instance) - or instance.fleet.instances[0].job_provisioning_data is not None - or instance.fleet.instances[0].status == InstanceStatus.TERMINATED + instance.id == master_instance.id + or master_instance.job_provisioning_data is not None + or master_instance.status == InstanceStatus.TERMINATED ): return False return is_cloud_cluster(instance.fleet) @@ -1209,13 +1230,13 @@ def _need_to_wait_fleet_provisioning(instance: InstanceModel) -> bool: def _get_instance_offer_for_instance( instance_offer: InstanceOfferWithAvailability, instance: InstanceModel, + master_instance: InstanceModel, ) -> InstanceOfferWithAvailability: if instance.fleet is None: return instance_offer fleet = fleet_model_to_fleet(instance.fleet) - master_instance = instance.fleet.instances[0] - master_job_provisioning_data = get_instance_provisioning_data(master_instance) if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER: + master_job_provisioning_data = get_instance_provisioning_data(master_instance) return get_instance_offer_with_restricted_az( instance_offer=instance_offer, master_job_provisioning_data=master_job_provisioning_data, diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index e347829fa4..95ae519d07 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -728,10 +728,6 @@ def is_cloud_cluster(fleet_model: FleetModel) -> bool: ) -def is_fleet_master_instance(instance: InstanceModel) -> bool: - return instance.fleet is not None and instance.id == instance.fleet.instances[0].id - - def get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements: profile = fleet_spec.merged_profile requirements = Requirements( diff --git a/src/dstack/_internal/server/services/placement.py b/src/dstack/_internal/server/services/placement.py index f0c63f891c..ed2eaefba0 100644 --- a/src/dstack/_internal/server/services/placement.py +++ b/src/dstack/_internal/server/services/placement.py @@ -98,9 +98,10 @@ async def schedule_fleet_placement_groups_deletion( def get_placement_group_model_for_instance( placement_group_models: list[PlacementGroupModel], instance_model: InstanceModel, + master_instance_model: InstanceModel, ) -> Optional[PlacementGroupModel]: placement_group_model = None - if not _is_fleet_master_instance(instance_model): + if instance_model.id != master_instance_model.id: if placement_group_models: placement_group_model = placement_group_models[0] if len(placement_group_models) > 1: From 864fce28ec2453332eee88b5a6e5ecbca3077404 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 10:56:13 +0500 Subject: [PATCH 12/20] TODO on efficient background processing --- src/dstack/_internal/server/background/__init__.py | 9 ++++++++- .../server/background/tasks/process_instances.py | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 85af7d3315..8577cce6f1 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -42,7 +42,14 @@ def get_scheduler() -> AsyncIOScheduler: def start_background_tasks() -> AsyncIOScheduler: - # We try to process as many resources as possible without exhausting DB connections. + # Background processing is implemented via in-memory locks on SQLite + # and SELECT FOR UPDATE on Postgres. Locks may be held for a long time. + # This is currently the main bottleneck for scaling dstack processing + # as processing more resources requires more DB connections. + # TODO: Make background processing efficient by committing locks to DB + # and processing outside of DB transactions. + # + # Now we just try to process as many resources as possible without exhausting DB connections. # # Quick tasks can process multiple resources per transaction. # Potentially long tasks process one resource per transaction diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 4f829eabe7..9a14bdc30d 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -554,7 +554,9 @@ def _deploy_instance( async def _create_instance(session: AsyncSession, instance: InstanceModel) -> None: master_instance = await _get_fleet_master_instance(session, instance) if _need_to_wait_fleet_provisioning(instance, master_instance): - logger.debug("Waiting for the first instance in the fleet to be provisioned") + logger.debug( + "%s: waiting for the first instance in the fleet to be provisioned", fmt(instance) + ) return try: From c870295e19bfea743ad8370e978c217938edeb9f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 11:04:13 +0500 Subject: [PATCH 13/20] Add load_only(JobModel.id) --- src/dstack/_internal/server/background/tasks/process_runs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index b4397b95e0..1a2e5e0974 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -126,6 +126,7 @@ async def _process_next_run(): JobModel.run_id == run_model.id, JobModel.id.not_in(job_lockset), ) + .options(load_only(JobModel.id)) .order_by(JobModel.id) # take locks in order .with_for_update(skip_locked=True, key_share=True) ) From 8f0db03f3f97a0105312ca0e4489b706736c33ce Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 11:12:20 +0500 Subject: [PATCH 14/20] Skip locking finished jobs in process_runs --- .../server/background/tasks/process_runs.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 1a2e5e0974..4f0a096822 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -4,7 +4,7 @@ from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only +from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only, with_loader_criteria import dstack._internal.server.services.services.autoscalers as autoscalers from dstack._internal.core.errors import ServerError @@ -111,7 +111,15 @@ async def _process_next_run(): ), ), ) - .options(joinedload(RunModel.jobs).load_only(JobModel.id)) + .options( + joinedload(RunModel.jobs).load_only(JobModel.id), + # No need to lock finished jobs + with_loader_criteria( + JobModel, + JobModel.status.not_in(JobStatus.finished_statuses()), + include_aliases=True, + ), + ) .options(load_only(RunModel.id)) .order_by(RunModel.last_processed_at.asc()) .limit(1) @@ -126,7 +134,14 @@ async def _process_next_run(): JobModel.run_id == run_model.id, JobModel.id.not_in(job_lockset), ) - .options(load_only(JobModel.id)) + .options( + load_only(JobModel.id), + with_loader_criteria( + JobModel, + JobModel.status.not_in(JobStatus.finished_statuses()), + include_aliases=True, + ), + ) .order_by(JobModel.id) # take locks in order .with_for_update(skip_locked=True, key_share=True) ) From bd8eb553cf4a816d0bfb1aa96d7207b8a032dc07 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 11:29:33 +0500 Subject: [PATCH 15/20] Comment on non-repeatable read --- src/dstack/_internal/server/background/tasks/process_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 4f0a096822..ad42e7ed40 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -147,7 +147,7 @@ async def _process_next_run(): ) job_models = res.scalars().all() if len(run_model.jobs) != len(job_models): - # Some jobs are locked + # Some jobs are locked or there was a non-repeatable read return job_ids = [j.id for j in run_model.jobs] run_lockset.add(run_model.id) From 0c76e5bb555c9f9768af549cc30d087f85166039 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 16 Jan 2026 11:32:16 +0500 Subject: [PATCH 16/20] Delete unused func --- src/dstack/_internal/server/services/placement.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/dstack/_internal/server/services/placement.py b/src/dstack/_internal/server/services/placement.py index ed2eaefba0..d0c045cdc9 100644 --- a/src/dstack/_internal/server/services/placement.py +++ b/src/dstack/_internal/server/services/placement.py @@ -232,7 +232,3 @@ async def create_placement_group( ) placement_group_model.provisioning_data = pgpd.json() return placement_group_model - - -def _is_fleet_master_instance(instance: InstanceModel) -> bool: - return instance.fleet is not None and instance.id == instance.fleet.instances[0].id From 3ec2f23cd07faeaa5f5522e43e336cf59f282aef Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 13:54:40 +0500 Subject: [PATCH 17/20] Fix missing instance lock in delete_fleets --- .../_internal/server/services/fleets.py | 54 ++++++++++++------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 95ae519d07..2881ce5c5e 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -42,7 +42,12 @@ ) from dstack._internal.core.models.projects import Project from dstack._internal.core.models.resources import ResourcesSpec -from dstack._internal.core.models.runs import JobProvisioningData, Requirements, get_policy_map +from dstack._internal.core.models.runs import ( + JobProvisioningData, + Requirements, + RunStatus, + get_policy_map, +) from dstack._internal.core.models.users import GlobalRole from dstack._internal.core.services import validate_dstack_resource_name from dstack._internal.core.services.diff import ModelDiff, copy_model, diff_models @@ -53,6 +58,7 @@ JobModel, MemberModel, ProjectModel, + RunModel, UserModel, ) from dstack._internal.server.services import events @@ -613,48 +619,56 @@ async def delete_fleets( instance_nums: Optional[List[int]] = None, ): res = await session.execute( - select(FleetModel) + select(FleetModel.id) .where( FleetModel.project_id == project.id, FleetModel.name.in_(names), FleetModel.deleted == False, ) - .options(joinedload(FleetModel.instances)) + .order_by(FleetModel.id) # take locks in order + .with_for_update(key_share=True) ) - fleet_models = res.scalars().unique().all() - fleets_ids = sorted([f.id for f in fleet_models]) - instances_ids = sorted([i.id for f in fleet_models for i in f.instances]) - await session.commit() - logger.info("Deleting fleets: %s", [v.name for v in fleet_models]) + fleets_ids = list(res.scalars().unique().all()) + res = await session.execute( + select(InstanceModel.id) + .where( + InstanceModel.fleet_id.in_(fleets_ids), + InstanceModel.deleted == False, + ) + .order_by(InstanceModel.id) # take locks in order + .with_for_update(key_share=True) + ) + instances_ids = list(res.scalars().unique().all()) + if is_db_sqlite(): + # Start new transaction to see committed changes after lock + await session.commit() async with ( get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, fleets_ids), get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids), ): - # Refetch after lock - # TODO: Lock instances with FOR UPDATE? - # TODO: Do not lock fleet when deleting only instances + # Refetch after lock. + # TODO: Do not lock fleet when deleting only instances. res = await session.execute( select(FleetModel) - .where( - FleetModel.project_id == project.id, - FleetModel.name.in_(names), - FleetModel.deleted == False, - ) + .where(FleetModel.id.in_(fleets_ids)) .options( - selectinload(FleetModel.instances) + joinedload(FleetModel.instances.and_(InstanceModel.id.in_(instances_ids))) .joinedload(InstanceModel.jobs) .load_only(JobModel.id) ) - .options(selectinload(FleetModel.runs)) + .options( + joinedload( + FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses())) + ) + ) .execution_options(populate_existing=True) - .order_by(FleetModel.id) # take locks in order - .with_for_update(key_share=True) ) fleet_models = res.scalars().unique().all() fleets = [fleet_model_to_fleet(m) for m in fleet_models] for fleet in fleets: if fleet.spec.configuration.ssh_config is not None: _check_can_manage_ssh_fleets(user=user, project=project) + logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) for fleet_model in fleet_models: _terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums) # TERMINATING fleets are deleted by process_fleets after instances are terminated From d1ea5e587f8505be474b77c84cde6354f02a2ab8 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 13:54:56 +0500 Subject: [PATCH 18/20] Handle terminating deleted instances --- .../background/tasks/process_instances.py | 12 ++++---- .../tasks/test_process_instances.py | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 9a14bdc30d..454d6ee18a 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -11,7 +11,7 @@ from pydantic import ValidationError from sqlalchemy import and_, delete, func, not_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, with_loader_criteria +from sqlalchemy.orm import joinedload from dstack._internal import settings from dstack._internal.core.backends.base.compute import ( @@ -218,9 +218,8 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .options(joinedload(InstanceModel.project).joinedload(ProjectModel.backends)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) .options( - joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), - with_loader_criteria( - InstanceModel, InstanceModel.deleted == False, include_aliases=True + joinedload(InstanceModel.fleet).joinedload( + FleetModel.instances.and_(InstanceModel.deleted == False) ), ) .execution_options(populate_existing=True) @@ -233,9 +232,8 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .options(joinedload(InstanceModel.project)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) .options( - joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), - with_loader_criteria( - InstanceModel, InstanceModel.deleted == False, include_aliases=True + joinedload(InstanceModel.fleet).joinedload( + FleetModel.instances.and_(InstanceModel.deleted == False) ), ) .execution_options(populate_existing=True) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index a72dc0c165..5eff68c8c7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -597,6 +597,34 @@ async def test_terminate(self, test_db, session: AsyncSession): assert instance.deleted_at is not None assert instance.finished_at is not None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_terminates_terminating_deleted_instance(self, test_db, session: AsyncSession): + # There was a race condition when instance could stay in Terminating while marked as deleted. + # TODO: + project = await create_project(session=session) + instance = await create_instance( + session=session, project=project, status=InstanceStatus.TERMINATING + ) + instance.deleted = True + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT + instance.last_job_processed_at = instance.deleted_at = ( + get_current_datetime() + dt.timedelta(minutes=-19) + ) + await session.commit() + + with self.mock_terminate_in_backend() as mock: + await process_instances() + mock.assert_called_once() + + await session.refresh(instance) + + assert instance is not None + assert instance.status == InstanceStatus.TERMINATED + assert instance.deleted == True + assert instance.deleted_at is not None + assert instance.finished_at is not None + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @pytest.mark.parametrize( From f2dc85a91f1a7e76cd6fd7f323ef3c475ec74fcd Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 14:00:56 +0500 Subject: [PATCH 19/20] Fix comment --- .../_internal/server/background/tasks/test_process_instances.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 5eff68c8c7..38bffc4421 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -601,7 +601,7 @@ async def test_terminate(self, test_db, session: AsyncSession): @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_terminates_terminating_deleted_instance(self, test_db, session: AsyncSession): # There was a race condition when instance could stay in Terminating while marked as deleted. - # TODO: + # TODO: Drop this after all such "bad" instances are processed. project = await create_project(session=session) instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING From 3874da50d3e32f74a8b8710411d73ea08c62894d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 15:38:11 +0500 Subject: [PATCH 20/20] Fix log message --- src/dstack/_internal/server/services/fleets.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 2881ce5c5e..588f34698d 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -668,7 +668,12 @@ async def delete_fleets( for fleet in fleets: if fleet.spec.configuration.ssh_config is not None: _check_can_manage_ssh_fleets(user=user, project=project) - logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) + if instance_nums is None: + logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) + else: + logger.info( + "Deleting fleets %s instances %s", [f.name for f in fleet_models], instance_nums + ) for fleet_model in fleet_models: _terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums) # TERMINATING fleets are deleted by process_fleets after instances are terminated