diff --git a/docs/getting-started/concepts/README.md b/docs/getting-started/concepts/README.md index 13722a8a7da..47ef8553781 100644 --- a/docs/getting-started/concepts/README.md +++ b/docs/getting-started/concepts/README.md @@ -28,6 +28,10 @@ [stream-feature-view.md](stream-feature-view.md) {% endcontent-ref %} +{% content-ref url="tiling.md" %} +[tiling.md](tiling.md) +{% endcontent-ref %} + {% content-ref url="feature-retrieval.md" %} [feature-retrieval.md](feature-retrieval.md) {% endcontent-ref %} diff --git a/docs/getting-started/concepts/stream-feature-view.md b/docs/getting-started/concepts/stream-feature-view.md index e9b08473b45..2af840237f9 100644 --- a/docs/getting-started/concepts/stream-feature-view.md +++ b/docs/getting-started/concepts/stream-feature-view.md @@ -13,4 +13,6 @@ - **Aggregations**: Define time-windowed aggregations (e.g., `sum`, `avg`) over event-timestamped data. +- **⚡ Tiling with Intermediate Representations**: Enable efficient pre-aggregation with correct merging semantics for holistic aggregations like `avg` and `std`. This provides faster queries while maintaining mathematical accuracy. [Learn more about tiling](tiling.md) + - **Feature resolution & execution**: Automatically resolves and executes dependent views during materialization or retrieval. diff --git a/docs/getting-started/concepts/tiling.md b/docs/getting-started/concepts/tiling.md new file mode 100644 index 00000000000..5f9d225baa0 --- /dev/null +++ b/docs/getting-started/concepts/tiling.md @@ -0,0 +1,285 @@ +# Tiling with Intermediate Representations in Feast + +## Overview + +**Tiling** is an optimization technique for **streaming time-windowed aggregations** that enables massively efficient feature computation by pre-aggregating data into smaller time intervals (tiles) and storing **Intermediate Representations (IRs)** for correct merging. + +**Primary Use Case: Streaming** + +Tiling provides **speedup** for streaming scenarios where features are updated frequently (every few minutes) from sources like Kafka, Kinesis, or PushSource. + +**Key Benefits (Streaming):** +- **Faster**: Reuse 90%+ of tiles between updates instead of recomputing from scratch +- **Correct results**: IRs ensure mathematically accurate merging for all aggregation types +- **Memory efficient**: Only process new events, reuse previous tiles in memory +- **Real-time capable**: Handle high-throughput streaming with low latency +- **Incremental updates**: Compute 1 new tile instead of rescanning entire window + +--- + +## The Problem: Why Intermediate Representations? + +Traditional approaches to time-windowed aggregations either: +1. **Recompute from raw data** every time → Slow, expensive +2. **Store final aggregated values** per tile → Fast but often **incorrect** when merging + +### The Merging Problem + +You **cannot correctly merge** many common aggregations: + +``` +WRONG: avg(tile1, tile2) ≠ (avg_tile1 + avg_tile2) / 2 + +Example: + tile1: [10, 20, 30] → avg = 20 + tile2: [100] → avg = 100 + + Correct merged avg: (10+20+30+100) / 4 = 40 + Wrong merged avg: (20 + 100) / 2 = 60 +``` + +**The same problem exists for:** +- Standard deviation (`std`) +- Variance (`var`) +- Median and percentiles +- Any "holistic" aggregation that requires knowledge of all values + +--- + +## The Solution: Intermediate Representations (IRs) + +Instead of storing **final aggregated values**, store **intermediate data** that preserves the mathematical properties needed for correct merging. + +### Example: Average + +**Traditional (Incorrect)**: +``` +Tile 1: avg = 20 +Tile 2: avg = 20 +Merged avg = (20 + 20) / 2 = 20 - WRONG +``` + +**With IRs (Correct)**: +``` +Tile 1: sum = 60, count = 3 +Tile 2: sum = 100, count = 1 +Merged: sum = 160, count = 4 +Merged avg = 160 / 4 = 40 - CORRECT +``` + +--- + +## Aggregation Categories + +### Algebraic Aggregations + +These can be merged by applying the same aggregation function to tiles: + +| Aggregation | Stored Value | Merge Strategy | Storage | +|-------------|--------------|----------------|---------| +| `sum` | sum | `sum(tile_sums)` | 1 column | +| `count` | count | `sum(tile_counts)` | 1 column | +| `max` | max | `max(tile_maxes)` | 1 column | +| `min` | min | `min(tile_mins)` | 1 column | + +**No IRs needed** - the final value is the IR! + +--- + +### Holistic Aggregations + +These require storing multiple intermediate values: + +#### Average (`avg`, `mean`) + +**Stored IRs**: `sum`, `count` +**Final computation**: `avg = sum / count` +**Merge strategy**: Sum the sums and counts, then divide + +**Storage**: 3 columns (final + 2 IRs) + +--- + +#### Standard Deviation (`std`, `stddev`) + +**Stored IRs**: `count`, `sum`, `sum_of_squares` +**Final computation**: +```python +variance = (sum_sq - sum²/count) / (count - δ) +std = sqrt(variance) +# δ = 1 for sample, 0 for population +``` + +**Merge strategy**: Sum all three IRs, then apply formula + +**Storage**: 4 columns (final + 3 IRs) + +--- + +#### Variance (`var`, `variance`) + +**Stored IRs**: `count`, `sum`, `sum_of_squares` +**Final computation**: Same as std but without `sqrt()` + +**Storage**: 4 columns (final + 3 IRs) + +--- + +## How Tiling Works + +Tiling is optimized for **streaming scenarios** with frequent updates (e.g., every few minutes). + +### 1. Continuous Tile Updates + +``` +Stream Events → Partition by Hop Intervals → Compute IRs → Store Windowed Aggregations + | | | | + | | | └─> Online Store (Redis, etc.) + | | └─> avg_sum, avg_count, std_sum_sq, etc. + | └─> 5-min hops: [00:00-00:05], [00:05-00:10], ... + └─> customer_id=1: [txn1, txn2, txn3, ...] + +Every 5 minutes: +- New events arrive +- Only 1 new tile computed (5 min of data) +- 11 previous tiles reused (in memory during streaming session) +- Final aggregation = merge 12 tiles (1 new + 11 reused) +``` + +**Why It's Fast:** +- **Without tiling:** Scan entire 1-hour window (1000+ events) every 5 minutes +- **With tiling:** Only process 5 minutes of new events, reuse previous tiles +- **Speedup:** Faster for streaming updates! + +--- + +### 2. Streaming Update Efficiency + +| Update | Without Tiling | With Tiling | Tile Reuse | +|--------|---------------|-------------|------------| +| T=00:00 | Compute 1hr | Compute 12 tiles | 0% reuse (initial) | +| T=00:05 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | +| T=00:10 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | +| T=00:15 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | + +**Key Benefit:** Tiles stay in memory during the streaming session, enabling massive reuse. + +--- + +## Tiling Algorithm + +### Sawtooth Window Tiling + +1. **Partition events** into hop-sized intervals (e.g., 5 minutes) +2. **Compute cumulative tail aggregations** for each hop from the start of the materialization window +3. **Subtract tiles** to form windowed aggregations (current_tile - previous_tile) +4. **Store IRs** for correct merging of holistic aggregations +5. **At materialization**, store windowed aggregations in online store + +**Benefits**: +- Efficient query-time performance (pre-computed windows) +- Minimal storage overhead (only hop-sized tiles) +- Mathematically correct for all aggregation types + +--- + +## Configuration + +### Recommended: StreamFeatureView (Streaming Scenarios) + +Tiling provides **maximum benefit for streaming scenarios** with frequent updates: + +```python +from feast import StreamFeatureView, Aggregation +from feast.data_source import PushSource, KafkaSource +from datetime import timedelta + +# Example with Kafka streaming source +customer_features = StreamFeatureView( + name="customer_transaction_features", + entities=[customer], + source=KafkaSource( + name="transactions_stream", + kafka_bootstrap_servers="localhost:9092", + topic="transactions", + timestamp_field="event_timestamp", + batch_source=file_source, # For historical data + ), + aggregations=[ + Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)), + Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)), + Aggregation(column="amount", function="std", time_window=timedelta(hours=1)), + ], + timestamp_field="event_timestamp", + online=True, + + # Tiling configuration + enable_tiling=True, # speedup for streaming! + tiling_hop_size=timedelta(minutes=5), # Update frequency +) +``` + +**When to Enable:** +- Streaming data sources (Kafka, Kinesis, PushSource) +- Frequent updates (every few minutes) +- Real-time feature serving +- High-throughput event processing + +--- + +### Key Parameters + +- `aggregations`: List of time-windowed aggregations to compute +- `timestamp_field`: Column name for timestamps (required when aggregations are specified) +- `enable_tiling`: Enable tiling optimization (default: `False`) + - Set to `True` for **streaming scenarios** +- `tiling_hop_size`: Time interval between tiles (default: 5 minutes) + - Smaller = more granular tiles, potentially higher memory during processing window + - Larger = less granular tiles, potentially lower memory during processing window + +### Compute Engine Requirements + +- **Spark Compute Engine**: Fully supported for streaming and batch +- **Ray Compute Engine**: Fully supported for streaming and batch +- **Local Compute Engine**: Does NOT support time-windowed aggregations + +--- + +## Architecture + +Tiling in Feast uses a **simple, pure pandas architecture** that works with any compute engine: + +### How It Works + +``` +┌─────────────────┐ +│ Engine DataFrame│ (Spark/Ray/etc) +└────────┬────────┘ + │ .toPandas() / .to_pandas() + ▼ +┌─────────────────┐ +│ Pandas DataFrame│ +└────────┬────────┘ + │ orchestrator.apply_sawtooth_window_tiling() + ▼ +┌─────────────────┐ +│ Cumulative │ (pandas with _tile_start, _tile_end, IRs) +│ Tiles │ +└────────┬────────┘ + │ tile_subtraction.convert_cumulative_to_windowed() + ▼ +┌─────────────────┐ +│ Windowed │ (pandas with final aggregations) +│ Aggregations │ +└────────┬────────┘ + │ spark.createDataFrame() / ray.from_pandas() + ▼ +┌─────────────────┐ +│ Engine DataFrame│ +└─────────────────┘ +``` + + +## Summary + +Tiling with Intermediate Representations provides a powerful optimization for **streaming time-windowed aggregations** in Feast. diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index cb7da0faf34..6492cbe3069 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -37,7 +37,7 @@ message StreamFeatureView { FeatureViewMeta meta = 2; } -// Next available id: 17 +// Next available id: 20 message StreamFeatureViewSpec { // Name of the feature view. Must be unique. Not updated. string name = 1; @@ -92,5 +92,12 @@ message StreamFeatureViewSpec { // Oneof with {user_defined_function, on_demand_substrait_transformation} FeatureTransformationV2 feature_transformation = 17; + + // Enable tiling for efficient window aggregation + bool enable_tiling = 18; + + // Hop size for tiling (e.g., 5 minutes). Determines the granularity of pre-aggregated tiles. + // If not specified, defaults to 5 minutes. Only used when enable_tiling is true. + google.protobuf.Duration tiling_hop_size = 19; } diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 6b6154ed270..ffd05559ee1 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -9,6 +9,7 @@ from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from .aggregation import Aggregation from .batch_feature_view import BatchFeatureView from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource from .dataframe import DataFrameEngine, FeastDataFrame @@ -32,6 +33,7 @@ pass __all__ = [ + "Aggregation", "BatchFeatureView", "DataFrameEngine", "Entity", diff --git a/sdk/python/feast/aggregation.py b/sdk/python/feast/aggregation/__init__.py similarity index 97% rename from sdk/python/feast/aggregation.py rename to sdk/python/feast/aggregation/__init__.py index cfb2e7de94c..22a727e7708 100644 --- a/sdk/python/feast/aggregation.py +++ b/sdk/python/feast/aggregation/__init__.py @@ -1,3 +1,7 @@ +""" +Aggregation module for Feast. +""" + from datetime import timedelta from typing import Optional @@ -91,3 +95,6 @@ def __eq__(self, other): return False return True + + +__all__ = ["Aggregation"] diff --git a/sdk/python/feast/aggregation/tiling/__init__.py b/sdk/python/feast/aggregation/tiling/__init__.py new file mode 100644 index 00000000000..08017db0a02 --- /dev/null +++ b/sdk/python/feast/aggregation/tiling/__init__.py @@ -0,0 +1,27 @@ +""" +Tiling for efficient time-windowed aggregations. + +This module provides tiling algorithms and interfaces +that can be implemented by any compute engine (Spark, Ray, etc.). + +Architecture: +1. Engine nodes: Convert to pandas (e.g., dataset.to_pandas(), toPandas()) +2. orchestrator.py: Generate cumulative tiles +3. tile_subtraction.py: Convert cumulative tiles to windowed aggregations +4. Engine nodes: Convert back to engine format (e.g., from_pandas(), createDataFrame()) +""" + +from feast.aggregation.tiling.base import IRMetadata, get_ir_metadata_for_aggregation +from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling +from feast.aggregation.tiling.tile_subtraction import ( + convert_cumulative_to_windowed, + deduplicate_keep_latest, +) + +__all__ = [ + "IRMetadata", + "get_ir_metadata_for_aggregation", + "apply_sawtooth_window_tiling", + "convert_cumulative_to_windowed", + "deduplicate_keep_latest", +] diff --git a/sdk/python/feast/aggregation/tiling/base.py b/sdk/python/feast/aggregation/tiling/base.py new file mode 100644 index 00000000000..c5ff765a0a9 --- /dev/null +++ b/sdk/python/feast/aggregation/tiling/base.py @@ -0,0 +1,91 @@ +""" +Base utilities for tiling. + +- Orchestrator works in pure pandas +- Engine-specific nodes do direct DataFrame conversions. +""" + +from dataclasses import dataclass +from typing import Any, List, Optional, Tuple + +from feast.aggregation import Aggregation + + +@dataclass +class IRMetadata: + """ + Metadata about intermediate representations for an aggregation. + + Attributes: + type: "algebraic" (sum, count, max, min) or "holistic" (avg, std, var) + ir_columns: List of IR column names (e.g., ["sum", "count"] for avg) + computation: String describing how to compute final value from IRs + """ + + type: str # "algebraic" or "holistic" + ir_columns: Optional[List[str]] = None + computation: Optional[str] = None + + +def get_ir_metadata_for_aggregation( + agg: Aggregation, feature_name: str +) -> Tuple[List[Tuple[str, Any]], IRMetadata]: + """ + Get intermediate representation metadata for an aggregation. + + This function determines what intermediate values need to be stored + to correctly compute holistic aggregations when merging tiles. + + For example: + - avg: store sum and count (not just final average) + - std: store sum, count, and sum_of_squares + - sum: just store sum (algebraic, no IRs needed) + + Args: + agg: Aggregation specification + feature_name: Name of the feature being aggregated + + Returns: + Tuple of (expression_list, metadata) + """ + agg_type = agg.function.lower() + + # Algebraic aggregations + if agg_type in ["sum", "count", "max", "min"]: + return ([], IRMetadata(type="algebraic")) + + # Holistic aggregations (need intermediate representations) + elif agg_type in ["avg", "mean"]: + # Average needs sum and count + ir_cols = [ + f"_tail_{feature_name}_sum", + f"_tail_{feature_name}_count", + ] + return ( + [], + IRMetadata( + type="avg", # Mark as avg for special handling + ir_columns=ir_cols, + computation="sum / count", + ), + ) + + elif agg_type in ["std", "stddev", "var", "variance"]: + # Std/Var need sum, count, and sum of squares + ir_cols = [ + f"_tail_{feature_name}_sum", + f"_tail_{feature_name}_count", + f"_tail_{feature_name}_sum_sq", + ] + return ( + [], + IRMetadata( + type="holistic", + ir_columns=ir_cols, + computation="sqrt((sum_sq - sum^2/count) / (count-1))", + ), + ) + + else: + # Unknown aggregation: treat as algebraic + return ([], IRMetadata(type="algebraic")) diff --git a/sdk/python/feast/aggregation/tiling/orchestrator.py b/sdk/python/feast/aggregation/tiling/orchestrator.py new file mode 100644 index 00000000000..16e047f465c --- /dev/null +++ b/sdk/python/feast/aggregation/tiling/orchestrator.py @@ -0,0 +1,193 @@ +""" +Tiling orchestrator. + +This module provides the core tiling logic using pure pandas operations. +Engines (Spark, Ray, etc.) just need to convert to/from pandas. +""" + +from datetime import timedelta +from typing import Any, Callable, Dict, List, Tuple, Union + +import numpy as np +import pandas as pd + +from feast.aggregation import Aggregation +from feast.aggregation.tiling.base import get_ir_metadata_for_aggregation + + +def apply_sawtooth_window_tiling( + df: pd.DataFrame, + aggregations: List[Aggregation], + group_by_keys: List[str], + timestamp_col: str, + window_size: timedelta, + hop_size: timedelta, +) -> pd.DataFrame: + """ + Generate cumulative tiles. + + This function creates cumulative "tail" aggregations that can be efficiently + merged to compute sliding window aggregations. For sawtooth windows, tiles + at time T contain the aggregation from the start of the window to T. + + Args: + df: Pandas DataFrame with input data + aggregations: List of aggregation specifications + group_by_keys: Entity column names for grouping + timestamp_col: Timestamp column name + window_size: Size of the time window + hop_size: Size of hop intervals for tiling + + Returns: + Pandas DataFrame with cumulative tiles + """ + if df.empty: + return df + + # Step 1: Add hop interval column + hop_size_ms = int(hop_size.total_seconds() * 1000) + + # Convert timestamp to milliseconds + if pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): + timestamp_ms = df[timestamp_col].astype("int64") // 10**6 + else: + timestamp_ms = pd.to_datetime(df[timestamp_col]).astype("int64") // 10**6 + + # Compute hop interval (inclusive lower boundaries) + df["_hop_interval"] = (timestamp_ms // hop_size_ms) * hop_size_ms + + # Step 2: Group by entity keys + hop interval and aggregate + agg_dict: Dict[str, Tuple[str, Union[str, Callable[[Any], Any]]]] = {} + ir_metadata_dict = {} + + for agg in aggregations: + feature_name = ( + f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" + ) + _, metadata = get_ir_metadata_for_aggregation(agg, feature_name) + ir_metadata_dict[feature_name] = metadata + + # Build pandas aggregation dict based on IR metadata + if metadata.type == "algebraic": + # Algebraic aggregations: just aggregate directly + if agg.function == "sum": + agg_dict[f"_tail_{feature_name}"] = (agg.column, "sum") + elif agg.function == "count": + agg_dict[f"_tail_{feature_name}"] = (agg.column, "count") + elif agg.function == "max": + agg_dict[f"_tail_{feature_name}"] = (agg.column, "max") + elif agg.function == "min": + agg_dict[f"_tail_{feature_name}"] = (agg.column, "min") + + elif metadata.type in ("holistic", "avg", "std", "var"): + # Holistic aggregations: compute IRs + if metadata.ir_columns: + ir_column_names = metadata.ir_columns + + # For avg: compute sum and count + if len(ir_column_names) >= 2: + agg_dict[ir_column_names[0]] = (agg.column, "sum") # sum + agg_dict[ir_column_names[1]] = (agg.column, "count") # count + + # For std/var: also compute sum of squares + if len(ir_column_names) >= 3: + agg_dict[ir_column_names[2]] = ( + agg.column, + lambda x: (x**2).sum(), + ) # sum_sq + + # Perform aggregation within each hop + grouped = df.groupby(group_by_keys + ["_hop_interval"], as_index=False) + hop_aggregated = grouped.agg(**agg_dict) if agg_dict else grouped.first() + + # Step 3: Compute cumulative sums (convert hop aggregations to cumulative tiles) + result = hop_aggregated.copy() + + # Sort by entity keys and hop interval + result = result.sort_values(by=group_by_keys + ["_hop_interval"]).reset_index( + drop=True + ) + + # For each IR column, compute cumulative sum within each entity group + ir_columns = [ + col for col in result.columns if col not in group_by_keys + ["_hop_interval"] + ] + + if ir_columns: + # Generate a complete grid of hop intervals for forward-filling + # This ensures we have tiles even where no data exists + min_hop = result["_hop_interval"].min() + max_hop = result["_hop_interval"].max() + all_hops = range(int(min_hop), int(max_hop) + hop_size_ms, hop_size_ms) + + # Create cumulative tiles for each entity + cumulative_results = [] + for entity_values, group_df in result.groupby(group_by_keys): + # Create complete hop grid for this entity + entity_dict = dict( + zip( + group_by_keys, + entity_values + if isinstance(entity_values, tuple) + else (entity_values,), + ) + ) + complete_grid = pd.DataFrame( + {**entity_dict, "_hop_interval": list(all_hops)} + ) + + # Left join actual data + merged = complete_grid.merge( + group_df, on=group_by_keys + ["_hop_interval"], how="left" + ) + + # Fill NaN with 0 for IR columns + for col in ir_columns: + if col in merged.columns: + merged[col] = merged[col].fillna(0) + + # Compute cumulative sum for IR columns + for col in ir_columns: + if col in merged.columns: + merged[col] = merged[col].cumsum() + + cumulative_results.append(merged) + + result = pd.concat(cumulative_results, ignore_index=True) + + # Step 4: Add tile metadata + result["_tile_start"] = result["_hop_interval"] + result["_tile_end"] = result["_hop_interval"] + hop_size_ms + + # Step 5: Compute final feature values from IRs (for algebraic aggs, just rename) + for agg in aggregations: + feature_name = ( + f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" + ) + metadata = ir_metadata_dict[feature_name] + + if metadata.type == "algebraic": + # For algebraic, final value = IR value + ir_col = f"_tail_{feature_name}" + if ir_col in result.columns: + result[feature_name] = result[ir_col] + + elif metadata.type in ("holistic", "avg", "std", "var"): + # For holistic, compute final value from IRs + if metadata.ir_columns: + # Use IR column names directly (they have _tail_ prefix) + ir_column_names = metadata.ir_columns + + if len(ir_column_names) >= 2: + sum_col = ir_column_names[0] + count_col = ir_column_names[1] + + if sum_col in result.columns and count_col in result.columns: + # Compute avg = sum / count + result[feature_name] = np.where( + result[count_col] > 0, + result[sum_col] / result[count_col], + 0, + ) + + return result diff --git a/sdk/python/feast/aggregation/tiling/tile_subtraction.py b/sdk/python/feast/aggregation/tiling/tile_subtraction.py new file mode 100644 index 00000000000..9dee478effa --- /dev/null +++ b/sdk/python/feast/aggregation/tiling/tile_subtraction.py @@ -0,0 +1,179 @@ +""" +Tile subtraction logic. + +This module provides the core algorithm for converting cumulative tiles +to windowed aggregations. All compute engines use this shared logic. +""" + +from datetime import timedelta +from typing import List + +import pandas as pd + +from feast.aggregation import Aggregation +from feast.aggregation.tiling.base import get_ir_metadata_for_aggregation + + +def convert_cumulative_to_windowed( + tiles_df: pd.DataFrame, + entity_keys: List[str], + timestamp_col: str, + window_size: timedelta, + aggregations: List[Aggregation], +) -> pd.DataFrame: + """ + Convert cumulative tiles to windowed aggregations. + + This function performs the core mathematical operation: + windowed_agg_at_T = cumulative_tile_at_T - cumulative_tile_at_(T-window_size) + + For holistic aggregations (avg, std, var), it: + 1. Subtracts intermediate representation (IR) components + 2. Recomputes the final value from windowed IRs + + Args: + tiles_df: DataFrame with cumulative tiles from orchestrator. + Must contain: entity_keys, _tile_end, IR columns, feature columns + entity_keys: Entity column names for grouping + timestamp_col: Name of the timestamp column to create + window_size: Size of the time window for aggregations + aggregations: List of aggregation specifications + + Returns: + DataFrame with windowed aggregations ready for online store. + Contains: entity_keys, timestamp_col, feature columns (no IR columns or tile metadata) + """ + if tiles_df.empty: + return tiles_df + + window_size_ms = int(window_size.total_seconds() * 1000) + + windowed_results = [] + + # Group by entity keys to process each entity separately + for entity_group, group_df in tiles_df.groupby(entity_keys): + # Sort by _tile_end to ensure correct temporal ordering + group_df = group_df.sort_values("_tile_end").reset_index(drop=True) + + # For each tile, compute windowed aggregation by subtracting previous tile + for idx, row in group_df.iterrows(): + tile_end = int(row["_tile_end"]) + window_start = tile_end - window_size_ms + + exact_match = group_df[group_df["_tile_end"] == window_start] + if len(exact_match) > 0: + prev_tile = exact_match.iloc[0] + has_prev_tile = True + else: + # Only use exact matches to ensure correct window boundaries + # If no exact match exists, any previous tile would end before + # window_start, which would compute an incorrect window that's + # larger than requested + has_prev_tile = False + + # Create windowed row (will contain windowed aggregations) + windowed_row = row.to_dict() + + # Subtract previous tile values from current tile for each aggregation + for agg in aggregations: + feature_name = ( + f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" + ) + _, metadata = get_ir_metadata_for_aggregation(agg, feature_name) + + if metadata.type == "algebraic": + # For algebraic aggregations: sum and count can be subtracted + if feature_name in group_df.columns: + current_val = float(row[feature_name]) + + if agg.function.lower() in ("max", "min"): + if has_prev_tile and feature_name in prev_tile.index: + prev_val = float(prev_tile[feature_name]) + windowed_row[feature_name] = current_val + else: + windowed_row[feature_name] = current_val + else: + # For sum and count: subtract previous from current + if has_prev_tile and feature_name in prev_tile.index: + prev_val = float(prev_tile[feature_name]) + else: + prev_val = 0.0 + windowed_row[feature_name] = current_val - prev_val + + elif metadata.type in ("holistic", "avg", "std", "var"): + # For holistic aggregations: + # 1. Subtract each IR component: windowed_IR = current_IR - previous_IR + # 2. Recompute final value from windowed IRs + if metadata.ir_columns: + ir_column_names = metadata.ir_columns + + # Subtract each IR component + for ir_col in ir_column_names: + if ir_col in group_df.columns: + current_ir = row[ir_col] + if has_prev_tile and ir_col in prev_tile.index: + prev_ir = prev_tile[ir_col] + else: + prev_ir = 0 + windowed_row[ir_col] = current_ir - prev_ir + + # Recompute final value from windowed IRs + if len(ir_column_names) >= 2: + sum_col = ir_column_names[ + 0 + ] # e.g., "_tail_avg_amount_3600s_sum" + count_col = ir_column_names[ + 1 + ] # e.g., "_tail_avg_amount_3600s_count" + + if sum_col in windowed_row and count_col in windowed_row: + count_val = windowed_row[count_col] + if count_val <= 0: + windowed_row[feature_name] = 0 + else: + # avg = windowed_sum / windowed_count + windowed_row[feature_name] = ( + windowed_row[sum_col] / count_val + ) + + # Set event_timestamp to the tile end time + windowed_row[timestamp_col] = pd.to_datetime(tile_end, unit="ms") + + windowed_results.append(windowed_row) + + if not windowed_results: + return pd.DataFrame() + + # Create DataFrame from windowed results + result_df = pd.DataFrame(windowed_results) + + # Drop internal tile metadata columns + cols_to_drop = ["_tile_start", "_tile_end", "_hop_interval"] + result_df = result_df.drop( + columns=[c for c in cols_to_drop if c in result_df.columns] + ) + return result_df + + +def deduplicate_keep_latest( + df: pd.DataFrame, entity_keys: List[str], timestamp_col: str +) -> pd.DataFrame: + """ + Keep only the latest timestamp per entity. + + Args: + df: DataFrame with entity_keys and timestamp_col + entity_keys: List of entity key column names + timestamp_col: Name of timestamp column + + Returns: + DataFrame with one row per entity (latest timestamp) + """ + if df.empty or timestamp_col not in df.columns: + return df + + return ( + df.sort_values(by=timestamp_col, ascending=False) + .groupby(entity_keys, as_index=False) + .first() + ) diff --git a/sdk/python/feast/infra/compute_engines/ray/feature_builder.py b/sdk/python/feast/infra/compute_engines/ray/feature_builder.py index 49a957da183..274fe87599c 100644 --- a/sdk/python/feast/infra/compute_engines/ray/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/ray/feature_builder.py @@ -75,12 +75,17 @@ def build_aggregation_node(self, view, input_node: DAGNode) -> DAGNode: group_by_keys = view.entities timestamp_col = getattr(view.batch_source, "timestamp_field", "event_timestamp") + enable_tiling = getattr(view, "enable_tiling", False) + tiling_hop_size = getattr(view, "tiling_hop_size", None) + node = RayAggregationNode( name="aggregation", aggregations=agg_specs, group_by_keys=group_by_keys, timestamp_col=timestamp_col, config=self.config, + enable_tiling=enable_tiling, + hop_size=tiling_hop_size, ) node.add_input(input_node) diff --git a/sdk/python/feast/infra/compute_engines/ray/nodes.py b/sdk/python/feast/infra/compute_engines/ray/nodes.py index 32126a9e42f..89694a57e2d 100644 --- a/sdk/python/feast/infra/compute_engines/ray/nodes.py +++ b/sdk/python/feast/infra/compute_engines/ray/nodes.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timedelta, timezone -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union import dill import pandas as pd @@ -326,12 +326,16 @@ def __init__( group_by_keys: List[str], timestamp_col: str, config: RayComputeEngineConfig, + enable_tiling: bool = False, + hop_size: Optional[timedelta] = None, ): super().__init__(name) self.aggregations = aggregations self.group_by_keys = group_by_keys self.timestamp_col = timestamp_col self.config = config + self.enable_tiling = enable_tiling + self.hop_size = hop_size def execute(self, context: ExecutionContext) -> DAGValue: """Execute the aggregation operation.""" @@ -339,6 +343,120 @@ def execute(self, context: ExecutionContext) -> DAGValue: input_value.assert_format(DAGFormat.RAY) dataset: Dataset = input_value.data + # Check if tiling should be used + has_time_windows = any(agg.time_window for agg in self.aggregations) + if self.enable_tiling and has_time_windows: + return self._execute_tiled_aggregation(dataset) + else: + return self._execute_standard_aggregation(dataset) + + def _execute_tiled_aggregation(self, dataset: Dataset) -> DAGValue: + """ + Execute tiled aggregation. + + Flow: + 1. Convert Ray Dataset → pandas + 2. Generate cumulative tiles + 3. Convert to windowed aggregations + 4. Convert pandas → Ray Dataset + """ + from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling + from feast.aggregation.tiling.tile_subtraction import ( + convert_cumulative_to_windowed, + deduplicate_keep_latest, + ) + + ray_wrapper = get_ray_wrapper() + + input_pdf = dataset.to_pandas() + + for agg in self.aggregations: + if agg.time_window is None: + raise ValueError( + f"Tiling is enabled but aggregation on column '{agg.column}' has no time_window set. " + f"Either set time_window for all aggregations or disable tiling by setting enable_tiling=False." + ) + + # Group aggregations by time window + window_to_aggs: Dict[timedelta, List[Aggregation]] = {} + for agg in self.aggregations: + if agg.time_window: + if agg.time_window not in window_to_aggs: + window_to_aggs[agg.time_window] = [] + window_to_aggs[agg.time_window].append(agg) + + # Process each time window in pandas + windowed_pdfs = [] + for window_size, window_aggs in window_to_aggs.items(): + # Step 1: Generate cumulative tiles + tiles_pdf = apply_sawtooth_window_tiling( + df=input_pdf, + aggregations=window_aggs, + group_by_keys=self.group_by_keys, + timestamp_col=self.timestamp_col, + window_size=window_size, + hop_size=self.hop_size or timedelta(minutes=5), + ) + + if tiles_pdf.empty: + continue + + # Step 2: Convert to windowed aggregations + windowed_pdf = convert_cumulative_to_windowed( + tiles_df=tiles_pdf, + entity_keys=self.group_by_keys, + timestamp_col=self.timestamp_col, + window_size=window_size, + aggregations=window_aggs, + ) + + if not windowed_pdf.empty: + windowed_pdfs.append(windowed_pdf) + + if not windowed_pdfs: + # No results, return empty Ray Dataset + aggregated_dataset = ray_wrapper.from_pandas(pd.DataFrame()) + else: + # Step 3: Join all windows in pandas (outer merge on entity keys + timestamp) + if len(windowed_pdfs) == 1: + final_pdf = windowed_pdfs[0] + else: + final_pdf = windowed_pdfs[0] + join_keys = self.group_by_keys + [self.timestamp_col] + for pdf in windowed_pdfs[1:]: + final_pdf = pd.merge( + final_pdf, + pdf, + on=join_keys, + how="outer", + suffixes=("", "_dup"), + ) + # Drop duplicate columns from merge + final_pdf = final_pdf.loc[ + :, ~final_pdf.columns.str.endswith("_dup") + ] + + # Step 4: Deduplicate in pandas (keep latest timestamp per entity) + if self.timestamp_col in final_pdf.columns and not final_pdf.empty: + final_pdf = deduplicate_keep_latest( + final_pdf, self.group_by_keys, self.timestamp_col + ) + + aggregated_dataset = ray_wrapper.from_pandas(final_pdf) + + return DAGValue( + data=aggregated_dataset, + format=DAGFormat.RAY, + metadata={ + "aggregated": True, + "aggregations": len(self.aggregations), + "group_by_keys": self.group_by_keys, + "tiled": True, + }, + ) + + def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue: + """Execute standard aggregation without tiling.""" # Convert aggregations to Ray's groupby format agg_dict = {} for agg in self.aggregations: diff --git a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py index e042bb000dc..11a3c1587f6 100644 --- a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py @@ -47,12 +47,20 @@ def build_aggregation_node(self, view, input_node): agg_specs = view.aggregations group_by_keys = view.entities timestamp_col = view.batch_source.timestamp_field + + # Check if tiling is enabled for this view + enable_tiling = getattr(view, "enable_tiling", False) + hop_size = getattr(view, "tiling_hop_size", None) + node = SparkAggregationNode( f"{view.name}:agg", agg_specs, group_by_keys, timestamp_col, + self.spark_session, inputs=[input_node], + enable_tiling=enable_tiling, + hop_size=hop_size, ) self.nodes.append(node) return node diff --git a/sdk/python/feast/infra/compute_engines/spark/nodes.py b/sdk/python/feast/infra/compute_engines/spark/nodes.py index 7c2b0bd7916..fa5a7bd6208 100644 --- a/sdk/python/feast/infra/compute_engines/spark/nodes.py +++ b/sdk/python/feast/infra/compute_engines/spark/nodes.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta from typing import Callable, List, Optional, Union, cast +import pandas as pd from pyspark.sql import DataFrame, SparkSession, Window from pyspark.sql import functions as F @@ -101,18 +102,150 @@ def __init__( aggregations: List[Aggregation], group_by_keys: List[str], timestamp_col: str, + spark_session: SparkSession, inputs=None, + enable_tiling: bool = False, + hop_size: Optional[timedelta] = None, ): super().__init__(name, inputs=inputs) self.aggregations = aggregations self.group_by_keys = group_by_keys self.timestamp_col = timestamp_col + self.spark_session = spark_session + self.enable_tiling = enable_tiling + self.hop_size = hop_size def execute(self, context: ExecutionContext) -> DAGValue: input_value = self.get_single_input_value(context) input_value.assert_format(DAGFormat.SPARK) input_df: DataFrame = input_value.data + # Check if tiling is enabled and we have time-windowed aggregations + has_time_windows = any(agg.time_window for agg in self.aggregations) + + if self.enable_tiling and has_time_windows: + return self._execute_tiled_aggregation(input_df) + else: + return self._execute_standard_aggregation(input_df) + + def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue: + """ + Execute aggregation using tiling. + """ + entity_keys = self.group_by_keys + + # Group aggregations by time window to process separately + from collections import defaultdict + + aggs_by_window = defaultdict(list) + for agg in self.aggregations: + if agg.time_window is None: + raise ValueError( + f"Tiling is enabled but aggregation on column '{agg.column}' has no time_window set. " + f"Either set time_window for all aggregations or disable tiling by setting enable_tiling=False." + ) + aggs_by_window[agg.time_window].append(agg) + + from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling + from feast.aggregation.tiling.tile_subtraction import ( + convert_cumulative_to_windowed, + deduplicate_keep_latest, + ) + + input_pdf = input_df.toPandas() + + # Process each time window in pandas + windowed_pdfs = [] + for time_window, window_aggs in aggs_by_window.items(): + # Step 1: Generate cumulative tiles + tiles_pdf = apply_sawtooth_window_tiling( + df=input_pdf, + aggregations=window_aggs, + group_by_keys=entity_keys, + timestamp_col=self.timestamp_col, + window_size=time_window, + hop_size=self.hop_size or timedelta(minutes=5), + ) + + if tiles_pdf.empty: + continue + + # Step 2: Convert to windowed aggregations + windowed_pdf = convert_cumulative_to_windowed( + tiles_df=tiles_pdf, + entity_keys=entity_keys, + timestamp_col=self.timestamp_col, + window_size=time_window, + aggregations=window_aggs, + ) + + if not windowed_pdf.empty: + windowed_pdfs.append(windowed_pdf) + + if not windowed_pdfs: + # No results, return empty Spark DataFrame with correct schema + # Build expected columns: entity_keys + timestamp_col + feature columns + expected_columns = entity_keys + [self.timestamp_col] + for time_window, window_aggs in aggs_by_window.items(): + for agg in window_aggs: + feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s" + if feature_name not in expected_columns: + expected_columns.append(feature_name) + + empty_data = {} + for col in entity_keys: + empty_data[col] = pd.Series(dtype="string") + if self.timestamp_col in expected_columns: + empty_data[self.timestamp_col] = pd.Series(dtype="datetime64[ns]") + for col in expected_columns: + if col not in empty_data: + empty_data[col] = pd.Series(dtype="float64") + + empty_pdf = pd.DataFrame(empty_data) + final_df = self.spark_session.createDataFrame(empty_pdf) + else: + # Step 3: Join all windows in pandas (outer merge on entity keys + timestamp) + if len(windowed_pdfs) == 1: + final_pdf = windowed_pdfs[0] + else: + final_pdf = windowed_pdfs[0] + join_keys = entity_keys + [self.timestamp_col] + for pdf in windowed_pdfs[1:]: + final_pdf = pd.merge( + final_pdf, + pdf, + on=join_keys, + how="outer", + suffixes=("", "_dup"), + ) + # Drop duplicate columns from merge + final_pdf = final_pdf.loc[ + :, ~final_pdf.columns.str.endswith("_dup") + ] + + # Step 4: Deduplicate in pandas (keep latest timestamp per entity) + if self.timestamp_col in final_pdf.columns and not final_pdf.empty: + final_pdf = deduplicate_keep_latest( + final_pdf, entity_keys, self.timestamp_col + ) + + # Step 5: Convert to Spark once at the end + final_df = self.spark_session.createDataFrame(final_pdf) + + return DAGValue( + data=final_df, + format=DAGFormat.SPARK, + metadata={ + "aggregated": True, + "tiled": True, + "window_sizes": [ + int(tw.total_seconds()) for tw in aggs_by_window.keys() + ], + }, + ) + + def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue: + """Execute standard Spark aggregation (existing logic).""" agg_exprs = [] for agg in self.aggregations: func = getattr(F, agg.function) diff --git a/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.py b/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.py index ba19088edd6..f64c2852aa9 100644 --- a/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.py +++ b/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.py @@ -21,7 +21,7 @@ from feast.protos.feast.core import Transformation_pb2 as feast_dot_core_dot_Transformation__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"feast/core/StreamFeatureView.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a$feast/core/OnDemandFeatureView.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1c\x66\x65\x61st/core/Aggregation.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"o\n\x11StreamFeatureView\x12/\n\x04spec\x18\x01 \x01(\x0b\x32!.feast.core.StreamFeatureViewSpec\x12)\n\x04meta\x18\x02 \x01(\x0b\x32\x1b.feast.core.FeatureViewMeta\"\xa8\x05\n\x15StreamFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12\x10\n\x08\x65ntities\x18\x03 \x03(\t\x12+\n\x08\x66\x65\x61tures\x18\x04 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x31\n\x0e\x65ntity_columns\x18\x05 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12\x39\n\x04tags\x18\x07 \x03(\x0b\x32+.feast.core.StreamFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12&\n\x03ttl\x18\t \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\x0c\x62\x61tch_source\x18\n \x01(\x0b\x32\x16.feast.core.DataSource\x12-\n\rstream_source\x18\x0b \x01(\x0b\x32\x16.feast.core.DataSource\x12\x0e\n\x06online\x18\x0c \x01(\x08\x12\x42\n\x15user_defined_function\x18\r \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x0c\n\x04mode\x18\x0e \x01(\t\x12-\n\x0c\x61ggregations\x18\x0f \x03(\x0b\x32\x17.feast.core.Aggregation\x12\x17\n\x0ftimestamp_field\x18\x10 \x01(\t\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\x11 \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42[\n\x10\x66\x65\x61st.proto.coreB\x16StreamFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"feast/core/StreamFeatureView.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a$feast/core/OnDemandFeatureView.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1c\x66\x65\x61st/core/Aggregation.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"o\n\x11StreamFeatureView\x12/\n\x04spec\x18\x01 \x01(\x0b\x32!.feast.core.StreamFeatureViewSpec\x12)\n\x04meta\x18\x02 \x01(\x0b\x32\x1b.feast.core.FeatureViewMeta\"\xf3\x05\n\x15StreamFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12\x10\n\x08\x65ntities\x18\x03 \x03(\t\x12+\n\x08\x66\x65\x61tures\x18\x04 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x31\n\x0e\x65ntity_columns\x18\x05 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12\x39\n\x04tags\x18\x07 \x03(\x0b\x32+.feast.core.StreamFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12&\n\x03ttl\x18\t \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\x0c\x62\x61tch_source\x18\n \x01(\x0b\x32\x16.feast.core.DataSource\x12-\n\rstream_source\x18\x0b \x01(\x0b\x32\x16.feast.core.DataSource\x12\x0e\n\x06online\x18\x0c \x01(\x08\x12\x42\n\x15user_defined_function\x18\r \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x0c\n\x04mode\x18\x0e \x01(\t\x12-\n\x0c\x61ggregations\x18\x0f \x03(\x0b\x32\x17.feast.core.Aggregation\x12\x17\n\x0ftimestamp_field\x18\x10 \x01(\t\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\x11 \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x12\x15\n\renable_tiling\x18\x12 \x01(\x08\x12\x32\n\x0ftiling_hop_size\x18\x13 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42[\n\x10\x66\x65\x61st.proto.coreB\x16StreamFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,7 +36,7 @@ _globals['_STREAMFEATUREVIEW']._serialized_start=268 _globals['_STREAMFEATUREVIEW']._serialized_end=379 _globals['_STREAMFEATUREVIEWSPEC']._serialized_start=382 - _globals['_STREAMFEATUREVIEWSPEC']._serialized_end=1062 - _globals['_STREAMFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=1019 - _globals['_STREAMFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1062 + _globals['_STREAMFEATUREVIEWSPEC']._serialized_end=1137 + _globals['_STREAMFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=1094 + _globals['_STREAMFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1137 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.pyi b/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.pyi index 70e897a2f21..160a59b35df 100644 --- a/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/StreamFeatureView_pb2.pyi @@ -59,7 +59,7 @@ class StreamFeatureView(google.protobuf.message.Message): global___StreamFeatureView = StreamFeatureView class StreamFeatureViewSpec(google.protobuf.message.Message): - """Next available id: 17""" + """Next available id: 20""" DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -95,6 +95,8 @@ class StreamFeatureViewSpec(google.protobuf.message.Message): AGGREGATIONS_FIELD_NUMBER: builtins.int TIMESTAMP_FIELD_FIELD_NUMBER: builtins.int FEATURE_TRANSFORMATION_FIELD_NUMBER: builtins.int + ENABLE_TILING_FIELD_NUMBER: builtins.int + TILING_HOP_SIZE_FIELD_NUMBER: builtins.int name: builtins.str """Name of the feature view. Must be unique. Not updated.""" project: builtins.str @@ -143,6 +145,13 @@ class StreamFeatureViewSpec(google.protobuf.message.Message): @property def feature_transformation(self) -> feast.core.Transformation_pb2.FeatureTransformationV2: """Oneof with {user_defined_function, on_demand_substrait_transformation}""" + enable_tiling: builtins.bool + """Enable tiling for efficient window aggregation""" + @property + def tiling_hop_size(self) -> google.protobuf.duration_pb2.Duration: + """Hop size for tiling (e.g., 5 minutes). Determines the granularity of pre-aggregated tiles. + If not specified, defaults to 5 minutes. Only used when enable_tiling is true. + """ def __init__( self, *, @@ -163,8 +172,10 @@ class StreamFeatureViewSpec(google.protobuf.message.Message): aggregations: collections.abc.Iterable[feast.core.Aggregation_pb2.Aggregation] | None = ..., timestamp_field: builtins.str = ..., feature_transformation: feast.core.Transformation_pb2.FeatureTransformationV2 | None = ..., + enable_tiling: builtins.bool = ..., + tiling_hop_size: google.protobuf.duration_pb2.Duration | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "feature_transformation", b"feature_transformation", "stream_source", b"stream_source", "ttl", b"ttl", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["aggregations", b"aggregations", "batch_source", b"batch_source", "description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "online", b"online", "owner", b"owner", "project", b"project", "stream_source", b"stream_source", "tags", b"tags", "timestamp_field", b"timestamp_field", "ttl", b"ttl", "user_defined_function", b"user_defined_function"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "feature_transformation", b"feature_transformation", "stream_source", b"stream_source", "tiling_hop_size", b"tiling_hop_size", "ttl", b"ttl", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["aggregations", b"aggregations", "batch_source", b"batch_source", "description", b"description", "enable_tiling", b"enable_tiling", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "online", b"online", "owner", b"owner", "project", b"project", "stream_source", b"stream_source", "tags", b"tags", "tiling_hop_size", b"tiling_hop_size", "timestamp_field", b"timestamp_field", "ttl", b"ttl", "user_defined_function", b"user_defined_function"]) -> None: ... global___StreamFeatureViewSpec = StreamFeatureViewSpec diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index ff6d603333d..d30db29b299 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union import dill +from google.protobuf.duration_pb2 import Duration from google.protobuf.message import Message from typeguard import typechecked @@ -56,8 +57,9 @@ class StreamFeatureView(FeatureView): columns. If not specified, can be inferred from the underlying data source. source: The stream source of data where this group of features is stored. aggregations: List of aggregations registered with the stream feature view. - mode: The mode of execution. timestamp_field: Must be specified if aggregations are specified. Defines the timestamp column on which to aggregate windows. + enable_tiling: Enable tiling optimization for efficient windowed aggregations. + tiling_hop_size: Time interval between tiles (e.g., 5 minutes). Defaults to 5 minutes. online: A boolean indicating whether online retrieval, and write to online store is enabled for this feature view. offline: A boolean indicating whether offline retrieval, and write to offline store is enabled for this feature view. description: A human-readable description. @@ -84,14 +86,16 @@ class StreamFeatureView(FeatureView): description: str tags: Dict[str, str] owner: str - aggregations: List[Aggregation] mode: Union[TransformationMode, str] - timestamp_field: str materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] udf_string: Optional[str] feature_transformation: Optional[Transformation] stream_engine: Optional[Dict[str, Any]] = None + aggregations: List[Aggregation] + timestamp_field: str + enable_tiling: bool + tiling_hop_size: Optional[timedelta] def __init__( self, @@ -114,6 +118,8 @@ def __init__( udf_string: Optional[str] = "", feature_transformation: Optional[Transformation] = None, stream_engine: Optional[Dict[str, Any]] = None, + enable_tiling: bool = False, + tiling_hop_size: Optional[timedelta] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -136,15 +142,33 @@ def __init__( "aggregations must have a timestamp field associated with them to perform the aggregations" ) - self.aggregations = aggregations or [] self.mode = mode - self.timestamp_field = timestamp_field or "" self.udf = udf self.udf_string = udf_string + self.aggregations = aggregations or [] + self.timestamp_field = timestamp_field or "" self.feature_transformation = ( feature_transformation or self.get_feature_transformation() ) self.stream_engine = stream_engine + self.enable_tiling = enable_tiling + self.tiling_hop_size = tiling_hop_size + + if enable_tiling and self.aggregations: + effective_hop_size = tiling_hop_size or timedelta(minutes=5) + time_windows = [ + agg.time_window + for agg in self.aggregations + if agg.time_window is not None + ] + if time_windows: + min_window_size = min(time_windows) + if effective_hop_size >= min_window_size: + raise ValueError( + f"tiling_hop_size ({effective_hop_size}) must be smaller than " + f"the minimum aggregation time_window ({min_window_size}). " + f"If hop_size >= window_size, the tiling algorithm will produce incorrect results." + ) super().__init__( name=name, @@ -240,6 +264,12 @@ def to_proto(self): self.mode.value if isinstance(self.mode, TransformationMode) else self.mode ) + # Serialize tiling configuration + tiling_hop_size_duration = None + if self.tiling_hop_size is not None: + tiling_hop_size_duration = Duration() + tiling_hop_size_duration.FromTimedelta(self.tiling_hop_size) + spec = StreamFeatureViewSpecProto( name=self.name, entities=self.entities, @@ -257,6 +287,8 @@ def to_proto(self): timestamp_field=self.timestamp_field, aggregations=[agg.to_proto() for agg in self.aggregations], mode=mode, + enable_tiling=self.enable_tiling, + tiling_hop_size=tiling_hop_size_duration, ) return StreamFeatureViewProto(spec=spec, meta=meta) @@ -311,6 +343,13 @@ def from_proto(cls, sfv_proto): for agg_proto in sfv_proto.spec.aggregations ], timestamp_field=sfv_proto.spec.timestamp_field, + enable_tiling=sfv_proto.spec.enable_tiling, + tiling_hop_size=( + sfv_proto.spec.tiling_hop_size.ToTimedelta() + if sfv_proto.spec.HasField("tiling_hop_size") + and sfv_proto.spec.tiling_hop_size.ToNanoseconds() != 0 + else None + ), ) if batch_source: diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 01ffa774ccd..f26dd84e075 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -153,7 +153,19 @@ def _get_column_names( """ # if we have mapped fields, use the original field names in the call to the offline store timestamp_field = feature_view.batch_source.timestamp_field - feature_names = [feature.name for feature in feature_view.features] + + # For feature views with aggregations, read INPUT columns from aggregations. + # This applies to StreamFeatureView, BatchFeatureView, + # or any FeatureView that has aggregations. + if hasattr(feature_view, "aggregations") and feature_view.aggregations: + # Extract unique input columns from aggregations, preserving order + feature_names = list( + dict.fromkeys(agg.column for agg in feature_view.aggregations) + ) + else: + # For regular feature views, use the feature names + feature_names = [feature.name for feature in feature_view.features] + created_timestamp_column = feature_view.batch_source.created_timestamp_column from feast.feature_view import DUMMY_ENTITY_ID diff --git a/sdk/python/tests/unit/infra/compute_engines/spark/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/spark/test_nodes.py index 0fb2bd4cb78..7fcc332aefb 100644 --- a/sdk/python/tests/unit/infra/compute_engines/spark/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/spark/test_nodes.py @@ -113,6 +113,7 @@ def test_spark_aggregation_node_executes_correctly(spark_session): aggregations=agg_specs, group_by_keys=["user_id"], timestamp_col="", + spark_session=spark_session, ) node.add_input(MagicMock()) node.inputs[0].name = "source"