From 184f0ef2030a9afcfcdd18c749a25ec489be78d8 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 27 Apr 2025 22:08:27 -0400 Subject: [PATCH 1/2] chore: Refactor transform on write Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 254 ++++++++++++++++++------------ 1 file changed, 154 insertions(+), 100 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 32bff5f793e..736603b7bd5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1518,134 +1518,188 @@ def _offline_write(): await run_in_threadpool(_offline_write) - def _get_feature_view_and_df_for_online_write( - self, - feature_view_name: str, - df: Optional[pd.DataFrame] = None, - inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, - allow_registry_cache: bool = True, - transform_on_write: bool = True, - ): - feature_view_dict = { - fv_proto.name: fv_proto - for fv_proto in self.list_all_feature_views(allow_registry_cache) - } - try: - feature_view = feature_view_dict[feature_view_name] - except FeatureViewNotFoundException: - raise FeatureViewNotFoundException(feature_view_name, self.project) + def _validate_and_convert_input_data( + self, + df: Optional[pd.DataFrame], + inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] + ) -> Optional[pd.DataFrame]: + """ + Validates input parameters and converts them to a pandas DataFrame. + + Args: + df: Optional DataFrame input + inputs: Optional dictionary or DataFrame input + + Returns: + Validated pandas DataFrame or None + + Raises: + ValueError: If both df and inputs are provided + DataFrameSerializationError: If input data cannot be converted to DataFrame + """ if df is not None and inputs is not None: raise ValueError("Both df and inputs cannot be provided at the same time.") + if df is None and inputs is not None: if isinstance(inputs, dict) or isinstance(inputs, List): try: - df = pd.DataFrame(inputs) + return pd.DataFrame(inputs) except Exception as _: raise DataFrameSerializationError(inputs) elif isinstance(inputs, pd.DataFrame): - pass + return inputs else: raise ValueError("inputs must be a dictionary or a pandas DataFrame.") + if df is not None and inputs is None: if isinstance(df, dict) or isinstance(df, List): try: - df = pd.DataFrame(df) + return pd.DataFrame(df) except Exception as _: raise DataFrameSerializationError(df) - if feature_view.features[0].vector_index and df is not None: + return df + + def _transform_on_demand_feature_view_df( + self, feature_view: OnDemandFeatureView, df: pd.DataFrame + ) -> pd.DataFrame: + """ + Apply transformations for an OnDemandFeatureView to the input dataframe. + + Args: + feature_view: The OnDemandFeatureView containing the transformation + df: The input dataframe to transform + + Returns: + Transformed dataframe + + Raises: + Exception: For unsupported OnDemandFeatureView modes + """ + if feature_view.mode == "python" and isinstance( + feature_view.feature_transformation, PythonTransformation + ): + input_dict = ( + df.to_dict(orient="records")[0] + if feature_view.singleton + else df.to_dict(orient="list") + ) + + if feature_view.singleton: + transformed_rows = [] + + for i, row in df.iterrows(): + output = feature_view.feature_transformation.udf(row.to_dict()) + if i == 0: + transformed_rows = output + else: + for k in output: + if isinstance(output[k], list): + transformed_rows[k].extend(output[k]) + else: + transformed_rows[k].append(output[k]) + + transformed_data = pd.DataFrame(transformed_rows) + else: + transformed_data = feature_view.feature_transformation.udf(input_dict) + + if feature_view.write_to_online_store: + entities = [ + self.get_entity(entity) for entity in (feature_view.entities or []) + ] + join_keys = [entity.join_key for entity in entities if entity] + join_keys = [k for k in join_keys if k in input_dict.keys()] + transformed_df = ( + pd.DataFrame(transformed_data) + if not isinstance(transformed_data, pd.DataFrame) + else transformed_data + ) + input_df = pd.DataFrame( + [input_dict] if feature_view.singleton else input_dict + ) + if input_df.shape[0] == transformed_df.shape[0]: + for k in input_dict: + if k not in transformed_data: + transformed_data[k] = input_dict[k] + transformed_df = pd.DataFrame(transformed_data) + else: + transformed_df = pd.merge( + transformed_df, + input_df, + how="left", + on=join_keys, + ) + else: + # overwrite any transformed features and update the dictionary + for k in input_dict: + if k not in transformed_data: + transformed_data[k] = input_dict[k] + + return pd.DataFrame(transformed_data) + + elif feature_view.mode == "pandas" and isinstance( + feature_view.feature_transformation, PandasTransformation + ): + transformed_df = feature_view.feature_transformation.udf(df) + for col in df.columns: + transformed_df[col] = df[col] + return transformed_df + else: + raise Exception("Unsupported OnDemandFeatureView mode") + + def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None: + """ + Validates vector features in the DataFrame against the feature view specifications. + + Args: + feature_view: The feature view containing vector feature specifications + df: The DataFrame to validate + + Raises: + ValueError: If vector dimension constraints are violated + """ + if feature_view.features and feature_view.features[0].vector_index: fv_vector_feature_name = feature_view.features[0].name df_vector_feature_index = df.columns.get_loc(fv_vector_feature_name) + if feature_view.features[0].vector_length != 0: - if ( - df.shape[df_vector_feature_index] - > feature_view.features[0].vector_length - ): + if df.shape[df_vector_feature_index] > feature_view.features[0].vector_length: raise ValueError( - f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors which is greater than expected (i.e {feature_view.features[0].vector_length}) by feature view {feature_view.name}." + f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors " + f"which is greater than expected (i.e {feature_view.features[0].vector_length}) " + f"by feature view {feature_view.name}." ) + def _get_feature_view_and_df_for_online_write( + self, + feature_view_name: str, + df: Optional[pd.DataFrame] = None, + inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, + allow_registry_cache: bool = True, + transform_on_write: bool = True, + ): + feature_view_dict = { + fv_proto.name: fv_proto + for fv_proto in self.list_all_feature_views(allow_registry_cache) + } + try: + feature_view = feature_view_dict[feature_view_name] + except FeatureViewNotFoundException: + raise FeatureViewNotFoundException(feature_view_name, self.project) + + # Convert inputs/df to a consistent DataFrame format + df = self._validate_and_convert_input_data(df, inputs) + + if df is not None: + self._validate_vector_features(feature_view, df) + # # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True if ( isinstance(feature_view, OnDemandFeatureView) and feature_view.write_to_online_store and transform_on_write ): - if ( - feature_view.mode == "python" - and isinstance( - feature_view.feature_transformation, PythonTransformation - ) - and df is not None - ): - input_dict = ( - df.to_dict(orient="records")[0] - if feature_view.singleton - else df.to_dict(orient="list") - ) - if feature_view.singleton: - transformed_rows = [] - - for i, row in df.iterrows(): - output = feature_view.feature_transformation.udf(row.to_dict()) - if i == 0: - transformed_rows = output - else: - for k in output: - if isinstance(output[k], list): - transformed_rows[k].extend(output[k]) - else: - transformed_rows[k].append(output[k]) - - transformed_data = pd.DataFrame(transformed_rows) - else: - transformed_data = feature_view.feature_transformation.udf( - input_dict - ) - if feature_view.write_to_online_store: - entities = [ - self.get_entity(entity) - for entity in (feature_view.entities or []) - ] - join_keys = [entity.join_key for entity in entities if entity] - join_keys = [k for k in join_keys if k in input_dict.keys()] - transformed_df = ( - pd.DataFrame(transformed_data) - if not isinstance(transformed_data, pd.DataFrame) - else transformed_data - ) - input_df = pd.DataFrame( - [input_dict] if feature_view.singleton else input_dict - ) - if input_df.shape[0] == transformed_df.shape[0]: - for k in input_dict: - if k not in transformed_data: - transformed_data[k] = input_dict[k] - transformed_df = pd.DataFrame(transformed_data) - else: - transformed_df = pd.merge( - transformed_df, - input_df, - how="left", - on=join_keys, - ) - else: - # overwrite any transformed features and update the dictionary - for k in input_dict: - if k not in transformed_data: - transformed_data[k] = input_dict[k] - df = pd.DataFrame(transformed_data) - elif feature_view.mode == "pandas" and isinstance( - feature_view.feature_transformation, PandasTransformation - ): - transformed_df = feature_view.feature_transformation.udf(df) - if df is not None: - for col in df.columns: - transformed_df[col] = df[col] - df = transformed_df - - else: - raise Exception("Unsupported OnDemandFeatureView mode") + df = self._transform_on_demand_feature_view_df(feature_view, df) return feature_view, df From 8f68ea2a5e1e4c7efbc7e2490459931620034e4c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 27 Apr 2025 22:11:39 -0400 Subject: [PATCH 2/2] linter Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 736603b7bd5..02cab72786b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1519,9 +1519,9 @@ def _offline_write(): await run_in_threadpool(_offline_write) def _validate_and_convert_input_data( - self, - df: Optional[pd.DataFrame], - inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] + self, + df: Optional[pd.DataFrame], + inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]], ) -> Optional[pd.DataFrame]: """ Validates input parameters and converts them to a pandas DataFrame. @@ -1663,7 +1663,10 @@ def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None: df_vector_feature_index = df.columns.get_loc(fv_vector_feature_name) if feature_view.features[0].vector_length != 0: - if df.shape[df_vector_feature_index] > feature_view.features[0].vector_length: + if ( + df.shape[df_vector_feature_index] + > feature_view.features[0].vector_length + ): raise ValueError( f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors " f"which is greater than expected (i.e {feature_view.features[0].vector_length}) "