From 60b5881f19e965e0eb7d9b0ec66eba509711359e Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:00:27 +0000 Subject: [PATCH 1/6] create integration spark data sets as files rather than a temp table Signed-off-by: niklasvm --- .../spark_offline_store/tests/data_source.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index 95bedd1b409..2e401b90fc1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -1,7 +1,12 @@ +import os +import shutil +import tempfile import uuid from typing import Dict, List import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq from pyspark import SparkConf from pyspark.sql import SparkSession @@ -27,8 +32,9 @@ def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.spark_conf = { "master": "local[*]", - "spark.ui.enabled": "false", - "spark.eventLog.enabled": "false", + # "spark.ui.enabled": "false", + # "spark.eventLog.enabled": "false", + "spark.sql.shuffle.partitions": 64, "spark.sql.parser.quotedRegexColumnNames": "true", "spark.sql.session.timeZone": "UTC", } @@ -48,6 +54,8 @@ def __init__(self, project_name: str, *args, **kwargs): def teardown(self): self.spark_session.stop() + for table in self.tables: + shutil.rmtree(table) def create_offline_store_config(self): self.spark_offline_store_config = SparkOfflineStoreConfig() @@ -86,11 +94,17 @@ def create_data_source( .appName("pytest-pyspark-local-testing") .getOrCreate() ) - self.spark_session.createDataFrame(df).createOrReplaceTempView(destination_name) - self.tables.append(destination_name) + temp_dir = tempfile.mkdtemp(prefix="spark_offline_store_test_data") + + path = os.path.join(temp_dir, destination_name) + self.tables.append(path) + + self.spark_session.createDataFrame(df).write.parquet(path) return SparkSource( - table=destination_name, + name=destination_name, + file_format="parquet", + path=path, timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, From b8d3ee9924c566cc9afb5cce8c24f1d4b7da7da8 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:01:46 +0000 Subject: [PATCH 2/6] add offline_write_batch method to spark offline store Signed-off-by: niklasvm --- .../contrib/spark_offline_store/spark.py | 90 ++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 26d414232f2..1cee65fb173 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -1,7 +1,7 @@ import tempfile import warnings from datetime import datetime -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np import pandas @@ -21,6 +21,7 @@ from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SavedDatasetSparkStorage, + SparkOptions, SparkSource, ) from feast.infra.offline_stores.offline_store import ( @@ -191,6 +192,75 @@ def get_historical_features( ), ) + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + if not feature_view.batch_source: + raise ValueError( + "feature view does not have a batch source to persist offline data" + ) + if not isinstance(config.offline_store, SparkOfflineStoreConfig): + raise ValueError( + f"offline store config is of type {type(config.offline_store)} when spark type required" + ) + if not isinstance(feature_view.batch_source, SparkSource): + raise ValueError( + f"feature view batch source is {type(feature_view.batch_source)} not spark source" + ) + + pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) + if column_names != table.column_names: + raise ValueError( + f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. " + f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}." + ) + + # # NOT SURE ABOUT THESE 2 LINES + print(table.schema) + print("----------------------------------------") + print(pa_schema) + # if table.schema != pa_schema: + # table = table.cast(pa_schema) + + spark_session = get_spark_session_or_start_new_with_repoconfig( + store_config=config.offline_store + ) + + if feature_view.batch_source.path: + # write data to disk so that it can be loaded into spark (for preserving column types) + with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file: + print(tmp_file.name) + pq.write_table(table, tmp_file.name) + + # load data + df_batch = spark_session.read.parquet(tmp_file.name) + + # load existing data to get spark table schema + df_existing = spark_session.read.format( + feature_view.batch_source.file_format + ).load(feature_view.batch_source.path) + + # cast columns if applicable + df_batch = _cast_data_frame(df_batch, df_existing) + + df_batch.write.format(feature_view.batch_source.file_format).mode( + "append" + ).save(feature_view.batch_source.path) + elif feature_view.batch_source.query: + raise NotImplementedError( + f"offline_write_batch not implemented for batch sources specified by query" + ) + else: + raise NotImplementedError( + f"offline_write_batch not implemented for batch sources specified by a table" + ) + @staticmethod @log_exceptions_and_usage(offline_store="spark") def pull_all_from_table_or_query( @@ -388,6 +458,24 @@ def _format_datetime(t: datetime) -> str: return dt +def _cast_data_frame( + df_new: pyspark.sql.DataFrame, df_existing: pyspark.sql.DataFrame +) -> pyspark.sql.DataFrame: + """Convert new dataframe's columns to the same types as existing dataframe while preserving the order of columns""" + existing_dtypes = {k: v for k, v in df_existing.dtypes} + new_dtypes = {k: v for k, v in df_new.dtypes} + + select_expression = [] + for col, new_type in new_dtypes.items(): + existing_type = existing_dtypes[col] + if new_type != existing_type: + select_expression.append(f"cast({col} as {existing_type}) as {col}") + else: + select_expression.append(col) + + return df_new.selectExpr(*select_expression) + + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ /* Compute a deterministic hash for the `left_table_query_string` that will be used throughout From 0a3a060ee90c04d84b41b80a34724773b5bf087e Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:09:02 +0000 Subject: [PATCH 3/6] remove some comments Signed-off-by: niklasvm --- .../offline_stores/contrib/spark_offline_store/spark.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 1cee65fb173..533dad9671f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -221,13 +221,6 @@ def offline_write_batch( f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}." ) - # # NOT SURE ABOUT THESE 2 LINES - print(table.schema) - print("----------------------------------------") - print(pa_schema) - # if table.schema != pa_schema: - # table = table.cast(pa_schema) - spark_session = get_spark_session_or_start_new_with_repoconfig( store_config=config.offline_store ) From 767a664e14bb8b7fd8d5057ed315c9b1f3dccf12 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:24:19 +0000 Subject: [PATCH 4/6] fix linting issue Signed-off-by: niklasvm --- .../contrib/spark_offline_store/tests/data_source.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index 2e401b90fc1..f9c9b8b8d09 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -32,9 +32,8 @@ def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.spark_conf = { "master": "local[*]", - # "spark.ui.enabled": "false", - # "spark.eventLog.enabled": "false", - "spark.sql.shuffle.partitions": 64, + "spark.ui.enabled": "false", + "spark.eventLog.enabled": "false", "spark.sql.parser.quotedRegexColumnNames": "true", "spark.sql.session.timeZone": "UTC", } From 967a299e5e18f66f17210ef86bbbce739c0ba8da Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:35:33 +0000 Subject: [PATCH 5/6] fix more linting issues Signed-off-by: niklasvm --- .../contrib/spark_offline_store/tests/data_source.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index f9c9b8b8d09..ab1acbef73e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -5,8 +5,6 @@ from typing import Dict, List import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq from pyspark import SparkConf from pyspark.sql import SparkSession From 3704f4abc7342d656ec853be1d50b2031b6b3e04 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Fri, 12 Aug 2022 15:46:55 +0000 Subject: [PATCH 6/6] fix flake8 errors Signed-off-by: niklasvm --- .../offline_stores/contrib/spark_offline_store/spark.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 533dad9671f..4a561994c3c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -21,7 +21,6 @@ from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SavedDatasetSparkStorage, - SparkOptions, SparkSource, ) from feast.infra.offline_stores.offline_store import ( @@ -247,11 +246,11 @@ def offline_write_batch( ).save(feature_view.batch_source.path) elif feature_view.batch_source.query: raise NotImplementedError( - f"offline_write_batch not implemented for batch sources specified by query" + "offline_write_batch not implemented for batch sources specified by query" ) else: raise NotImplementedError( - f"offline_write_batch not implemented for batch sources specified by a table" + "offline_write_batch not implemented for batch sources specified by a table" ) @staticmethod