From 44bb32ac8b1e7de7fbd0604b8ad77ad7804bdb16 Mon Sep 17 00:00:00 2001 From: Katy Haynie <74935161+katyhaynie-snowflake@users.noreply.github.com> Date: Mon, 19 Dec 2022 16:50:49 -0500 Subject: [PATCH 1/2] feat: Snowflake skip materialization if no table change Signed-off-by: Katy Haynie <74935161+katyhaynie-snowflake@users.noreply.github.com> --- .../feast/infra/materialization/snowflake_engine.py | 13 +++++++++++++ sdk/python/feast/infra/offline_stores/snowflake.py | 2 ++ 2 files changed, 15 insertions(+) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index b523413bcde..d8fc5f5611d 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -8,6 +8,7 @@ import pandas as pd from colorama import Fore, Style from pydantic import Field, StrictStr +from pytz import utc from tqdm import tqdm import feast @@ -256,6 +257,18 @@ def _materialize_one( end_date=end_date, ) + # Lets check and see if we can skip this query, because the table hasnt changed + # since before the start date of this query + with get_snowflake_conn(self.repo_config.offline_store) as conn: + query = f"""SELECT SYSTEM$LAST_CHANGE_COMMIT_TIME('{feature_view.batch_source.get_table_query_string()}') AS last_commit_change_time""" + last_commit_change_time = ( + conn.cursor().execute(query).fetchall()[0][0] / 1_000_000_000 + ) + if last_commit_change_time < start_date.astimezone(tz=utc).timestamp(): + return SnowflakeMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) + fv_latest_values_sql = offline_job.to_sql() if feature_view.batch_source.field_mapping is not None: diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index e126b059342..2b77061aada 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -159,6 +159,8 @@ def pull_latest_from_table_or_query( start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc) + print(start_date, end_date) + # select system$last_change_commit_time('mytable'); query = f""" SELECT From 365382f80ff89222c87394c3fb5f3b20eb2e0d75 Mon Sep 17 00:00:00 2001 From: Katy Haynie <74935161+katyhaynie-snowflake@users.noreply.github.com> Date: Mon, 19 Dec 2022 17:05:51 -0500 Subject: [PATCH 2/2] removing print statement from offline snowflake.py Signed-off-by: Katy Haynie <74935161+katyhaynie-snowflake@users.noreply.github.com> --- sdk/python/feast/infra/offline_stores/snowflake.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 2b77061aada..e126b059342 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -159,8 +159,6 @@ def pull_latest_from_table_or_query( start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc) - print(start_date, end_date) - # select system$last_change_commit_time('mytable'); query = f""" SELECT