Skip to content
Merged
44 changes: 32 additions & 12 deletions sdk/python/feast/infra/compute_engines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,40 @@ def create_offline_store_retrieval_job(
context:
start_time:
end_time:

Returns:

"""
offline_store = context.offline_store
# 📥 Reuse Feast's robust query resolver
retrieval_job = offline_store.pull_all_from_table_or_query(
config=context.repo_config,
data_source=data_source,
join_key_columns=column_info.join_keys,
feature_name_columns=column_info.feature_cols,
timestamp_field=column_info.ts_col,
created_timestamp_column=column_info.created_ts_col,
start_date=start_time,
end_date=end_time,
)

pull_latest = context.repo_config.materialization_config.pull_latest_features

if pull_latest:
if not start_time or not end_time:
raise ValueError(
"start_time and end_time must be provided when pull_latest_features is True"
)

retrieval_job = offline_store.pull_latest_from_table_or_query(
config=context.repo_config,
data_source=data_source,
join_key_columns=column_info.join_keys,
feature_name_columns=column_info.feature_cols,
timestamp_field=column_info.ts_col,
created_timestamp_column=column_info.created_ts_col,
start_date=start_time,
end_date=end_time,
)
else:
# 📥 Reuse Feast's robust query resolver
retrieval_job = offline_store.pull_all_from_table_or_query(
config=context.repo_config,
data_source=data_source,
join_key_columns=column_info.join_keys,
feature_name_columns=column_info.feature_cols,
timestamp_field=column_info.ts_col,
created_timestamp_column=column_info.created_ts_col,
start_date=start_time,
end_date=end_time,
)

return retrieval_job
13 changes: 13 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ def validate_path(cls, path: str, values: ValidationInfo) -> str:
return path


class MaterializationConfig(BaseModel):
"""Configuration options for feature materialization behavior."""

pull_latest_features: StrictBool = False
""" bool: If true, feature retrieval jobs will only pull the latest feature values for each entity.
If false, feature retrieval jobs will pull all feature values within the specified time range. """


class RepoConfig(FeastBaseModel):
"""Repo config. Typically loaded from `feature_store.yaml`"""

Expand Down Expand Up @@ -239,6 +247,11 @@ class RepoConfig(FeastBaseModel):
coerce_tz_aware: Optional[bool] = True
""" If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """

materialization_config: MaterializationConfig = Field(
MaterializationConfig(), alias="materialization"
)
""" MaterializationConfig: Configuration options for feature materialization behavior. """

def __init__(self, **data: Any):
super().__init__(**data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from feast.permissions.auth_model import OidcClientAuthConfig
from feast.permissions.permission import Permission
from feast.permissions.policy import RoleBasedPolicy
from feast.repo_config import RegistryConfig, RepoConfig
from feast.repo_config import MaterializationConfig, RegistryConfig, RepoConfig
from feast.utils import _utc_now
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
Expand Down Expand Up @@ -423,6 +423,9 @@ class Environment:
entity_key_serialization_version: int
repo_dir_name: str
fixture_request: Optional[pytest.FixtureRequest] = None
materialization: MaterializationConfig = dataclasses.field(
default_factory=lambda: MaterializationConfig()
)

def __post_init__(self):
self.end_date = _utc_now().replace(microsecond=0, second=0, minute=0)
Expand All @@ -443,6 +446,7 @@ def setup(self):
repo_path=self.repo_dir_name,
feature_server=self.feature_server,
entity_key_serialization_version=self.entity_key_serialization_version,
materialization_config=self.materialization,
)

self.feature_store = FeatureStore(config=self.config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,12 @@ def odfv_multi(df: pd.DataFrame) -> pd.DataFrame:

@pytest.mark.integration
@pytest.mark.universal_offline_stores
def test_universal_materialization_consistency(environment):
@pytest.mark.parametrize("materialization_pull_latest", [True, False])
def test_universal_materialization_consistency(
environment, materialization_pull_latest
):
environment.materialization.pull_latest_features = materialization_pull_latest

fs = environment.feature_store
df = create_basic_driver_dataset()
ds = environment.data_source_creator.create_data_source(
Expand Down
Loading