diff --git a/infra/feast-operator/docs/namespace-registry.md b/infra/feast-operator/docs/namespace-registry.md new file mode 100644 index 00000000000..e025c62406d --- /dev/null +++ b/infra/feast-operator/docs/namespace-registry.md @@ -0,0 +1,58 @@ +# Feast Namespace Registry + +## Overview + +The Feast Namespace Registry is a feature that automatically creates and maintains a centralized ConfigMap containing information about all Feast feature store instances deployed by the operator. This enables dashboard applications and other tools to discover and connect to Feast instances across different namespaces. + +## Implementation Details + +1. **ConfigMap Creation**: The operator creates a ConfigMap in the appropriate namespace: + - **OpenShift AI**: `redhat-ods-applications` namespace (or DSCi configured namespace) + - **Kubernetes**: `feast-operator-system` namespace + +2. **Access Control**: A RoleBinding is created to allow `system:authenticated` users to read the ConfigMap + +3. **Automatic Registration & Cleanup**: When a new feature store instance is created, it automatically registers its namespace and client configuration in the ConfigMap. When deleted, it automatically removes its entry from the ConfigMap + +4. **Data Structure**: The ConfigMap contains a JSON structure with namespace names as keys and lists of client configuration names as values + +### ConfigMap Structure + +The namespace registry ConfigMap (`feast-configs-registry`) contains the following data: + +```json +{ + "namespaces": { + "namespace-1": ["client-config-1", "client-config-2"], + "namespace-2": ["client-config-3"] + } +} +``` + +### Usage + +The namespace registry is automatically deployed when any Feast feature store instance is created. No additional configuration is required. + +#### For External Applications + +External applications can discover Feast instances by: + +1. Reading the ConfigMap from the appropriate namespace: + ```bash + # For OpenShift + kubectl get configmap feast-configs-registry -n redhat-ods-applications -o jsonpath='{.data.namespaces}' + + # For Kubernetes + kubectl get configmap feast-configs-registry -n feast-operator-system -o jsonpath='{.data.namespaces}' + ``` + +### Lifecycle Management + +The namespace registry automatically manages the lifecycle of feature store instances: + +1. **Creation**: When a feature store is deployed, it registers itself in the ConfigMap +2. **Updates**: If a feature store is updated, its entry remains in the ConfigMap +3. **Deletion**: When a feature store is deleted, its entry is automatically removed from the ConfigMap +4. **Namespace Cleanup**: If all feature stores in a namespace are deleted, the namespace entry is also removed + + diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index e9cbb219acb..e513560c464 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -78,6 +78,16 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request if apierrors.IsNotFound(err) { // CR deleted since request queued, child objects getting GC'd, no requeue logger.V(1).Info("FeatureStore CR not found, has been deleted") + // Clean up namespace registry entry even if the CR is not found + if err := r.cleanupNamespaceRegistry(ctx, &feastdevv1alpha1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: req.NamespacedName.Name, + Namespace: req.NamespacedName.Namespace, + }, + }); err != nil { + logger.Error(err, "Failed to clean up namespace registry entry for deleted FeatureStore") + // Don't return error here as the CR is already deleted + } return ctrl.Result{}, nil } // error fetching FeatureStore instance, requeue and try again @@ -86,6 +96,16 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request } currentStatus := cr.Status.DeepCopy() + // Handle deletion - clean up namespace registry entry + if cr.DeletionTimestamp != nil { + logger.Info("FeatureStore is being deleted, cleaning up namespace registry entry") + if err := r.cleanupNamespaceRegistry(ctx, cr); err != nil { + logger.Error(err, "Failed to clean up namespace registry entry") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + result, recErr = r.deployFeast(ctx, cr) if cr.DeletionTimestamp == nil && !reflect.DeepEqual(currentStatus, cr.Status) { if err = r.Client.Status().Update(ctx, cr); err != nil { @@ -102,6 +122,22 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request } } + // Add to namespace registry if deployment was successful and not being deleted + if recErr == nil && cr.DeletionTimestamp == nil { + feast := services.FeastServices{ + Handler: feasthandler.FeastHandler{ + Client: r.Client, + Context: ctx, + FeatureStore: cr, + Scheme: r.Scheme, + }, + } + if err := feast.AddToNamespaceRegistry(); err != nil { + logger.Error(err, "Failed to add FeatureStore to namespace registry") + // Don't return error here as the FeatureStore is already deployed successfully + } + } + return result, recErr } @@ -201,6 +237,20 @@ func (r *FeatureStoreReconciler) SetupWithManager(mgr ctrl.Manager) error { } +// cleanupNamespaceRegistry removes the feature store instance from the namespace registry +func (r *FeatureStoreReconciler) cleanupNamespaceRegistry(ctx context.Context, cr *feastdevv1alpha1.FeatureStore) error { + feast := services.FeastServices{ + Handler: feasthandler.FeastHandler{ + Client: r.Client, + Context: ctx, + FeatureStore: cr, + Scheme: r.Scheme, + }, + } + + return feast.RemoveFromNamespaceRegistry() +} + // if a remotely referenced FeatureStore is changed, reconcile any FeatureStores that reference it. func (r *FeatureStoreReconciler) mapFeastRefsToFeastRequests(ctx context.Context, object client.Object) []reconcile.Request { logger := log.FromContext(ctx) diff --git a/infra/feast-operator/internal/controller/featurestore_controller_namespace_registry_test.go b/infra/feast-operator/internal/controller/featurestore_controller_namespace_registry_test.go new file mode 100644 index 00000000000..67fd4dcb632 --- /dev/null +++ b/infra/feast-operator/internal/controller/featurestore_controller_namespace_registry_test.go @@ -0,0 +1,453 @@ +/* +Copyright 2025 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + "github.com/feast-dev/feast/infra/feast-operator/internal/controller/services" +) + +const DefaultNamespace = "default" +const FeastControllerNamespace = "feast-operator-system" + +var ctx = context.Background() + +var _ = Describe("FeatureStore Controller - Namespace Registry", func() { + + Context("When deploying a FeatureStore with namespace registry", func() { + const resourceName = "namespace-registry-test" + var pullPolicy = corev1.PullAlways + var image = "feastdev/feast:latest" + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: DefaultNamespace, + } + featurestore := &feastdevv1alpha1.FeatureStore{} + + BeforeEach(func() { + By("Ensuring manager namespace exists") + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: FeastControllerNamespace, + }, + } + // Try to create, ignore if already exists + err := k8sClient.Create(ctx, namespace) + if err != nil && !errors.IsAlreadyExists(err) { + Expect(err).NotTo(HaveOccurred()) + } + + By("Creating a FeatureStore resource") + featurestore = createFeatureStoreResource(resourceName, image, pullPolicy, nil, nil) + Expect(k8sClient.Create(ctx, featurestore)).Should(Succeed()) + + // Wait for the resource to be created + Eventually(func() error { + return k8sClient.Get(ctx, typeNamespacedName, featurestore) + }, time.Second*10, time.Millisecond*250).Should(Succeed()) + }) + + AfterEach(func() { + By("Cleaning up the FeatureStore resource") + // Only delete if the resource still exists + err := k8sClient.Get(ctx, typeNamespacedName, featurestore) + if err == nil { + Expect(k8sClient.Delete(ctx, featurestore)).Should(Succeed()) + + // Wait for the resource to be deleted + Eventually(func() bool { + err := k8sClient.Get(ctx, typeNamespacedName, featurestore) + return errors.IsNotFound(err) + }, time.Second*10, time.Millisecond*250).Should(BeTrue()) + } + }) + + It("should create namespace registry ConfigMap", func() { + By("Reconciling the FeatureStore") + reconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Checking that namespace registry ConfigMap is created") + Eventually(func() error { + cm := &corev1.ConfigMap{} + return k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName, + Namespace: services.DefaultKubernetesNamespace, // Assuming Kubernetes environment + }, cm) + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + }) + + It("should create namespace registry Role and RoleBinding", func() { + By("Reconciling the FeatureStore") + reconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Checking that namespace registry Role is created") + Eventually(func() error { + role := &rbacv1.Role{} + return k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName + "-reader", + Namespace: services.DefaultKubernetesNamespace, + }, role) + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + + By("Checking that namespace registry RoleBinding is created") + Eventually(func() error { + roleBinding := &rbacv1.RoleBinding{} + return k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName + "-reader", + Namespace: services.DefaultKubernetesNamespace, + }, roleBinding) + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + }) + + It("should register feature store in namespace registry", func() { + By("Reconciling the FeatureStore") + reconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Checking that feature store is registered in namespace registry") + Eventually(func() error { + cm := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName, + Namespace: services.DefaultKubernetesNamespace, + }, cm) + if err != nil { + return err + } + + // Check if the ConfigMap contains the expected data + if cm.Data == nil || cm.Data[services.NamespaceRegistryDataKey] == "" { + return fmt.Errorf("namespace registry data is empty") + } + + // Parse the JSON data + var registryData services.NamespaceRegistryData + err = json.Unmarshal([]byte(cm.Data[services.NamespaceRegistryDataKey]), ®istryData) + if err != nil { + return err + } + + // Check if the feature store namespace is registered + if registryData.Namespaces == nil { + return fmt.Errorf("namespaces map is nil") + } + + // The feature store should be registered in its namespace + featureStoreNamespace := featurestore.Namespace + if featureStoreNamespace == "" { + featureStoreNamespace = DefaultNamespace + } + + configs, exists := registryData.Namespaces[featureStoreNamespace] + if !exists { + return fmt.Errorf("feature store namespace %s not found in registry", featureStoreNamespace) + } + + // Check if the client config is registered + expectedConfigName := featurestore.Status.ClientConfigMap + if expectedConfigName == "" { + // If no client config name is set, we expect at least one config + if len(configs) == 0 { + return fmt.Errorf("no client configs found for namespace %s", featureStoreNamespace) + } + } else { + // Check if the specific config is registered + found := false + for _, config := range configs { + if config == expectedConfigName { + found = true + break + } + } + if !found { + return fmt.Errorf("expected client config %s not found in registry", expectedConfigName) + } + } + + return nil + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + }) + + It("should clean up namespace registry entry when FeatureStore is deleted", func() { + By("Reconciling the FeatureStore to create registry entry") + reconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying feature store is registered") + Eventually(func() error { + cm := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName, + Namespace: services.DefaultKubernetesNamespace, + }, cm) + if err != nil { + return err + } + + if cm.Data == nil || cm.Data[services.NamespaceRegistryDataKey] == "" { + return fmt.Errorf("namespace registry data is empty") + } + + var registryData services.NamespaceRegistryData + err = json.Unmarshal([]byte(cm.Data[services.NamespaceRegistryDataKey]), ®istryData) + if err != nil { + return err + } + + featureStoreNamespace := featurestore.Namespace + if featureStoreNamespace == "" { + featureStoreNamespace = DefaultNamespace + } + + _, exists := registryData.Namespaces[featureStoreNamespace] + if !exists { + return fmt.Errorf("feature store not registered") + } + + return nil + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + + By("Deleting the FeatureStore") + Expect(k8sClient.Delete(ctx, featurestore)).Should(Succeed()) + + By("Reconciling the deletion") + _, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying namespace registry entry is cleaned up") + Eventually(func() error { + cm := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: services.NamespaceRegistryConfigMapName, + Namespace: services.DefaultKubernetesNamespace, + }, cm) + if err != nil { + return err + } + + if cm.Data == nil || cm.Data[services.NamespaceRegistryDataKey] == "" { + // Empty registry is acceptable after cleanup + return nil + } + + var registryData services.NamespaceRegistryData + err = json.Unmarshal([]byte(cm.Data[services.NamespaceRegistryDataKey]), ®istryData) + if err != nil { + return err + } + + featureStoreNamespace := featurestore.Namespace + if featureStoreNamespace == "" { + featureStoreNamespace = DefaultNamespace + } + + // Check that the specific FeatureStore's config is removed + configs, exists := registryData.Namespaces[featureStoreNamespace] + if exists { + expectedClientConfigName := "feast-" + featurestore.Name + "-client" + for _, config := range configs { + if config == expectedClientConfigName { + return fmt.Errorf("feature store config %s still exists after deletion", expectedClientConfigName) + } + } + } + + return nil + }, time.Second*30, time.Millisecond*500).Should(Succeed()) + }) + }) + + Context("When testing namespace registry data operations", func() { + It("should correctly serialize and deserialize namespace registry data", func() { + By("Creating test data") + originalData := &services.NamespaceRegistryData{ + Namespaces: map[string][]string{ + "test-namespace-1": {"client-config-1", "client-config-2"}, + "test-namespace-2": {"client-config-3"}, + }, + } + + By("Marshaling to JSON") + jsonData, err := json.Marshal(originalData) + Expect(err).NotTo(HaveOccurred()) + + By("Unmarshaling back") + var unmarshaledData services.NamespaceRegistryData + err = json.Unmarshal(jsonData, &unmarshaledData) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying data integrity") + Expect(unmarshaledData.Namespaces).To(Equal(originalData.Namespaces)) + }) + + It("should handle empty namespace registry data", func() { + By("Creating empty data") + originalData := &services.NamespaceRegistryData{ + Namespaces: make(map[string][]string), + } + + By("Marshaling to JSON") + jsonData, err := json.Marshal(originalData) + Expect(err).NotTo(HaveOccurred()) + + By("Unmarshaling back") + var unmarshaledData services.NamespaceRegistryData + err = json.Unmarshal(jsonData, &unmarshaledData) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying empty data") + Expect(unmarshaledData.Namespaces).To(Equal(originalData.Namespaces)) + Expect(unmarshaledData.Namespaces).To(BeEmpty()) + }) + + It("should correctly remove entries from namespace registry data", func() { + By("Creating test data with multiple entries") + originalData := &services.NamespaceRegistryData{ + Namespaces: map[string][]string{ + "namespace-1": {"config-1", "config-2", "config-3"}, + "namespace-2": {"config-4"}, + }, + } + + By("Marshaling to JSON") + jsonData, err := json.Marshal(originalData) + Expect(err).NotTo(HaveOccurred()) + + By("Unmarshaling back") + var data services.NamespaceRegistryData + err = json.Unmarshal(jsonData, &data) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating removal of specific config") + namespace := "namespace-1" + configToRemove := "config-2" + + if configs, exists := data.Namespaces[namespace]; exists { + var updatedConfigs []string + for _, config := range configs { + if config != configToRemove { + updatedConfigs = append(updatedConfigs, config) + } + } + data.Namespaces[namespace] = updatedConfigs + } + + By("Verifying removal worked") + expectedConfigs := []string{"config-1", "config-3"} + Expect(data.Namespaces[namespace]).To(Equal(expectedConfigs)) + + By("Verifying other namespace is unchanged") + Expect(data.Namespaces["namespace-2"]).To(Equal([]string{"config-4"})) + }) + + It("should remove entire namespace when last config is removed", func() { + By("Creating test data with single config per namespace") + originalData := &services.NamespaceRegistryData{ + Namespaces: map[string][]string{ + "namespace-1": {"config-1"}, + "namespace-2": {"config-2"}, + }, + } + + By("Marshaling to JSON") + jsonData, err := json.Marshal(originalData) + Expect(err).NotTo(HaveOccurred()) + + By("Unmarshaling back") + var data services.NamespaceRegistryData + err = json.Unmarshal(jsonData, &data) + Expect(err).NotTo(HaveOccurred()) + + By("Simulating removal of the only config from namespace-1") + namespace := "namespace-1" + configToRemove := "config-1" + + if configs, exists := data.Namespaces[namespace]; exists { + var updatedConfigs []string + for _, config := range configs { + if config != configToRemove { + updatedConfigs = append(updatedConfigs, config) + } + } + + // If no configs left, remove the namespace entry + if len(updatedConfigs) == 0 { + delete(data.Namespaces, namespace) + } else { + data.Namespaces[namespace] = updatedConfigs + } + } + + By("Verifying namespace was removed") + _, exists := data.Namespaces[namespace] + Expect(exists).To(BeFalse()) + + By("Verifying other namespace is unchanged") + Expect(data.Namespaces["namespace-2"]).To(Equal([]string{"config-2"})) + + By("Verifying total namespace count") + Expect(data.Namespaces).To(HaveLen(1)) + }) + }) +}) diff --git a/infra/feast-operator/internal/controller/services/namespace_registry.go b/infra/feast-operator/internal/controller/services/namespace_registry.go new file mode 100644 index 00000000000..64cdaebd6f0 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/namespace_registry.go @@ -0,0 +1,433 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "encoding/json" + "fmt" + "os" + + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// NamespaceRegistryData represents the structure of data stored in the namespace registry ConfigMap +type NamespaceRegistryData struct { + Namespaces map[string][]string `json:"namespaces"` +} + +// deployNamespaceRegistry creates and manages the namespace registry ConfigMap +func (feast *FeastServices) deployNamespaceRegistry() error { + // Check if we can determine the target namespace before creating any resources + targetNamespace, err := feast.getNamespaceRegistryNamespace() + if err != nil { + logger := log.FromContext(feast.Handler.Context) + logger.V(1).Info("Skipping namespace registry deployment: unable to determine target namespace", "error", err) + return nil // Return nil to avoid failing the entire deployment + } + + logger := log.FromContext(feast.Handler.Context) + logger.V(1).Info("Deploying namespace registry", "targetNamespace", targetNamespace) + + if err := feast.createNamespaceRegistryConfigMap(targetNamespace); err != nil { + return err + } + if err := feast.createNamespaceRegistryRoleBinding(targetNamespace); err != nil { + return err + } + return nil +} + +// createNamespaceRegistryConfigMap creates the namespace registry ConfigMap +func (feast *FeastServices) createNamespaceRegistryConfigMap(targetNamespace string) error { + logger := log.FromContext(feast.Handler.Context) + + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: NamespaceRegistryConfigMapName, + Namespace: targetNamespace, + }, + } + cm.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("ConfigMap")) + + if op, err := controllerutil.CreateOrUpdate(feast.Handler.Context, feast.Handler.Client, cm, controllerutil.MutateFn(func() error { + return feast.setNamespaceRegistryConfigMap(cm) + })); err != nil { + return err + } else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + logger.Info("Successfully reconciled namespace registry ConfigMap", "ConfigMap", cm.Name, "Namespace", cm.Namespace, "operation", op) + } + + return nil +} + +// setNamespaceRegistryConfigMap sets the data for the namespace registry ConfigMap +func (feast *FeastServices) setNamespaceRegistryConfigMap(cm *corev1.ConfigMap) error { + // Get existing data or initialize empty structure + existingData := &NamespaceRegistryData{ + Namespaces: make(map[string][]string), + } + + if cm.Data != nil && cm.Data[NamespaceRegistryDataKey] != "" { + if err := json.Unmarshal([]byte(cm.Data[NamespaceRegistryDataKey]), existingData); err != nil { + // If unmarshaling fails, start with empty data + existingData = &NamespaceRegistryData{ + Namespaces: make(map[string][]string), + } + } + } + + // Add current feature store instance to the registry + featureStoreNamespace := feast.Handler.FeatureStore.Namespace + clientConfigName := feast.Handler.FeatureStore.Status.ClientConfigMap + + if clientConfigName != "" { + if existingData.Namespaces[featureStoreNamespace] == nil { + existingData.Namespaces[featureStoreNamespace] = []string{} + } + + // Check if client config is already in the list + found := false + for _, config := range existingData.Namespaces[featureStoreNamespace] { + if config == clientConfigName { + found = true + break + } + } + + if !found { + existingData.Namespaces[featureStoreNamespace] = append(existingData.Namespaces[featureStoreNamespace], clientConfigName) + } + } + + // Marshal the data back to JSON + dataBytes, err := json.Marshal(existingData) + if err != nil { + return fmt.Errorf("failed to marshal namespace registry data: %w", err) + } + + // Set the ConfigMap data + if cm.Data == nil { + cm.Data = make(map[string]string) + } + cm.Data[NamespaceRegistryDataKey] = string(dataBytes) + + // Set labels + cm.Labels = feast.getLabels() + + return nil +} + +// createNamespaceRegistryRoleBinding creates a RoleBinding to allow system:authenticated to read the ConfigMap +func (feast *FeastServices) createNamespaceRegistryRoleBinding(targetNamespace string) error { + logger := log.FromContext(feast.Handler.Context) + + roleBinding := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: NamespaceRegistryConfigMapName + "-reader", + Namespace: targetNamespace, + }, + } + roleBinding.SetGroupVersionKind(rbacv1.SchemeGroupVersion.WithKind("RoleBinding")) + + if op, err := controllerutil.CreateOrUpdate(feast.Handler.Context, feast.Handler.Client, roleBinding, controllerutil.MutateFn(func() error { + return feast.setNamespaceRegistryRoleBinding(roleBinding) + })); err != nil { + return err + } else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + logger.Info("Successfully reconciled namespace registry RoleBinding", "RoleBinding", roleBinding.Name, "Namespace", roleBinding.Namespace, "operation", op) + } + + return nil +} + +// setNamespaceRegistryRoleBinding sets the RoleBinding for namespace registry access +func (feast *FeastServices) setNamespaceRegistryRoleBinding(rb *rbacv1.RoleBinding) error { + // Create a Role that allows reading the ConfigMap + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: NamespaceRegistryConfigMapName + "-reader", + Namespace: rb.Namespace, + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"configmaps"}, + ResourceNames: []string{NamespaceRegistryConfigMapName}, + Verbs: []string{"get", "list"}, + }, + }, + } + + // Create or update the Role + if _, err := controllerutil.CreateOrUpdate(feast.Handler.Context, feast.Handler.Client, role, controllerutil.MutateFn(func() error { + role.Rules = []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"configmaps"}, + ResourceNames: []string{NamespaceRegistryConfigMapName}, + Verbs: []string{"get", "list"}, + }, + } + return nil + })); err != nil { + return err + } + + // Set the RoleBinding + rb.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: role.Name, + } + + rb.Subjects = []rbacv1.Subject{ + { + APIGroup: "rbac.authorization.k8s.io", + Kind: "Group", + Name: "system:authenticated", + }, + } + + return nil +} + +// getNamespaceRegistryNamespace determines the target namespace for the namespace registry ConfigMap +func (feast *FeastServices) getNamespaceRegistryNamespace() (string, error) { + // Check if we're running on OpenShift + logger := log.FromContext(feast.Handler.Context) + if isOpenShift { + // TODO: Add support for reading DSCi configuration + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := string(data); len(ns) > 0 { + logger.V(1).Info("Using OpenShift namespace", "namespace", ns) + return ns, nil + } + } + // This is what notebook controller team is doing, we are following them + // They are not defaulting to redhat-ods-applications namespace + return "", fmt.Errorf("unable to determine the namespace") + } + + return DefaultKubernetesNamespace, nil +} + +// AddToNamespaceRegistry adds a feature store instance to the namespace registry +func (feast *FeastServices) AddToNamespaceRegistry() error { + logger := log.FromContext(feast.Handler.Context) + targetNamespace, err := feast.getNamespaceRegistryNamespace() + if err != nil { + logger.V(1).Info("Skipping namespace registry addition: unable to determine target namespace", "error", err) + return nil // Return nil to avoid failing the entire operation + } + + // Get the existing ConfigMap + cm := &corev1.ConfigMap{} + err = feast.Handler.Client.Get(feast.Handler.Context, types.NamespacedName{ + Name: NamespaceRegistryConfigMapName, + Namespace: targetNamespace, + }, cm) + if err != nil { + if apierrors.IsNotFound(err) { + logger.V(1).Info("Namespace registry ConfigMap not found, nothing to add to") + return nil + } + return fmt.Errorf("failed to get namespace registry ConfigMap: %w", err) + } + + // Parse existing data + var existingData NamespaceRegistryData + if cm.Data != nil && cm.Data[NamespaceRegistryDataKey] != "" { + err = json.Unmarshal([]byte(cm.Data[NamespaceRegistryDataKey]), &existingData) + if err != nil { + logger.V(1).Info("Failed to unmarshal namespace registry data, nothing to add to") + return nil + } + } + + // Add current feature store instance to the registry + featureStoreNamespace := feast.Handler.FeatureStore.Namespace + clientConfigName := feast.Handler.FeatureStore.Status.ClientConfigMap + + if clientConfigName != "" { + // Initialize namespace map if it doesn't exist + if existingData.Namespaces == nil { + existingData.Namespaces = make(map[string][]string) + } + if existingData.Namespaces[featureStoreNamespace] == nil { + existingData.Namespaces[featureStoreNamespace] = []string{} + } + + // Check if client config is already in the list + found := false + for _, config := range existingData.Namespaces[featureStoreNamespace] { + if config == clientConfigName { + found = true + break + } + } + + // Add if not already present + if !found { + existingData.Namespaces[featureStoreNamespace] = append(existingData.Namespaces[featureStoreNamespace], clientConfigName) + } + } + + // Marshal the updated data back to JSON + dataBytes, err := json.Marshal(existingData) + if err != nil { + return fmt.Errorf("failed to marshal updated namespace registry data: %w", err) + } + + // Update the ConfigMap + if cm.Data == nil { + cm.Data = make(map[string]string) + } + cm.Data[NamespaceRegistryDataKey] = string(dataBytes) + + // Update the ConfigMap + if err := feast.Handler.Client.Update(feast.Handler.Context, cm); err != nil { + return fmt.Errorf("failed to update namespace registry ConfigMap: %w", err) + } + + logger.Info("Successfully added feature store to namespace registry", + "namespace", featureStoreNamespace, + "clientConfig", clientConfigName, + "targetNamespace", targetNamespace) + + return nil +} + +// RemoveFromNamespaceRegistry removes a feature store instance from the namespace registry +func (feast *FeastServices) RemoveFromNamespaceRegistry() error { + logger := log.FromContext(feast.Handler.Context) + + // Determine the target namespace based on platform + targetNamespace, err := feast.getNamespaceRegistryNamespace() + if err != nil { + logger.V(1).Info("Skipping namespace registry removal: unable to determine target namespace", "error", err) + return nil // Return nil to avoid failing the entire operation + } + + // Get the existing ConfigMap + cm := &corev1.ConfigMap{} + err = feast.Handler.Client.Get(feast.Handler.Context, client.ObjectKey{ + Name: NamespaceRegistryConfigMapName, + Namespace: targetNamespace, + }, cm) + if err != nil { + if apierrors.IsNotFound(err) { + // ConfigMap doesn't exist, nothing to clean up + logger.V(1).Info("Namespace registry ConfigMap not found, nothing to clean up") + return nil + } + return fmt.Errorf("failed to get namespace registry ConfigMap: %w", err) + } + + // Get existing data + existingData := &NamespaceRegistryData{ + Namespaces: make(map[string][]string), + } + + if cm.Data != nil && cm.Data[NamespaceRegistryDataKey] != "" { + if err := json.Unmarshal([]byte(cm.Data[NamespaceRegistryDataKey]), existingData); err != nil { + // If unmarshaling fails, there's nothing to clean up + logger.V(1).Info("Failed to unmarshal namespace registry data, nothing to clean up") + return nil + } + } + + // Remove current feature store instance from the registry + featureStoreNamespace := feast.Handler.FeatureStore.Namespace + clientConfigName := feast.Handler.FeatureStore.Status.ClientConfigMap + featureStoreName := feast.Handler.FeatureStore.Name + + // Generate expected client config name using the same logic as creation + expectedClientConfigName := "feast-" + featureStoreName + "-client" + + logger.Info("Removing feature store from registry", + "featureStoreName", featureStoreName, + "featureStoreNamespace", featureStoreNamespace, + "clientConfigName", clientConfigName, + "expectedClientConfigName", expectedClientConfigName) + + if existingData.Namespaces[featureStoreNamespace] != nil { + var updatedConfigs []string + removed := false + + for _, config := range existingData.Namespaces[featureStoreNamespace] { + // Remove if it matches the client config name or the expected pattern + if config == clientConfigName || config == expectedClientConfigName { + logger.Info("Removing config from registry", "config", config) + removed = true + } else { + updatedConfigs = append(updatedConfigs, config) + } + } + + existingData.Namespaces[featureStoreNamespace] = updatedConfigs + + // If no configs left for this namespace, remove the namespace entry + if len(existingData.Namespaces[featureStoreNamespace]) == 0 { + delete(existingData.Namespaces, featureStoreNamespace) + logger.Info("Removed empty namespace entry from registry", "namespace", featureStoreNamespace) + } + + if !removed { + logger.V(1).Info("No matching config found to remove from registry", + "existingConfigs", existingData.Namespaces[featureStoreNamespace]) + } + } else { + logger.V(1).Info("Namespace not found in registry", "namespace", featureStoreNamespace) + } + + // Marshal the updated data back to JSON + dataBytes, err := json.Marshal(existingData) + if err != nil { + return fmt.Errorf("failed to marshal updated namespace registry data: %w", err) + } + + // Update the ConfigMap + if cm.Data == nil { + cm.Data = make(map[string]string) + } + cm.Data[NamespaceRegistryDataKey] = string(dataBytes) + + // Update the ConfigMap + if err := feast.Handler.Client.Update(feast.Handler.Context, cm); err != nil { + return fmt.Errorf("failed to update namespace registry ConfigMap: %w", err) + } + + logger.Info("Updated namespace registry ConfigMap", + "namespace", featureStoreNamespace, + "clientConfig", clientConfigName, + "remainingConfigs", existingData.Namespaces[featureStoreNamespace], + "targetNamespace", targetNamespace) + + logger.Info("Successfully removed feature store from namespace registry", + "namespace", featureStoreNamespace, + "clientConfig", clientConfigName, + "targetNamespace", targetNamespace) + + return nil +} diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 62d905fb985..5b70d6e1911 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -141,6 +141,9 @@ func (feast *FeastServices) Deploy() error { if err := feast.deployClient(); err != nil { return err } + if err := feast.deployNamespaceRegistry(); err != nil { + return err + } if err := feast.deployCronJob(); err != nil { return err } diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 4a84e9532cd..68ee053ace5 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -35,6 +35,11 @@ const ( DefaultOnlineStorePath = "online_store.db" svcDomain = ".svc.cluster.local" + // Namespace registry ConfigMap constants + NamespaceRegistryConfigMapName = "feast-configs-registry" + NamespaceRegistryDataKey = "namespaces" + DefaultKubernetesNamespace = "feast-operator-system" + HttpPort = 80 HttpsPort = 443 HttpScheme = "http" diff --git a/infra/feast-operator/internal/controller/suite_test.go b/infra/feast-operator/internal/controller/suite_test.go index 51208d6dbb0..127bb7fe3b3 100644 --- a/infra/feast-operator/internal/controller/suite_test.go +++ b/infra/feast-operator/internal/controller/suite_test.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "fmt" "path/filepath" "runtime" @@ -25,6 +26,9 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -32,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + "github.com/feast-dev/feast/infra/feast-operator/internal/controller/services" // +kubebuilder:scaffold:imports ) @@ -77,6 +82,18 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + // Create the feast-operator-system namespace for tests that need it + By("creating feast-operator-system namespace") + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: services.DefaultKubernetesNamespace, + }, + } + err = k8sClient.Create(context.Background(), namespace) + if err != nil && !errors.IsAlreadyExists(err) { + Expect(err).NotTo(HaveOccurred()) + } + }) var _ = AfterSuite(func() {