diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 3ad64b5b341..6685bc0baa1 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -32,9 +32,13 @@ service RegistryServer{ // FeatureView RPCs rpc ApplyFeatureView (ApplyFeatureViewRequest) returns (google.protobuf.Empty) {} + rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {} + rpc GetAnyFeatureView (GetAnyFeatureViewRequest) returns (GetAnyFeatureViewResponse) {} + rpc ListAllFeatureViews (ListAllFeatureViewsRequest) returns (ListAllFeatureViewsResponse) {} + + // plain FeatureView RPCs rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} - rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {} // StreamFeatureView RPCs rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {} @@ -208,6 +212,35 @@ message DeleteFeatureViewRequest { bool commit = 3; } +message AnyFeatureView { + oneof any_feature_view { + feast.core.FeatureView feature_view = 1; + feast.core.OnDemandFeatureView on_demand_feature_view = 2; + feast.core.StreamFeatureView stream_feature_view = 3; + } +} + +message GetAnyFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message GetAnyFeatureViewResponse { + AnyFeatureView any_feature_view = 1; +} + +message ListAllFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; + map tags = 3; +} + +message ListAllFeatureViewsResponse { + repeated AnyFeatureView feature_views = 1; +} + + // StreamFeatureView message GetStreamFeatureViewRequest { diff --git a/sdk/python/feast/cli_utils.py b/sdk/python/feast/cli_utils.py index 264a633c315..4152eb219b2 100644 --- a/sdk/python/feast/cli_utils.py +++ b/sdk/python/feast/cli_utils.py @@ -175,7 +175,7 @@ def handle_fv_verbose_permissions_command( tags=tags_filter # type: ignore[assignment] ) for fv in feature_views: - if p.match_resource(fv): + if p.match_resource(fv): # type: ignore[arg-type] feature_views_names.add(fv.name) if len(feature_views_names) > 0: Node( @@ -207,8 +207,7 @@ def handle_not_verbose_permissions_command( def fetch_all_feast_objects(store: FeatureStore) -> list[FeastObject]: objects: list[FeastObject] = [] objects.extend(store.list_entities()) - objects.extend(store.list_all_feature_views()) - objects.extend(store.list_batch_feature_views()) + objects.extend(store.list_all_feature_views()) # type: ignore[arg-type] objects.extend(store.list_feature_services()) objects.extend(store.list_data_sources()) objects.extend(store.list_validation_references()) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 27b6eade5b1..4f96cfb0fcb 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import itertools -import logging import os import warnings from datetime import datetime, timedelta @@ -247,9 +246,26 @@ def list_feature_services( """ return self._registry.list_feature_services(self.project, tags=tags) + def _list_all_feature_views( + self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None + ) -> List[BaseFeatureView]: + feature_views = [] + for fv in self.registry.list_all_feature_views( + self.project, allow_cache=allow_cache, tags=tags + ): + if ( + isinstance(fv, FeatureView) + and fv.entities + and fv.entities[0] == DUMMY_ENTITY_NAME + ): + fv.entities = [] + fv.entity_columns = [] + feature_views.append(fv) + return feature_views + def list_all_feature_views( self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None - ) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]: + ) -> List[BaseFeatureView]: """ Retrieves the list of feature views from the registry. @@ -274,10 +290,6 @@ def list_feature_views( Returns: A list of feature views. """ - logging.warning( - "list_feature_views will make breaking changes. Please use list_batch_feature_views instead. " - "list_feature_views will behave like list_all_feature_views in the future." - ) return utils._list_feature_views( self._registry, self.project, allow_cache, tags=tags ) @@ -297,44 +309,6 @@ def list_batch_feature_views( """ return self._list_batch_feature_views(allow_cache=allow_cache, tags=tags) - def _list_all_feature_views( - self, - allow_cache: bool = False, - tags: Optional[dict[str, str]] = None, - ) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]: - all_feature_views = ( - utils._list_feature_views( - self._registry, self.project, allow_cache, tags=tags - ) - + self._list_stream_feature_views(allow_cache, tags=tags) - + self.list_on_demand_feature_views(allow_cache, tags=tags) - ) - return all_feature_views - - def _list_feature_views( - self, - allow_cache: bool = False, - hide_dummy_entity: bool = True, - tags: Optional[dict[str, str]] = None, - ) -> List[FeatureView]: - logging.warning( - "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " - "_list_feature_views will behave like _list_all_feature_views in the future." - ) - feature_views = [] - for fv in self._registry.list_feature_views( - self.project, allow_cache=allow_cache, tags=tags - ): - if ( - hide_dummy_entity - and fv.entities - and fv.entities[0] == DUMMY_ENTITY_NAME - ): - fv.entities = [] - fv.entity_columns = [] - feature_views.append(fv) - return feature_views - def _list_batch_feature_views( self, allow_cache: bool = False, diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index f5040d97529..f2374edf1b2 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -391,6 +391,44 @@ def list_feature_views( """ raise NotImplementedError + @abstractmethod + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + """ + Retrieves a feature view of any type. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + raise NotImplementedError + + @abstractmethod + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + """ + Retrieve a list of feature views of all types from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + tags: Filter by tags + + Returns: + List of feature views + """ + raise NotImplementedError + @abstractmethod def apply_materialization( self, diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index c04a62552bf..8f47fab0779 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -7,6 +7,7 @@ from threading import Lock from typing import List, Optional +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService @@ -102,6 +103,39 @@ def list_entities( ) return self._list_entities(project, tags) + @abstractmethod + def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: + pass + + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_any_feature_view( + self.cached_registry_proto, name, project + ) + return self._get_any_feature_view(name, project) + + @abstractmethod + def _list_all_feature_views( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[BaseFeatureView]: + pass + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_all_feature_views( + self.cached_registry_proto, project, tags + ) + return self._list_all_feature_views(project, tags) + @abstractmethod def _get_feature_view(self, name: str, project: str) -> FeatureView: pass diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index b0413fd77ef..fc5c3f6671e 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -2,6 +2,7 @@ from typing import List, Optional from feast import utils +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.errors import ( @@ -93,6 +94,33 @@ def get_feature_service( raise FeatureServiceNotFoundException(name, project=project) +def get_any_feature_view( + registry_proto: RegistryProto, name: str, project: str +) -> BaseFeatureView: + for feature_view_proto in registry_proto.feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return FeatureView.from_proto(feature_view_proto) + + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if ( + on_demand_feature_view.spec.project == project + and on_demand_feature_view.spec.name == name + ): + return OnDemandFeatureView.from_proto(on_demand_feature_view) + + raise FeatureViewNotFoundException(name, project) + + def get_feature_view( registry_proto: RegistryProto, name: str, project: str ) -> FeatureView: @@ -179,6 +207,17 @@ def list_feature_services( return feature_services +@registry_proto_cache_with_tags +def list_all_feature_views( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[BaseFeatureView]: + return ( + list_feature_views(registry_proto, project, tags) + + list_stream_feature_views(registry_proto, project, tags) + + list_on_demand_feature_views(registry_proto, project, tags) + ) + + @registry_proto_cache_with_tags def list_feature_views( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 634d6fa7ac4..bf5dfbe24fe 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -585,7 +585,26 @@ def apply_materialization( self.commit() return - raise FeatureViewNotFoundException(feature_view.name, project) + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.list_all_feature_views( + registry_proto, project, tags + ) + + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.get_any_feature_view(registry_proto, name, project) def list_feature_views( self, diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index ba25ef7dbe6..cdb45f03638 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -30,6 +30,24 @@ from feast.stream_feature_view import StreamFeatureView +def extract_base_feature_view( + any_feature_view: RegistryServer_pb2.AnyFeatureView, +) -> BaseFeatureView: + feature_view_type = any_feature_view.WhichOneof("any_feature_view") + if feature_view_type == "feature_view": + feature_view = FeatureView.from_proto(any_feature_view.feature_view) + elif feature_view_type == "on_demand_feature_view": + feature_view = OnDemandFeatureView.from_proto( + any_feature_view.on_demand_feature_view + ) + elif feature_view_type == "stream_feature_view": + feature_view = StreamFeatureView.from_proto( + any_feature_view.stream_feature_view + ) + + return feature_view + + class RemoteRegistryConfig(RegistryConfig): registry_type: StrictStr = "remote" """ str: Provider name or a class name that implements Registry.""" @@ -249,6 +267,37 @@ def list_on_demand_feature_views( for on_demand_feature_view in response.on_demand_feature_views ] + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + request = RegistryServer_pb2.GetAnyFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response: RegistryServer_pb2.GetAnyFeatureViewResponse = ( + self.stub.GetAnyFeatureView(request) + ) + any_feature_view = response.any_feature_view + return extract_base_feature_view(any_feature_view) + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + request = RegistryServer_pb2.ListAllFeatureViewsRequest( + project=project, allow_cache=allow_cache, tags=tags + ) + + response: RegistryServer_pb2.ListAllFeatureViewsResponse = ( + self.stub.ListAllFeatureViews(request) + ) + return [ + extract_base_feature_view(any_feature_view) + for any_feature_view in response.feature_views + ] + def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index accfa42e12d..f9dd37e516f 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta, timezone from enum import Enum from threading import Lock -from typing import Any, Callable, List, Literal, Optional, Union +from typing import Any, Callable, List, Literal, Optional, Union, cast from pydantic import ConfigDict, Field, StrictStr @@ -575,6 +575,76 @@ def get_feature_view( FeatureViewNotFoundException, ) + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_any_feature_view( + self.cached_registry_proto, name, project + ) + fv = self._get_object( + "FEATURE_VIEWS", + name, + project, + FeatureViewProto, + FeatureView, + "FEATURE_VIEW_NAME", + "FEATURE_VIEW_PROTO", + None, + ) + + if not fv: + fv = self._get_object( + "STREAM_FEATURE_VIEWS", + name, + project, + StreamFeatureViewProto, + StreamFeatureView, + "STREAM_FEATURE_VIEW_NAME", + "STREAM_FEATURE_VIEW_PROTO", + None, + ) + if not fv: + fv = self._get_object( + "ON_DEMAND_FEATURE_VIEWS", + name, + project, + OnDemandFeatureViewProto, + OnDemandFeatureView, + "ON_DEMAND_FEATURE_VIEW_NAME", + "ON_DEMAND_FEATURE_VIEW_PROTO", + FeatureViewNotFoundException, + ) + return fv + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_all_feature_views( + self.cached_registry_proto, project, tags + ) + + return ( + cast( + list[BaseFeatureView], + self.list_feature_views(project, allow_cache, tags), + ) + + cast( + list[BaseFeatureView], + self.list_stream_feature_views(project, allow_cache, tags), + ) + + cast( + list[BaseFeatureView], + self.list_on_demand_feature_views(project, allow_cache, tags), + ) + ) + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: infra_object = self._get_object( "MANAGED_INFRA", diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 2b4a58266c7..9ce3fbe5dda 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union, cast from pydantic import StrictInt, StrictStr from sqlalchemy import ( # type: ignore @@ -334,6 +334,61 @@ def _get_entity(self, name: str, project: str) -> Entity: not_found_exception=EntityNotFoundException, ) + def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: + fv = self._get_object( + table=feature_views, + name=name, + project=project, + proto_class=FeatureViewProto, + python_class=FeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=None, + ) + + if not fv: + fv = self._get_object( + table=on_demand_feature_views, + name=name, + project=project, + proto_class=OnDemandFeatureViewProto, + python_class=OnDemandFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=None, + ) + + if not fv: + fv = self._get_object( + table=stream_feature_views, + name=name, + project=project, + proto_class=StreamFeatureViewProto, + python_class=StreamFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=FeatureViewNotFoundException, + ) + return fv + + def _list_all_feature_views( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[BaseFeatureView]: + return ( + cast( + list[BaseFeatureView], + self._list_feature_views(project=project, tags=tags), + ) + + cast( + list[BaseFeatureView], + self._list_stream_feature_views(project=project, tags=tags), + ) + + cast( + list[BaseFeatureView], + self._list_on_demand_feature_views(project=project, tags=tags), + ) + ) + def _get_feature_view(self, name: str, project: str) -> FeatureView: return self._get_object( table=feature_views, diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 2661f258821..c2f4a688d3b 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -8,6 +8,7 @@ from grpc_reflection.v1alpha import reflection from feast import FeatureService, FeatureStore +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.errors import FeatureViewNotFoundException @@ -38,6 +39,28 @@ from feast.stream_feature_view import StreamFeatureView +def _build_any_feature_view_proto(feature_view: BaseFeatureView): + if isinstance(feature_view, StreamFeatureView): + arg_name = "stream_feature_view" + feature_view_proto = feature_view.to_proto() + elif isinstance(feature_view, FeatureView): + arg_name = "feature_view" + feature_view_proto = feature_view.to_proto() + elif isinstance(feature_view, OnDemandFeatureView): + arg_name = "on_demand_feature_view" + feature_view_proto = feature_view.to_proto() + + return RegistryServer_pb2.AnyFeatureView( + feature_view=feature_view_proto if arg_name == "feature_view" else None, + stream_feature_view=feature_view_proto + if arg_name == "stream_feature_view" + else None, + on_demand_feature_view=feature_view_proto + if arg_name == "on_demand_feature_view" + else None, + ) + + class RegistryServer(RegistryServer_pb2_grpc.RegistryServerServicer): def __init__(self, registry: BaseRegistry) -> None: super().__init__() @@ -178,6 +201,27 @@ def GetFeatureView( actions=[AuthzedAction.DESCRIBE], ).to_proto() + def GetAnyFeatureView( + self, request: RegistryServer_pb2.GetAnyFeatureViewRequest, context + ): + feature_view = assert_permissions( + cast( + FeastObject, + self.proxied_registry.get_any_feature_view( + name=request.name, + project=request.project, + allow_cache=request.allow_cache, + ), + ), + actions=[AuthzedAction.DESCRIBE], + ) + + return RegistryServer_pb2.GetAnyFeatureViewResponse( + any_feature_view=_build_any_feature_view_proto( + cast(BaseFeatureView, feature_view) + ) + ) + def ApplyFeatureView( self, request: RegistryServer_pb2.ApplyFeatureViewRequest, context ): @@ -228,6 +272,26 @@ def ListFeatureViews( ] ) + def ListAllFeatureViews( + self, request: RegistryServer_pb2.ListAllFeatureViewsRequest, context + ): + return RegistryServer_pb2.ListAllFeatureViewsResponse( + feature_views=[ + _build_any_feature_view_proto(cast(BaseFeatureView, feature_view)) + for feature_view in permitted_resources( + resources=cast( + list[FeastObject], + self.proxied_registry.list_all_feature_views( + project=request.project, + allow_cache=request.allow_cache, + tags=dict(request.tags), + ), + ), + actions=AuthzedAction.DESCRIBE, + ) + ] + ) + def DeleteFeatureView( self, request: RegistryServer_pb2.DeleteFeatureViewRequest, context ): diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 5862cd46304..992869557af 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,6 +1,5 @@ import copy import itertools -import logging import os import typing import warnings @@ -746,10 +745,6 @@ def _list_feature_views( ) -> List["FeatureView"]: from feast.feature_view import DUMMY_ENTITY_NAME - logging.warning( - "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " - "_list_feature_views will behave like _list_all_feature_views in the future." - ) feature_views = [] for fv in registry.list_feature_views(project, allow_cache=allow_cache, tags=tags): if hide_dummy_entity and fv.entities and fv.entities[0] == DUMMY_ENTITY_NAME: diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 2ffe869ef50..308201590df 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -163,7 +163,6 @@ def test_write_to_online_store_event_check(environment): fs.apply([fv1, e]) assert len(fs.list_all_feature_views(tags=TAGS)) == 1 assert len(fs.list_feature_views(tags=TAGS)) == 1 - assert len(fs.list_batch_feature_views(tags=TAGS)) == 1 # data to ingest into Online Store (recent) data = { @@ -421,7 +420,7 @@ def setup_feature_store_universal_feature_views( feature_views = construct_universal_feature_views(data_sources) fs.apply([driver(), feature_views.driver, feature_views.global_fv]) - assert len(fs.list_batch_feature_views(TAGS)) == 2 + assert len(fs.list_all_feature_views(TAGS)) == 2 data = { "driver_id": [1, 2], @@ -518,7 +517,7 @@ def test_online_list_retrieval(environment, universal_data_sources): environment, universal_data_sources ) - assert len(fs.list_batch_feature_views(tags=TAGS)) == 2 + assert len(fs.list_all_feature_views(tags=TAGS)) == 2 @pytest.mark.integration diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index aee93b47052..6e4a208d4b1 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -36,6 +36,7 @@ from feast.field import Field from feast.infra.infra_object import Infra from feast.infra.online_stores.sqlite import SqliteTable +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.registry.registry import Registry from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig from feast.infra.registry.sql import SqlRegistry, SqlRegistryConfig @@ -415,7 +416,7 @@ def assert_project(project_name, test_registry, allow_cache=False): "test_registry", all_fixtures, ) -def test_apply_feature_view_success(test_registry): +def test_apply_feature_view_success(test_registry: BaseRegistry): # Create Feature Views batch_source = FileSource( file_format=ParquetFormat(), @@ -464,6 +465,8 @@ def test_apply_feature_view_success(test_registry): ) feature_view = test_registry.get_feature_view("my_feature_view_1", project) + any_feature_view = test_registry.get_any_feature_view("my_feature_view_1", project) + assert ( feature_view.name == "my_feature_view_1" and feature_view.features[0].name == "fs1_my_feature_1" @@ -475,6 +478,7 @@ def test_apply_feature_view_success(test_registry): and feature_view.features[3].name == "fs1_my_feature_4" and feature_view.features[3].dtype == Array(Bytes) and feature_view.entities[0] == "fs1_my_entity_1" + and feature_view == any_feature_view ) assert feature_view.ttl == timedelta(minutes=5) @@ -502,7 +506,7 @@ def test_apply_feature_view_success(test_registry): "test_registry", sql_fixtures, ) -def test_apply_on_demand_feature_view_success(test_registry): +def test_apply_on_demand_feature_view_success(test_registry: BaseRegistry): # Create Feature Views driver_stats = FileSource( name="driver_stats_source", @@ -545,6 +549,7 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: test_registry.get_user_metadata(project, location_features_from_push) # Register Feature View + test_registry.apply_feature_view(driver_daily_features_view, project) test_registry.apply_feature_view(location_features_from_push, project) assert not test_registry.get_user_metadata(project, location_features_from_push) @@ -563,13 +568,21 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: and feature_views[0].features[0].dtype == String ) + all_feature_views = test_registry.list_all_feature_views(project) + + assert len(all_feature_views) == 2 + feature_view = test_registry.get_on_demand_feature_view( "location_features_from_push", project ) + any_feature_view = test_registry.get_any_feature_view( + "location_features_from_push", project + ) assert ( feature_view.name == "location_features_from_push" and feature_view.features[0].name == "first_char" and feature_view.features[0].dtype == String + and feature_view == any_feature_view ) test_registry.delete_feature_view("location_features_from_push", project) @@ -1110,7 +1123,7 @@ def test_registry_cache_thread_async(test_registry): "test_registry", all_fixtures, ) -def test_apply_stream_feature_view_success(test_registry): +def test_apply_stream_feature_view_success(test_registry: BaseRegistry): # Create Feature Views def simple_udf(x: int): return x + 3 @@ -1163,8 +1176,11 @@ def simple_udf(x: int): project, tags=sfv.tags ) + all_feature_views = test_registry.list_all_feature_views(project, tags=sfv.tags) + # List Feature Views assert len(stream_feature_views) == 1 + assert len(all_feature_views) == 1 assert stream_feature_views[0] == sfv test_registry.delete_feature_view("test kafka stream feature view", project)