diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 3b7d42e7644..76e944ae35c 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -61,6 +61,7 @@ class WriteToFeatureStoreRequest(BaseModel): feature_view_name: str df: dict allow_registry_cache: bool = True + transform_on_write: bool = True class PushFeaturesRequest(BaseModel): @@ -68,6 +69,7 @@ class PushFeaturesRequest(BaseModel): df: dict allow_registry_cache: bool = True to: str = "online" + transform_on_write: bool = True class MaterializeRequest(BaseModel): @@ -302,6 +304,7 @@ async def push(request: PushFeaturesRequest) -> None: df=df, allow_registry_cache=request.allow_registry_cache, to=to, + transform_on_write=request.transform_on_write, ) should_push_async = ( @@ -336,6 +339,7 @@ def write_to_online_store(request: WriteToFeatureStoreRequest) -> None: feature_view_name=feature_view_name, df=df, allow_registry_cache=allow_registry_cache, + transform_on_write=request.transform_on_write, ) @app.get("/health") diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bd6f1873874..9e55b1390f9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1464,6 +1464,7 @@ def push( df: pd.DataFrame, allow_registry_cache: bool = True, to: PushMode = PushMode.ONLINE, + transform_on_write: bool = True, ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. @@ -1473,13 +1474,17 @@ def push( df: The data being pushed. allow_registry_cache: Whether to allow cached versions of the registry. to: Whether to push to online or offline store. Defaults to online store only. + transform_on_write: Whether to transform the data before pushing. """ for fv in self._fvs_for_push_source_or_raise( push_source_name, allow_registry_cache ): if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: self.write_to_online_store( - fv.name, df, allow_registry_cache=allow_registry_cache + fv.name, + df, + allow_registry_cache=allow_registry_cache, + transform_on_write=transform_on_write, ) if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: self.write_to_offline_store( @@ -1521,6 +1526,7 @@ def _get_feature_view_and_df_for_online_write( df: Optional[pd.DataFrame] = None, inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, + transform_on_write: bool = True, ): feature_view_dict = { fv_proto.name: fv_proto @@ -1553,6 +1559,7 @@ def _get_feature_view_and_df_for_online_write( if ( isinstance(feature_view, OnDemandFeatureView) and feature_view.write_to_online_store + and transform_on_write ): if ( feature_view.mode == "python" @@ -1638,6 +1645,7 @@ def write_to_online_store( df: Optional[pd.DataFrame] = None, inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, + transform_on_write: bool = True, ): """ Persists a dataframe to the online store. @@ -1647,6 +1655,7 @@ def write_to_online_store( df: The dataframe to be persisted. inputs: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. + transform_on_write (optional): Whether to transform the data before pushing. """ feature_view, df = self._get_feature_view_and_df_for_online_write( @@ -1654,6 +1663,7 @@ def write_to_online_store( df=df, inputs=inputs, allow_registry_cache=allow_registry_cache, + transform_on_write=transform_on_write, ) provider = self._get_provider() provider.ingest_df(feature_view, df) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 69942f8090c..9f9dbd2dfea 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -874,7 +874,8 @@ def test_stored_writes(self): assert driver_stats_fv.entity_columns == [] ODFV_STRING_CONSTANT = "guaranteed constant" - ODFV_OTHER_STRING_CONSTANT = "somethign else" + ODFV_OTHER_STRING_CONSTANT = "something else" + ODFV_UNTRANSFORMED_STRING_CONSTANT = "also something else" @on_demand_feature_view( entities=[driver], @@ -1069,6 +1070,52 @@ def python_stored_writes_feature_view( assert online_odfv_python_response["string_constant"] != [ ODFV_OTHER_STRING_CONSTANT ] + odfv_entity_rows_to_write_no_transform = [ + { + "driver_id": 1003, + "counter": 10, + "conv_rate": 0.25, + "acc_rate": 0.50, + "input_datetime": current_datetime, + "string_constant": ODFV_UNTRANSFORMED_STRING_CONSTANT, + } + ] + odfv_entity_rows_to_read_no_transform = [ + { + "driver_id": 1003, + "conv_rate_plus_acc": 7, # note how this is not the correct value and would be calculate on demand + "conv_rate": 0.25, + "acc_rate": 0.50, + "counter": 0, + "input_datetime": current_datetime, + "string_constant": ODFV_UNTRANSFORMED_STRING_CONSTANT, + } + ] + print("storing ODFV features") + self.store.write_to_online_store( + feature_view_name="python_stored_writes_feature_view", + df=odfv_entity_rows_to_write_no_transform, + transform_on_write=False, + ) + online_odfv_python_response_no_transform = self.store.get_online_features( + entity_rows=odfv_entity_rows_to_read_no_transform, + features=[ + "python_stored_writes_feature_view:conv_rate_plus_acc", + "python_stored_writes_feature_view:current_datetime", + "python_stored_writes_feature_view:counter", + "python_stored_writes_feature_view:input_datetime", + "python_stored_writes_feature_view:string_constant", + ], + ).to_dict() + # note these are approximately correct by + assert online_odfv_python_response_no_transform == { + "driver_id": [1003], + "counter": [10], + "conv_rate_plus_acc": [None], + "input_datetime": [current_datetime.replace(microsecond=0)], + "string_constant": [ODFV_UNTRANSFORMED_STRING_CONSTANT], + "current_datetime": [None], + } def test_stored_writes_with_explode(self): with tempfile.TemporaryDirectory() as data_dir: