diff --git a/docs/reference/feature-servers/registry-server.md b/docs/reference/feature-servers/registry-server.md index 53ac7405ff7..13dab49068c 100644 --- a/docs/reference/feature-servers/registry-server.md +++ b/docs/reference/feature-servers/registry-server.md @@ -946,4 +946,190 @@ Please refer the [page](./../registry/registry-permissions.md) for more details ## How to configure Authentication and Authorization ? -Please refer the [page](./../../../docs/getting-started/concepts/permission.md) for more details on how to configure authentication and authorization. \ No newline at end of file +Please refer the [page](./../../../docs/getting-started/concepts/permission.md) for more details on how to configure authentication and authorization. + +### Metrics + +#### Get Resource Counts +- **Endpoint**: `GET /api/v1/metrics/resource_counts` +- **Description**: Retrieve counts of registry objects (entities, data sources, feature views, etc.) for a project or across all projects. +- **Parameters**: + - `project` (optional): Project name to filter resource counts (if not provided, returns counts for all projects) +- **Examples**: + ```bash + # Get counts for specific project + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/resource_counts?project=my_project" + + # Get counts for all projects + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/resource_counts" + ``` +- **Response Example** (single project): + ```json + { + "project": "my_project", + "counts": { + "entities": 5, + "dataSources": 3, + "savedDatasets": 2, + "features": 12, + "featureViews": 4, + "featureServices": 2 + } + } + ``` +- **Response Example** (all projects): + ```json + { + "total": { + "entities": 15, + "dataSources": 8, + "savedDatasets": 5, + "features": 35, + "featureViews": 12, + "featureServices": 6 + }, + "perProject": { + "project_a": { + "entities": 5, + "dataSources": 3, + "savedDatasets": 2, + "features": 12, + "featureViews": 4, + "featureServices": 2 + }, + "project_b": { + "entities": 10, + "dataSources": 5, + "savedDatasets": 3, + "features": 23, + "featureViews": 8, + "featureServices": 4 + } + } + } + ``` + +#### Get Recently Visited Objects +- **Endpoint**: `GET /api/v1/metrics/recently_visited` +- **Description**: Retrieve the most recently visited registry objects for the authenticated user in a project. +- **Parameters**: + - `project` (optional): Project name to filter recent visits (defaults to current project) + - `object` (optional): Object type to filter recent visits (e.g., entities, features, feature_services) + - `page` (optional): Page number for pagination (starts from 1) + - `limit` (optional): Number of items per page (maximum 100) + - `sort_by` (optional): Field to sort by (e.g., timestamp, path, object) + - `sort_order` (optional): Sort order: "asc" or "desc" (default: "asc") +- **Examples**: + ```bash + # Get all recent visits for a project + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/recently_visited?project=my_project" + + # Get recent visits with pagination + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/recently_visited?project=my_project&page=1&limit=10" + + # Get recent visits filtered by object type + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/recently_visited?project=my_project&object=entities" + + # Get recent visits sorted by timestamp descending + curl -H "Authorization: Bearer " \ + "http://localhost:6572/api/v1/metrics/recently_visited?project=my_project&sort_by=timestamp&sort_order=desc" + ``` +- **Response Example** (without pagination): + ```json + { + "visits": [ + { + "path": "/api/v1/entities/driver", + "timestamp": "2024-07-18T12:34:56.789Z", + "project": "my_project", + "user": "alice", + "object": "entities", + "object_name": "driver", + "method": "GET" + }, + { + "path": "/api/v1/feature_services/user_service", + "timestamp": "2024-07-18T12:30:45.123Z", + "project": "my_project", + "user": "alice", + "object": "feature_services", + "object_name": "user_service", + "method": "GET" + } + ], + "pagination": { + "totalCount": 2 + } + } + ``` +- **Response Example** (with pagination): + ```json + { + "visits": [ + { + "path": "/api/v1/entities/driver", + "timestamp": "2024-07-18T12:34:56.789Z", + "project": "my_project", + "user": "alice", + "object": "entities", + "object_name": "driver", + "method": "GET" + } + ], + "pagination": { + "page": 1, + "limit": 10, + "totalCount": 25, + "totalPages": 3, + "hasNext": true, + "hasPrevious": false + } + } + ``` + +**Note**: Recent visits are automatically logged when users access registry objects via the REST API. The logging behavior can be configured through the `feature_server.recent_visit_logging` section in `feature_store.yaml` (see configuration section below). + +--- + +## Registry Server Configuration: Recent Visit Logging + +The registry server supports configuration of recent visit logging via the `feature_server` section in `feature_store.yaml`. + +**Example:** +```yaml +feature_server: + type: local + recent_visit_logging: + limit: 100 # Number of recent visits to store per user + log_patterns: + - ".*/entities/(?!all$)[^/]+$" + - ".*/data_sources/(?!all$)[^/]+$" + - ".*/feature_views/(?!all$)[^/]+$" + - ".*/features/(?!all$)[^/]+$" + - ".*/feature_services/(?!all$)[^/]+$" + - ".*/saved_datasets/(?!all$)[^/]+$" + - ".*/custom_api/.*" +``` + +**Configuration Options:** +- **recent_visit_logging.limit**: Maximum number of recent visits to store per user (default: 100). +- **recent_visit_logging.log_patterns**: List of regex patterns for API paths to log as recent visits. + +**Default Log Patterns:** +- `.*/entities/(?!all$)[^/]+$` - Individual entity endpoints +- `.*/data_sources/(?!all$)[^/]+$` - Individual data source endpoints +- `.*/feature_views/(?!all$)[^/]+$` - Individual feature view endpoints +- `.*/features/(?!all$)[^/]+$` - Individual feature endpoints +- `.*/feature_services/(?!all$)[^/]+$` - Individual feature service endpoints +- `.*/saved_datasets/(?!all$)[^/]+$` - Individual saved dataset endpoints + +**Behavior:** +- Only requests matching one of the `log_patterns` will be tracked +- Only the most recent `limit` visits per user are stored +- Metrics endpoints (`/metrics/*`) are automatically excluded from logging to prevent circular references +- Visit data is stored per user and per project in the registry metadata diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 3d6c4c8ebb3..e2ee5623aef 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -6,12 +6,13 @@ from feast.api.registry.rest.feature_views import get_feature_view_router from feast.api.registry.rest.features import get_feature_router from feast.api.registry.rest.lineage import get_lineage_router +from feast.api.registry.rest.metrics import get_metrics_router from feast.api.registry.rest.permissions import get_permission_router from feast.api.registry.rest.projects import get_project_router from feast.api.registry.rest.saved_datasets import get_saved_dataset_router -def register_all_routes(app: FastAPI, grpc_handler): +def register_all_routes(app: FastAPI, grpc_handler, server=None): app.include_router(get_entity_router(grpc_handler)) app.include_router(get_data_source_router(grpc_handler)) app.include_router(get_feature_service_router(grpc_handler)) @@ -21,3 +22,4 @@ def register_all_routes(app: FastAPI, grpc_handler): app.include_router(get_permission_router(grpc_handler)) app.include_router(get_project_router(grpc_handler)) app.include_router(get_saved_dataset_router(grpc_handler)) + app.include_router(get_metrics_router(grpc_handler, server)) diff --git a/sdk/python/feast/api/registry/rest/metrics.py b/sdk/python/feast/api/registry/rest/metrics.py new file mode 100644 index 00000000000..f58d403b6b8 --- /dev/null +++ b/sdk/python/feast/api/registry/rest/metrics.py @@ -0,0 +1,164 @@ +import json +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Query, Request + +from feast.api.registry.rest.rest_utils import ( + get_pagination_params, + get_sorting_params, + grpc_call, + paginate_and_sort, +) +from feast.protos.feast.registry import RegistryServer_pb2 + + +def get_metrics_router(grpc_handler, server=None) -> APIRouter: + logger = logging.getLogger(__name__) + router = APIRouter() + + @router.get("/metrics/resource_counts", tags=["Metrics"]) + async def resource_counts( + project: Optional[str] = Query( + None, description="Project name to filter resource counts" + ), + ): + def count_resources_for_project(project_name: str): + entities = grpc_call( + grpc_handler.ListEntities, + RegistryServer_pb2.ListEntitiesRequest(project=project_name), + ) + data_sources = grpc_call( + grpc_handler.ListDataSources, + RegistryServer_pb2.ListDataSourcesRequest(project=project_name), + ) + try: + saved_datasets = grpc_call( + grpc_handler.ListSavedDatasets, + RegistryServer_pb2.ListSavedDatasetsRequest(project=project_name), + ) + except Exception: + saved_datasets = {"savedDatasets": []} + try: + features = grpc_call( + grpc_handler.ListFeatures, + RegistryServer_pb2.ListFeaturesRequest(project=project_name), + ) + except Exception: + features = {"features": []} + try: + feature_views = grpc_call( + grpc_handler.ListFeatureViews, + RegistryServer_pb2.ListFeatureViewsRequest(project=project_name), + ) + except Exception: + feature_views = {"featureViews": []} + try: + feature_services = grpc_call( + grpc_handler.ListFeatureServices, + RegistryServer_pb2.ListFeatureServicesRequest(project=project_name), + ) + except Exception: + feature_services = {"featureServices": []} + return { + "entities": len(entities.get("entities", [])), + "dataSources": len(data_sources.get("dataSources", [])), + "savedDatasets": len(saved_datasets.get("savedDatasets", [])), + "features": len(features.get("features", [])), + "featureViews": len(feature_views.get("featureViews", [])), + "featureServices": len(feature_services.get("featureServices", [])), + } + + if project: + counts = count_resources_for_project(project) + return {"project": project, "counts": counts} + else: + # List all projects via gRPC + projects_resp = grpc_call( + grpc_handler.ListProjects, RegistryServer_pb2.ListProjectsRequest() + ) + all_projects = [ + p["spec"]["name"] for p in projects_resp.get("projects", []) + ] + all_counts = {} + total_counts = { + "entities": 0, + "dataSources": 0, + "savedDatasets": 0, + "features": 0, + "featureViews": 0, + "featureServices": 0, + } + for project_name in all_projects: + counts = count_resources_for_project(project_name) + all_counts[project_name] = counts + for k in total_counts: + total_counts[k] += counts[k] + return {"total": total_counts, "perProject": all_counts} + + @router.get("/metrics/recently_visited", tags=["Metrics"]) + async def recently_visited( + request: Request, + project: Optional[str] = Query( + None, description="Project name to filter recent visits" + ), + object_type: Optional[str] = Query( + None, + alias="object", + description="Object type to filter recent visits (e.g., entities, features)", + ), + pagination_params: dict = Depends(get_pagination_params), + sorting_params: dict = Depends(get_sorting_params), + ): + user = None + if hasattr(request.state, "user"): + user = getattr(request.state, "user", None) + if not user: + user = "anonymous" + project_val = project or (server.store.project if server else None) + key = f"recently_visited_{user}" + logger.info( + f"[/metrics/recently_visited] Project: {project_val}, Key: {key}, Object: {object_type}" + ) + try: + visits_json = ( + server.registry.get_project_metadata(project_val, key) + if server + else None + ) + visits = json.loads(visits_json) if visits_json else [] + except Exception: + visits = [] + if object_type: + visits = [v for v in visits if v.get("object") == object_type] + + server_limit = getattr(server, "recent_visits_limit", 100) if server else 100 + visits = visits[-server_limit:] + + page = pagination_params.get("page", 0) + limit = pagination_params.get("limit", 0) + sort_by = sorting_params.get("sort_by") + sort_order = sorting_params.get("sort_order", "asc") + + if page == 0 and limit == 0: + if sort_by: + visits = sorted( + visits, + key=lambda x: x.get(sort_by, ""), + reverse=(sort_order == "desc"), + ) + return {"visits": visits, "pagination": {"totalCount": len(visits)}} + else: + if page == 0: + page = 1 + if limit == 0: + limit = 50 + paged_visits, pagination = paginate_and_sort( + visits, page, limit, sort_by, sort_order + ) + return { + "visits": paged_visits, + "pagination": pagination, + } + + return router diff --git a/sdk/python/feast/api/registry/rest/rest_registry_server.py b/sdk/python/feast/api/registry/rest/rest_registry_server.py index dc71523b1c8..3e355efe11c 100644 --- a/sdk/python/feast/api/registry/rest/rest_registry_server.py +++ b/sdk/python/feast/api/registry/rest/rest_registry_server.py @@ -1,4 +1,6 @@ +import json import logging +import re from fastapi import Depends, FastAPI, status @@ -13,6 +15,7 @@ str_to_auth_manager_type, ) from feast.registry_server import RegistryServer +from feast.utils import _utc_now logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -23,6 +26,26 @@ def __init__(self, store: FeatureStore): self.store = store self.registry = store.registry self.grpc_handler = RegistryServer(self.registry) + recent_visit_logging_cfg = {} + feature_server_cfg = getattr( + getattr(store, "config", None), "feature_server", None + ) + if isinstance(feature_server_cfg, dict): + recent_visit_logging_cfg = feature_server_cfg.get( + "recent_visit_logging", {} + ) + self.recent_visits_limit = recent_visit_logging_cfg.get("limit", 100) + self.log_patterns = recent_visit_logging_cfg.get( + "log_patterns", + [ + r".*/entities/(?!all$)[^/]+$", + r".*/data_sources/(?!all$)[^/]+$", + r".*/feature_views/(?!all$)[^/]+$", + r".*/features/(?!all$)[^/]+$", + r".*/feature_services/(?!all$)[^/]+$", + r".*/saved_datasets/(?!all$)[^/]+$", + ], + ) self.app = FastAPI( title="Feast REST Registry Server", description="Feast REST Registry Server", @@ -39,10 +62,91 @@ def __init__(self, store: FeatureStore): "X-Frame-Options": "DENY", }, ) + self._add_logging_middleware() self._add_openapi_security() self._init_auth() self._register_routes() + def _add_logging_middleware(self): + from fastapi import Request + from starlette.middleware.base import BaseHTTPMiddleware + + class LoggingMiddleware(BaseHTTPMiddleware): + def __init__( + self, app, registry, project, recent_visits_limit, log_patterns + ): + super().__init__(app) + self.registry = registry + self.project = project + self.recent_visits_limit = recent_visits_limit + self.log_patterns = [re.compile(p) for p in log_patterns] + + async def dispatch(self, request: Request, call_next): + LOG_PATTERNS = self.log_patterns + if request.method == "GET": + user = None + if hasattr(request.state, "user"): + user = getattr(request.state, "user", None) + if not user: + user = "anonymous" + project = request.query_params.get("project") or self.project + key = f"recently_visited_{user}" + logger.info(f"[LoggingMiddleware] Project: {project}, Key: {key}") + path = str(request.url.path) + method = request.method + + if path.startswith("/api/v1/metrics/"): + response = await call_next(request) + return response + + object_type = None + object_name = None + if not any(p.match(path) for p in LOG_PATTERNS): + response = await call_next(request) + return response + m = re.match(r"/api/v1/([^/]+)(?:/([^/]+))?", path) + if m: + object_type = m.group(1) + object_name = m.group(2) + else: + object_type = None + object_name = None + visit = { + "path": path, + "timestamp": _utc_now().isoformat(), + "project": project, + "user": user, + "object": object_type, + "object_name": object_name, + "method": method, + } + try: + visits_json = self.registry.get_project_metadata(project, key) + visits = json.loads(visits_json) if visits_json else [] + except Exception: + visits = [] + visits.append(visit) + visits = visits[-self.recent_visits_limit :] + try: + self.registry.set_project_metadata( + project, key, json.dumps(visits) + ) + except Exception as e: + logger.warning(f"Failed to persist recent visits: {e}") + response = await call_next(request) + return response + + self.app.add_middleware( + LoggingMiddleware, + registry=self.registry, + project=self.store.project, + recent_visits_limit=self.recent_visits_limit, + log_patterns=self.log_patterns, + ) + + def _register_routes(self): + register_all_routes(self.app, self.grpc_handler, self) + def _add_openapi_security(self): if self.app.openapi_schema: return @@ -75,9 +179,6 @@ def _init_auth(self): ) self.auth_manager = get_auth_manager() - def _register_routes(self): - register_all_routes(self.app, self.grpc_handler) - def start_server( self, port: int, diff --git a/sdk/python/feast/api/registry/rest/rest_utils.py b/sdk/python/feast/api/registry/rest/rest_utils.py index 359a69b4139..413fcf4cbf5 100644 --- a/sdk/python/feast/api/registry/rest/rest_utils.py +++ b/sdk/python/feast/api/registry/rest/rest_utils.py @@ -220,12 +220,18 @@ def paginate_and_sort( start = (page - 1) * limit end = start + limit paged_items = items[start:end] - pagination = { - "page": page, - "limit": limit, - "total_count": total, - "total_pages": (total + limit - 1) // limit, - "has_next": end < total, - "has_previous": start > 0, - } + pagination = {} + if page: + pagination["page"] = page + if limit: + pagination["limit"] = limit + if total: + pagination["totalCount"] = total + total_pages = (total + limit - 1) // limit + if total_pages: + pagination["totalPages"] = total_pages + if end < total: + pagination["hasNext"] = True + if start > 0: + pagination["hasPrevious"] = True return paged_items, pagination diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index dabe58feb46..3ab2f52e349 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -602,6 +602,20 @@ def list_project_metadata( """ raise NotImplementedError + @abstractmethod + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """ + Retrieves a custom project metadata value by key. + + Args: + project: Feast project name + key: Metadata key + + Returns: + The metadata value as a string, or None if not found. + """ + raise NotImplementedError + @abstractmethod def update_infra(self, infra: Infra, project: str, commit: bool = True): """ diff --git a/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py b/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py index f9317bf7a45..8e21e67e977 100644 --- a/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py +++ b/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py @@ -1,10 +1,12 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +import json import os import uuid from pathlib import Path from tempfile import TemporaryFile +from typing import Optional from urllib.parse import urlparse from feast.infra.registry.registry import RegistryConfig @@ -96,3 +98,39 @@ def _write_registry(self, registry_proto: RegistryProto): file_obj.seek(0) self.blob.upload_blob(file_obj, overwrite=True) # type: ignore return + + def set_project_metadata(self, project: str, key: str, value: str): + registry_proto = self.get_registry_proto() + found = False + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + meta = {} + meta[key] = value + pm.project_uuid = json.dumps(meta) + found = True + break + if not found: + from feast.project_metadata import ProjectMetadata + + pm = ProjectMetadata(project_name=project) + pm.project_uuid = json.dumps({key: value}) + registry_proto.project_metadata.append(pm.to_proto()) + self.update_registry_proto(registry_proto) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + registry_proto = self.get_registry_proto() + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + return None + return meta.get(key, None) + return None diff --git a/sdk/python/feast/infra/registry/file.py b/sdk/python/feast/infra/registry/file.py index ae783bf82c4..0e5065223b5 100644 --- a/sdk/python/feast/infra/registry/file.py +++ b/sdk/python/feast/infra/registry/file.py @@ -1,5 +1,7 @@ +import json import uuid from pathlib import Path +from typing import Optional from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -42,3 +44,51 @@ def _write_registry(self, registry_proto: RegistryProto): file_dir.mkdir(exist_ok=True) with open(self._filepath, mode="wb", buffering=0) as f: f.write(registry_proto.SerializeToString()) + + def set_project_metadata(self, project: str, key: str, value: str): + """Set a custom project metadata key-value pair in the registry proto (file backend).""" + registry_proto = self.get_registry_proto() + found = False + for pm in registry_proto.project_metadata: + if pm.project == project: + # Use a special key for custom metadata + if hasattr(pm, "custom_metadata"): + # If custom_metadata is a map + pm.custom_metadata[key] = value + else: + # Fallback: store as JSON in a special key + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + meta = {} + meta[key] = value + pm.project_uuid = json.dumps(meta) + found = True + break + if not found: + from feast.project_metadata import ProjectMetadata + + pm = ProjectMetadata(project_name=project) + # Fallback: store as JSON in project_uuid + pm.project_uuid = json.dumps({key: value}) + registry_proto.project_metadata.append(pm.to_proto()) + self.update_registry_proto(registry_proto) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """Get a custom project metadata value by key from the registry proto (file backend).""" + registry_proto = self.get_registry_proto() + for pm in registry_proto.project_metadata: + if pm.project == project: + if hasattr(pm, "custom_metadata"): + return pm.custom_metadata.get(key, None) + else: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + return None + return meta.get(key, None) + return None diff --git a/sdk/python/feast/infra/registry/gcs.py b/sdk/python/feast/infra/registry/gcs.py index 72498ad054d..bfb07c31481 100644 --- a/sdk/python/feast/infra/registry/gcs.py +++ b/sdk/python/feast/infra/registry/gcs.py @@ -1,6 +1,8 @@ +import json import uuid from pathlib import Path from tempfile import TemporaryFile +from typing import Optional from urllib.parse import urlparse from feast.infra.registry.registry_store import RegistryStore @@ -70,3 +72,39 @@ def _write_registry(self, registry_proto: RegistryProto): file_obj.write(registry_proto.SerializeToString()) file_obj.seek(0) blob.upload_from_file(file_obj) + + def set_project_metadata(self, project: str, key: str, value: str): + registry_proto = self.get_registry_proto() + found = False + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + meta = {} + meta[key] = value + pm.project_uuid = json.dumps(meta) + found = True + break + if not found: + from feast.project_metadata import ProjectMetadata + + pm = ProjectMetadata(project_name=project) + pm.project_uuid = json.dumps({key: value}) + registry_proto.project_metadata.append(pm.to_proto()) + self.update_registry_proto(registry_proto) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + registry_proto = self.get_registry_proto() + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + return None + return meta.get(key, None) + return None diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 7af49e08e1c..38f79f84e7a 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -164,6 +164,24 @@ def get_user_metadata( ) -> Optional[bytes]: pass + def set_project_metadata(self, project: str, key: str, value: str): + """Set a custom project metadata key-value pair in the registry backend.""" + if hasattr(self._registry_store, "set_project_metadata"): + self._registry_store.set_project_metadata(project, key, value) + else: + raise NotImplementedError( + "set_project_metadata not implemented for this registry backend" + ) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """Get a custom project metadata value by key from the registry backend.""" + if hasattr(self._registry_store, "get_project_metadata"): + return self._registry_store.get_project_metadata(project, key) + else: + raise NotImplementedError( + "get_project_metadata not implemented for this registry backend" + ) + # The cached_registry_proto object is used for both reads and writes. In particular, # all write operations refresh the cache and modify it in memory; the write must # then be persisted to the underlying RegistryStore with a call to commit(). diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 7a223cd2dab..8fc0db55c27 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -1,3 +1,4 @@ +import json import os import weakref from datetime import datetime @@ -579,6 +580,20 @@ def list_projects( response = self.stub.ListProjects(request) return [Project.from_proto(project) for project in response.projects] + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + request = RegistryServer_pb2.ListProjectMetadataRequest(project=project) + response = self.stub.ListProjectMetadata(request) + for pm in response.project_metadata: + if hasattr(pm, "custom_metadata") and key in pm.custom_metadata: + return pm.custom_metadata[key] + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if isinstance(meta, dict) and key in meta: + return meta[key] + return None + def proto(self) -> RegistryProto: return self.stub.Proto(Empty()) diff --git a/sdk/python/feast/infra/registry/s3.py b/sdk/python/feast/infra/registry/s3.py index 8aac4d52ee3..246636600a4 100644 --- a/sdk/python/feast/infra/registry/s3.py +++ b/sdk/python/feast/infra/registry/s3.py @@ -1,7 +1,9 @@ +import json import os import uuid from pathlib import Path from tempfile import TemporaryFile +from typing import Optional from urllib.parse import urlparse from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist @@ -78,3 +80,39 @@ def _write_registry(self, registry_proto: RegistryProto): self.s3_client.Bucket(self._bucket).put_object( Body=file_obj, Key=self._key, **self._boto_extra_args ) + + def set_project_metadata(self, project: str, key: str, value: str): + registry_proto = self.get_registry_proto() + found = False + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + meta = {} + meta[key] = value + pm.project_uuid = json.dumps(meta) + found = True + break + if not found: + from feast.project_metadata import ProjectMetadata + + pm = ProjectMetadata(project_name=project) + pm.project_uuid = json.dumps({key: value}) + registry_proto.project_metadata.append(pm.to_proto()) + self.update_registry_proto(registry_proto) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + registry_proto = self.get_registry_proto() + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + if not isinstance(meta, dict): + return None + return meta.get(key, None) + return None diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 423fb64365a..12299572f04 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -1373,3 +1373,54 @@ def list_projects( self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_projects(self.cached_registry_proto, tags) return self._list_projects(tags) + + def set_project_metadata(self, project: str, key: str, value: str): + """Set a custom project metadata key-value pair in the FEAST_METADATA table (Snowflake backend).""" + with GetSnowflakeConnection(self.registry_config) as conn: + query = f""" + SELECT + project_id + FROM + {self.registry_path}.\"FEAST_METADATA\" + WHERE + project_id = '{project}' + AND metadata_key = '{key}' + LIMIT 1 + """ + df = execute_snowflake_statement(conn, query).fetch_pandas_all() + if not df.empty: + query = f""" + UPDATE {self.registry_path}.\"FEAST_METADATA\" + SET + metadata_value = '{value}', + last_updated_timestamp = CURRENT_TIMESTAMP() + WHERE + project_id = '{project}' + AND metadata_key = '{key}' + """ + execute_snowflake_statement(conn, query) + else: + query = f""" + INSERT INTO {self.registry_path}.\"FEAST_METADATA\" + VALUES + ('{project}', '{key}', '{value}', CURRENT_TIMESTAMP()) + """ + execute_snowflake_statement(conn, query) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """Get a custom project metadata value by key from the FEAST_METADATA table (Snowflake backend).""" + with GetSnowflakeConnection(self.registry_config) as conn: + query = f""" + SELECT + metadata_value + FROM + {self.registry_path}.\"FEAST_METADATA\" + WHERE + project_id = '{project}' + AND metadata_key = '{key}' + LIMIT 1 + """ + df = execute_snowflake_statement(conn, query).fetch_pandas_all() + if not df.empty: + return df.iloc[0]["METADATA_VALUE"] + return None diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 2e052e7ffb3..49b085ac150 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1269,3 +1269,46 @@ def delete_project( return raise ProjectNotFoundException(name) + + def set_project_metadata(self, project: str, key: str, value: str): + """Set a custom project metadata key-value pair in the feast_metadata table.""" + from feast.utils import _utc_now + + update_time = int(_utc_now().timestamp()) + with self.write_engine.begin() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.project_id == project, + feast_metadata.c.metadata_key == key, + ) + row = conn.execute(stmt).first() + values = { + "metadata_key": key, + "metadata_value": value, + "last_updated_timestamp": update_time, + "project_id": project, + } + if row: + update_stmt = ( + update(feast_metadata) + .where( + feast_metadata.c.project_id == project, + feast_metadata.c.metadata_key == key, + ) + .values(values) + ) + conn.execute(update_stmt) + else: + insert_stmt = insert(feast_metadata).values(values) + conn.execute(insert_stmt) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """Get a custom project metadata value by key from the feast_metadata table.""" + with self.read_engine.begin() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.project_id == project, + feast_metadata.c.metadata_key == key, + ) + row = conn.execute(stmt).first() + if row: + return row._mapping["metadata_value"] + return None diff --git a/sdk/python/tests/unit/api/test_api_rest_registry.py b/sdk/python/tests/unit/api/test_api_rest_registry.py index 98cd004daf5..690fe9bf82a 100644 --- a/sdk/python/tests/unit/api/test_api_rest_registry.py +++ b/sdk/python/tests/unit/api/test_api_rest_registry.py @@ -1185,3 +1185,151 @@ def test_invalid_project_name_with_relationships_via_rest(fastapi_test_app): assert "relationships" in data assert isinstance(data["relationships"], dict) assert len(data["relationships"]) == 0 + + +def test_metrics_resource_counts_via_rest(fastapi_test_app): + """Test the /metrics/resource_counts endpoint.""" + # Test with specific project + response = fastapi_test_app.get("/metrics/resource_counts?project=demo_project") + assert response.status_code == 200 + data = response.json() + assert "project" in data + assert data["project"] == "demo_project" + assert "counts" in data + + counts = data["counts"] + assert "entities" in counts + assert "dataSources" in counts + assert "savedDatasets" in counts + assert "features" in counts + assert "featureViews" in counts + assert "featureServices" in counts + + # Verify counts are integers + for key, value in counts.items(): + assert isinstance(value, int) + assert value >= 0 + + # Test without project parameter (should return all projects) + response = fastapi_test_app.get("/metrics/resource_counts") + assert response.status_code == 200 + data = response.json() + assert "total" in data + assert "perProject" in data + + total = data["total"] + assert "entities" in total + assert "dataSources" in total + assert "savedDatasets" in total + assert "features" in total + assert "featureViews" in total + assert "featureServices" in total + + per_project = data["perProject"] + assert "demo_project" in per_project + assert isinstance(per_project["demo_project"], dict) + + +def test_metrics_recently_visited_via_rest(fastapi_test_app): + """Test the /metrics/recently_visited endpoint.""" + # First, make some requests to generate visit data + fastapi_test_app.get("/entities?project=demo_project") + fastapi_test_app.get("/entities/user_id?project=demo_project") + fastapi_test_app.get("/feature_services?project=demo_project") + + # Test basic recently visited endpoint + response = fastapi_test_app.get("/metrics/recently_visited?project=demo_project") + assert response.status_code == 200 + data = response.json() + assert "visits" in data + assert "pagination" in data + + visits = data["visits"] + assert isinstance(visits, list) + + # Check visit structure + if visits: + visit = visits[0] + assert "path" in visit + assert "timestamp" in visit + assert "project" in visit + assert "user" in visit + assert "object" in visit + assert "object_name" in visit + assert "method" in visit + + # Verify timestamp format + import datetime + + datetime.datetime.fromisoformat(visit["timestamp"].replace("Z", "+00:00")) + + pagination = data["pagination"] + assert "totalCount" in pagination + assert isinstance(pagination["totalCount"], int) + + +def test_metrics_recently_visited_with_object_filter(fastapi_test_app): + """Test filtering by object type for recently visited endpoint.""" + # Generate visit data for different object types + fastapi_test_app.get("/entities?project=demo_project") + fastapi_test_app.get("/feature_services?project=demo_project") + fastapi_test_app.get("/data_sources?project=demo_project") + + # Test filtering by entities only + response = fastapi_test_app.get( + "/metrics/recently_visited?project=demo_project&object=entities" + ) + assert response.status_code == 200 + data = response.json() + assert "visits" in data + + visits = data["visits"] + for visit in visits: + assert visit["object"] == "entities" + + # Test filtering by feature_services + response = fastapi_test_app.get( + "/metrics/recently_visited?project=demo_project&object=feature_services" + ) + assert response.status_code == 200 + data = response.json() + visits = data["visits"] + for visit in visits: + assert visit["object"] == "feature_services" + + +def test_metrics_recently_visited_error_handling(fastapi_test_app): + """Test error handling for recently visited endpoint.""" + # Test with non-existent project + response = fastapi_test_app.get( + "/metrics/recently_visited?project=nonexistent_project" + ) + assert response.status_code == 200 + data = response.json() + assert "visits" in data + assert len(data["visits"]) == 0 + + # Test with invalid object type + response = fastapi_test_app.get( + "/metrics/recently_visited?project=demo_project&object=invalid_type" + ) + assert response.status_code == 200 + data = response.json() + assert "visits" in data + assert len(data["visits"]) == 0 + + +def test_metrics_recently_visited_user_isolation(fastapi_test_app): + """Test that visits are isolated per user.""" + # Make requests as "anonymous" user (default) + fastapi_test_app.get("/entities?project=demo_project") + + # Check that visits are recorded for anonymous user + response = fastapi_test_app.get("/metrics/recently_visited?project=demo_project") + assert response.status_code == 200 + data = response.json() + visits = data["visits"] + + # All visits should be for anonymous user + for visit in visits: + assert visit["user"] == "anonymous" diff --git a/sdk/python/tests/unit/api/test_api_rest_registry_server.py b/sdk/python/tests/unit/api/test_api_rest_registry_server.py index 8124a46bf75..2abfa3ac462 100644 --- a/sdk/python/tests/unit/api/test_api_rest_registry_server.py +++ b/sdk/python/tests/unit/api/test_api_rest_registry_server.py @@ -40,7 +40,9 @@ def test_rest_registry_server_initializes_correctly( assert server.grpc_handler == mock_grpc_handler # Validate route registration and auth init - mock_register_all_routes.assert_called_once_with(server.app, mock_grpc_handler) + mock_register_all_routes.assert_called_once_with( + server.app, mock_grpc_handler, server + ) mock_init_security_manager.assert_called_once() mock_init_auth_manager.assert_called_once() mock_get_auth_manager.assert_called_once() @@ -80,3 +82,5 @@ def test_routes_registered_in_app(mock_store_and_registry): assert "/features" in route_paths assert "/features/all" in route_paths assert "/features/{feature_view}/{name}" in route_paths + assert "/metrics/resource_counts" in route_paths + assert "/metrics/recently_visited" in route_paths diff --git a/sdk/python/tests/unit/infra/registry/test_registry.py b/sdk/python/tests/unit/infra/registry/test_registry.py index 65dea2ff680..f103925f1c1 100644 --- a/sdk/python/tests/unit/infra/registry/test_registry.py +++ b/sdk/python/tests/unit/infra/registry/test_registry.py @@ -9,6 +9,16 @@ class TestCachingRegistry(CachingRegistry): """Test subclass that implements abstract methods as no-ops""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._test_metadata = {} + + def get_project_metadata(self, project: str, key: str) -> str: + return self._test_metadata.get((project, key)) + + def set_project_metadata(self, project: str, key: str, value: str): + self._test_metadata[(project, key)] = value + def _get_any_feature_view(self, *args, **kwargs): pass