diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 178cd8d2e4..991de7af45 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -267,7 +267,6 @@ def build(self) -> Plan: self._ensure_no_new_snapshots_with_restatements() self._ensure_new_env_with_changes() self._ensure_valid_date_range() - self._ensure_no_forward_only_revert() self._ensure_no_broken_references() self._apply_effective_from() @@ -726,17 +725,11 @@ def _get_orphaned_indirect_change_category( # One of the new parents in the chain was breaking so this indirect snapshot is breaking return SnapshotChangeCategory.INDIRECT_BREAKING - if SnapshotChangeCategory.FORWARD_ONLY in previous_parent_categories: - # One of the new parents in the chain was forward-only so this indirect snapshot is forward-only - indirect_category = SnapshotChangeCategory.FORWARD_ONLY - elif ( - previous_parent_categories.intersection( - { - SnapshotChangeCategory.NON_BREAKING, - SnapshotChangeCategory.INDIRECT_NON_BREAKING, - } - ) - and indirect_category != SnapshotChangeCategory.FORWARD_ONLY + if previous_parent_categories.intersection( + { + SnapshotChangeCategory.NON_BREAKING, + SnapshotChangeCategory.INDIRECT_NON_BREAKING, + } ): # All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING @@ -801,27 +794,6 @@ def _ensure_valid_date_range(self) -> None: f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')" ) - def _ensure_no_forward_only_revert(self) -> None: - """Ensures that a previously superseded breaking / non-breaking snapshot is not being - used again to replace an existing forward-only snapshot with the same version. - - In other words there is no going back to the original non-forward-only snapshot with - the same version once a forward-only change for that version has been introduced. - """ - for name, (candidate, promoted) in self._context_diff.modified_snapshots.items(): - if ( - candidate.snapshot_id not in self._context_diff.new_snapshots - and candidate.is_model - and not candidate.model.forward_only - and promoted.is_forward_only - and not promoted.is_paused - and not candidate.is_no_rebuild - and promoted.version == candidate.version - ): - raise PlanError( - f"Attempted to revert to an unrevertable version of model '{name}'. Run `sqlmesh plan` again to mitigate the issue." - ) - def _ensure_no_broken_references(self) -> None: for snapshot in self._context_diff.snapshots.values(): broken_references = { diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index e053e1e108..3fcad34187 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1071,11 +1071,22 @@ def _cleanup_snapshot( evaluation_strategy = _evaluation_strategy(snapshot, adapter) for is_table_deployable, table_name in table_names: - evaluation_strategy.delete( - table_name, - is_table_deployable=is_table_deployable, - physical_schema=snapshot.physical_schema, - ) + try: + evaluation_strategy.delete( + table_name, + is_table_deployable=is_table_deployable, + physical_schema=snapshot.physical_schema, + ) + except Exception: + # Use `get_data_object` to check if the table exists instead of `table_exists` since the former + # is based on `INFORMATION_SCHEMA` and avoids touching the table directly. + # This is important when the table name is malformed for some reason and running any statement + # that touches the table would result in an error. + if adapter.get_data_object(table_name) is not None: + raise + logger.warning( + "Skipping cleanup of table '%s' because it does not exist", table_name + ) if on_complete is not None: on_complete(table_name) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 35c3628cff..66018d4be4 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -37,7 +37,6 @@ from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import ( now, - now_timestamp, to_date, to_datetime, to_timestamp, @@ -1097,53 +1096,6 @@ def test_end_validation(make_snapshot, mocker: MockerFixture): assert restatement_prod_plan_builder.build().end == "2022-01-04" -def test_forward_only_revert_not_allowed(make_snapshot, mocker: MockerFixture): - snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds"))) - snapshot.categorize_as(SnapshotChangeCategory.BREAKING) - assert not snapshot.is_forward_only - - forward_only_snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 2, ds"))) - forward_only_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) - forward_only_snapshot.version = snapshot.version - forward_only_snapshot.unpaused_ts = now_timestamp() - assert forward_only_snapshot.is_forward_only - - context_diff = ContextDiff( - environment="test_environment", - is_new_environment=True, - is_unfinalized_environment=False, - normalize_environment_name=True, - create_from="prod", - create_from_env_exists=True, - added=set(), - removed_snapshots={}, - modified_snapshots={snapshot.name: (snapshot, forward_only_snapshot)}, - snapshots={snapshot.snapshot_id: snapshot}, - new_snapshots={}, - previous_plan_id=None, - previously_promoted_snapshot_ids=set(), - previous_finalized_snapshots=None, - previous_gateway_managed_virtual_layer=False, - gateway_managed_virtual_layer=False, - environment_statements=[], - ) - - with pytest.raises( - PlanError, - match=r"Attempted to revert to an unrevertable version of model.*", - ): - PlanBuilder(context_diff, forward_only=True).build() - - # Make sure the plan can be created if a new snapshot version was enforced. - new_version_snapshot = make_snapshot( - SqlModel(name="a", query=parse_one("select 1, ds"), stamp="test_stamp") - ) - snapshot.categorize_as(SnapshotChangeCategory.BREAKING) - context_diff.modified_snapshots = {snapshot.name: (new_version_snapshot, forward_only_snapshot)} - context_diff.new_snapshots = {new_version_snapshot.snapshot_id: new_version_snapshot} - PlanBuilder(context_diff, forward_only=True).build() - - def test_forward_only_plan_seed_models(make_snapshot, mocker: MockerFixture): snapshot_a = make_snapshot( SeedModel( diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 4b028e148b..41d81202f3 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -438,12 +438,14 @@ def create_and_cleanup(name: str, dev_table_only: bool): return snapshot snapshot = create_and_cleanup("catalog.test_schema.test_model", True) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_called_once_with( f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" ) adapter_mock.reset_mock() snapshot = create_and_cleanup("test_schema.test_model", False) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_has_calls( [ call( @@ -455,6 +457,7 @@ def create_and_cleanup(name: str, dev_table_only: bool): adapter_mock.reset_mock() snapshot = create_and_cleanup("test_model", False) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_has_calls( [ call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), @@ -463,6 +466,59 @@ def create_and_cleanup(name: str, dev_table_only: bool): ) +def test_cleanup_fails(adapter_mock, make_snapshot): + adapter_mock.drop_table.side_effect = RuntimeError("test_error") + + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="catalog.test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) + snapshot.version = "test_version" + + evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + with pytest.raises(NodeExecutionFailedError) as exc_info: + evaluator.cleanup( + [SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)] + ) + + assert str(exc_info.value.__cause__) == "test_error" + + +def test_cleanup_skip_missing_table(adapter_mock, make_snapshot): + adapter_mock.get_data_object.return_value = None + adapter_mock.drop_table.side_effect = RuntimeError("fail") + + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="catalog.test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) + snapshot.version = "test_version" + + evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + evaluator.cleanup([SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)]) + + adapter_mock.get_data_object.assert_called_once_with( + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ) + adapter_mock.drop_table.assert_called_once_with( + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ) + + def test_cleanup_external_model(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator = SnapshotEvaluator(adapter_mock)