diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index 8a82ec24a64..bb9008c9fb3 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -16,6 +16,7 @@ from ray.data.context import DatasetContext from feast.data_source import DataSource +from feast.dataframe import DataFrameEngine, FeastDataFrame from feast.errors import ( RequestDataNotFoundInEntityDfException, SavedDatasetLocationAlreadyExists, @@ -1037,6 +1038,28 @@ def to_arrow( df = result.to_pandas() return pa.Table.from_pandas(df) + def to_feast_df( + self, + validation_reference: Optional[ValidationReference] = None, + timeout: Optional[int] = None, + ) -> FeastDataFrame: + """ + Return the result as a FeastDataFrame with Ray engine. + + This preserves Ray's lazy execution by wrapping the Ray Dataset directly. + """ + # If we have on-demand feature views, fall back to base class Arrow implementation + if self.on_demand_feature_views: + return super().to_feast_df(validation_reference, timeout) + + # Get the Ray Dataset directly (maintains lazy execution) + ray_ds = self._get_ray_dataset() + + return FeastDataFrame( + data=ray_ds, + engine=DataFrameEngine.RAY, + ) + def to_remote_storage(self) -> list[str]: if not self._staging_location: raise ValueError("Staging location must be set for remote materialization.") diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 9eed26a5e5e..b81ca5ab1bb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -4,7 +4,20 @@ import warnings from dataclasses import asdict, dataclass from datetime import datetime, timezone -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Union, + cast, +) + +if TYPE_CHECKING: + from feast.saved_dataset import ValidationReference import numpy as np import pandas @@ -18,6 +31,7 @@ from feast import FeatureView, OnDemandFeatureView from feast.data_source import DataSource +from feast.dataframe import DataFrameEngine, FeastDataFrame from feast.errors import EntitySQLEmptyResults, InvalidEntityType from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL from feast.infra.offline_stores import offline_utils @@ -389,6 +403,23 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout)) + def to_feast_df( + self, + validation_reference: Optional["ValidationReference"] = None, + timeout: Optional[int] = None, + ) -> FeastDataFrame: + """ + Return the result as a FeastDataFrame with Spark engine. + + This preserves Spark's lazy execution by wrapping the Spark DataFrame directly. + """ + # Get the Spark DataFrame directly (maintains lazy execution) + spark_df = self.to_spark_df() + return FeastDataFrame( + data=spark_df, + engine=DataFrameEngine.SPARK, + ) + def persist( self, storage: SavedDatasetStorage, diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 8a15fb738d1..5961c1f4292 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -32,6 +32,7 @@ from feast import flags_helper from feast.data_source import DataSource +from feast.dataframe import DataFrameEngine, FeastDataFrame from feast.dqm.errors import ValidationFailed from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView @@ -93,6 +94,49 @@ def to_df( .reset_index(drop=True) ) + def to_feast_df( + self, + validation_reference: Optional["ValidationReference"] = None, + timeout: Optional[int] = None, + ) -> FeastDataFrame: + """ + Synchronously executes the underlying query and returns the result as a FeastDataFrame. + + This is the new primary method that returns FeastDataFrame with proper engine detection. + On demand transformations will be executed. If a validation reference is provided, the dataframe + will be validated. + + Args: + validation_reference (optional): The validation to apply against the retrieved dataframe. + timeout (optional): The query timeout if applicable. + """ + # Get Arrow table as before + arrow_table = self.to_arrow( + validation_reference=validation_reference, timeout=timeout + ) + + # Prepare metadata + metadata = {} + + # Add features to metadata if available + if hasattr(self, "features"): + metadata["features"] = self.features + else: + metadata["features"] = [] + + # Add on-demand feature views to metadata + if hasattr(self, "on_demand_feature_views") and self.on_demand_feature_views: + metadata["on_demand_feature_views"] = [ + odfv.name for odfv in self.on_demand_feature_views + ] + else: + metadata["on_demand_feature_views"] = [] + + # Wrap in FeastDataFrame with Arrow engine and metadata + return FeastDataFrame( + data=arrow_table, engine=DataFrameEngine.ARROW, metadata=metadata + ) + def to_arrow( self, validation_reference: Optional["ValidationReference"] = None, diff --git a/sdk/python/tests/unit/test_retrieval_job_dataframe.py b/sdk/python/tests/unit/test_retrieval_job_dataframe.py new file mode 100644 index 00000000000..9c8328a6251 --- /dev/null +++ b/sdk/python/tests/unit/test_retrieval_job_dataframe.py @@ -0,0 +1,168 @@ +"""Tests for RetrievalJob FeastDataFrame integration.""" + +from unittest.mock import Mock + +import pandas as pd +import pyarrow as pa + +from feast.dataframe import DataFrameEngine, FeastDataFrame +from feast.infra.offline_stores.offline_store import RetrievalJob + + +class MockRetrievalJob(RetrievalJob): + """Mock RetrievalJob for testing.""" + + def __init__( + self, arrow_table: pa.Table, features: list = None, odfvs: list = None + ): + self.arrow_table = arrow_table + self.features = features or [] + self.odfvs = odfvs or [] + + def _to_arrow_internal(self, timeout=None): + return self.arrow_table + + @property + def full_feature_names(self): + return False + + @property + def on_demand_feature_views(self): + return self.odfvs + + +class TestRetrievalJobFeastDataFrame: + """Test RetrievalJob FeastDataFrame integration.""" + + def test_to_feast_df_basic(self): + """Test basic to_feast_df functionality.""" + # Create test data + test_data = pa.table( + { + "feature1": [1, 2, 3], + "feature2": ["a", "b", "c"], + "timestamp": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]), + } + ) + + # Create mock retrieval job + job = MockRetrievalJob(test_data, features=["feature1", "feature2"]) + + # Test to_feast_df + feast_df = job.to_feast_df() + + # Assertions + assert isinstance(feast_df, FeastDataFrame) + assert feast_df.engine == DataFrameEngine.ARROW + assert isinstance(feast_df.data, pa.Table) + assert feast_df.data.num_rows == 3 + assert feast_df.data.num_columns == 3 + + def test_to_feast_df_metadata(self): + """Test to_feast_df metadata population.""" + # Create test data + test_data = pa.table({"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]}) + + # Create mock on-demand feature views + mock_odfv1 = Mock() + mock_odfv1.name = "odfv1" + # Mock transform_arrow to return an empty table (no new columns added) + mock_odfv1.transform_arrow.return_value = pa.table({}) + + mock_odfv2 = Mock() + mock_odfv2.name = "odfv2" + # Mock transform_arrow to return an empty table (no new columns added) + mock_odfv2.transform_arrow.return_value = pa.table({}) + + # Create mock retrieval job with features and ODFVs + job = MockRetrievalJob( + test_data, features=["feature1", "feature2"], odfvs=[mock_odfv1, mock_odfv2] + ) + + # Test to_feast_df + feast_df = job.to_feast_df() + + # Check metadata + assert "features" in feast_df.metadata + assert "on_demand_feature_views" in feast_df.metadata + assert feast_df.metadata["features"] == ["feature1", "feature2"] + assert feast_df.metadata["on_demand_feature_views"] == ["odfv1", "odfv2"] + + def test_to_feast_df_with_timeout(self): + """Test to_feast_df with timeout parameter.""" + test_data = pa.table({"feature1": [1, 2, 3]}) + job = MockRetrievalJob(test_data) + + # Test with timeout - should not raise any errors + feast_df = job.to_feast_df(timeout=30) + + assert isinstance(feast_df, FeastDataFrame) + assert feast_df.engine == DataFrameEngine.ARROW + + def test_to_feast_df_empty_metadata(self): + """Test to_feast_df with empty features and ODFVs.""" + test_data = pa.table({"feature1": [1, 2, 3]}) + job = MockRetrievalJob(test_data) # No features or ODFVs provided + + feast_df = job.to_feast_df() + + # Should handle missing features gracefully + assert feast_df.metadata["features"] == [] + assert feast_df.metadata["on_demand_feature_views"] == [] + + def test_to_feast_df_preserves_arrow_data(self): + """Test that to_feast_df preserves the original Arrow data.""" + # Create test data with specific types + test_data = pa.table( + { + "int_feature": pa.array([1, 2, 3], type=pa.int64()), + "float_feature": pa.array([1.1, 2.2, 3.3], type=pa.float64()), + "string_feature": pa.array(["a", "b", "c"], type=pa.string()), + "bool_feature": pa.array([True, False, True], type=pa.bool_()), + } + ) + + job = MockRetrievalJob(test_data) + feast_df = job.to_feast_df() + + # Check that the Arrow data is exactly the same + assert feast_df.data.equals(test_data) + assert feast_df.data.schema == test_data.schema + + # Check column names and types are preserved + assert feast_df.data.column_names == test_data.column_names + for i, column in enumerate(test_data.schema): + assert feast_df.data.schema.field(i).type == column.type + + def test_to_df_still_works(self): + """Test that the original to_df method still works unchanged.""" + test_data = pa.table({"feature1": [1, 2, 3], "feature2": ["a", "b", "c"]}) + + job = MockRetrievalJob(test_data) + + # Test to_df returns pandas DataFrame + df = job.to_df() + + assert isinstance(df, pd.DataFrame) + assert len(df) == 3 + assert list(df.columns) == ["feature1", "feature2"] + assert df["feature1"].tolist() == [1, 2, 3] + assert df["feature2"].tolist() == ["a", "b", "c"] + + def test_both_methods_return_same_data(self): + """Test that to_df and to_feast_df return equivalent data.""" + test_data = pa.table( + {"feature1": [1, 2, 3, 4], "feature2": [10.5, 20.5, 30.5, 40.5]} + ) + + job = MockRetrievalJob(test_data) + + # Get data from both methods + df = job.to_df() + feast_df = job.to_feast_df() + + # Convert FeastDataFrame to pandas for comparison + feast_as_pandas = feast_df.data.to_pandas().reset_index(drop=True) + + # Should be equivalent + pd.testing.assert_frame_equal(df, feast_as_pandas)