diff --git a/sdk/python/feast/infra/compute_engines/utils.py b/sdk/python/feast/infra/compute_engines/utils.py index 20a3dae981d..d2c49305376 100644 --- a/sdk/python/feast/infra/compute_engines/utils.py +++ b/sdk/python/feast/infra/compute_engines/utils.py @@ -21,20 +21,40 @@ def create_offline_store_retrieval_job( context: start_time: end_time: - Returns: """ offline_store = context.offline_store - # 📥 Reuse Feast's robust query resolver - retrieval_job = offline_store.pull_all_from_table_or_query( - config=context.repo_config, - data_source=data_source, - join_key_columns=column_info.join_keys, - feature_name_columns=column_info.feature_cols, - timestamp_field=column_info.ts_col, - created_timestamp_column=column_info.created_ts_col, - start_date=start_time, - end_date=end_time, - ) + + pull_latest = context.repo_config.materialization_config.pull_latest_features + + if pull_latest: + if not start_time or not end_time: + raise ValueError( + "start_time and end_time must be provided when pull_latest_features is True" + ) + + retrieval_job = offline_store.pull_latest_from_table_or_query( + config=context.repo_config, + data_source=data_source, + join_key_columns=column_info.join_keys, + feature_name_columns=column_info.feature_cols, + timestamp_field=column_info.ts_col, + created_timestamp_column=column_info.created_ts_col, + start_date=start_time, + end_date=end_time, + ) + else: + # 📥 Reuse Feast's robust query resolver + retrieval_job = offline_store.pull_all_from_table_or_query( + config=context.repo_config, + data_source=data_source, + join_key_columns=column_info.join_keys, + feature_name_columns=column_info.feature_cols, + timestamp_field=column_info.ts_col, + created_timestamp_column=column_info.created_ts_col, + start_date=start_time, + end_date=end_time, + ) + return retrieval_job diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 895002948f1..ac4383e1142 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -182,6 +182,14 @@ def validate_path(cls, path: str, values: ValidationInfo) -> str: return path +class MaterializationConfig(BaseModel): + """Configuration options for feature materialization behavior.""" + + pull_latest_features: StrictBool = False + """ bool: If true, feature retrieval jobs will only pull the latest feature values for each entity. + If false, feature retrieval jobs will pull all feature values within the specified time range. """ + + class RepoConfig(FeastBaseModel): """Repo config. Typically loaded from `feature_store.yaml`""" @@ -239,6 +247,11 @@ class RepoConfig(FeastBaseModel): coerce_tz_aware: Optional[bool] = True """ If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """ + materialization_config: MaterializationConfig = Field( + MaterializationConfig(), alias="materialization" + ) + """ MaterializationConfig: Configuration options for feature materialization behavior. """ + def __init__(self, **data: Any): super().__init__(**data) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 89a13df69ed..14e60cb7cf9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -34,7 +34,7 @@ from feast.permissions.auth_model import OidcClientAuthConfig from feast.permissions.permission import Permission from feast.permissions.policy import RoleBasedPolicy -from feast.repo_config import RegistryConfig, RepoConfig +from feast.repo_config import MaterializationConfig, RegistryConfig, RepoConfig from feast.utils import _utc_now from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, @@ -423,6 +423,9 @@ class Environment: entity_key_serialization_version: int repo_dir_name: str fixture_request: Optional[pytest.FixtureRequest] = None + materialization: MaterializationConfig = dataclasses.field( + default_factory=lambda: MaterializationConfig() + ) def __post_init__(self): self.end_date = _utc_now().replace(microsecond=0, second=0, minute=0) @@ -443,6 +446,7 @@ def setup(self): repo_path=self.repo_dir_name, feature_server=self.feature_server, entity_key_serialization_version=self.entity_key_serialization_version, + materialization_config=self.materialization, ) self.feature_store = FeatureStore(config=self.config) diff --git a/sdk/python/tests/integration/materialization/test_universal_materialization.py b/sdk/python/tests/integration/materialization/test_universal_materialization.py index 860e9a5fc6c..cf15746bf9e 100644 --- a/sdk/python/tests/integration/materialization/test_universal_materialization.py +++ b/sdk/python/tests/integration/materialization/test_universal_materialization.py @@ -219,7 +219,12 @@ def odfv_multi(df: pd.DataFrame) -> pd.DataFrame: @pytest.mark.integration @pytest.mark.universal_offline_stores -def test_universal_materialization_consistency(environment): +@pytest.mark.parametrize("materialization_pull_latest", [True, False]) +def test_universal_materialization_consistency( + environment, materialization_pull_latest +): + environment.materialization.pull_latest_features = materialization_pull_latest + fs = environment.feature_store df = create_basic_driver_dataset() ds = environment.data_source_creator.create_data_source(