diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index 64a05f144ce..88cd3dbd9b5 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -61,7 +61,7 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err } else if onlineStoreType == "redis" { - onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore) + onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err } else { return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType) diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index 26f34cf8965..8fb85085d43 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/feast-dev/feast/go/internal/feast/registry" "sort" "strconv" "strings" @@ -13,7 +14,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "github.com/spaolacci/murmur3" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -37,10 +38,15 @@ type RedisOnlineStore struct { // Redis client connector client *redis.Client + + config *registry.RepoConfig } -func NewRedisOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) { - store := RedisOnlineStore{project: project} +func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) { + store := RedisOnlineStore{ + project: project, + config: config, + } var address []string var password string @@ -161,7 +167,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E redisKeyToEntityIndex := make(map[string]int) for i := 0; i < len(entityKeys); i++ { - var key, err = buildRedisKey(r.project, entityKeys[i]) + var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion) if err != nil { return nil, err } @@ -270,8 +276,8 @@ func (r *RedisOnlineStore) Destruct() { } -func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) { - serKey, err := serializeEntityKey(entityKey) +func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { + serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion) if err != nil { return nil, err } @@ -279,7 +285,7 @@ func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) return &fullKey, nil } -func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { +func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { // Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table. // Ensure that we have the right amount of join keys and entity values @@ -316,7 +322,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { offset := (2 * len(keys)) + (i * 3) value := m[keys[i]].GetVal() - valueBytes, valueTypeBytes, err := serializeValue(value) + valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion) if err != nil { return valueBytes, err } @@ -341,7 +347,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { return &entityKeyBuffer, nil } -func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) { +func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) { // TODO: Implement support for other types (at least the major types like ints, strings, bytes) switch x := (value).(type) { case *types.Value_StringVal: @@ -354,10 +360,16 @@ func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) { binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val)) return &valueBuffer, types.ValueType_INT32, nil case *types.Value_Int64Val: - // TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :( - valueBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val)) - return &valueBuffer, types.ValueType_INT64, nil + if entityKeySerializationVersion <= 1 { + // We unfortunately have to use 32 bit here for backward compatibility :( + valueBuffer := make([]byte, 4) + binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val)) + return &valueBuffer, types.ValueType_INT64, nil + } else { + valueBuffer := make([]byte, 8) + binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val)) + return &valueBuffer, types.ValueType_INT64, nil + } case nil: return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x) default: diff --git a/go/internal/feast/onlinestore/redisonlinestore_test.go b/go/internal/feast/onlinestore/redisonlinestore_test.go index 43cdbe06a22..ad9ef1e1e44 100644 --- a/go/internal/feast/onlinestore/redisonlinestore_test.go +++ b/go/internal/feast/onlinestore/redisonlinestore_test.go @@ -1,6 +1,7 @@ package onlinestore import ( + "github.com/feast-dev/feast/go/internal/feast/registry" "testing" "github.com/stretchr/testify/assert" @@ -10,7 +11,11 @@ func TestNewRedisOnlineStore(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -23,7 +28,11 @@ func TestNewRedisOnlineStoreWithPassword(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,password=secret", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -34,7 +43,11 @@ func TestNewRedisOnlineStoreWithDB(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,db=1", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -45,7 +58,11 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,ssl=true", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") diff --git a/go/internal/feast/onlinestore/sqliteonlinestore.go b/go/internal/feast/onlinestore/sqliteonlinestore.go index 94ba0c0d568..1f407ad39c7 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore.go @@ -16,7 +16,7 @@ import ( _ "github.com/mattn/go-sqlite3" "google.golang.org/protobuf/proto" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -24,15 +24,16 @@ import ( type SqliteOnlineStore struct { // Feast project name - project string - path string - db *sql.DB - db_mu sync.Mutex + project string + path string + db *sql.DB + db_mu sync.Mutex + repoConfig *registry.RepoConfig } // Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath. func NewSqliteOnlineStore(project string, repoConfig *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) { - store := SqliteOnlineStore{project: project} + store := SqliteOnlineStore{project: project, repoConfig: repoConfig} if db_path, ok := onlineStoreConfig["path"]; !ok { return nil, fmt.Errorf("cannot find sqlite path %s", db_path) } else { @@ -69,7 +70,7 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. in_query := make([]string, len(entityKeys)) serialized_entities := make([]interface{}, len(entityKeys)) for i := 0; i < len(entityKeys); i++ { - serKey, err := serializeEntityKey(entityKeys[i]) + serKey, err := serializeEntityKey(entityKeys[i], s.repoConfig.EntityKeySerializationVersion) if err != nil { return nil, err } diff --git a/go/internal/feast/registry/repoconfig.go b/go/internal/feast/registry/repoconfig.go index 59d125b1bfc..b034b632dc0 100644 --- a/go/internal/feast/registry/repoconfig.go +++ b/go/internal/feast/registry/repoconfig.go @@ -30,6 +30,8 @@ type RepoConfig struct { Flags map[string]interface{} `json:"flags"` // RepoPath RepoPath string `json:"repo_path"` + // EntityKeySerializationVersion + EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"` } type RegistryConfig struct { diff --git a/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java b/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java index 268592d20ac..5850eb6483c 100644 --- a/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java +++ b/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java @@ -38,22 +38,84 @@ /** Feast Serving properties. */ public class ApplicationProperties { private static final Logger log = org.slf4j.LoggerFactory.getLogger(ApplicationProperties.class); + private FeastProperties feast; + private GrpcServer grpc; + private RestServer rest; - public static class FeastProperties { - /* Feast Serving build version */ - @NotBlank private String version = "unknown"; + public FeastProperties getFeast() { + return feast; + } - public void setRegistry(String registry) { - this.registry = registry; + public void setFeast(FeastProperties feast) { + this.feast = feast; + } + + public GrpcServer getGrpc() { + return grpc; + } + + public void setGrpc(GrpcServer grpc) { + this.grpc = grpc; + } + + public RestServer getRest() { + return rest; + } + + public void setRest(RestServer rest) { + this.rest = rest; + } + + /** + * Validates all FeastProperties. This method runs after properties have been initialized and + * individually and conditionally validates each class. + */ + @PostConstruct + public void validate() { + ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); + Validator validator = factory.getValidator(); + + // Validate root fields in FeastProperties + Set> violations = validator.validate(this); + if (!violations.isEmpty()) { + throw new ConstraintViolationException(violations); } + } + public enum StoreType { + REDIS, + REDIS_CLUSTER; + } + + public static class FeastProperties { + /* Feast Serving build version */ + @NotBlank private String version = "unknown"; @NotBlank private String registry; + @NotBlank private String project; + private int registryRefreshInterval; + private int entityKeySerializationVersion; + /** Name of the active store configuration (only one store can be active at a time). */ + @NotBlank private String activeStore; + /** + * Collection of store configurations. The active store is selected by the "activeStore" field. + */ + @JsonMerge(OptBoolean.FALSE) + private List stores = new ArrayList<>(); + /* Metric tracing properties. */ + private TracingProperties tracing; + /* Feast Audit Logging properties */ + @NotNull private LoggingProperties logging; + private String gcpProject; + private String awsRegion; + private String transformationServiceEndpoint; public String getRegistry() { return registry; } - @NotBlank private String project; + public void setRegistry(String registry) { + this.registry = registry; + } public String getProject() { return project; @@ -63,8 +125,6 @@ public void setProject(final String project) { this.project = project; } - private int registryRefreshInterval; - public int getRegistryRefreshInterval() { return registryRefreshInterval; } @@ -73,6 +133,14 @@ public void setRegistryRefreshInterval(int registryRefreshInterval) { this.registryRefreshInterval = registryRefreshInterval; } + public int getEntityKeySerializationVersion() { + return entityKeySerializationVersion; + } + + public void setEntityKeySerializationVersion(int entityKeySerializationVersion) { + this.entityKeySerializationVersion = entityKeySerializationVersion; + } + /** * Finds and returns the active store * @@ -92,25 +160,6 @@ public void setActiveStore(String activeStore) { this.activeStore = activeStore; } - /** Name of the active store configuration (only one store can be active at a time). */ - @NotBlank private String activeStore; - - /** - * Collection of store configurations. The active store is selected by the "activeStore" field. - */ - @JsonMerge(OptBoolean.FALSE) - private List stores = new ArrayList<>(); - - /* Metric tracing properties. */ - private TracingProperties tracing; - - /* Feast Audit Logging properties */ - @NotNull private LoggingProperties logging; - - public void setStores(List stores) { - this.stores = stores; - } - /** * Gets Serving store configuration as a list of {@link Store}. * @@ -120,6 +169,10 @@ public List getStores() { return stores; } + public void setStores(List stores) { + this.stores = stores; + } + /** * Gets Feast Serving build version. * @@ -129,10 +182,6 @@ public String getVersion() { return version; } - public void setTracing(TracingProperties tracing) { - this.tracing = tracing; - } - /** * Gets tracing properties * @@ -142,6 +191,10 @@ public TracingProperties getTracing() { return tracing; } + public void setTracing(TracingProperties tracing) { + this.tracing = tracing; + } + /** * Gets logging properties * @@ -151,8 +204,6 @@ public LoggingProperties getLogging() { return logging; } - private String gcpProject; - public String getGcpProject() { return gcpProject; } @@ -161,17 +212,13 @@ public void setGcpProject(String gcpProject) { this.gcpProject = gcpProject; } - public void setAwsRegion(String awsRegion) { - this.awsRegion = awsRegion; - } - - private String awsRegion; - public String getAwsRegion() { return awsRegion; } - private String transformationServiceEndpoint; + public void setAwsRegion(String awsRegion) { + this.awsRegion = awsRegion; + } public String getTransformationServiceEndpoint() { return transformationServiceEndpoint; @@ -182,16 +229,6 @@ public void setTransformationServiceEndpoint(String transformationServiceEndpoin } } - private FeastProperties feast; - - public void setFeast(FeastProperties feast) { - this.feast = feast; - } - - public FeastProperties getFeast() { - return feast; - } - /** Store configuration class for database that this Feast Serving uses. */ public static class Store { @@ -327,30 +364,6 @@ public void setServer(Server server) { } } - private GrpcServer grpc; - private RestServer rest; - - public GrpcServer getGrpc() { - return grpc; - } - - public void setGrpc(GrpcServer grpc) { - this.grpc = grpc; - } - - public RestServer getRest() { - return rest; - } - - public void setRest(RestServer rest) { - this.rest = rest; - } - - public enum StoreType { - REDIS, - REDIS_CLUSTER; - } - /** Trace metric collection properties */ public static class TracingProperties { @@ -417,20 +430,4 @@ public void setServiceName(String serviceName) { this.serviceName = serviceName; } } - - /** - * Validates all FeastProperties. This method runs after properties have been initialized and - * individually and conditionally validates each class. - */ - @PostConstruct - public void validate() { - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Validator validator = factory.getValidator(); - - // Validate root fields in FeastProperties - Set> violations = validator.validate(this); - if (!violations.isEmpty()) { - throw new ConstraintViolationException(violations); - } - } } diff --git a/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java index 4ea0692ccd5..868e3b83d1f 100644 --- a/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java +++ b/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java @@ -48,7 +48,8 @@ public ServingServiceV2 registryBasedServingServiceV2( new OnlineRetriever( applicationProperties.getFeast().getProject(), redisClusterClient, - new EntityKeySerializerV2()); + new EntityKeySerializerV2( + applicationProperties.getFeast().getEntityKeySerializationVersion())); break; case REDIS: RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); @@ -57,7 +58,8 @@ public ServingServiceV2 registryBasedServingServiceV2( new OnlineRetriever( applicationProperties.getFeast().getProject(), redisClient, - new EntityKeySerializerV2()); + new EntityKeySerializerV2( + applicationProperties.getFeast().getEntityKeySerializationVersion())); break; default: throw new RuntimeException( diff --git a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java index 3e9ab7e8ab5..f99e5cbdb1e 100644 --- a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java +++ b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java @@ -30,6 +30,15 @@ // https://github.com/feast-dev/feast/blob/b1ccf8dd1535f721aee8bea937ee38feff80bec5/sdk/python/feast/infra/key_encoding_utils.py#L22 // and must be kept up to date with any changes in that logic. public class EntityKeySerializerV2 implements EntityKeySerializer { + private final int entityKeySerializationVersion; + + public EntityKeySerializerV2() { + this(1); + } + + public EntityKeySerializerV2(int entityKeySerializationVersion) { + this.entityKeySerializationVersion = entityKeySerializationVersion; + } @Override public byte[] serialize(RedisProto.RedisKeyV2 entityKey) { @@ -83,7 +92,11 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) { we use `struct.pack(" encodeInteger(Integer value) { return Arrays.asList(ArrayUtils.toObject(buffer.array())); } + private List encodeLong(Long value) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putLong(value); + + return Arrays.asList(ArrayUtils.toObject(buffer.array())); + } + private List encodeString(String value) { byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8); return encodeBytes(stringBytes); diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index 8333610473b..62b6b72724e 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -6,7 +6,9 @@ from feast.protos.feast.types.Value_pb2 import ValueType -def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: +def _serialize_val( + value_type, v: ValueProto, entity_key_serialization_version=1 +) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING elif value_type == "bytes_val": @@ -14,14 +16,16 @@ def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: elif value_type == "int32_val": return struct.pack(" bytes: """ - Serialize keys to a bytestring so it can be used to prefix-scan through items stored in the online store + Serialize keys to a bytestring, so it can be used to prefix-scan through items stored in the online store using serialize_entity_key. This encoding is a partial implementation of serialize_entity_key, only operating on the keys of entities, @@ -35,7 +39,9 @@ def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes: return b"".join(output) -def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: +def serialize_entity_key( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. @@ -54,7 +60,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: output.append(struct.pack(" OnlineStore: return online_store_class() -def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes: - key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")] +def _redis_key( + project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: + key: List[bytes] = [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ), + project.encode("utf-8"), + ] return b"".join(key) @@ -40,10 +48,17 @@ def _mmh3(key: str): return bytes.fromhex(struct.pack(" str: +def compute_entity_id( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> str: """ Compute Entity id given Feast Entity Key for online stores. Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys. It has nothing to do with the Entity concept we have in Feast. """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + return mmh3.hash_bytes( + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + ).hex() diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 60fa9265ca7..da458a3693c 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -199,7 +199,11 @@ def online_write_batch( # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting # it may be significantly slower but avoids potential (rare) race conditions for entity_key, _, _, _ in data: - redis_key_bin = _redis_key(project, entity_key) + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) keys.append(redis_key_bin) pipe.hmget(redis_key_bin, ts_key) prev_event_timestamps = pipe.execute() @@ -268,7 +272,11 @@ def online_read( keys = [] for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) keys.append(redis_key_bin) with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 2f0e9029426..d44f8a742e2 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -95,7 +95,10 @@ def online_write_batch( with conn: for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) timestamp = to_naive_utc(timestamp) if created_ts is not None: created_ts = to_naive_utc(created_ts) @@ -153,7 +156,13 @@ def online_read( f"FROM {_table_id(config.project, table)} " f"WHERE entity_key IN ({','.join('?' * len(entity_keys))}) " f"ORDER BY entity_key", - [serialize_entity_key(entity_key) for entity_key in entity_keys], + [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for entity_key in entity_keys + ], ) rows = cur.fetchall() @@ -161,7 +170,10 @@ def online_read( k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0]) } for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) res = {} res_ts = None for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []): diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index 78a39caed8b..4816a60087c 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -167,13 +167,16 @@ def main(): table = connection.table("test_hbase_driver_hourly_stats") row_keys = [ serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]), + entity_key_serialization_version=2, ).hex(), serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]), + entity_key_serialization_version=2, ).hex(), serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]), + entity_key_serialization_version=2, ).hex(), ] rows = table.rows(row_keys) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index bad4edba810..9a73f46f7f5 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -341,6 +341,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): sources.append( RequestSource.from_proto(on_demand_source.request_data_source) ) + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index f72fd717d23..71fff49f2d6 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -1177,6 +1177,7 @@ def apply_feature_view( else: del existing_feature_views_of_same_type[idx] break + existing_feature_views_of_same_type.append(feature_view_proto) if commit: self.commit() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index f7f564df6ff..0823b93ba6f 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,5 +1,6 @@ import logging import os +import warnings from pathlib import Path from typing import Any @@ -25,6 +26,8 @@ from feast.importer import import_class from feast.usage import log_exceptions +warnings.simplefilter("once", RuntimeWarning) + _logger = logging.getLogger(__name__) # These dict exists so that: @@ -35,7 +38,6 @@ "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", } - ONLINE_STORE_CLASS_FOR_TYPE = { "sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore", "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", @@ -139,6 +141,17 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False + entity_key_serialization_version: StrictInt = 1 + """ Entity key serialization version: This version is used to control what serialization scheme is + used when writing data to the online store. + A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. + A value of 2 uses a newer serialization scheme, supported as of Feast 0.23. + The main difference between the two scheme is that the serialization scheme v1 stored `long` values as `int`s, + which would result in errors trying to serialize a range of values. + v2 fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read + feature values for entities that have already been written into the online store. + """ + def __init__(self, **data: Any): super().__init__(**data) @@ -178,6 +191,16 @@ def __init__(self, **data: Any): self.feature_server["type"] )(**self.feature_server) + if self.entity_key_serialization_version <= 1: + warnings.warn( + "`entity_key_serialization_version` is either not specified in the feature_store.yaml, " + "or is specified to a value <= 1." + "This serialization version may cause errors when trying to write fields with the `Long` data type" + " into the online store. Specifying `entity_key_serialization_version` to 2 is recommended for" + " new projects. ", + RuntimeWarning, + ) + def get_registry_config(self): if isinstance(self.registry, str): return RegistryConfig(path=self.registry) diff --git a/sdk/python/feast/templates/aws/feature_store.yaml b/sdk/python/feast/templates/aws/feature_store.yaml index 27d1c6879f8..3745a753477 100644 --- a/sdk/python/feast/templates/aws/feature_store.yaml +++ b/sdk/python/feast/templates/aws/feature_store.yaml @@ -12,3 +12,4 @@ offline_store: user: %REDSHIFT_USER% s3_staging_location: %REDSHIFT_S3_STAGING_LOCATION% iam_role: %REDSHIFT_IAM_ROLE% +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/gcp/feature_store.yaml b/sdk/python/feast/templates/gcp/feature_store.yaml index 14c8d5a94fc..74ee7290900 100644 --- a/sdk/python/feast/templates/gcp/feature_store.yaml +++ b/sdk/python/feast/templates/gcp/feature_store.yaml @@ -1,3 +1,4 @@ project: my_project registry: data/registry.db -provider: gcp \ No newline at end of file +provider: gcp +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/hbase/feature_store.yaml b/sdk/python/feast/templates/hbase/feature_store.yaml index 83ce237b715..f99e858f7c4 100644 --- a/sdk/python/feast/templates/hbase/feature_store.yaml +++ b/sdk/python/feast/templates/hbase/feature_store.yaml @@ -5,3 +5,4 @@ online_store: type: hbase host: 127.0.0.1 port: 9090 +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/local/feature_store.yaml b/sdk/python/feast/templates/local/feature_store.yaml index dcbe32d943f..fddde04f90d 100644 --- a/sdk/python/feast/templates/local/feature_store.yaml +++ b/sdk/python/feast/templates/local/feature_store.yaml @@ -2,4 +2,5 @@ project: my_project registry: data/registry.db provider: local online_store: - path: data/online_store.db \ No newline at end of file + path: data/online_store.db +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/minimal/feature_store.yaml b/sdk/python/feast/templates/minimal/feature_store.yaml index 2083288ad76..98086900055 100644 --- a/sdk/python/feast/templates/minimal/feature_store.yaml +++ b/sdk/python/feast/templates/minimal/feature_store.yaml @@ -2,4 +2,5 @@ project: my_project registry: /path/to/registry.db provider: local online_store: - path: /path/to/online_store.db \ No newline at end of file + path: /path/to/online_store.db +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/postgres/feature_store.yaml b/sdk/python/feast/templates/postgres/feature_store.yaml index 53b86b70641..0ccd4a6d499 100644 --- a/sdk/python/feast/templates/postgres/feature_store.yaml +++ b/sdk/python/feast/templates/postgres/feature_store.yaml @@ -25,3 +25,4 @@ offline_store: db_schema: DB_SCHEMA user: DB_USERNAME password: DB_PASSWORD +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/snowflake/feature_store.yaml b/sdk/python/feast/templates/snowflake/feature_store.yaml index 9757ea2ead0..948869897bd 100644 --- a/sdk/python/feast/templates/snowflake/feature_store.yaml +++ b/sdk/python/feast/templates/snowflake/feature_store.yaml @@ -9,3 +9,4 @@ offline_store: role: SNOWFLAKE_ROLE warehouse: SNOWFLAKE_WAREHOUSE database: SNOWFLAKE_DATABASE +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/spark/feature_store.yaml b/sdk/python/feast/templates/spark/feature_store.yaml index 2ea0ddfcc9f..91e3ecf4724 100644 --- a/sdk/python/feast/templates/spark/feature_store.yaml +++ b/sdk/python/feast/templates/spark/feature_store.yaml @@ -12,3 +12,4 @@ offline_store: spark.sql.session.timeZone: "UTC" online_store: path: data/online_store.db +entity_key_serialization_version: 2 diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index c5b66e7ddce..73c566847d6 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -51,7 +51,7 @@ def _assert_online_features( .values[0] .float_val > 0 - ) + ), response.to_dict() result = response.to_dict() assert len(result) == 5 diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 25eb061930b..07e22017b54 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -311,7 +311,7 @@ def test_write_batch_non_duplicates(repo_config, dynamodb_online_store): table_instance = dynamodb_resource.Table(f"{PROJECT}.{dynamodb_tbl}") # Insert duplicate data dynamodb_online_store._write_batch_non_duplicates( - table_instance, data + data_duplicate, progress=None + table_instance, data + data_duplicate, None, repo_config ) # Request more items than inserted response = table_instance.scan(Limit=20) diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py new file mode 100644 index 00000000000..449d6819a12 --- /dev/null +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -0,0 +1,30 @@ +import pytest + +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto + + +def test_serialize_entity_key(): + # Should be fine + serialize_entity_key( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + ), + entity_key_serialization_version=2, + ) + # True int64, but should also be fine. + serialize_entity_key( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] + ), + entity_key_serialization_version=2, + ) + + # Old serialization scheme, should fail. + with pytest.raises(BaseException): + serialize_entity_key( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] + ), + ) diff --git a/sdk/python/tests/unit/test_serialization_version.py b/sdk/python/tests/unit/test_serialization_version.py new file mode 100644 index 00000000000..00562e4000a --- /dev/null +++ b/sdk/python/tests/unit/test_serialization_version.py @@ -0,0 +1,17 @@ +import tempfile + +from assertpy import assertpy + +from feast import RepoConfig + + +def test_registry_entity_serialization_version(): + with tempfile.TemporaryDirectory() as tmpdir: + r = RepoConfig( + project="prompt_dory", + provider="local", + online_store="redis", + registry=f"{tmpdir}/registry.db", + entity_key_serialization_version=2, + ) + assertpy.assert_that(r.entity_key_serialization_version).is_equal_to(2) diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index ee6ea138fbe..a038b858406 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -84,13 +84,11 @@ def local_repo(self, example_repo_py: str, offline_store: str): repo_example.write_text(example_repo_py) result = self.run(["apply"], cwd=repo_path) - assert ( - result.returncode == 0 - ), f"stdout: {result.stdout}\n stderr: {result.stderr}" + print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + assert result.returncode == 0 yield FeatureStore(repo_path=str(repo_path), config=None) result = self.run(["teardown"], cwd=repo_path) - assert ( - result.returncode == 0 - ), f"stdout: {result.stdout}\n stderr: {result.stderr}" + print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + assert result.returncode == 0 diff --git a/sdk/python/tests/utils/online_store_utils.py b/sdk/python/tests/utils/online_store_utils.py index f72b4d5a2a3..9cd76638695 100644 --- a/sdk/python/tests/utils/online_store_utils.py +++ b/sdk/python/tests/utils/online_store_utils.py @@ -45,7 +45,7 @@ def _insert_data_test_table(data, project, tbl_name, region): dynamodb_resource = boto3.resource("dynamodb", region_name=region) table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}") for entity_key, features, timestamp, created_ts in data: - entity_id = compute_entity_id(entity_key) + entity_id = compute_entity_id(entity_key, entity_key_serialization_version=2) with table_instance.batch_writer() as batch: batch.put_item( Item={