diff --git a/docs/reference/online-stores/hybrid.md b/docs/reference/online-stores/hybrid.md new file mode 100644 index 00000000000..38527d9a66e --- /dev/null +++ b/docs/reference/online-stores/hybrid.md @@ -0,0 +1,111 @@ +# Hybrid online store + +## Description + +The HybridOnlineStore allows routing online feature operations to different online store backends based on a configurable tag (such as `tribe`, `team`, or `project`) on the FeatureView. This enables a single Feast deployment to support multiple online store backends, each configured independently and selected dynamically at runtime. + +## Getting started + +To use the HybridOnlineStore, install Feast with all required online store dependencies (e.g., Bigtable, Cassandra, etc.) for the stores you plan to use. For example: + +``` +pip install 'feast[gcp,cassandra]' +``` + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: hybrid_online_store.HybridOnlineStore + routing_tag: team # or any tag name you want to use in FeatureView's for routing + online_stores: + - type: bigtable + conf: + project_id: my_gcp_project + instance: my_bigtable_instance + - type: cassandra + conf: + hosts: + - cassandra1.example.com + - cassandra2.example.com + keyspace: feast_keyspace + username: feast_user + password: feast_password +``` +{% endcode %} + +### Setting the Routing Tag in FeatureView + +To enable routing, add a tag to your FeatureView that matches the `routing_tag` specified in your `feature_store.yaml`. For example, if your `routing_tag` is `team`, add a `team` tag to your FeatureView: + +```yaml +tags: + team: bigtable # This tag determines which online store is used +``` + +The value of this tag (e.g., `bigtable`) should match the type or identifier of the online store you want to use for this FeatureView. The HybridOnlineStore will route all online operations for this FeatureView to the corresponding backend. + +### Example FeatureView + +{% code title="feature_view" %} +```yaml +name: user_features +entities: + - name: user_id + join_keys: ["user_id"] +ttl: null +schema: + - name: age + dtype: int64 + - name: country + dtype: string +online: true +source: + path: data/user_features.parquet + event_timestamp_column: event_timestamp + created_timestamp_column: created_timestamp +tags: + team: bigtable # This tag determines which online store is used +``` +{% endcode %} + +The `team` tag in the FeatureView's `tags` field determines which online store backend is used for this FeatureView. In this example, all online operations for `user_features` will be routed to the Bigtable online store, as specified by the tag value and the `routing_tag` in your `feature_store.yaml`. + +The HybridOnlineStore will route requests to the correct online store based on the value of the tag specified by `routing_tag`. + +The full set of configuration options for each online store is available in their respective documentation: +- [BigtableOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.bigtable.BigtableOnlineStoreConfig) +- [CassandraOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStoreConfig) + +For a full explanation of configuration options, please refer to the documentation for each online store backend you configure in the `online_stores` list. + +Storage specifications can be found at [docs/specs/online_store_format.md](../../specs/online_store_format.md). + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). Below is a matrix indicating which functionality is supported by the HybridOnlineStore. + +| | HybridOnlineStore | +|-----------------------------------------------------------|-------------------| +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | yes | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | yes | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py new file mode 100644 index 00000000000..e929e039411 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -0,0 +1,329 @@ +""" +hybrid_online_store.py +---------------------- + +This module provides the HybridOnlineStore, a Feast OnlineStore implementation that enables routing online feature operations +to different online stores based on a configurable tag (e.g., tribe, team, or project) on the FeatureView. This allows a single Feast deployment +to support multiple online store backends, each configured independently and selected dynamically at runtime. + +Features: + - Supports multiple online store backends in a single Feast deployment. + - Routes online reads and writes to the correct backend based on a configurable tag on the FeatureView. + - Enables multi-tenancy and flexible data management strategies. + - Designed for extensibility and compatibility with Feast's OnlineStore interface. + +Usage: + 1. Add a tag (e.g., 'tribe', 'team', or any custom name) to your FeatureView. + 2. Configure multiple online stores in your Feast repo config under 'online_stores'. + 3. Set the 'routing_tag' field in your online_store config to specify which tag to use for routing. + 4. The HybridOnlineStore will route reads and writes to the correct backend based on the tag value. + +Example configuration (feature_store.yaml): + + online_store: + type: hybrid_online_store.HybridOnlineStore + routing_tag: team # or any tag name you want to use for routing + online_stores: + - type: feast.infra.online_stores.bigtable.BigtableOnlineStore + conf: + ... # bigtable config + - type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore + conf: + ... # cassandra config + +Example FeatureView: + + tags: + team: bigtable + +The HybridOnlineStore will route requests to the correct online store based on the value of the tag specified by 'routing_tag'. +""" + +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.helpers import get_online_store_from_config +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, get_online_config_from_type + + +class HybridOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for HybridOnlineStore. + + This config allows multiple online stores to be used in a single Feast deployment. Each online store is specified by its type (Python import path) + and a configuration dictionary. The HybridOnlineStore uses this configuration to instantiate and manage the set of online stores. + + Attributes: + type: The type identifier for the HybridOnlineStore. + online_stores: A list of OnlineStoresWithConfig, each specifying the type and config for an online store backend. + """ + + type: Literal["HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = ( + "hybrid_online_store.HybridOnlineStore" + ) + + class OnlineStoresWithConfig(FeastConfigBaseModel): + """ + Configuration for a single online store backend. + + Attributes: + type: Python import path to the online store class. + conf: Dictionary of configuration parameters for the online store. + """ + + type: StrictStr # Python import path to the online store class + conf: Dict + + online_stores: Optional[List[OnlineStoresWithConfig]] + routing_tag: StrictStr = ( + "tribe" # Configurable tag name for routing, default is 'tribe' + ) + + +class HybridOnlineStore(OnlineStore): + """ + HybridOnlineStore routes online feature operations to different online store backends + based on a tag (e.g., 'tribe') on the FeatureView. This enables multi-tenancy and flexible + backend selection in a single Feast deployment. + + The backend is selected dynamically at runtime according to the tag value. + """ + + def __init__(self): + """ + Initialize the HybridOnlineStore. Online stores are instantiated lazily on first use. + """ + self.online_stores = {} + self._initialized = False + + def _initialize_online_stores(self, config: RepoConfig): + """ + Lazily instantiate all configured online store backends from the repo config. + + Args: + config: Feast RepoConfig containing the online_stores configuration. + """ + if self._initialized: + return + self.online_stores = {} + online_stores_cfg = getattr(config.online_store, "online_stores", []) + for store_cfg in online_stores_cfg: + config_cls = get_online_config_from_type( + store_cfg.type.split(".")[-1].lower() + ) + config_instance = config_cls(**store_cfg.conf) + online_store_instance = get_online_store_from_config(config_instance) + self.online_stores[store_cfg.type.split(".")[-1].lower()] = ( + online_store_instance + ) + self._initialized = True + + def _get_online_store(self, tribe_tag, config: RepoConfig): + """ + Retrieve the online store backend corresponding to the given tag value. + + Args: + tribe_tag: The tag value (e.g., 'tribe') used to select the backend. + config: Feast RepoConfig. + Returns: + The OnlineStore instance for the given tag, or None if not found. + """ + self._initialize_online_stores(config) + return self.online_stores.get(tribe_tag.lower()) + + def _prepare_repo_conf(self, config: RepoConfig, online_store_type: str): + """ + Prepare a RepoConfig for the selected online store backend. + + Args: + config: The original Feast RepoConfig. + online_store_type: The type of the online store backend to use. + Returns: + A dictionary representing the updated RepoConfig for the selected backend. + """ + rconfig = config + for online_store in config.online_store.online_stores: + if online_store.type.split(".")[-1].lower() == online_store_type.lower(): + rconfig.online_config = online_store.conf + rconfig.online_config["type"] = online_store.type + data = rconfig.__dict__ + data["registry"] = data["registry_config"] + data["offline_store"] = data["offline_config"] + data["online_store"] = data["online_config"] + return data + + def _get_routing_tag_value(self, table: FeatureView, config: RepoConfig): + tag_name = getattr(config.online_store, "routing_tag", "tribe") + return table.tags.get(tag_name) + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + odata: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to the appropriate online store based on the FeatureView's tag. + + Args: + config: Feast RepoConfig. + table: FeatureView to write to. Must have a tag (e.g., 'tribe') to select the backend. + odata: List of tuples containing entity key, feature values, event timestamp, and created timestamp. + progress: Optional callback for progress reporting. + Raises: + ValueError: If the FeatureView does not have the required tag. + NotImplementedError: If no online store is found for the tag value. + """ + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.online_write_batch(config, table, odata, progress) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + @staticmethod + def write_to_table( + created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val + ): + """ + (Not implemented) Write a single feature value to the online store table. + """ + pass + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Read feature rows from the appropriate online store based on the FeatureView's tag. + + Args: + config: Feast RepoConfig. + table: FeatureView to read from. Must have a tag (e.g., 'tribe') to select the backend. + entity_keys: List of entity keys to read. + requested_features: Optional list of feature names to read. + Returns: + List of tuples containing event timestamp and feature values. + Raises: + ValueError: If the FeatureView does not have the required tag. + NotImplementedError: If no online store is found for the tag value. + """ + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + return online_store.online_read( + config, table, entity_keys, requested_features + ) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update the state of the online stores for the given FeatureViews and Entities. + + Args: + config: Feast RepoConfig. + tables_to_delete: Sequence of FeatureViews to delete. + tables_to_keep: Sequence of FeatureViews to keep. + entities_to_delete: Sequence of Entities to delete. + entities_to_keep: Sequence of Entities to keep. + partial: Whether to perform a partial update. + Raises: + ValueError: If a FeatureView does not have the required tag. + NotImplementedError: If no online store is found for a tag value. + """ + for table in tables_to_keep: + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.update( + config, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial, + ) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Teardown all managed online stores for the given FeatureViews and Entities. + + Args: + config: Feast RepoConfig. + tables: Sequence of FeatureViews to teardown. + entities: Sequence of Entities to teardown. + """ + # Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance + tribes_seen = set() + online_stores_cfg = getattr(config.online_store, "online_stores", []) + tag_name = getattr(config.online_store, "routing_tag", "tribe") + for table in tables: + tribe = table.tags.get(tag_name) + if not tribe: + continue + # Find all store configs matching this tribe (supporting multiple instances of the same type) + for store_cfg in online_stores_cfg: + store_type = store_cfg.type + # Use id(store_cfg.conf) to distinguish different configs of the same type + key = (tribe, store_type, id(store_cfg.conf)) + if key in tribes_seen: + continue + tribes_seen.add(key) + # Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility) + if tribe.lower() == store_type.split(".")[-1].lower(): + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.teardown(config, tables, entities) diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py new file mode 100644 index 00000000000..90a65a092d0 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py @@ -0,0 +1,28 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file provides integration test repo configuration for HybridOnlineStore. +# It enables running integration tests with multiple online store backends. +# Update this file if you add more backends or change test setup. + +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.hybrid_online_store import ( + HybridOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=HybridOnlineStoreCreator), +] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py new file mode 100644 index 00000000000..f0efbd11044 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py @@ -0,0 +1,25 @@ +from sdk.python.tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class HybridOnlineStoreCreator(OnlineStoreCreator): + def create_online_store(self): + # Use Redis and SQLite as two backends for demonstration/testing, but mock Redis config for unit tests + return { + "type": "hybrid_online_store.HybridOnlineStore", + "online_stores": [ + { + "type": "redis", + "conf": { + "redis_type": "redis", + "connection_string": "localhost:6379", + }, + }, + {"type": "sqlite", "conf": {"path": "/tmp/feast_hybrid_test.db"}}, + ], + } + + def teardown(self): + # Implement any resource cleanup if needed (e.g., remove test DB files) + pass diff --git a/sdk/python/tests/integration/online_store/test_hybrid_online_store.py b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py new file mode 100644 index 00000000000..4b9dad05ff8 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py @@ -0,0 +1,87 @@ +from datetime import datetime +from unittest.mock import patch + +import pytest + +from feast import Entity, FeatureView, Field, FileSource, RepoConfig, ValueType +from feast.infra.online_stores.hybrid_online_store.hybrid_online_store import ( + HybridOnlineStore, + HybridOnlineStoreConfig, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey +from feast.protos.feast.types.Value_pb2 import Value +from feast.types import PrimitiveFeastType + + +@pytest.fixture +def sample_entity(): + return Entity(name="id", join_keys=["id"], value_type=ValueType.INT64) + + +@pytest.fixture +def sample_feature_view(sample_entity): + file_source = FileSource( + path="/tmp/feast_hybrid_test.parquet", + event_timestamp_column="event_timestamp", + ) + return FeatureView( + name="test_fv", + entities=[sample_entity], + schema=[Field(name="feature1", dtype=PrimitiveFeastType.INT64)], + online=True, + tags={"tribe": "redis"}, + source=file_source, + ) + + +@pytest.fixture +def sample_repo_config(): + # Minimal config for HybridOnlineStore with two backends (mocked for test) + return RepoConfig( + registry="test-registry.db", + project="test_project", + provider="local", + online_store=HybridOnlineStoreConfig( + online_stores=[ + HybridOnlineStoreConfig.OnlineStoresWithConfig( + type="redis", + conf={"redis_type": "redis", "connection_string": "localhost:6379"}, + ), + HybridOnlineStoreConfig.OnlineStoresWithConfig( + type="sqlite", + conf={"path": "/tmp/feast_hybrid_test.db"}, + ), + ] + ), + offline_store=None, + ) + + +@pytest.mark.usefixtures("sample_entity", "sample_feature_view", "sample_repo_config") +def test_hybrid_online_store_write_and_read(sample_repo_config, sample_feature_view): + with ( + patch( + "feast.infra.online_stores.redis.RedisOnlineStore.online_write_batch" + ) as mock_write, + patch( + "feast.infra.online_stores.redis.RedisOnlineStore.online_read" + ) as mock_read, + ): + mock_write.return_value = None + mock_read.return_value = [(None, {"feature1": Value(int64_val=100)})] + store = HybridOnlineStore() + entity_key = EntityKey( + join_keys=["id"], + entity_values=[Value(int64_val=1)], + ) + now = datetime.utcnow() + odata = [(entity_key, {"feature1": Value(int64_val=100)}, now, None)] + # Write to the online store (mocked) + store.online_write_batch( + sample_repo_config, sample_feature_view, odata, progress=None + ) + # Read back (mocked) + result = store.online_read( + sample_repo_config, sample_feature_view, [entity_key] + ) + assert result[0][1]["feature1"].int64_val == 100