diff --git a/docs/reference/feature-servers/registry-server.md b/docs/reference/feature-servers/registry-server.md index ddda5393863..8051a379ebd 100644 --- a/docs/reference/feature-servers/registry-server.md +++ b/docs/reference/feature-servers/registry-server.md @@ -32,6 +32,15 @@ Most endpoints support these common query parameters: - `allow_cache` (optional, default: `true`): Whether to allow cached data - `tags` (optional): Filter results by tags in key=value format +#### Relationship Parameters +- `include_relationships` (optional, default: `false`): Include all relationships (both direct and indirect) for the object(s) in the response + +#### Pagination Parameters (List Endpoints Only) +- `page` (optional): Page number (starts from 1) +- `limit` (optional, max: 100): Number of items per page +- `sort_by` (optional): Field to sort by +- `sort_order` (optional): Sort order: "asc" or "desc" (default: "asc") + ### Entities #### List Entities @@ -39,10 +48,25 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all entities in a project - **Parameters**: - `project` (required): Project name -- **Example**: + - `include_relationships` (optional): Include relationships for each entity + - `allow_cache` (optional): Whether to allow cached data + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/entities?project=my_project" + + # With pagination + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/entities?project=my_project&page=1&limit=10&sort_by=name" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/entities?project=my_project&include_relationships=true" ``` #### Get Entity @@ -51,11 +75,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Entity name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this entity - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/entities/user_id?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/entities/user_id?project=my_project&include_relationships=true" ``` ### Data Sources @@ -65,12 +95,22 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all data sources in a project - **Parameters**: - `project` (required): Project name + - `include_relationships` (optional): Include relationships for each data source - `allow_cache` (optional): Whether to allow cached data - `tags` (optional): Filter by tags -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/data_sources?project=my_project" + + # With pagination and relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/data_sources?project=my_project&include_relationships=true&page=1&limit=10" ``` #### Get Data Source @@ -79,11 +119,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Data source name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this data source - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/data_sources/user_data?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/data_sources/user_data?project=my_project&include_relationships=true" ``` ### Feature Views @@ -93,12 +139,22 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all feature views in a project - **Parameters**: - `project` (required): Project name + - `include_relationships` (optional): Include relationships for each feature view - `allow_cache` (optional): Whether to allow cached data - `tags` (optional): Filter by tags -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/feature_views?project=my_project" + + # With pagination and relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/feature_views?project=my_project&include_relationships=true&page=1&limit=5&sort_by=name" ``` #### Get Feature View @@ -107,11 +163,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Feature view name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this feature view - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/feature_views/user_features?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/feature_views/user_features?project=my_project&include_relationships=true" ``` ### Feature Services @@ -121,12 +183,22 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all feature services in a project - **Parameters**: - `project` (required): Project name + - `include_relationships` (optional): Include relationships for each feature service - `allow_cache` (optional): Whether to allow cached data - `tags` (optional): Filter by tags -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/feature_services?project=my_project" + + # With pagination and relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/feature_services?project=my_project&include_relationships=true&page=1&limit=10" ``` #### Get Feature Service @@ -135,11 +207,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Feature service name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this feature service - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/feature_services/recommendation_service?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/feature_services/recommendation_service?project=my_project&include_relationships=true" ``` ### Lineage and Relationships @@ -192,11 +270,21 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all permissions in a project - **Parameters**: - `project` (required): Project name + - `include_relationships` (optional): Include relationships for each permission - `allow_cache` (optional): Whether to allow cached data -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/permissions?project=my_project" + + # With pagination and relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/permissions?project=my_project&include_relationships=true&page=1&limit=10" ``` #### Get Permission @@ -205,11 +293,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Permission name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this permission - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/permissions/read_features?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/permissions/read_features?project=my_project&include_relationships=true" ``` ### Projects @@ -219,10 +313,19 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all projects - **Parameters**: - `allow_cache` (optional): Whether to allow cached data -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/projects" + + # With pagination + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/projects?page=1&limit=10&sort_by=name" ``` #### Get Project @@ -244,12 +347,22 @@ Most endpoints support these common query parameters: - **Description**: Retrieve all saved datasets in a project - **Parameters**: - `project` (required): Project name + - `include_relationships` (optional): Include relationships for each saved dataset - `allow_cache` (optional): Whether to allow cached data - `tags` (optional): Filter by tags -- **Example**: + - `page` (optional): Page number for pagination + - `limit` (optional): Number of items per page + - `sort_by` (optional): Field to sort by + - `sort_order` (optional): Sort order ("asc" or "desc") +- **Examples**: ```bash + # Basic list curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/saved_datasets?project=my_project" + + # With pagination and relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/saved_datasets?project=my_project&include_relationships=true&page=1&limit=10" ``` #### Get Saved Dataset @@ -258,11 +371,17 @@ Most endpoints support these common query parameters: - **Parameters**: - `name` (path): Saved dataset name - `project` (required): Project name + - `include_relationships` (optional): Include relationships for this saved dataset - `allow_cache` (optional): Whether to allow cached data -- **Example**: +- **Examples**: ```bash + # Basic get curl -H "Authorization: Bearer " \ "http://localhost:6572/api/v1/saved_datasets/training_data?project=my_project" + + # With relationships + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/saved_datasets/training_data?project=my_project&include_relationships=true" ``` ### Response Formats @@ -275,6 +394,140 @@ All endpoints return JSON responses with the following general structure: - **Not Found (404)**: Requested resource does not exist - **Internal Server Error (500)**: Server-side error +#### Enhanced Response Formats + +The REST API now supports enhanced response formats for relationships and pagination: + +**Single Object Endpoints (GET `/resource/{name}`):** + +Without relationships: +```json +{ + "entity": { ... } +} +``` + +With relationships: +```json +{ + "entity": { ... }, + "relationships": [ + { + "source": { + "type": "entity", + "name": "customer_id" + }, + "target": { + "type": "featureView", + "name": "customer_features" + } + } + ] +} +``` + +**List Endpoints (GET `/resources`):** + +Without relationships or pagination: +```json +{ + "entities": [ + { "name": "customer_id", ... }, + { "name": "driver_id", ... } + ] +} +``` + +With pagination only: +```json +{ + "entities": [ + { "name": "customer_id", ... }, + { "name": "driver_id", ... } + ], + "pagination": { + "page": 1, + "limit": 10, + "total": 25, + "totalPages": 3 + } +} +``` + +With relationships and pagination: +```json +{ + "entities": [ + { "name": "customer_id", ... }, + { "name": "driver_id", ... } + ], + "pagination": { + "page": 1, + "limit": 10, + "total": 25, + "totalPages": 3 + }, + "relationships": { + "customer_id": [ + { + "source": { + "type": "entity", + "name": "customer_id" + }, + "target": { + "type": "featureView", + "name": "customer_features" + } + } + ], + "driver_id": [ + { + "source": { + "type": "entity", + "name": "driver_id" + }, + "target": { + "type": "featureView", + "name": "driver_features" + } + } + ] + } +} +``` + +### Relationships and Pagination + +The REST API has been enhanced with comprehensive support for relationships and pagination across all endpoints. + +#### Relationships + +Relationships show how different Feast objects connect to each other, providing insight into dependencies and data lineage. When the `include_relationships` parameter is set to `true`, the API returns both direct and indirect relationships. + +**Supported Object Types:** +- `entity` - Feast entities +- `dataSource` - Data sources +- `featureView` - Feature views (including regular, on-demand, and stream) +- `featureService` - Feature services +- `permission` - Permissions +- `savedDataset` - Saved datasets + +**Common Relationship Patterns:** +- Feature Views → Data Sources (feature views depend on data sources) +- Feature Views → Entities (feature views use entities as join keys) +- Feature Services → Feature Views (feature services consume feature views) +- Entities → Data Sources (entities connect to data sources through feature views) +- Entities → Feature Services (entities connect to feature services through feature views) + +#### Pagination + +All list endpoints support pagination to improve performance and manageability of large datasets: + +- **`page`** - Page number (starts from 1) +- **`limit`** - Number of items per page (maximum 100) +- **`sort_by`** - Field to sort by +- **`sort_order`** - Sort order: "asc" (ascending) or "desc" (descending) + ### Interactive API Documentation When the REST API server is running, you can access interactive documentation at: diff --git a/sdk/python/feast/api/registry/rest/data_sources.py b/sdk/python/feast/api/registry/rest/data_sources.py index 248872fabec..04988889c03 100644 --- a/sdk/python/feast/api/registry/rest/data_sources.py +++ b/sdk/python/feast/api/registry/rest/data_sources.py @@ -6,7 +6,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, parse_tags, @@ -22,6 +24,9 @@ def get_data_source_router(grpc_handler) -> APIRouter: @router.get("/data_sources") def list_data_sources( project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for each data source" + ), allow_cache: bool = Query(default=True), tags: Dict[str, str] = Depends(parse_tags), pagination_params: dict = Depends(get_pagination_params), @@ -34,17 +39,29 @@ def list_data_sources( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) - response = grpc_call(grpc_handler.ListDataSources, req) - return { - "data_sources": response.get("dataSources", []), + data_sources = response.get("dataSources", []) + + result = { + "data_sources": data_sources, "pagination": response.get("pagination", {}), } + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, data_sources, "dataSource", project, allow_cache + ) + result["relationships"] = relationships + + return result + @router.get("/data_sources/{name}") def get_data_source( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this data source" + ), allow_cache: bool = Query(default=True), ): req = RegistryServer_pb2.GetDataSourceRequest( @@ -52,6 +69,16 @@ def get_data_source( project=project, allow_cache=allow_cache, ) - return grpc_call(grpc_handler.GetDataSource, req) + data_source = grpc_call(grpc_handler.GetDataSource, req) + + result = data_source + + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "dataSource", name, project, allow_cache + ) + result["relationships"] = relationships + + return result return router diff --git a/sdk/python/feast/api/registry/rest/entities.py b/sdk/python/feast/api/registry/rest/entities.py index ed9d196d889..216fb3f7535 100644 --- a/sdk/python/feast/api/registry/rest/entities.py +++ b/sdk/python/feast/api/registry/rest/entities.py @@ -5,7 +5,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, ) @@ -21,6 +23,9 @@ def get_entity_router(grpc_handler) -> APIRouter: def list_entities( project: str = Query(...), allow_cache: bool = Query(default=True), + include_relationships: bool = Query( + False, description="Include relationships for each entity" + ), pagination_params: dict = Depends(get_pagination_params), sorting_params: dict = Depends(get_sorting_params), ): @@ -30,13 +35,29 @@ def list_entities( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) + response = grpc_call(grpc_handler.ListEntities, req) + entities = response.get("entities", []) - return grpc_call(grpc_handler.ListEntities, req) + result = { + "entities": entities, + "pagination": response.get("pagination", {}), + } + + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, entities, "entity", project, allow_cache + ) + result["relationships"] = relationships + + return result @router.get("/entities/{name}") def get_entity( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this entity" + ), allow_cache: bool = Query(default=True), ): req = RegistryServer_pb2.GetEntityRequest( @@ -44,6 +65,16 @@ def get_entity( project=project, allow_cache=allow_cache, ) - return grpc_call(grpc_handler.GetEntity, req) + entity = grpc_call(grpc_handler.GetEntity, req) + + result = entity + + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "entity", name, project, allow_cache + ) + result["relationships"] = relationships + + return result return router diff --git a/sdk/python/feast/api/registry/rest/feature_services.py b/sdk/python/feast/api/registry/rest/feature_services.py index ac9b45a071f..641834fbeee 100644 --- a/sdk/python/feast/api/registry/rest/feature_services.py +++ b/sdk/python/feast/api/registry/rest/feature_services.py @@ -5,7 +5,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, parse_tags, @@ -19,6 +21,9 @@ def get_feature_service_router(grpc_handler) -> APIRouter: @router.get("/feature_services") def list_feature_services( project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for each feature service" + ), allow_cache: bool = Query(default=True), tags: Dict[str, str] = Depends(parse_tags), pagination_params: dict = Depends(get_pagination_params), @@ -31,12 +36,29 @@ def list_feature_services( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) - return grpc_call(grpc_handler.ListFeatureServices, req) + response = grpc_call(grpc_handler.ListFeatureServices, req) + feature_services = response.get("featureServices", []) + + result = { + "featureServices": feature_services, + "pagination": response.get("pagination", {}), + } + + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, feature_services, "featureService", project, allow_cache + ) + result["relationships"] = relationships + + return result @router.get("/feature_services/{name}") def get_feature_service( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this feature service" + ), allow_cache: bool = Query(default=True), ): req = RegistryServer_pb2.GetFeatureServiceRequest( @@ -44,6 +66,16 @@ def get_feature_service( project=project, allow_cache=allow_cache, ) - return grpc_call(grpc_handler.GetFeatureService, req) + feature_service = grpc_call(grpc_handler.GetFeatureService, req) + + result = feature_service + + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "featureService", name, project, allow_cache + ) + result["relationships"] = relationships + + return result return router diff --git a/sdk/python/feast/api/registry/rest/feature_views.py b/sdk/python/feast/api/registry/rest/feature_views.py index ce5f8ac0f1e..4b8970a355b 100644 --- a/sdk/python/feast/api/registry/rest/feature_views.py +++ b/sdk/python/feast/api/registry/rest/feature_views.py @@ -5,7 +5,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, parse_tags, @@ -13,6 +15,22 @@ from feast.registry_server import RegistryServer_pb2 +def _extract_feature_view_from_any(any_feature_view: dict) -> dict: + """Extract the specific feature view type and data from an AnyFeatureView object. + + Args: + any_feature_view: Dictionary containing the AnyFeatureView data + + Returns: + Dictionary with 'type' and feature view data, or empty dict if no valid type found + """ + for key, value in any_feature_view.items(): + if value: + return {"type": key, **value} + + return {} + + def get_feature_view_router(grpc_handler) -> APIRouter: router = APIRouter() @@ -20,6 +38,9 @@ def get_feature_view_router(grpc_handler) -> APIRouter: def get_any_feature_view( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this feature view" + ), allow_cache: bool = Query(True), ): req = RegistryServer_pb2.GetAnyFeatureViewRequest( @@ -29,18 +50,24 @@ def get_any_feature_view( ) response = grpc_call(grpc_handler.GetAnyFeatureView, req) any_feature_view = response.get("anyFeatureView", {}) - feature_view = ( - any_feature_view.get("featureView") - or any_feature_view.get("onDemandFeatureView") - or any_feature_view.get("streamFeatureView") - or {} - ) - return {"featureView": feature_view} + + result = _extract_feature_view_from_any(any_feature_view) + + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "featureView", name, project, allow_cache + ) + result["relationships"] = relationships + + return result @router.get("/feature_views") def list_all_feature_views( project: str = Query(...), allow_cache: bool = Query(default=True), + include_relationships: bool = Query( + False, description="Include relationships for each feature view" + ), tags: Dict[str, str] = Depends(parse_tags), pagination_params: dict = Depends(get_pagination_params), sorting_params: dict = Depends(get_sorting_params), @@ -52,6 +79,27 @@ def list_all_feature_views( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) - return grpc_call(grpc_handler.ListAllFeatureViews, req) + response = grpc_call(grpc_handler.ListAllFeatureViews, req) + any_feature_views = response.get("featureViews", []) + + # Extract the specific type of feature view from each AnyFeatureView + feature_views = [] + for any_feature_view in any_feature_views: + feature_view = _extract_feature_view_from_any(any_feature_view) + if feature_view: + feature_views.append(feature_view) + + result = { + "featureViews": feature_views, + "pagination": response.get("pagination", {}), + } + + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, feature_views, "featureView", project, allow_cache + ) + result["relationships"] = relationships + + return result return router diff --git a/sdk/python/feast/api/registry/rest/permissions.py b/sdk/python/feast/api/registry/rest/permissions.py index b10f4289e68..b7735700833 100644 --- a/sdk/python/feast/api/registry/rest/permissions.py +++ b/sdk/python/feast/api/registry/rest/permissions.py @@ -3,7 +3,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, ) @@ -17,6 +19,9 @@ def get_permission_router(grpc_handler) -> APIRouter: def get_permission( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this permission" + ), allow_cache: bool = Query(True), ): req = RegistryServer_pb2.GetPermissionRequest( @@ -24,12 +29,27 @@ def get_permission( project=project, allow_cache=allow_cache, ) - return {"permission": grpc_call(grpc_handler.GetPermission, req)} + permission = grpc_call(grpc_handler.GetPermission, req) + + result = permission + + # Note: permissions may not have relationships in the traditional sense + # but we include the functionality for consistency + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "permission", name, project, allow_cache + ) + result["relationships"] = relationships + + return result @router.get("/permissions") def list_permissions( project: str = Query(...), allow_cache: bool = Query(default=True), + include_relationships: bool = Query( + False, description="Include relationships for each permission" + ), pagination_params: dict = Depends(get_pagination_params), sorting_params: dict = Depends(get_sorting_params), ): @@ -39,6 +59,20 @@ def list_permissions( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) - return grpc_call(grpc_handler.ListPermissions, req) + response = grpc_call(grpc_handler.ListPermissions, req) + permissions = response.get("permissions", []) + + result = { + "permissions": permissions, + "pagination": response.get("pagination", {}), + } + + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, permissions, "permission", project, allow_cache + ) + result["relationships"] = relationships + + return result return router diff --git a/sdk/python/feast/api/registry/rest/projects.py b/sdk/python/feast/api/registry/rest/projects.py index 41763b13e3e..41e008b8fa9 100644 --- a/sdk/python/feast/api/registry/rest/projects.py +++ b/sdk/python/feast/api/registry/rest/projects.py @@ -22,7 +22,9 @@ def get_project( name=name, allow_cache=allow_cache, ) - return grpc_call(grpc_handler.GetProject, req) + project = grpc_call(grpc_handler.GetProject, req) + + return project @router.get("/projects") def list_projects( @@ -35,7 +37,12 @@ def list_projects( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) + response = grpc_call(grpc_handler.ListProjects, req) + projects = response.get("projects", []) - return grpc_call(grpc_handler.ListProjects, req) + return { + "projects": projects, + "pagination": response.get("pagination", {}), + } return router diff --git a/sdk/python/feast/api/registry/rest/rest_utils.py b/sdk/python/feast/api/registry/rest/rest_utils.py index bddc31c9a98..8e75a70470e 100644 --- a/sdk/python/feast/api/registry/rest/rest_utils.py +++ b/sdk/python/feast/api/registry/rest/rest_utils.py @@ -20,6 +20,86 @@ def grpc_call(handler_fn, request): raise HTTPException(status_code=500, detail="Internal server error") +def get_object_relationships( + grpc_handler, + object_type: str, + object_name: str, + project: str, + allow_cache: bool = True, +) -> List: + """ + Get relationships for a specific object. + + Args: + grpc_handler: The gRPC handler to use for calls + object_type: Type of object (dataSource, entity, featureView, featureService) + object_name: Name of the object + project: Project name + allow_cache: Whether to allow cached data + + Returns: + List containing relationships for the object (both direct and indirect) + """ + try: + req = RegistryServer_pb2.GetObjectRelationshipsRequest( + project=project, + object_type=object_type, + object_name=object_name, + include_indirect=True, + allow_cache=allow_cache, + ) + response = grpc_call(grpc_handler.GetObjectRelationships, req) + return response.get("relationships", []) + except Exception: + # If relationships can't be retrieved, return empty list rather than failing + return [] + + +def get_relationships_for_objects( + grpc_handler, + objects: List[Dict], + object_type: str, + project: str, + allow_cache: bool = True, +) -> Dict[str, List]: + """ + Get relationships for multiple objects efficiently. + + Args: + grpc_handler: The gRPC handler to use for calls + objects: List of objects to get relationships for + object_type: Type of objects (dataSource, entity, featureView, featureService) + project: Project name + allow_cache: Whether to allow cached data + + Returns: + Dictionary mapping object names to their relationships (both direct and indirect) + """ + relationships_map = {} + + for obj in objects: + # Extract object name from different possible structures + obj_name = None + if isinstance(obj, dict): + # Consistent name extraction logic for all object types + obj_name = ( + obj.get("name") + or obj.get("spec", {}).get("name") + or obj.get("meta", {}).get("name") + ) + + if obj_name: + relationships_map[obj_name] = get_object_relationships( + grpc_handler, + object_type, + obj_name, + project, + allow_cache, + ) + + return relationships_map + + def parse_tags(tags: List[str] = Query(default=[])) -> Dict[str, str]: """ Parses query strings like ?tags=key1:value1&tags=key2:value2 into a dict. @@ -45,7 +125,7 @@ def get_pagination_params( def get_sorting_params( sort_by: Optional[str] = Query(None), - sort_order: Optional[str] = Query(None, regex="^(asc|desc)$"), + sort_order: Optional[str] = Query(None, pattern="^(asc|desc)$"), ) -> dict: return { "sort_by": sort_by or "", diff --git a/sdk/python/feast/api/registry/rest/saved_datasets.py b/sdk/python/feast/api/registry/rest/saved_datasets.py index 3d0932b50cf..79223fea8ac 100644 --- a/sdk/python/feast/api/registry/rest/saved_datasets.py +++ b/sdk/python/feast/api/registry/rest/saved_datasets.py @@ -5,7 +5,9 @@ from feast.api.registry.rest.rest_utils import ( create_grpc_pagination_params, create_grpc_sorting_params, + get_object_relationships, get_pagination_params, + get_relationships_for_objects, get_sorting_params, grpc_call, parse_tags, @@ -20,6 +22,9 @@ def get_saved_dataset_router(grpc_handler) -> APIRouter: def get_saved_dataset( name: str, project: str = Query(...), + include_relationships: bool = Query( + False, description="Include relationships for this saved dataset" + ), allow_cache: bool = Query(True), ): req = RegistryServer_pb2.GetSavedDatasetRequest( @@ -27,13 +32,28 @@ def get_saved_dataset( project=project, allow_cache=allow_cache, ) - return grpc_call(grpc_handler.GetSavedDataset, req) + saved_dataset = grpc_call(grpc_handler.GetSavedDataset, req) + + result = saved_dataset + + # Note: saved datasets may not have relationships in the traditional sense + # but we include the functionality for consistency + if include_relationships: + relationships = get_object_relationships( + grpc_handler, "savedDataset", name, project, allow_cache + ) + result["relationships"] = relationships + + return result @router.get("/saved_datasets") def list_saved_datasets( project: str = Query(...), allow_cache: bool = Query(default=True), tags: Dict[str, str] = Depends(parse_tags), + include_relationships: bool = Query( + False, description="Include relationships for each saved dataset" + ), pagination_params: dict = Depends(get_pagination_params), sorting_params: dict = Depends(get_sorting_params), ): @@ -44,11 +64,20 @@ def list_saved_datasets( pagination=create_grpc_pagination_params(pagination_params), sorting=create_grpc_sorting_params(sorting_params), ) - response = grpc_call(grpc_handler.ListSavedDatasets, req) - return { - "saved_datasets": response.get("savedDatasets", []), + saved_datasets = response.get("savedDatasets", []) + + result = { + "saved_datasets": saved_datasets, "pagination": response.get("pagination", {}), } + if include_relationships: + relationships = get_relationships_for_objects( + grpc_handler, saved_datasets, "savedDataset", project, allow_cache + ) + result["relationships"] = relationships + + return result + return router diff --git a/sdk/python/feast/lineage/registry_lineage.py b/sdk/python/feast/lineage/registry_lineage.py index 02e4083bf7c..e7e07f7953c 100644 --- a/sdk/python/feast/lineage/registry_lineage.py +++ b/sdk/python/feast/lineage/registry_lineage.py @@ -307,6 +307,39 @@ def _parse_indirect_relationships( ) ) + # Create Entity -> DataSource relationships (through feature views) + # Build a map of feature view -> data sources + feature_view_to_data_sources: Dict[str, List[str]] = {} + for rel in direct_relationships: + if ( + rel.source.type == FeastObjectType.DATA_SOURCE + and rel.target.type == FeastObjectType.FEATURE_VIEW + ): + if rel.target.name not in feature_view_to_data_sources: + feature_view_to_data_sources[rel.target.name] = [] + feature_view_to_data_sources[rel.target.name].append(rel.source.name) + + # For each Entity -> FeatureView relationship, create Entity -> DataSource relationships + for rel in direct_relationships: + if ( + rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.FEATURE_VIEW + ): + # Find data sources that this feature view uses + if rel.target.name in feature_view_to_data_sources: + for data_source_name in feature_view_to_data_sources[ + rel.target.name + ]: + indirect_relationships.append( + EntityRelation( + source=rel.source, # The entity + target=EntityReference( + FeastObjectType.DATA_SOURCE, + data_source_name, + ), + ) + ) + return indirect_relationships def get_object_relationships( diff --git a/sdk/python/tests/unit/api/test_api_rest_registry.py b/sdk/python/tests/unit/api/test_api_rest_registry.py index 5cb8a4bf187..fcc0b74772d 100644 --- a/sdk/python/tests/unit/api/test_api_rest_registry.py +++ b/sdk/python/tests/unit/api/test_api_rest_registry.py @@ -1,12 +1,12 @@ import os import tempfile +import pandas as pd import pytest from fastapi.testclient import TestClient -from feast import Entity, FeatureService, FeatureView, Field, FileSource +from feast import Entity, FeatureService, FeatureStore, FeatureView, Field, FileSource from feast.api.registry.rest.rest_registry_server import RestRegistryServer -from feast.feature_store import FeatureStore from feast.infra.offline_stores.file_source import SavedDatasetFileStorage from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset @@ -22,7 +22,6 @@ def fastapi_test_app(): # Create dummy parquet file (Feast requires valid sources) parquet_file_path = os.path.join(tmp_dir.name, "data.parquet") - import pandas as pd df = pd.DataFrame( { @@ -107,7 +106,30 @@ def test_feature_views_via_rest(fastapi_test_app): assert "featureViews" in response.json() response = fastapi_test_app.get("/feature_views/user_profile?project=demo_project") assert response.status_code == 200 - assert response.json()["featureView"]["spec"]["name"] == "user_profile" + assert response.json()["spec"]["name"] == "user_profile" + + +def test_feature_views_type_field_via_rest(fastapi_test_app): + """Test that the type field is correctly populated for feature views.""" + # Test list endpoint + response = fastapi_test_app.get("/feature_views?project=demo_project") + assert response.status_code == 200 + data = response.json() + assert "featureViews" in data + + # Verify all feature views have a type field + for fv in data["featureViews"]: + assert "type" in fv + assert fv["type"] is not None + assert fv["type"] == "featureView" + + # Test single endpoint + response = fastapi_test_app.get("/feature_views/user_profile?project=demo_project") + assert response.status_code == 200 + data = response.json() + assert "type" in data + assert data["type"] == "featureView" + assert data["spec"]["name"] == "user_profile" def test_feature_services_via_rest(fastapi_test_app): @@ -336,7 +358,6 @@ def fastapi_test_app_with_multiple_objects(): registry_path = os.path.join(tmp_dir.name, "registry.db") parquet_file_path = os.path.join(tmp_dir.name, "data.parquet") - import pandas as pd df = pd.DataFrame( { @@ -577,7 +598,7 @@ def test_feature_views_sorting_via_rest(fastapi_test_app_with_multiple_objects): ) assert response.status_code == 200 data = response.json() - fv_names = [fv["featureView"]["spec"]["name"] for fv in data["featureViews"]] + fv_names = [fv["spec"]["name"] for fv in data["featureViews"]] assert fv_names == sorted(fv_names) # Test sorting by name descending @@ -586,7 +607,7 @@ def test_feature_views_sorting_via_rest(fastapi_test_app_with_multiple_objects): ) assert response.status_code == 200 data = response.json() - fv_names = [fv["featureView"]["spec"]["name"] for fv in data["featureViews"]] + fv_names = [fv["spec"]["name"] for fv in data["featureViews"]] assert fv_names == sorted(fv_names, reverse=True) @@ -818,3 +839,24 @@ def test_lineage_sorting_via_rest(fastapi_test_app_with_multiple_objects): assert response.status_code == 200 data = response.json() assert "relationships" in data + + +def test_feature_view_type_identification(): + """Test that we can properly identify feature view types from their structure.""" + from feast.api.registry.rest.feature_views import _extract_feature_view_from_any + + any_feature_view_1 = {"featureView": {"spec": {"name": "test_fv"}, "meta": {}}} + any_feature_view_2 = { + "onDemandFeatureView": {"spec": {"name": "test_odfv"}, "meta": {}} + } + any_feature_view_3 = { + "streamFeatureView": {"spec": {"name": "test_sfv"}, "meta": {}} + } + + result_1 = _extract_feature_view_from_any(any_feature_view_1) + result_2 = _extract_feature_view_from_any(any_feature_view_2) + result_3 = _extract_feature_view_from_any(any_feature_view_3) + + assert result_1["type"] == "featureView" + assert result_2["type"] == "onDemandFeatureView" + assert result_3["type"] == "streamFeatureView" diff --git a/sdk/python/tests/unit/infra/test_registry_lineage.py b/sdk/python/tests/unit/infra/test_registry_lineage.py index 0175aed565f..287c8c1892a 100644 --- a/sdk/python/tests/unit/infra/test_registry_lineage.py +++ b/sdk/python/tests/unit/infra/test_registry_lineage.py @@ -63,23 +63,78 @@ def test_lineage_generator_basic(self): def test_object_relationships_filtering(self): """Test filtering relationships for a specific object.""" - - # Create simple registry for testing registry = Registry() - # Create a basic feature view - fv_spec = FeatureViewSpec(name="user_features") - feature_view = FeatureView(spec=fv_spec) - registry.feature_views.append(feature_view) + # Create entities + entity1_spec = EntitySpecV2(name="user_id") + entity1 = Entity(spec=entity1_spec) + + entity2_spec = EntitySpecV2(name="product_id") + entity2 = Entity(spec=entity2_spec) + + registry.entities.extend([entity1, entity2]) + + # Create data source + ds = DataSource() + ds.name = "user_data_source" + registry.data_sources.append(ds) + + # Create feature views + fv1_spec = FeatureViewSpec(name="user_features") + fv1_spec.entities.append("user_id") + fv1_spec.batch_source.CopyFrom(ds) + fv1 = FeatureView(spec=fv1_spec) + + fv2_spec = FeatureViewSpec(name="product_features") + fv2_spec.entities.append("product_id") + fv2 = FeatureView(spec=fv2_spec) + + registry.feature_views.extend([fv1, fv2]) # Test object relationship filtering lineage_generator = RegistryLineageGenerator() - relationships = lineage_generator.get_object_relationships( + + # Test basic filtering (original test coverage) + basic_relationships = lineage_generator.get_object_relationships( registry, "featureView", "user_features", include_indirect=False ) + assert isinstance(basic_relationships, list) - # Should return a list (may be empty for simple test) - assert isinstance(relationships, list) + # Test filtering for specific entity with detailed validation + user_entity_relationships = lineage_generator.get_object_relationships( + registry, "entity", "user_id", include_indirect=True + ) + + # Should include both direct (Entity -> FeatureView) and indirect (Entity -> DataSource) relationships + assert len(user_entity_relationships) >= 2 + + # Check that all relationships involve user_id + for rel in user_entity_relationships: + assert ( + rel.source.type == FeastObjectType.ENTITY + and rel.source.name == "user_id" + ) or ( + rel.target.type == FeastObjectType.ENTITY + and rel.target.name == "user_id" + ) + + # Test filtering for different entity + product_entity_relationships = lineage_generator.get_object_relationships( + registry, "entity", "product_id", include_indirect=True + ) + + # Should have fewer relationships (no data source connection) + assert len(product_entity_relationships) >= 1 + + # Check that all relationships involve product_id + for rel in product_entity_relationships: + assert ( + rel.source.type == FeastObjectType.ENTITY + and rel.source.name == "product_id" + ) or ( + rel.target.type == FeastObjectType.ENTITY + and rel.target.name == "product_id" + ) def test_to_proto_fallback(self): """Test that to_proto methods work with fallback to dict.""" @@ -244,3 +299,293 @@ def test_missing_object_attributes(self): # Should return empty or minimal relationships without crashing assert isinstance(relationships, list) assert isinstance(indirect_relationships, list) + + def test_entity_to_feature_view_relationships(self): + """Test direct Entity -> FeatureView relationships.""" + registry = Registry() + + # Create entities + entity1_spec = EntitySpecV2(name="user_id") + entity1 = Entity(spec=entity1_spec) + + entity2_spec = EntitySpecV2(name="product_id") + entity2 = Entity(spec=entity2_spec) + + registry.entities.extend([entity1, entity2]) + + # Create feature views with entities + fv1_spec = FeatureViewSpec(name="user_features") + fv1_spec.entities.append("user_id") + fv1 = FeatureView(spec=fv1_spec) + + fv2_spec = FeatureViewSpec(name="product_features") + fv2_spec.entities.append("product_id") + fv2 = FeatureView(spec=fv2_spec) + + # Feature view with multiple entities + fv3_spec = FeatureViewSpec(name="user_product_features") + fv3_spec.entities.append("user_id") + fv3_spec.entities.append("product_id") + fv3 = FeatureView(spec=fv3_spec) + + registry.feature_views.extend([fv1, fv2, fv3]) + + # Generate lineage + lineage_generator = RegistryLineageGenerator() + direct_relationships, indirect_relationships = ( + lineage_generator.generate_lineage(registry) + ) + + # Filter Entity -> FeatureView relationships + entity_to_fv_relationships = [ + rel + for rel in direct_relationships + if rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.FEATURE_VIEW + ] + + # Should have 4 relationships: + # user_id -> user_features + # product_id -> product_features + # user_id -> user_product_features + # product_id -> user_product_features + assert len(entity_to_fv_relationships) == 4 + + # Check specific relationships + relationship_pairs = { + (rel.source.name, rel.target.name) for rel in entity_to_fv_relationships + } + + expected_pairs = { + ("user_id", "user_features"), + ("product_id", "product_features"), + ("user_id", "user_product_features"), + ("product_id", "user_product_features"), + } + + assert relationship_pairs == expected_pairs + + # Test relationship types + for rel in entity_to_fv_relationships: + assert rel.source.type == FeastObjectType.ENTITY + assert rel.target.type == FeastObjectType.FEATURE_VIEW + + def test_entity_to_data_source_relationships(self): + """Test indirect Entity -> DataSource relationships through feature views.""" + registry = Registry() + + # Create entities + entity1_spec = EntitySpecV2(name="user_id") + entity1 = Entity(spec=entity1_spec) + + entity2_spec = EntitySpecV2(name="product_id") + entity2 = Entity(spec=entity2_spec) + + registry.entities.extend([entity1, entity2]) + + # Create data sources + ds1 = DataSource() + ds1.name = "user_data_source" + + ds2 = DataSource() + ds2.name = "product_data_source" + + ds3 = DataSource() + ds3.name = "combined_data_source" + + registry.data_sources.extend([ds1, ds2, ds3]) + + # Create feature views with entities and batch sources + fv1_spec = FeatureViewSpec(name="user_features") + fv1_spec.entities.append("user_id") + fv1_spec.batch_source.CopyFrom(ds1) # Link to user_data_source + fv1 = FeatureView(spec=fv1_spec) + + fv2_spec = FeatureViewSpec(name="product_features") + fv2_spec.entities.append("product_id") + fv2_spec.batch_source.CopyFrom(ds2) # Link to product_data_source + fv2 = FeatureView(spec=fv2_spec) + + # Feature view with multiple entities and data source + fv3_spec = FeatureViewSpec(name="user_product_features") + fv3_spec.entities.append("user_id") + fv3_spec.entities.append("product_id") + fv3_spec.batch_source.CopyFrom(ds3) # Link to combined_data_source + fv3 = FeatureView(spec=fv3_spec) + + registry.feature_views.extend([fv1, fv2, fv3]) + + # Generate lineage + lineage_generator = RegistryLineageGenerator() + direct_relationships, indirect_relationships = ( + lineage_generator.generate_lineage(registry) + ) + + # Filter Entity -> DataSource relationships (should be in indirect relationships) + entity_to_ds_relationships = [ + rel + for rel in indirect_relationships + if rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.DATA_SOURCE + ] + + # Should have 4 relationships: + # user_id -> user_data_source (through user_features) + # product_id -> product_data_source (through product_features) + # user_id -> combined_data_source (through user_product_features) + # product_id -> combined_data_source (through user_product_features) + assert len(entity_to_ds_relationships) == 4 + + # Check specific relationships + relationship_pairs = { + (rel.source.name, rel.target.name) for rel in entity_to_ds_relationships + } + + expected_pairs = { + ("user_id", "user_data_source"), + ("product_id", "product_data_source"), + ("user_id", "combined_data_source"), + ("product_id", "combined_data_source"), + } + + assert relationship_pairs == expected_pairs + + # Test relationship types + for rel in entity_to_ds_relationships: + assert rel.source.type == FeastObjectType.ENTITY + assert rel.target.type == FeastObjectType.DATA_SOURCE + + def test_entity_relationships_with_unnamed_data_sources(self): + """Test Entity -> DataSource relationships with unnamed data sources.""" + registry = Registry() + + entity_spec = EntitySpecV2(name="user_id") + entity = Entity(spec=entity_spec) + registry.entities.append(entity) + + ds_with_table = DataSource() + ds_with_table.bigquery_options.table = "users_table" + ds_with_table.type = DataSource.SourceType.BATCH_BIGQUERY + + ds_with_path = DataSource() + ds_with_path.file_options.uri = "/path/to/users.parquet" + ds_with_path.type = DataSource.SourceType.BATCH_FILE + + registry.data_sources.extend([ds_with_table, ds_with_path]) + + fv1_spec = FeatureViewSpec(name="user_features_from_table") + fv1_spec.entities.append("user_id") + fv1_spec.batch_source.CopyFrom(ds_with_table) + fv1 = FeatureView(spec=fv1_spec) + + fv2_spec = FeatureViewSpec(name="user_features_from_path") + fv2_spec.entities.append("user_id") + fv2_spec.batch_source.CopyFrom(ds_with_path) + fv2 = FeatureView(spec=fv2_spec) + + registry.feature_views.extend([fv1, fv2]) + + lineage_generator = RegistryLineageGenerator() + direct_relationships, indirect_relationships = ( + lineage_generator.generate_lineage(registry) + ) + + # Filter Entity -> DataSource relationships + entity_to_ds_relationships = [ + rel + for rel in indirect_relationships + if rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.DATA_SOURCE + ] + + # Should have 2 relationships with generated names + assert len(entity_to_ds_relationships) == 2 + + ds_names = {rel.target.name for rel in entity_to_ds_relationships} + for name in ds_names: + assert name.startswith("unnamed_source_"), ( + f"Expected unnamed_source_ prefix, got {name}" + ) + + # All relationships should involve the user_id entity + for rel in entity_to_ds_relationships: + assert rel.source.type == FeastObjectType.ENTITY + assert rel.source.name == "user_id" + assert rel.target.type == FeastObjectType.DATA_SOURCE + + def test_complex_entity_data_source_lineage(self): + """Test complex scenarios with multiple entities and data sources.""" + registry = Registry() + + # Create entities + entities = [] + for i in range(3): + entity_spec = EntitySpecV2(name=f"entity_{i}") + entity = Entity(spec=entity_spec) + entities.append(entity) + registry.entities.extend(entities) + + # Create data sources + data_sources = [] + for i in range(3): + ds = DataSource() + ds.name = f"data_source_{i}" + data_sources.append(ds) + registry.data_sources.extend(data_sources) + + # Create feature views with various entity combinations + # Single entity feature views + for i in range(3): + fv_spec = FeatureViewSpec(name=f"feature_view_{i}") + fv_spec.entities.append(f"entity_{i}") + fv_spec.batch_source.CopyFrom(data_sources[i]) + fv = FeatureView(spec=fv_spec) + registry.feature_views.append(fv) + + # Multi-entity feature view + fv_multi_spec = FeatureViewSpec(name="multi_entity_feature_view") + fv_multi_spec.entities.extend(["entity_0", "entity_1", "entity_2"]) + fv_multi_spec.batch_source.CopyFrom(data_sources[0]) # Uses first data source + fv_multi = FeatureView(spec=fv_multi_spec) + registry.feature_views.append(fv_multi) + + # Generate lineage + lineage_generator = RegistryLineageGenerator() + direct_relationships, indirect_relationships = ( + lineage_generator.generate_lineage(registry) + ) + + # Count Entity -> FeatureView relationships + entity_to_fv_relationships = [ + rel + for rel in direct_relationships + if rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.FEATURE_VIEW + ] + # Should have 6 relationships: 3 single + 3 from multi-entity view + assert len(entity_to_fv_relationships) == 6 + + # Count Entity -> DataSource relationships + entity_to_ds_relationships = [ + rel + for rel in indirect_relationships + if rel.source.type == FeastObjectType.ENTITY + and rel.target.type == FeastObjectType.DATA_SOURCE + ] + # Should have 6 relationships: 3 single + 3 from multi-entity view (all to data_source_0) + assert len(entity_to_ds_relationships) == 6 + + # Verify that each entity is connected to the correct data sources + entity_ds_connections = {} + for rel in entity_to_ds_relationships: + entity_name = rel.source.name + ds_name = rel.target.name + if entity_name not in entity_ds_connections: + entity_ds_connections[entity_name] = set() + entity_ds_connections[entity_name].add(ds_name) + + # Each entity should be connected to its own data source + data_source_0 (from multi-entity view) + for i in range(3): + entity_name = f"entity_{i}" + expected_ds = {f"data_source_{i}", "data_source_0"} + assert entity_ds_connections[entity_name] == expected_ds