diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 02ba7a6bd7a..81c277b7116 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -421,6 +421,17 @@ type RegistryFilePersistence struct { Path string `json:"path,omitempty"` PvcConfig *PvcConfig `json:"pvc,omitempty"` S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` + + // CacheTTLSeconds defines the TTL (in seconds) for the registry cache. + // +kubebuilder:validation:Minimum=0 + // +optional + CacheTTLSeconds *int32 `json:"cache_ttl_seconds,omitempty"` + + // CacheMode defines the registry cache update strategy. + // Allowed values are "sync" and "thread". + // +kubebuilder:validation:Enum=none;sync;thread + // +optional + CacheMode *string `json:"cache_mode,omitempty"` } // RegistryDBStorePersistence configures the DB store persistence for the registry service diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index 99e1f8c0241..a040fce4093 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -916,6 +916,16 @@ func (in *RegistryFilePersistence) DeepCopyInto(out *RegistryFilePersistence) { } } } + if in.CacheTTLSeconds != nil { + in, out := &in.CacheTTLSeconds, &out.CacheTTLSeconds + *out = new(int32) + **out = **in + } + if in.CacheMode != nil { + in, out := &in.CacheMode, &out.CacheMode + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryFilePersistence. diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 843d9903e67..54e386846c0 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -1649,6 +1649,21 @@ spec: description: RegistryFilePersistence configures the file-based persistence for the registry service properties: + cache_mode: + description: |- + CacheMode defines the registry cache update strategy. + Allowed values are "sync" and "thread". + enum: + - none + - sync + - thread + type: string + cache_ttl_seconds: + description: CacheTTLSeconds defines the TTL (in + seconds) for the registry cache. + format: int32 + minimum: 0 + type: integer path: type: string pvc: @@ -5661,6 +5676,21 @@ spec: the file-based persistence for the registry service properties: + cache_mode: + description: |- + CacheMode defines the registry cache update strategy. + Allowed values are "sync" and "thread". + enum: + - none + - sync + - thread + type: string + cache_ttl_seconds: + description: CacheTTLSeconds defines the TTL + (in seconds) for the registry cache. + format: int32 + minimum: 0 + type: integer path: type: string pvc: diff --git a/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml b/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml index 2146dabe85c..7be95578aed 100644 --- a/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml +++ b/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml @@ -10,6 +10,8 @@ spec: persistence: file: path: s3://bucket/registry.db + cache_ttl_seconds: 60 + cache_mode: sync s3_additional_kwargs: ServerSideEncryption: AES256 ACL: bucket-owner-full-control diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 33b7cb298c7..0e64f867324 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -1657,6 +1657,21 @@ spec: description: RegistryFilePersistence configures the file-based persistence for the registry service properties: + cache_mode: + description: |- + CacheMode defines the registry cache update strategy. + Allowed values are "sync" and "thread". + enum: + - none + - sync + - thread + type: string + cache_ttl_seconds: + description: CacheTTLSeconds defines the TTL (in + seconds) for the registry cache. + format: int32 + minimum: 0 + type: integer path: type: string pvc: @@ -5669,6 +5684,21 @@ spec: the file-based persistence for the registry service properties: + cache_mode: + description: |- + CacheMode defines the registry cache update strategy. + Allowed values are "sync" and "thread". + enum: + - none + - sync + - thread + type: string + cache_ttl_seconds: + description: CacheTTLSeconds defines the TTL + (in seconds) for the registry cache. + format: int32 + minimum: 0 + type: integer path: type: string pvc: diff --git a/infra/feast-operator/docs/api/markdown/ref.md b/infra/feast-operator/docs/api/markdown/ref.md index 14e11015f16..f6fd57a22e9 100644 --- a/infra/feast-operator/docs/api/markdown/ref.md +++ b/infra/feast-operator/docs/api/markdown/ref.md @@ -662,6 +662,9 @@ _Appears in:_ | `path` _string_ | | | `pvc` _[PvcConfig](#pvcconfig)_ | | | `s3_additional_kwargs` _map[string]string_ | | +| `cache_ttl_seconds` _integer_ | CacheTTLSeconds defines the TTL (in seconds) for the registry cache. | +| `cache_mode` _string_ | CacheMode defines the registry cache update strategy. +Allowed values are "sync" and "thread". | #### RegistryPersistence diff --git a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go index 879774b2cd5..b9b21ad5540 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go @@ -574,6 +574,11 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { dbParametersMap := unmarshallYamlString(sqlTypeYamlString) copyMap := services.CopyMap(dbParametersMap) delete(dbParametersMap, "path") + // Expect cache_ttl_seconds to be mapped into Registry.CacheTTLSeconds + ttlVal, ok := copyMap["cache_ttl_seconds"].(int) + Expect(ok).To(BeTrue()) + ttl := int32(ttlVal) + delete(dbParametersMap, "cache_ttl_seconds") testConfig := &services.RepoConfig{ Project: feastProject, Provider: services.LocalProviderType, @@ -583,9 +588,10 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { DBParameters: unmarshallYamlString(snowflakeYamlString), }, Registry: services.RegistryConfig{ - Path: copyMap["path"].(string), - RegistryType: services.RegistryDBPersistenceSQLConfigType, - DBParameters: dbParametersMap, + Path: copyMap["path"].(string), + RegistryType: services.RegistryDBPersistenceSQLConfigType, + CacheTTLSeconds: &ttl, + DBParameters: dbParametersMap, }, OnlineStore: services.OnlineStoreConfig{ Type: onlineType, diff --git a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go index 7ad4d04e9eb..e0436b703df 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go @@ -367,5 +367,67 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { testConfig.Registry.S3AdditionalKwargs = nil Expect(repoConfig).To(Equal(&testConfig)) }) + + It("should propagate registry file cache settings into repo config", func() { + By("Reconciling the created resource with registry cache settings") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + // update the FeatureStore to set cache settings on registry file persistence + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + ttl := int32(300) + mode := "thread" + resource.Spec.Services.Registry.Local.Persistence.FilePersistence.CacheTTLSeconds = &ttl + resource.Spec.Services.Registry.Local.Persistence.FilePersistence.CacheMode = &mode + err = k8sClient.Update(ctx, resource) + Expect(err).NotTo(HaveOccurred()) + + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + // fetch updated resource and deployment + resource = &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + feast := services.FeastServices{ + Handler: handler.FeastHandler{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + }, + } + + deploy := &appsv1.Deployment{} + objMeta := feast.GetObjectMeta() + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: objMeta.Name, + Namespace: objMeta.Namespace, + }, deploy) + Expect(err).NotTo(HaveOccurred()) + + env := getFeatureStoreYamlEnvVar(services.GetRegistryContainer(*deploy).Env) + Expect(env).NotTo(BeNil()) + + // decode feature_store.yaml and verify registry cache settings + envByte, err := base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + + Expect(repoConfig.Registry.CacheTTLSeconds).NotTo(BeNil()) + Expect(*repoConfig.Registry.CacheTTLSeconds).To(Equal(ttl)) + Expect(repoConfig.Registry.CacheMode).NotTo(BeNil()) + Expect(*repoConfig.Registry.CacheMode).To(Equal(mode)) + }) }) }) diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 2df0d5cb189..44b295066f4 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -127,6 +127,13 @@ func setRepoConfigRegistry(services *feastdevv1alpha1.FeatureStoreServices, secr repoConfig.Registry.RegistryType = RegistryFileConfigType repoConfig.Registry.Path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) repoConfig.Registry.S3AdditionalKwargs = filePersistence.S3AdditionalKwargs + if filePersistence.CacheTTLSeconds != nil { + repoConfig.Registry.CacheTTLSeconds = filePersistence.CacheTTLSeconds + } + if filePersistence.CacheMode != nil { + repoConfig.Registry.CacheMode = filePersistence.CacheMode + } + } else if dbPersistence != nil && len(dbPersistence.Type) > 0 { repoConfig.Registry.Path = "" repoConfig.Registry.RegistryType = RegistryConfigType(dbPersistence.Type) @@ -139,6 +146,31 @@ func setRepoConfigRegistry(services *feastdevv1alpha1.FeatureStoreServices, secr return err } + // Extract typed cache settings from DB parameters to avoid inline map conflicts + if ttlVal, ok := parametersMap["cache_ttl_seconds"]; ok { + switch v := ttlVal.(type) { + case int: + ttl := int32(v) + repoConfig.Registry.CacheTTLSeconds = &ttl + case int32: + ttl := v + repoConfig.Registry.CacheTTLSeconds = &ttl + case int64: + ttl := int32(v) + repoConfig.Registry.CacheTTLSeconds = &ttl + case float64: + ttl := int32(v) + repoConfig.Registry.CacheTTLSeconds = &ttl + } + delete(parametersMap, "cache_ttl_seconds") + } + if modeVal, ok := parametersMap["cache_mode"]; ok { + if modeStr, ok := modeVal.(string); ok { + repoConfig.Registry.CacheMode = &modeStr + } + delete(parametersMap, "cache_mode") + } + err = mergeStructWithDBParametersMap(¶metersMap, &repoConfig.Registry) if err != nil { return err diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 68ee053ace5..d8e973d97ad 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -280,6 +280,8 @@ type RegistryConfig struct { RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` Cert string `yaml:"cert,omitempty"` S3AdditionalKwargs *map[string]string `yaml:"s3_additional_kwargs,omitempty"` + CacheTTLSeconds *int32 `yaml:"cache_ttl_seconds,omitempty"` + CacheMode *string `yaml:"cache_mode,omitempty"` DBParameters map[string]interface{} `yaml:",inline,omitempty"` }