diff --git a/README.md b/README.md index 3d6d64aa11b..49115f2276e 100644 --- a/README.md +++ b/README.md @@ -109,11 +109,26 @@ print(training_df.head()) ``` ### 6. Load feature values into your online store + +**Option 1: Incremental materialization (recommended)** ```commandline CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIME ``` +**Option 2: Full materialization with timestamps** +```commandline +CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +feast materialize 2021-04-12T00:00:00 $CURRENT_TIME +``` + +**Option 3: Simple materialization without timestamps** +```commandline +feast materialize --disable-event-timestamp +``` + +The `--disable-event-timestamp` flag allows you to materialize all available feature data using the current datetime as the event timestamp, without needing to specify start and end timestamps. This is useful when your source data lacks proper event timestamp columns. + ```commandline Materializing feature view driver_hourly_stats from 2021-04-14 to 2021-04-15 done! ``` diff --git a/docs/getting-started/components/online-store.md b/docs/getting-started/components/online-store.md index 980089c4fe8..5e45c1c2879 100644 --- a/docs/getting-started/components/online-store.md +++ b/docs/getting-started/components/online-store.md @@ -1,18 +1,18 @@ -# Online store - -Feast uses online stores to serve features at low latency. -Feature values are loaded from data sources into the online store through _materialization_, which can be triggered through the `materialize` command. - -The storage schema of features within the online store mirrors that of the original data source. -One key difference is that for each [entity key](../concepts/entity.md), only the latest feature values are stored. -No historical values are stored. - -Here is an example batch data source: - -![](../../.gitbook/assets/image%20%286%29.png) - -Once the above data source is materialized into Feast (using `feast materialize`), the feature values will be stored as follows: - -![](../../.gitbook/assets/image%20%285%29.png) - +# Online store + +Feast uses online stores to serve features at low latency. +Feature values are loaded from data sources into the online store through _materialization_, which can be triggered through the `materialize` command (either with specific timestamps or using `--disable-event-timestamp` to materialize all data with current timestamps). + +The storage schema of features within the online store mirrors that of the original data source. +One key difference is that for each [entity key](../concepts/entity.md), only the latest feature values are stored. +No historical values are stored. + +Here is an example batch data source: + +![](../../.gitbook/assets/image%20%286%29.png) + +Once the above data source is materialized into Feast (using `feast materialize` with timestamps or `feast materialize --disable-event-timestamp`), the feature values will be stored as follows: + +![](../../.gitbook/assets/image%20%285%29.png) + Features can also be written directly to the online store via [push sources](../../reference/data-sources/push.md) . \ No newline at end of file diff --git a/docs/getting-started/components/overview.md b/docs/getting-started/components/overview.md index 2be7b1169bf..98d5b42b3b7 100644 --- a/docs/getting-started/components/overview.md +++ b/docs/getting-started/components/overview.md @@ -7,7 +7,7 @@ * **Create Batch Features:** ELT/ETL systems like Spark and SQL are used to transform data in the batch store. * **Create Stream Features:** Stream features are created from streaming services such as Kafka or Kinesis, and can be pushed directly into Feast via the [Push API](../../reference/data-sources/push.md). * **Feast Apply:** The user (or CI) publishes versioned controlled feature definitions using `feast apply`. This CLI command updates infrastructure and persists definitions in the object store registry. -* **Feast Materialize:** The user (or scheduler) executes `feast materialize` which loads features from the offline store into the online store. +* **Feast Materialize:** The user (or scheduler) executes `feast materialize` (with timestamps or `--disable-event-timestamp` to materialize all data with current timestamps) which loads features from the offline store into the online store. * **Model Training:** A model training pipeline is launched. It uses the Feast Python SDK to retrieve a training dataset that can be used for training models. * **Get Historical Features:** Feast exports a point-in-time correct training dataset based on the list of features and entity dataframe provided by the model training pipeline. * **Deploy Model:** The trained model binary (and list of features) are deployed into a model serving system. This step is not executed by Feast. diff --git a/docs/getting-started/concepts/data-ingestion.md b/docs/getting-started/concepts/data-ingestion.md index 6c9d9a4d740..f1c1cc0131a 100644 --- a/docs/getting-started/concepts/data-ingestion.md +++ b/docs/getting-started/concepts/data-ingestion.md @@ -64,11 +64,17 @@ materialize_python = PythonOperator( #### How to run this in the CLI +**With timestamps:** ```bash CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIME ``` +**Simple materialization (for data without event timestamps):** +```bash +feast materialize --disable-event-timestamp +``` + #### How to run this on Airflow ```python diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index 184167ebe29..0caba1f7d60 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -499,13 +499,19 @@ print(training_df.head()) We now serialize the latest values of features since the beginning of time to prepare for serving. Note, `materialize_incremental` serializes all new features since the last `materialize` call, or since the time provided minus the `ttl` timedelta. In this case, this will be `CURRENT_TIME - 1 day` (`ttl` was set on the `FeatureView` instances in [feature_repo/feature_repo/example_repo.py](feature_repo/feature_repo/example_repo.py)). {% tabs %} -{% tab title="Bash" %} +{% tab title="Bash (with timestamp)" %} ```bash CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIME ``` {% endtab %} +{% tab title="Bash (simple)" %} +```bash +# Alternative: Materialize all data using current timestamp (for data without event timestamps) +feast materialize --disable-event-timestamp +``` +{% endtab %} {% endtabs %} {% tabs %} diff --git a/docs/reference/feast-cli-commands.md b/docs/reference/feast-cli-commands.md index 71e0ab3c76d..15157863e9c 100644 --- a/docs/reference/feast-cli-commands.md +++ b/docs/reference/feast-cli-commands.md @@ -152,18 +152,30 @@ feast init -t gcp my_feature_repo ## Materialize -Load data from feature views into the online store between two dates +Load data from feature views into the online store. +**With timestamps:** ```bash feast materialize 2020-01-01T00:00:00 2022-01-01T00:00:00 ``` -Load data for specific feature views into the online store between two dates +**Without timestamps (uses current datetime):** +```bash +feast materialize --disable-event-timestamp +``` + +Load data for specific feature views: ```text feast materialize -v driver_hourly_stats 2020-01-01T00:00:00 2022-01-01T00:00:00 ``` +```text +feast materialize --disable-event-timestamp -v driver_hourly_stats +``` + +The `--disable-event-timestamp` flag is useful when your source data lacks event timestamp columns, allowing you to materialize all available data using the current datetime as the event timestamp. + ```text Materializing 1 feature views from 2020-01-01 to 2022-01-01 diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 7d251671636..f8e121ad6af 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -200,6 +200,52 @@ requests.post( data=json.dumps(push_data)) ``` +### Materializing features + +The Python feature server also exposes an endpoint for materializing features from the offline store to the online store. + +**Standard materialization with timestamps:** +```bash +curl -X POST "http://localhost:6566/materialize" -d '{ + "start_ts": "2021-01-01T00:00:00", + "end_ts": "2021-01-02T00:00:00", + "feature_views": ["driver_hourly_stats"] +}' | jq +``` + +**Materialize all data without event timestamps:** +```bash +curl -X POST "http://localhost:6566/materialize" -d '{ + "feature_views": ["driver_hourly_stats"], + "disable_event_timestamp": true +}' | jq +``` + +When `disable_event_timestamp` is set to `true`, the `start_ts` and `end_ts` parameters are not required, and all available data is materialized using the current datetime as the event timestamp. This is useful when your source data lacks proper event timestamp columns. + +Or from Python: +```python +import json +import requests + +# Standard materialization +materialize_data = { + "start_ts": "2021-01-01T00:00:00", + "end_ts": "2021-01-02T00:00:00", + "feature_views": ["driver_hourly_stats"] +} + +# Materialize without event timestamps +materialize_data_no_timestamps = { + "feature_views": ["driver_hourly_stats"], + "disable_event_timestamp": True +} + +requests.post( + "http://localhost:6566/materialize", + data=json.dumps(materialize_data)) +``` + ## Starting the feature server in TLS(SSL) mode Enabling TLS mode ensures that data between the Feast client and server is transmitted securely. For an ideal production environment, it is recommended to start the feature server in TLS mode. diff --git a/infra/templates/README.md.jinja2 b/infra/templates/README.md.jinja2 index 65cd1a30b94..ccaadc29ff0 100644 --- a/infra/templates/README.md.jinja2 +++ b/infra/templates/README.md.jinja2 @@ -107,11 +107,26 @@ print(training_df.head()) ``` ### 6. Load feature values into your online store + +**Option 1: Incremental materialization (recommended)** ```commandline CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIME ``` +**Option 2: Full materialization with timestamps** +```commandline +CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +feast materialize 2021-04-12T00:00:00 $CURRENT_TIME +``` + +**Option 3: Simple materialization without timestamps** +```commandline +feast materialize --disable-event-timestamp +``` + +The `--disable-event-timestamp` flag allows you to materialize all available feature data using the current datetime as the event timestamp, without needing to specify start and end timestamps. This is useful when your source data lacks proper event timestamp columns. + ```commandline Materializing feature view driver_hourly_stats from 2021-04-14 to 2021-04-15 done! ``` diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index 71cd518dc99..1bdf1f5ff85 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -303,17 +303,26 @@ def registry_dump_command(ctx: click.Context): @cli.command("materialize") -@click.argument("start_ts") -@click.argument("end_ts") +@click.argument("start_ts", required=False) +@click.argument("end_ts", required=False) @click.option( "--views", "-v", help="Feature views to materialize", multiple=True, ) +@click.option( + "--disable-event-timestamp", + is_flag=True, + help="Materialize all available data using current datetime as event timestamp (useful when source data lacks event timestamps)", +) @click.pass_context def materialize_command( - ctx: click.Context, start_ts: str, end_ts: str, views: List[str] + ctx: click.Context, + start_ts: Optional[str], + end_ts: Optional[str], + views: List[str], + disable_event_timestamp: bool, ): """ Run a (non-incremental) materialization job to ingest data into the online store. Feast @@ -322,13 +331,35 @@ def materialize_command( Views will be materialized. START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + + If --disable-event-timestamp is used, timestamps are not required and all available data will be materialized using the current datetime as the event timestamp. """ store = create_feature_store(ctx) + if disable_event_timestamp: + if start_ts or end_ts: + raise click.UsageError( + "Cannot specify START_TS or END_TS when --disable-event-timestamp is used" + ) + now = datetime.now() + # Query all available data and use current datetime as event timestamp + start_date = datetime( + 1970, 1, 1 + ) # Beginning of time to capture all historical data + end_date = now + else: + if not start_ts or not end_ts: + raise click.UsageError( + "START_TS and END_TS are required unless --disable-event-timestamp is used" + ) + start_date = utils.make_tzaware(parser.parse(start_ts)) + end_date = utils.make_tzaware(parser.parse(end_ts)) + store.materialize( feature_views=None if not views else views, - start_date=utils.make_tzaware(parser.parse(start_ts)), - end_date=utils.make_tzaware(parser.parse(end_ts)), + start_date=start_date, + end_date=end_date, + disable_event_timestamp=disable_event_timestamp, ) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 8593512d5c6..0cc90b294d8 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -5,6 +5,7 @@ import time import traceback from contextlib import asynccontextmanager +from datetime import datetime from importlib import resources as importlib_resources from typing import Any, Dict, List, Optional, Union @@ -73,9 +74,10 @@ class PushFeaturesRequest(BaseModel): class MaterializeRequest(BaseModel): - start_ts: str - end_ts: str + start_ts: Optional[str] = None + end_ts: Optional[str] = None feature_views: Optional[List[str]] = None + disable_event_timestamp: bool = False class MaterializeIncrementalRequest(BaseModel): @@ -432,10 +434,27 @@ def materialize(request: MaterializeRequest) -> None: resource=_get_feast_object(feature_view, True), actions=[AuthzedAction.WRITE_ONLINE], ) + + if request.disable_event_timestamp: + # Query all available data and use current datetime as event timestamp + now = datetime.now() + start_date = datetime( + 1970, 1, 1 + ) # Beginning of time to capture all historical data + end_date = now + else: + if not request.start_ts or not request.end_ts: + raise ValueError( + "start_ts and end_ts are required when disable_event_timestamp is False" + ) + start_date = utils.make_tzaware(parser.parse(request.start_ts)) + end_date = utils.make_tzaware(parser.parse(request.end_ts)) + store.materialize( - utils.make_tzaware(parser.parse(request.start_ts)), - utils.make_tzaware(parser.parse(request.end_ts)), + start_date, + end_date, request.feature_views, + disable_event_timestamp=request.disable_event_timestamp, ) @app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)]) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d9cdecbd291..611ad6dde85 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1542,6 +1542,7 @@ def materialize( start_date: datetime, end_date: datetime, feature_views: Optional[List[str]] = None, + disable_event_timestamp: bool = False, ) -> None: """ Materialize data from the offline store into the online store. @@ -1555,6 +1556,7 @@ def materialize( end_date (datetime): End date for time range of data to materialize into the online store feature_views (List[str]): Optional list of feature view names. If selected, will only run materialization for the specified feature views. + disable_event_timestamp (bool): If True, materializes all available data using current datetime as event timestamp instead of source event timestamps Examples: Materialize all features into the online store over the interval @@ -1609,6 +1611,7 @@ def tqdm_builder(length): registry=self._registry, project=self.project, tqdm_builder=tqdm_builder, + disable_event_timestamp=disable_event_timestamp, ) self._registry.apply_materialization( diff --git a/sdk/python/feast/infra/common/materialization_job.py b/sdk/python/feast/infra/common/materialization_job.py index f4ce5b09548..7cc340b627d 100644 --- a/sdk/python/feast/infra/common/materialization_job.py +++ b/sdk/python/feast/infra/common/materialization_job.py @@ -22,6 +22,7 @@ class MaterializationTask: end_time: datetime only_latest: bool = True tqdm_builder: Union[None, Callable[[int], tqdm]] = None + disable_event_timestamp: bool = False class MaterializationJobStatus(enum.Enum): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 40b2d63f077..2f960a02822 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -426,6 +426,7 @@ def materialize_single_feature_view( registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], + disable_event_timestamp: bool = False, ) -> None: if isinstance(feature_view, OnDemandFeatureView): if not feature_view.write_to_online_store: @@ -445,6 +446,7 @@ def materialize_single_feature_view( start_time=start_date, end_time=end_date, tqdm_builder=tqdm_builder, + disable_event_timestamp=disable_event_timestamp, ) jobs = self.batch_engine.materialize(registry, task) assert len(jobs) == 1 diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 6a20b5edf03..c2879c1e2db 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -228,6 +228,7 @@ def materialize_single_feature_view( registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], + disable_event_timestamp: bool = False, ) -> None: """ Writes latest feature values in the specified time range to the online store. @@ -240,6 +241,7 @@ def materialize_single_feature_view( registry: The registry for the current feature store. project: Feast project to which the objects belong. tqdm_builder: A function to monitor the progress of materialization. + disable_event_timestamp: If True, materializes all available data using current datetime as event timestamp instead of source event timestamps. """ pass diff --git a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi index 6d5879e52cb..91f04c1a7d6 100644 --- a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi @@ -71,7 +71,7 @@ class FeatureServiceSpec(google.protobuf.message.Message): """Name of Feast project that this Feature Service belongs to.""" @property def features(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.FeatureViewProjection_pb2.FeatureViewProjection]: - """Represents a projection that's to be applied on top of the FeatureView. + """Represents a projection that's to be applied on top of the FeatureView. Contains data such as the features to use from a FeatureView. """ @property diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 2aa674c0aa5..a04ff3cc456 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -90,6 +90,7 @@ def materialize_single_feature_view( registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], + disable_event_timestamp: bool = False, ) -> None: pass diff --git a/sdk/python/tests/unit/cli/test_cli.py b/sdk/python/tests/unit/cli/test_cli.py index c8649f5cfb5..46f4d24956b 100644 --- a/sdk/python/tests/unit/cli/test_cli.py +++ b/sdk/python/tests/unit/cli/test_cli.py @@ -190,3 +190,40 @@ def test_cli_configuration(): assertpy.assert_that(output).contains(b"path: data/online_store.db") assertpy.assert_that(output).contains(b"type: file") assertpy.assert_that(output).contains(b"entity_key_serialization_version: 3") + + +def test_cli_materialize_disable_event_timestamp(): + """ + Unit test for the 'feast materialize --disable-event-timestamp' command + """ + runner = CliRunner() + + with setup_third_party_provider_repo("local") as repo_path: + # Test that --disable-event-timestamp flag works without timestamps + return_code, output = runner.run_with_output( + ["materialize", "--disable-event-timestamp"], cwd=repo_path + ) + # Should succeed (though may not have data to materialize) + assertpy.assert_that(return_code).is_equal_to(0) + + # Test that providing timestamps with --disable-event-timestamp fails + return_code, output = runner.run_with_output( + [ + "materialize", + "--disable-event-timestamp", + "2021-01-01T00:00:00", + "2021-01-02T00:00:00", + ], + cwd=repo_path, + ) + assertpy.assert_that(return_code).is_equal_to(2) # Click usage error + assertpy.assert_that(output).contains( + b"Cannot specify START_TS or END_TS when --disable-event-timestamp is used" + ) + + # Test that missing timestamps without flag fails + return_code, output = runner.run_with_output(["materialize"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(2) # Click usage error + assertpy.assert_that(output).contains( + b"START_TS and END_TS are required unless --disable-event-timestamp is used" + ) diff --git a/sdk/python/tests/unit/test_feature_server.py b/sdk/python/tests/unit/test_feature_server.py index e07cdc8655e..21c01d61765 100644 --- a/sdk/python/tests/unit/test_feature_server.py +++ b/sdk/python/tests/unit/test_feature_server.py @@ -1,148 +1,209 @@ -import json -from unittest.mock import AsyncMock, MagicMock - -import pytest -from fastapi.testclient import TestClient - -from feast.data_source import PushMode -from feast.errors import PushSourceNotFoundException -from feast.feature_server import get_app -from feast.online_response import OnlineResponse -from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse -from feast.utils import _utc_now -from tests.foo_provider import FooProvider -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - - -@pytest.fixture -def mock_fs_factory(): - def builder(**async_support): - provider = FooProvider.with_async_support(**async_support) - fs = MagicMock() - fs._get_provider.return_value = provider - empty_response = OnlineResponse(GetOnlineFeaturesResponse(results=[])) - fs.get_online_features = MagicMock(return_value=empty_response) - fs.push = MagicMock() - fs.get_online_features_async = AsyncMock(return_value=empty_response) - fs.push_async = AsyncMock() - return fs - - return builder - - -@pytest.fixture -def test_client(): - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_1.py"), "file" - ) as store: - yield TestClient(get_app(store)) - - -def get_online_features_body(): - return { - "features": [ - "pushed_driver_locations:driver_lat", - "pushed_driver_locations:driver_long", - ], - "entities": {"driver_id": [123]}, - } - - -def push_body(push_mode=PushMode.ONLINE, lat=42.0): - return { - "push_source_name": "driver_locations_push", - "df": { - "driver_lat": [lat], - "driver_long": ["42.0"], - "driver_id": [123], - "event_timestamp": [str(_utc_now())], - "created_timestamp": [str(_utc_now())], - }, - "to": push_mode.name.lower(), - } - - -@pytest.mark.parametrize("async_online_read", [True, False]) -def test_get_online_features_async_supported(async_online_read, mock_fs_factory): - fs = mock_fs_factory(online_read=async_online_read) - client = TestClient(get_app(fs)) - client.post("/get-online-features", json=get_online_features_body()) - assert fs.get_online_features.call_count == int(not async_online_read) - assert fs.get_online_features_async.await_count == int(async_online_read) - - -@pytest.mark.parametrize( - "online_write,push_mode,async_count", - [ - (True, PushMode.ONLINE_AND_OFFLINE, 1), - (True, PushMode.OFFLINE, 0), - (True, PushMode.ONLINE, 1), - (False, PushMode.ONLINE_AND_OFFLINE, 0), - (False, PushMode.OFFLINE, 0), - (False, PushMode.ONLINE, 0), - ], -) -def test_push_online_async_supported( - online_write, push_mode, async_count, mock_fs_factory -): - fs = mock_fs_factory(online_write=online_write) - client = TestClient(get_app(fs)) - client.post("/push", json=push_body(push_mode)) - assert fs.push.call_count == 1 - async_count - assert fs.push_async.await_count == async_count - - -async def test_push_and_get(test_client): - driver_lat = 55.1 - push_payload = push_body(lat=driver_lat) - response = test_client.post("/push", json=push_payload) - assert response.status_code == 200 - - # Check new pushed temperature is fetched - request_payload = get_online_features_body() - actual_resp = test_client.post("/get-online-features", json=request_payload) - actual = json.loads(actual_resp.text) - - ix = actual["metadata"]["feature_names"].index("driver_lat") - assert actual["results"][ix]["values"][0] == pytest.approx(driver_lat, 0.0001) - - assert_get_online_features_response_format( - actual, request_payload["entities"]["driver_id"][0] - ) - - -def assert_get_online_features_response_format(parsed_response, expected_entity_id): - assert "metadata" in parsed_response - metadata = parsed_response["metadata"] - expected_features = ["driver_id", "driver_lat", "driver_long"] - response_feature_names = metadata["feature_names"] - assert len(response_feature_names) == len(expected_features) - for expected_feature in expected_features: - assert expected_feature in response_feature_names - assert "results" in parsed_response - results = parsed_response["results"] - for result in results: - # Same order as in metadata - assert len(result["statuses"]) == 1 # Requested one entity - for status in result["statuses"]: - assert status == "PRESENT" - results_driver_id_index = response_feature_names.index("driver_id") - assert results[results_driver_id_index]["values"][0] == expected_entity_id - - -def test_push_source_does_not_exist(test_client): - with pytest.raises( - PushSourceNotFoundException, - match="Unable to find push source 'push_source_does_not_exist'", - ): - test_client.post( - "/push", - json={ - "push_source_name": "push_source_does_not_exist", - "df": { - "any_data": [1], - "event_timestamp": [str(_utc_now())], - }, - }, - ) +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest +from fastapi.testclient import TestClient + +from feast.data_source import PushMode +from feast.errors import PushSourceNotFoundException +from feast.feature_server import get_app +from feast.online_response import OnlineResponse +from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse +from feast.utils import _utc_now +from tests.foo_provider import FooProvider +from tests.utils.cli_repo_creator import CliRunner, get_example_repo + + +@pytest.fixture +def mock_fs_factory(): + def builder(**async_support): + provider = FooProvider.with_async_support(**async_support) + fs = MagicMock() + fs._get_provider.return_value = provider + empty_response = OnlineResponse(GetOnlineFeaturesResponse(results=[])) + fs.get_online_features = MagicMock(return_value=empty_response) + fs.push = MagicMock() + fs.get_online_features_async = AsyncMock(return_value=empty_response) + fs.push_async = AsyncMock() + return fs + + return builder + + +@pytest.fixture +def test_client(): + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_1.py"), "file" + ) as store: + yield TestClient(get_app(store)) + + +def get_online_features_body(): + return { + "features": [ + "pushed_driver_locations:driver_lat", + "pushed_driver_locations:driver_long", + ], + "entities": {"driver_id": [123]}, + } + + +def push_body(push_mode=PushMode.ONLINE, lat=42.0): + return { + "push_source_name": "driver_locations_push", + "df": { + "driver_lat": [lat], + "driver_long": ["42.0"], + "driver_id": [123], + "event_timestamp": [str(_utc_now())], + "created_timestamp": [str(_utc_now())], + }, + "to": push_mode.name.lower(), + } + + +@pytest.mark.parametrize("async_online_read", [True, False]) +def test_get_online_features_async_supported(async_online_read, mock_fs_factory): + fs = mock_fs_factory(online_read=async_online_read) + client = TestClient(get_app(fs)) + client.post("/get-online-features", json=get_online_features_body()) + assert fs.get_online_features.call_count == int(not async_online_read) + assert fs.get_online_features_async.await_count == int(async_online_read) + + +@pytest.mark.parametrize( + "online_write,push_mode,async_count", + [ + (True, PushMode.ONLINE_AND_OFFLINE, 1), + (True, PushMode.OFFLINE, 0), + (True, PushMode.ONLINE, 1), + (False, PushMode.ONLINE_AND_OFFLINE, 0), + (False, PushMode.OFFLINE, 0), + (False, PushMode.ONLINE, 0), + ], +) +def test_push_online_async_supported( + online_write, push_mode, async_count, mock_fs_factory +): + fs = mock_fs_factory(online_write=online_write) + client = TestClient(get_app(fs)) + client.post("/push", json=push_body(push_mode)) + assert fs.push.call_count == 1 - async_count + assert fs.push_async.await_count == async_count + + +async def test_push_and_get(test_client): + driver_lat = 55.1 + push_payload = push_body(lat=driver_lat) + response = test_client.post("/push", json=push_payload) + assert response.status_code == 200 + + # Check new pushed temperature is fetched + request_payload = get_online_features_body() + actual_resp = test_client.post("/get-online-features", json=request_payload) + actual = json.loads(actual_resp.text) + + ix = actual["metadata"]["feature_names"].index("driver_lat") + assert actual["results"][ix]["values"][0] == pytest.approx(driver_lat, 0.0001) + + assert_get_online_features_response_format( + actual, request_payload["entities"]["driver_id"][0] + ) + + +def assert_get_online_features_response_format(parsed_response, expected_entity_id): + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + expected_features = ["driver_id", "driver_lat", "driver_long"] + response_feature_names = metadata["feature_names"] + assert len(response_feature_names) == len(expected_features) + for expected_feature in expected_features: + assert expected_feature in response_feature_names + assert "results" in parsed_response + results = parsed_response["results"] + for result in results: + # Same order as in metadata + assert len(result["statuses"]) == 1 # Requested one entity + for status in result["statuses"]: + assert status == "PRESENT" + results_driver_id_index = response_feature_names.index("driver_id") + assert results[results_driver_id_index]["values"][0] == expected_entity_id + + +def test_push_source_does_not_exist(test_client): + with pytest.raises( + PushSourceNotFoundException, + match="Unable to find push source 'push_source_does_not_exist'", + ): + test_client.post( + "/push", + json={ + "push_source_name": "push_source_does_not_exist", + "df": { + "any_data": [1], + "event_timestamp": [str(_utc_now())], + }, + }, + ) + + +def test_materialize_endpoint_logic(): + """Test the materialization endpoint logic without HTTP requests""" + from datetime import datetime + + from feast.feature_server import MaterializeRequest + + # Test 1: Standard request with timestamps + request = MaterializeRequest( + start_ts="2021-01-01T00:00:00", + end_ts="2021-01-02T00:00:00", + feature_views=["test_view"], + ) + assert request.disable_event_timestamp is False + assert request.start_ts is not None + assert request.end_ts is not None + + # Test 2: Request with disable_event_timestamp + request_no_ts = MaterializeRequest( + feature_views=["test_view"], disable_event_timestamp=True + ) + assert request_no_ts.disable_event_timestamp is True + assert request_no_ts.start_ts is None + assert request_no_ts.end_ts is None + + # Test 3: Validation logic (this is what our endpoint does) + # Simulate the endpoint's validation logic + if request_no_ts.disable_event_timestamp: + # Should use epoch to now + now = datetime.now() + start_date = datetime(1970, 1, 1) + end_date = now + # Should not raise an error + assert start_date < end_date + else: + # Should require timestamps + if not request_no_ts.start_ts or not request_no_ts.end_ts: + # This should trigger our validation error + pass + + +def test_materialize_request_model(): + """Test MaterializeRequest model validation""" + from feast.feature_server import MaterializeRequest + + # Test with disable_event_timestamp=True (no timestamps needed) + req1 = MaterializeRequest(feature_views=["test"], disable_event_timestamp=True) + assert req1.disable_event_timestamp is True + assert req1.start_ts is None + assert req1.end_ts is None + + # Test with disable_event_timestamp=False (timestamps provided) + req2 = MaterializeRequest( + start_ts="2021-01-01T00:00:00", + end_ts="2021-01-02T00:00:00", + feature_views=["test"], + ) + assert req2.disable_event_timestamp is False + assert req2.start_ts == "2021-01-01T00:00:00" + assert req2.end_ts == "2021-01-02T00:00:00"