diff --git a/Makefile b/Makefile index 75f05a28fc8..b8a752f9a16 100644 --- a/Makefile +++ b/Makefile @@ -650,11 +650,14 @@ format-ui: # Go SDK & embedded PB_REL = https://github.com/protocolbuffers/protobuf/releases -PB_VERSION = 3.11.2 +PB_VERSION = 30.2 PB_ARCH := $(shell uname -m) ifeq ($(PB_ARCH), arm64) PB_ARCH=aarch_64 endif +ifeq ($(PB_ARCH), aarch64) + PB_ARCH=aarch_64 +endif PB_PROTO_FOLDERS=core registry serving types storage $(TOOL_DIR)/protoc-$(PB_VERSION)-$(OS)-$(PB_ARCH).zip: $(TOOL_DIR) @@ -669,6 +672,7 @@ install-go-proto-dependencies: $(TOOL_DIR)/protoc-$(PB_VERSION)-$(OS)-$(PB_ARCH) .PHONY: compile-protos-go compile-protos-go: install-go-proto-dependencies + mkdir -p $(ROOT_DIR)/go/protos $(foreach folder,$(PB_PROTO_FOLDERS), \ protoc --proto_path=$(ROOT_DIR)/protos \ --go_out=$(ROOT_DIR)/go/protos \ diff --git a/go.mod b/go.mod index 05305c1e6c1..18c459373b6 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,24 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/thrift v0.21.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect + github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect + github.com/aws/smithy-go v1.22.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index 41abd905c44..9aefbf0aa35 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,42 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= +github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= +github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= +github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 h1:BRXS0U76Z8wfF+bnkilA2QwpIch6URlm++yPUt9QPmQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3/go.mod h1:bNXKFFyaiVvWuR6O16h/I1724+aXe/tAkA9/QS01t5k= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= diff --git a/go/internal/feast/registry/local.go b/go/internal/feast/registry/local.go index e5343cd75cd..58c6426368a 100644 --- a/go/internal/feast/registry/local.go +++ b/go/internal/feast/registry/local.go @@ -1,11 +1,11 @@ package registry import ( + "github.com/google/uuid" "io/ioutil" "os" "path/filepath" - "github.com/google/uuid" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index a383dc42c07..160dda94fd6 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -364,6 +364,8 @@ func getRegistryStoreFromType(registryStoreType string, registryConfig *Registry switch registryStoreType { case "FileRegistryStore": return NewFileRegistryStore(registryConfig, repoPath), nil + case "S3RegistryStore": + return NewS3RegistryStore(registryConfig, repoPath), nil } return nil, errors.New("only FileRegistryStore as a RegistryStore is supported at this moment") } diff --git a/go/internal/feast/registry/registry_test.go b/go/internal/feast/registry/registry_test.go new file mode 100644 index 00000000000..0801632a70d --- /dev/null +++ b/go/internal/feast/registry/registry_test.go @@ -0,0 +1,94 @@ +package registry + +import ( + "context" + "errors" + "io/ioutil" + "net/url" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func TestGetOnlineFeaturesS3Registry(t *testing.T) { + mockS3Client := &MockS3Client{ + GetObjectFn: func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader("mock data")), + }, nil + }, + DeleteObjectFn: func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return &s3.DeleteObjectOutput{}, nil + }, + } + + tests := []struct { + name string + config *RepoConfig + }{ + { + name: "redis with simple features", + config: &RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": "s3://test-bucket/path/to/registry.db", + }, + Provider: "aws", + }, + }, + } + for _, test := range tests { + registryConfig, err := test.config.GetRegistryConfig() + if err != nil { + t.Errorf("Error getting registry config. msg: %s", err.Error()) + } + r := &Registry{ + project: test.config.Project, + cachedRegistryProtoTtl: time.Duration(registryConfig.CacheTtlSeconds) * time.Second, + } + _ = registryConfig.RegistryStoreType + registryPath := registryConfig.Path + uri, err := url.Parse(registryPath) + if err != nil { + t.Errorf("Error parsing registry path. msg: %s", err.Error()) + } + if registryStoreType, ok := REGISTRY_STORE_CLASS_FOR_SCHEME[uri.Scheme]; ok { + switch registryStoreType { + case "S3RegistryStore": + registryStore := &S3RegistryStore{ + filePath: registryConfig.Path, + s3Client: mockS3Client, + } + r.registryStore = registryStore + err := r.InitializeRegistry() + if err != nil { + t.Errorf("Error initializing registry. msg: %s. registry path=%q", err.Error(), registryPath) + } + default: + t.Errorf("Only S3RegistryStore is supported on this testing. got=%s", registryStoreType) + } + } + } +} + +// MockS3Client is mock client for testing s3 registry store +type MockS3Client struct { + GetObjectFn func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObjectFn func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + +func (m *MockS3Client) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + if m.GetObjectFn != nil { + return m.GetObjectFn(ctx, params) + } + return nil, errors.New("not implemented") +} + +func (m *MockS3Client) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + if m.DeleteObjectFn != nil { + return m.DeleteObjectFn(ctx, params) + } + return nil, errors.New("not implemented") +} diff --git a/go/internal/feast/registry/s3.go b/go/internal/feast/registry/s3.go new file mode 100644 index 00000000000..0979dac64d0 --- /dev/null +++ b/go/internal/feast/registry/s3.go @@ -0,0 +1,109 @@ +package registry + +import ( + "context" + "errors" + "io/ioutil" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/feast-dev/feast/go/protos/feast/core" + + "google.golang.org/protobuf/proto" +) + +// S3ClientInterface define interface of s3.Client for making mocking s3 client and testing it +type S3ClientInterface interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + +// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface +type S3RegistryStore struct { + filePath string + s3Client S3ClientInterface +} + +// NewS3RegistryStore creates a S3RegistryStore with the given configuration +func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore { + var lr S3RegistryStore + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cfg, err := awsConfig.LoadDefaultConfig(ctx) + if err != nil { + lr = S3RegistryStore{ + filePath: config.Path, + } + } else { + lr = S3RegistryStore{ + filePath: config.Path, + s3Client: s3.NewFromConfig(cfg), + } + } + return &lr +} + +func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) { + bucket, key, err := r.parseS3Path() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + output, err := r.s3Client.GetObject(ctx, + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + panic(err) + } + defer output.Body.Close() + + data, err := ioutil.ReadAll(output.Body) + if err != nil { + return nil, err + } + + registry := &core.Registry{} + if err := proto.Unmarshal(data, registry); err != nil { + return nil, err + } + return registry, nil +} + +func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error { + return errors.New("not implemented in S3RegistryStore") +} + +func (r *S3RegistryStore) Teardown() error { + bucket, key, err := r.parseS3Path() + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = r.s3Client.DeleteObject(ctx, + &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + panic(err) + } + return nil +} + +func (r *S3RegistryStore) parseS3Path() (string, string, error) { + path := strings.TrimPrefix(r.filePath, "s3://") + parts := strings.SplitN(path, "/", 2) + if len(parts) != 2 { + return "", "", errors.New("invalid S3 file path format") + } + return parts[0], parts[1], nil +}