diff --git a/docs/getting-started/architecture/feature-transformation.md b/docs/getting-started/architecture/feature-transformation.md index 562a733fef0..419f6172746 100644 --- a/docs/getting-started/architecture/feature-transformation.md +++ b/docs/getting-started/architecture/feature-transformation.md @@ -18,4 +18,101 @@ when to use which transformation engine/communication pattern is extremely criti the success of your implementation. In general, we recommend transformation engines and network calls to be chosen by aligning it with what is most -appropriate for the data producer, feature/model usage, and overall product. \ No newline at end of file +appropriate for the data producer, feature/model usage, and overall product. + + +## API +### feature_transformation +`feature_transformation` or `udf` are the core APIs for defining feature transformations in Feast. They allow you to specify custom logic that can be applied to the data during materialization or retrieval. Examples include: + +```python +def remove_extra_spaces(df: DataFrame) -> DataFrame: + df['name'] = df['name'].str.replace('\s+', ' ') + return df + +spark_transformation = SparkTransformation( + mode=TransformationMode.SPARK, + udf=remove_extra_spaces, + udf_string="remove extra spaces", +) +feature_view = FeatureView( + feature_transformation=spark_transformation, + ... +) +``` +OR +```python +spark_transformation = Transformation( + mode=TransformationMode.SPARK_SQL, + udf=remove_extra_spaces_sql, + udf_string="remove extra spaces sql", +) +feature_view = FeatureView( + feature_transformation=spark_transformation, + ... +) +``` +OR +```python +@transformation(mode=TransformationMode.SPARK) +def remove_extra_spaces_udf(df: pd.DataFrame) -> pd.DataFrame: + return df.assign(name=df['name'].str.replace('\s+', ' ')) + +feature_view = FeatureView( + feature_transformation=remove_extra_spaces_udf, + ... +) +``` + +### Aggregation +Aggregation is builtin API for defining batch or streamable aggregations on data. It allows you to specify how to aggregate data over a time window, such as calculating the average or sum of a feature over a specified period. Examples include: +```python +from feast import Aggregation +feature_view = FeatureView( + aggregations=[ + Aggregation( + column="amount", + function="sum" + ) + Aggregation( + column="amount", + function="avg", + time_window="1h" + ), + ] + ... +) +``` + +### Filter +ttl: They amount of time that the features will be available for materialization or retrieval. The entity rows' timestamp higher that the current time minus the ttl will be used to filter the features. This is useful for ensuring that only recent data is used in feature calculations. Examples include: + +```python +feature_view = FeatureView( + ttl="1d", # Features will be available for 1 day + ... +) +``` + +### Join +Feast can join multiple feature views together to create a composite feature view. This allows you to combine features from different sources or views into a single view. Examples include: +```python +feature_view = FeatureView( + name="composite_feature_view", + entities=["entity_id"], + source=[ + FeatureView( + name="feature_view_1", + features=["feature_1", "feature_2"], + ... + ), + FeatureView( + name="feature_view_2", + features=["feature_3", "feature_4"], + ... + ) + ] + ... +) +``` +The underlying implementation of the join is an inner join by default, and join key is the entity id. \ No newline at end of file diff --git a/docs/getting-started/concepts/batch-feature-view.md b/docs/getting-started/concepts/batch-feature-view.md new file mode 100644 index 00000000000..83a6f5698d7 --- /dev/null +++ b/docs/getting-started/concepts/batch-feature-view.md @@ -0,0 +1,148 @@ +# ๐Ÿงฌ BatchFeatureView in Feast + +`BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic. + +--- + +## โœ… Key Capabilities + +- **Composable DAG of FeatureViews**: Supports defining a `BatchFeatureView` on top of one or more other `FeatureView`s. +- **Transformations**: Apply [transformation](../../getting-started/architecture/feature-transformation.md) logic (`feature_transformation` or `udf`) to raw data source, can also be used to deal with multiple data sources. +- **Aggregations**: Define time-windowed aggregations (e.g. `sum`, `avg`) over event-timestamped data. +- **Feature resolution & execution**: Automatically resolves and executes DAGs of dependent views during materialization or retrieval. More details in the [Compute engine documentation](../../reference/compute-engine/README.md). +- **Materialization Sink Customization**: Specify a custom `sink_source` to define where derived feature data should be persisted. + +--- + +## ๐Ÿ“ Class Signature + +```python +class BatchFeatureView(FeatureView): + def __init__( + *, + name: str, + source: Union[DataSource, FeatureView, List[FeatureView]], + sink_source: Optional[DataSource] = None, + schema: Optional[List[Field]] = None, + entities: Optional[List[Entity]] = None, + aggregations: Optional[List[Aggregation]] = None, + udf: Optional[Callable[[DataFrame], DataFrame]] = None, + udf_string: Optional[str] = None, + ttl: Optional[timedelta] = timedelta(days=0), + online: bool = True, + offline: bool = False, + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", + ) +``` + +--- + +## ๐Ÿง  Usage + +### 1. Simple Feature View from Data Source + +```python +from feast import BatchFeatureView, Field +from feast.types import Float32, Int32 +from feast import FileSource +from feast.aggregation import Aggregation +from datetime import timedelta + +source = FileSource( + path="s3://bucket/path/data.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver_fv = BatchFeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + schema=[ + Field(name="driver_id", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + aggregations=[ + Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)), + ], + source=source, +) +``` + +--- + +### 2. Derived Feature View from Another View +You can build feature views on top of other features by deriving a feature view from another view. Let's take a look at an example. +```python +from feast import BatchFeatureView, Field +from pyspark.sql import DataFrame +from feast.types import Float32, Int32 +from feast import FileSource + +def transform(df: DataFrame) -> DataFrame: + return df.withColumn("conv_rate", df["conv_rate"] * 2) + +daily_driver_stats = BatchFeatureView( + name="daily_driver_stats", + entities=["driver_id"], + schema=[ + Field(name="driver_id", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + udf=transform, + source=driver_fv, + sink_source=FileSource( # Required to specify where to sink the derived view + name="daily_driver_stats_sink", + path="s3://bucket/daily_stats/", + file_format="parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ), +) +``` + +--- + +## ๐Ÿ”„ Execution Flow + +Feast automatically resolves the DAG of `BatchFeatureView` dependencies during: + +- `materialize()`: recursively resolves and executes the feature view graph. +- `get_historical_features()`: builds the execution plan for retrieving point-in-time correct features. +- `apply()`: registers the feature view DAG structure to the registry. + +Each transformation and aggregation is turned into a DAG node (e.g., `SparkTransformationNode`, `SparkAggregationNode`) executed by the compute engine (e.g., `SparkComputeEngine`). + +--- + +## โš™๏ธ How Materialization Works + +- If the `BatchFeatureView` is backed by a base source (`FileSource`, `BigQuerySource`, `SparkSource` etc), the `batch_source` is used directly. +- If the source is another feature view (i.e., chained views), the `sink_source` must be provided to define the materialization target data source. +- During DAG planning, `SparkWriteNode` uses the `sink_source` as the batch sink. + +--- + +## ๐Ÿงช Example Tests + +See: + +- `test_spark_dag_materialize_recursive_view()`: Validates chaining of two feature views and output validation. +- `test_spark_compute_engine_materialize()`: Validates transformation and write of features into offline and online stores. + +--- + +## ๐Ÿ›‘ Gotchas + +- `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them). +- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist. +- Aggregation logic must reference columns present in the raw source or transformed inputs. + +--- + +## ๐Ÿ”ฎ Future Directions + +- Support additional offline stores (e.g., Snowflake, Redshift) with auto-generated sink sources. +- Enable fully declarative transform logic (SQL + UDF mix). +- Introduce optimization passes for DAG pruning and fusion. diff --git a/docs/reference/compute-engine/README.md b/docs/reference/compute-engine/README.md index 75f29890046..40f7e6c5246 100644 --- a/docs/reference/compute-engine/README.md +++ b/docs/reference/compute-engine/README.md @@ -13,16 +13,39 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio ## ๐Ÿง  Core Concepts -| Component | Description | -|--------------------|----------------------------------------------------------------------| -| `ComputeEngine` | Interface for executing materialization and retrieval tasks | -| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | -| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) | -| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | -| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | +| Component | Description | API | +|--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| +| `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) | +| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) | +| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) | +| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) | +| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | +| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | --- +## Feature resolver and builder +The `FeatureBuilder` initializes a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring the correct execution order. \ +The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \ +When defining a `FeatureView`, the source can be a physical `DataSource`, a derived `FeatureView`, or a list of `FeatureViews`. +The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG. \ +Subsequently, the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.). +An example of built output from FeatureBuilder: +```markdown +- Output(Agg(daily_driver_stats)) + - Agg(daily_driver_stats) + - Filter(daily_driver_stats) + - Transform(daily_driver_stats) + - Agg(hourly_driver_stats) + - Filter(hourly_driver_stats) + - Transform(hourly_driver_stats) + - Source(hourly_driver_stats) +``` + +## Diagram +![feature_dag.png](feature_dag.png) + + ## โœจ Available Engines ### ๐Ÿ”ฅ SparkComputeEngine @@ -44,7 +67,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio SourceReadNode | v -JoinNode (Only for get_historical_features with entity df) +TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources) | v FilterNode (Always included; applies TTL or user-defined filters) @@ -56,9 +79,6 @@ AggregationNode (If aggregations are defined in FeatureView) DeduplicationNode (If no aggregation is defined for get_historical_features) | v -TransformationNode (If feature_transformation is defined) - | - v ValidationNode (If enable_validation = True) | v @@ -79,20 +99,54 @@ To create your own compute engine: ```python from feast.infra.compute_engines.base import ComputeEngine -from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob -from feast.infra.compute_engines.tasks import HistoricalRetrievalTask +from typing import Sequence, Union +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.common.materialization_job import ( + MaterializationJob, + MaterializationTask, +) +from feast.infra.common.retrieval_task import HistoricalRetrievalTask +from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.infra.registry.base_registry import BaseRegistry +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.stream_feature_view import StreamFeatureView + + class MyComputeEngine(ComputeEngine): - def materialize(self, task: MaterializationTask) -> MaterializationJob: + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + ... + + def _materialize_one( + self, + registry: BaseRegistry, + task: MaterializationTask, + **kwargs, + ) -> MaterializationJob: ... def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob: ... + ``` 2. Create a FeatureBuilder ```python from feast.infra.compute_engines.feature_builder import FeatureBuilder + class CustomFeatureBuilder(FeatureBuilder): def build_source_node(self): ... def build_aggregation_node(self, input_node): ... @@ -101,6 +155,7 @@ class CustomFeatureBuilder(FeatureBuilder): def build_dedup_node(self, input_node): def build_transformation_node(self, input_node): ... def build_output_nodes(self, input_node): ... + def build_validation_node(self, input_node): ... ``` 3. Define DAGNode subclasses @@ -114,7 +169,7 @@ class CustomFeatureBuilder(FeatureBuilder): ## ๐Ÿšง Roadmap - [x] Modular, backend-agnostic DAG execution framework - [x] Spark engine with native support for materialization + PIT joins -- [ ] PyArrow + Pandas engine for local compute -- [ ] Native multi-feature-view DAG optimization +- [x] PyArrow + Pandas engine for local compute +- [x] Native multi-feature-view DAG optimization - [ ] DAG validation, metrics, and debug output - [ ] Scalable distributed backend via Ray or Polars diff --git a/docs/reference/compute-engine/feature_dag.png b/docs/reference/compute-engine/feature_dag.png new file mode 100644 index 00000000000..93738eb3575 Binary files /dev/null and b/docs/reference/compute-engine/feature_dag.png differ diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index a82e2c78b13..b77737a8bd5 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -258,11 +258,14 @@ def __copy__(self): fv = FeatureView( name=self.name, ttl=self.ttl, - source=self.stream_source if self.stream_source else self.batch_source, + source=self.source_views + if self.source_views + else (self.stream_source if self.stream_source else self.batch_source), schema=self.schema, tags=self.tags, online=self.online, offline=self.offline, + sink_source=self.batch_source if self.source_views else None, ) # This is deliberately set outside of the FV initialization as we do not have the Entity objects. @@ -289,6 +292,7 @@ def __eq__(self, other): or self.batch_source != other.batch_source or self.stream_source != other.stream_source or sorted(self.entity_columns) != sorted(other.entity_columns) + or self.source_views != other.source_views or self.materialization_intervals != other.materialization_intervals ): return False @@ -372,7 +376,29 @@ def to_proto(self) -> FeatureViewProto: Returns: A FeatureViewProto protobuf. """ + return self._to_proto_internal(seen={}) + + def _to_proto_internal( + self, seen: Dict[str, Union[None, FeatureViewProto]] + ) -> FeatureViewProto: + if self.name in seen: + if seen[self.name] is None: + raise ValueError( + f"Cycle detected during serialization of FeatureView: {self.name}" + ) + return seen[self.name] # type: ignore[return-value] + + seen[self.name] = None + + spec = self.to_proto_spec(seen) meta = self.to_proto_meta() + proto = FeatureViewProto(spec=spec, meta=meta) + seen[self.name] = proto + return proto + + def to_proto_spec( + self, seen: Dict[str, Union[None, FeatureViewProto]] + ) -> FeatureViewSpecProto: ttl_duration = self.get_ttl_duration() batch_source_proto = None @@ -386,8 +412,10 @@ def to_proto(self) -> FeatureViewProto: stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" source_view_protos = None if self.source_views: - source_view_protos = [view.to_proto().spec for view in self.source_views] - spec = FeatureViewSpecProto( + source_view_protos = [ + view._to_proto_internal(seen).spec for view in self.source_views + ] + return FeatureViewSpecProto( name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], @@ -403,8 +431,6 @@ def to_proto(self) -> FeatureViewProto: source_views=source_view_protos, ) - return FeatureViewProto(spec=spec, meta=meta) - def to_proto_meta(self): meta = FeatureViewMetaProto(materialization_intervals=[]) if self.created_timestamp: @@ -426,16 +452,35 @@ def get_ttl_duration(self): return ttl_duration @classmethod - def from_proto(cls, feature_view_proto: FeatureViewProto): + def from_proto(cls, feature_view_proto: FeatureViewProto) -> "FeatureView": + return cls._from_proto_internal(feature_view_proto, seen={}) + + @classmethod + def _from_proto_internal( + cls, + feature_view_proto: FeatureViewProto, + seen: Dict[str, Union[None, "FeatureView"]], + ) -> "FeatureView": """ Creates a feature view from a protobuf representation of a feature view. Args: feature_view_proto: A protobuf representation of a feature view. + seen: A dictionary to keep track of already seen feature views to avoid recursion. Returns: A FeatureViewProto object based on the feature view protobuf. """ + feature_view_name = feature_view_proto.spec.name + + if feature_view_name in seen: + if seen[feature_view_name] is None: + raise ValueError( + f"Cycle detected while deserializing FeatureView: {feature_view_name}" + ) + return seen[feature_view_name] # type: ignore[return-value] + seen[feature_view_name] = None + batch_source = ( DataSource.from_proto(feature_view_proto.spec.batch_source) if feature_view_proto.spec.HasField("batch_source") @@ -447,7 +492,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): else None ) source_views = [ - FeatureView.from_proto(FeatureViewProto(spec=view_spec, meta=None)) + FeatureView._from_proto_internal( + FeatureViewProto(spec=view_spec, meta=None), seen + ) for view_spec in feature_view_proto.spec.source_views ] @@ -510,6 +557,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) ) + seen[feature_view_name] = feature_view return feature_view @property diff --git a/sdk/python/feast/infra/compute_engines/algorithms/topo.py b/sdk/python/feast/infra/compute_engines/algorithms/topo.py index 8b2e7aebc13..b2c098405e1 100644 --- a/sdk/python/feast/infra/compute_engines/algorithms/topo.py +++ b/sdk/python/feast/infra/compute_engines/algorithms/topo.py @@ -3,7 +3,7 @@ from feast.infra.compute_engines.dag.node import DAGNode -def topo_sort(root: DAGNode) -> List[DAGNode]: +def topological_sort(root: DAGNode) -> List[DAGNode]: """ Topologically sort a DAG starting from a single root node. @@ -13,10 +13,10 @@ def topo_sort(root: DAGNode) -> List[DAGNode]: Returns: A list of DAGNodes in topological order (dependencies first). """ - return topo_sort_multiple([root]) + return topological_sort_multiple([root]) -def topo_sort_multiple(roots: List[DAGNode]) -> List[DAGNode]: +def topological_sort_multiple(roots: List[DAGNode]) -> List[DAGNode]: """ Topologically sort a DAG with multiple roots. diff --git a/sdk/python/feast/infra/compute_engines/dag/README.md b/sdk/python/feast/infra/compute_engines/dag/README.md new file mode 100644 index 00000000000..ff1bed14e5f --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/dag/README.md @@ -0,0 +1,9 @@ +# DAG +Directed Acyclic Graph (DAG) representation for feature engineering workflows in Feast. This module is designed to remain abstracted and independent of Feast feature view business logic. + +Core concepts include: +- [context](./context.py): ExecutionContext for managing feature view execution. +- [node](./node.py): DAGNode for representing operations in the DAG. +- [value](./value.py): DAGValue class for handling data passed between nodes. +- [model](./model.py): Model class for defining the dag data classes such as DAGFormat. +- [plan](./plan.py): ExecutionPlan for executing the DAG. diff --git a/sdk/python/feast/infra/compute_engines/feature_builder.py b/sdk/python/feast/infra/compute_engines/feature_builder.py index 26f3703c794..fbea3a85d26 100644 --- a/sdk/python/feast/infra/compute_engines/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/feature_builder.py @@ -4,7 +4,7 @@ from feast import BatchFeatureView, FeatureView, StreamFeatureView from feast.infra.common.materialization_job import MaterializationTask from feast.infra.common.retrieval_task import HistoricalRetrievalTask -from feast.infra.compute_engines.algorithms.topo import topo_sort +from feast.infra.compute_engines.algorithms.topo import topological_sort from feast.infra.compute_engines.dag.context import ColumnInfo from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.dag.plan import ExecutionPlan @@ -115,7 +115,7 @@ def _build(self, view, input_nodes: Optional[List[DAGNode]]) -> DAGNode: def build(self) -> ExecutionPlan: # Step 1: Topo sort the FeatureViewNode DAG (Logical DAG) - logical_nodes = self.feature_resolver.topo_sort(self.dag_root) + logical_nodes = self.feature_resolver.topological_sort(self.dag_root) # Step 2: For each FeatureView, build its corresponding execution DAGNode view_to_node: Dict[str, DAGNode] = {} @@ -136,7 +136,7 @@ def build(self) -> ExecutionPlan: ) # Step 4: Topo sort the final DAG from the output node (Physical DAG) - sorted_nodes = topo_sort(final_node) + sorted_nodes = topological_sort(final_node) # Step 5: Return sorted execution plan return ExecutionPlan(sorted_nodes) diff --git a/sdk/python/feast/infra/compute_engines/feature_resolver.py b/sdk/python/feast/infra/compute_engines/feature_resolver.py index ae2f505c1d7..a22e07bf1f7 100644 --- a/sdk/python/feast/infra/compute_engines/feature_resolver.py +++ b/sdk/python/feast/infra/compute_engines/feature_resolver.py @@ -1,7 +1,7 @@ from typing import List, Optional, Set from feast.feature_view import FeatureView -from feast.infra.compute_engines.algorithms.topo import topo_sort +from feast.infra.compute_engines.algorithms.topo import topological_sort from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.dag.value import DAGValue @@ -78,8 +78,8 @@ def _walk(self, view: FeatureView): return node - def topo_sort(self, root: FeatureViewNode) -> List[FeatureViewNode]: - return topo_sort(root) # type: ignore + def topological_sort(self, root: FeatureViewNode) -> List[FeatureViewNode]: + return topological_sort(root) # type: ignore def debug_dag(self, node: FeatureViewNode, depth=0): """ diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 79db6334ffd..17d853fe2bc 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -38,6 +38,7 @@ from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) +OnDemandSourceType = Union[FeatureView, FeatureViewProjection, RequestSource] @typechecked @@ -82,13 +83,7 @@ def __init__( # noqa: C901 name: str, entities: Optional[List[Entity]] = None, schema: Optional[List[Field]] = None, - sources: List[ - Union[ - FeatureView, - RequestSource, - FeatureViewProjection, - ] - ], + sources: List[OnDemandSourceType], udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", feature_transformation: Optional[Transformation] = None, @@ -468,7 +463,7 @@ def from_proto( ) for feature in on_demand_feature_view_proto.spec.features ], - sources=sources, + sources=cast(List[OnDemandSourceType], sources), feature_transformation=transformation, mode=on_demand_feature_view_proto.spec.mode or "pandas", description=on_demand_feature_view_proto.spec.description, diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 911c94ff34c..b6632e7f178 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -10,6 +10,13 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewMeta as FeatureViewMetaProto, +) +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewSpec as FeatureViewSpecProto, +) from feast.protos.feast.types.Value_pb2 import ValueType from feast.types import Float32 from feast.utils import _utc_now, make_tzaware @@ -192,3 +199,102 @@ def test_update_materialization_intervals(): second_updated_feature_view.materialization_intervals[0][1] == updated_feature_view.materialization_intervals[0][1] ) + + +def test_create_feature_view_with_chained_views(): + file_source = FileSource(name="my-file-source", path="test.parquet") + sink_source = FileSource(name="my-sink-source", path="sink.parquet") + feature_view_1 = FeatureView( + name="my-feature-view-1", + entities=[], + schema=[Field(name="feature1", dtype=Float32)], + source=file_source, + ) + feature_view_2 = FeatureView( + name="my-feature-view-2", + entities=[], + schema=[Field(name="feature2", dtype=Float32)], + source=feature_view_1, + sink_source=sink_source, + ) + feature_view_3 = FeatureView( + name="my-feature-view-3", + entities=[], + schema=[Field(name="feature3", dtype=Float32)], + source=[feature_view_1, feature_view_2], + sink_source=sink_source, + ) + + assert feature_view_2.name == "my-feature-view-2" + assert feature_view_2.schema == [Field(name="feature2", dtype=Float32)] + assert feature_view_2.batch_source == sink_source + assert feature_view_2.source_views == [feature_view_1] + + assert feature_view_3.name == "my-feature-view-3" + assert feature_view_3.schema == [Field(name="feature3", dtype=Float32)] + assert feature_view_3.batch_source == sink_source + assert feature_view_3.source_views == [feature_view_1, feature_view_2] + + +def test_feature_view_to_proto_with_cycle(): + fv_a = FeatureView( + name="fv_a", + schema=[Field(name="feature1", dtype=Float32)], + source=FileSource(name="source_a", path="source_a.parquet"), + ttl=timedelta(days=1), + entities=[], + ) + + fv_b = FeatureView( + name="fv_b", + schema=[Field(name="feature1", dtype=Float32)], + source=[fv_a], + ttl=timedelta(days=1), + entities=[], + sink_source=FileSource(name="sink_source_b", path="sink_b.parquet"), + ) + + fv_a = FeatureView( + name="fv_a", + schema=[Field(name="feature1", dtype=Float32)], + source=[fv_b], + ttl=timedelta(days=1), + entities=[], + sink_source=FileSource(name="sink_source_a", path="sink_a.parquet"), + ) + with pytest.raises( + ValueError, match="Cycle detected during serialization of FeatureView: fv_a" + ): + fv_a.to_proto() + + +def test_feature_view_from_proto_with_cycle(): + # Create spec_a + spec_a = FeatureViewSpecProto() + spec_a.name = "fv_a" + spec_a.entities.append("entity_id") + spec_a.features.append(Field(name="a", dtype=Float32).to_proto()) + spec_a.batch_source.CopyFrom( + FileSource(name="source_a", path="source_a.parquet").to_proto() + ) + + # Create spec_b + spec_b = FeatureViewSpecProto() + spec_b.name = "fv_b" + spec_b.entities.append("entity_id") + spec_b.features.append(Field(name="b", dtype=Float32).to_proto()) + spec_b.batch_source.CopyFrom( + FileSource(name="source_b", path="source_b.parquet").to_proto() + ) + + # Create the cycle: A โ†’ B โ†’ A + spec_b.source_views.append(spec_a) + spec_a.source_views.append(spec_b) + + # Trigger deserialization + proto_a = FeatureViewProto(spec=spec_a, meta=FeatureViewMetaProto()) + + with pytest.raises( + ValueError, match="Cycle detected while deserializing FeatureView: fv_a" + ): + FeatureView.from_proto(proto_a)