diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index a2c59360908..6b6154ed270 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -11,6 +11,7 @@ from .batch_feature_view import BatchFeatureView from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource +from .dataframe import DataFrameEngine, FeastDataFrame from .entity import Entity from .feature import Feature from .feature_service import FeatureService @@ -32,9 +33,11 @@ __all__ = [ "BatchFeatureView", + "DataFrameEngine", "Entity", "KafkaSource", "KinesisSource", + "FeastDataFrame", "Feature", "Field", "FeatureService", diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py new file mode 100644 index 00000000000..0a54a11c232 --- /dev/null +++ b/sdk/python/feast/dataframe.py @@ -0,0 +1,96 @@ +"""FeastDataFrame: A lightweight container for DataFrame-like objects in Feast.""" + +from enum import Enum +from typing import Any, Dict, Optional + +import pandas as pd +import pyarrow as pa + + +class DataFrameEngine(str, Enum): + """Supported DataFrame engines.""" + + PANDAS = "pandas" + SPARK = "spark" + DASK = "dask" + RAY = "ray" + ARROW = "arrow" + POLARS = "polars" + UNKNOWN = "unknown" + + +class FeastDataFrame: + """ + A lightweight container for DataFrame-like objects in Feast. + + This class wraps any DataFrame implementation and provides metadata + about the engine type for proper routing in Feast's processing pipeline. + """ + + def __init__( + self, + data: Any, + engine: Optional[DataFrameEngine] = None, + metadata: Optional[Dict[str, Any]] = None, + ): + """ + Initialize a FeastDataFrame. + + Args: + data: The wrapped DataFrame object (pandas, Spark, Dask, etc.) + engine: Explicitly specify the engine type (auto-detected if None) + metadata: Additional metadata (schema hints, etc.) + """ + self.data = data + self.metadata = metadata or {} + + # Detect the actual engine from the data + detected_engine = self._detect_engine() + + if engine is not None: + # Validate that the provided engine matches the detected engine + if engine != detected_engine: + raise ValueError( + f"Provided engine '{engine.value}' does not match detected engine '{detected_engine.value}' " + f"for data type {type(data).__name__}" + ) + self._engine = engine + else: + self._engine = detected_engine + + def _detect_engine(self) -> DataFrameEngine: + """Auto-detect the DataFrame engine based on type.""" + if isinstance(self.data, pd.DataFrame): + return DataFrameEngine.PANDAS + elif isinstance(self.data, pa.Table): + return DataFrameEngine.ARROW + + # For optional dependencies, check module name to avoid import errors + module = type(self.data).__module__ + if "pyspark" in module: + return DataFrameEngine.SPARK + elif "dask" in module: + return DataFrameEngine.DASK + elif "ray" in module: + return DataFrameEngine.RAY + elif "polars" in module: + return DataFrameEngine.POLARS + else: + return DataFrameEngine.UNKNOWN + + @property + def engine(self) -> DataFrameEngine: + """Get the detected or specified engine type.""" + return self._engine + + def __repr__(self): + return f"FeastDataFrame(engine={self.engine.value}, type={type(self.data).__name__})" + + @property + def is_lazy(self) -> bool: + """Check if the underlying DataFrame is lazy (Spark, Dask, Ray).""" + return self.engine in [ + DataFrameEngine.SPARK, + DataFrameEngine.DASK, + DataFrameEngine.RAY, + ] diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 967c507c6a0..2491a28badc 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Union +from typing import TYPE_CHECKING, Any, Dict, List, TypeAlias, Union import pandas as pd import pyarrow as pa @@ -25,9 +25,9 @@ if TYPE_CHECKING: import torch - TorchTensor = torch.Tensor + TorchTensor: TypeAlias = torch.Tensor else: - TorchTensor = Any + TorchTensor: TypeAlias = Any TIMESTAMP_POSTFIX: str = "__ts" diff --git a/sdk/python/tests/unit/test_dataframe.py b/sdk/python/tests/unit/test_dataframe.py new file mode 100644 index 00000000000..959df98c899 --- /dev/null +++ b/sdk/python/tests/unit/test_dataframe.py @@ -0,0 +1,139 @@ +"""Unit tests for FeastDataFrame.""" + +import pandas as pd +import pyarrow as pa +import pytest + +from feast.dataframe import DataFrameEngine, FeastDataFrame + + +class TestFeastDataFrame: + """Test suite for FeastDataFrame functionality.""" + + def test_pandas_detection(self): + """Test auto-detection of pandas DataFrame.""" + df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + feast_df = FeastDataFrame(df) + + assert feast_df.engine == DataFrameEngine.PANDAS + assert not feast_df.is_lazy + assert isinstance(feast_df.data, pd.DataFrame) + + def test_arrow_detection(self): + """Test auto-detection of Arrow Table.""" + table = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) + feast_df = FeastDataFrame(table) + + assert feast_df.engine == DataFrameEngine.ARROW + assert not feast_df.is_lazy + assert isinstance(feast_df.data, pa.Table) + + def test_explicit_engine(self): + """Test explicit engine specification with unknown data.""" + data = {"mock": "data"} + feast_df = FeastDataFrame(data, engine=DataFrameEngine.UNKNOWN) + + assert feast_df.engine == DataFrameEngine.UNKNOWN + assert not feast_df.is_lazy + + def test_unknown_engine(self): + """Test handling of unknown DataFrame types.""" + data = {"some": "dict"} + feast_df = FeastDataFrame(data) + + assert feast_df.engine == DataFrameEngine.UNKNOWN + + def test_metadata(self): + """Test metadata handling.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + metadata = {"features": ["a"], "source": "test"} + feast_df = FeastDataFrame(df, metadata=metadata) + + assert feast_df.metadata == metadata + assert feast_df.metadata["features"] == ["a"] + + def test_repr(self): + """Test string representation.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df) + + repr_str = repr(feast_df) + assert "FeastDataFrame" in repr_str + assert "engine=pandas" in repr_str + assert "DataFrame" in repr_str + + def test_is_lazy_property(self): + """Test is_lazy property for different engines.""" + # Test with pandas DataFrame (not lazy) + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df) + assert not feast_df.is_lazy + + # Test with Arrow table (not lazy) + table = pa.table({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(table) + assert not feast_df.is_lazy + + # Test with unknown data type (not lazy) + unknown_data = {"mock": "data"} + feast_df = FeastDataFrame(unknown_data) + assert not feast_df.is_lazy + + # Test explicit lazy engines (using unknown data to avoid type validation) + for lazy_engine in [ + DataFrameEngine.SPARK, + DataFrameEngine.DASK, + DataFrameEngine.RAY, + ]: + feast_df = FeastDataFrame(unknown_data, engine=DataFrameEngine.UNKNOWN) + feast_df._engine = lazy_engine # Override for testing + assert feast_df.is_lazy + + def test_polars_detection(self): + """Test detection of polars DataFrame (using mock).""" + + # Mock polars DataFrame + class MockPolarsDF: + __module__ = "polars.dataframe.frame" + + def __init__(self): + pass + + polars_df = MockPolarsDF() + feast_df = FeastDataFrame(polars_df) + + assert feast_df.engine == DataFrameEngine.POLARS + assert not feast_df.is_lazy + + def test_engine_validation_valid(self): + """Test that providing a correct engine passes validation.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df, engine=DataFrameEngine.PANDAS) + + assert feast_df.engine == DataFrameEngine.PANDAS + assert isinstance(feast_df.data, pd.DataFrame) + + def test_engine_validation_invalid(self): + """Test that providing an incorrect engine raises ValueError.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + + with pytest.raises( + ValueError, + match="Provided engine 'spark' does not match detected engine 'pandas'", + ): + FeastDataFrame(df, engine=DataFrameEngine.SPARK) + + def test_engine_validation_arrow(self): + """Test engine validation with Arrow table.""" + table = pa.table({"a": [1, 2, 3]}) + + # Valid case + feast_df = FeastDataFrame(table, engine=DataFrameEngine.ARROW) + assert feast_df.engine == DataFrameEngine.ARROW + + # Invalid case + with pytest.raises( + ValueError, + match="Provided engine 'pandas' does not match detected engine 'arrow'", + ): + FeastDataFrame(table, engine=DataFrameEngine.PANDAS)