diff --git a/Makefile b/Makefile index 990604da575..146b4373aae 100644 --- a/Makefile +++ b/Makefile @@ -231,6 +231,27 @@ test-python-universal-postgres-online: not test_snowflake" \ sdk/python/tests + test-python-universal-mysql-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.mysql_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.mysql \ + FEAST_USAGE=False \ + IS_TEST=True \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal-cassandra: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 430f9a745d7..2b58fb277e3 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -94,6 +94,7 @@ * [DynamoDB](reference/online-stores/dynamodb.md) * [PostgreSQL (contrib)](reference/online-stores/postgres.md) * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) + * [MySQL (contrib)](reference/online-stores/mysql.md) * [Providers](reference/providers/README.md) * [Local](reference/providers/local.md) * [Google Cloud Platform](reference/providers/google-cloud-platform.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 58c8705e9ee..2ade4f0de9b 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -34,3 +34,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli [cassandra.md](cassandra.md) {% endcontent-ref %} +{% content-ref url="mysql.md" %} +[mysql.md](mysql.md) +{% endcontent-ref %} + diff --git a/docs/reference/online-stores/mysql.md b/docs/reference/online-stores/mysql.md new file mode 100644 index 00000000000..cb90c7561c5 --- /dev/null +++ b/docs/reference/online-stores/mysql.md @@ -0,0 +1,53 @@ +# MySQL online store (contrib) + +## Description + +The MySQL online store provides support for materializing feature values into a MySQL database for serving online features. + +* Only the latest feature values are persisted + + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: mysql + host: DB_HOST + port: DB_PORT + database: DB_NAME + user: DB_USERNAME + password: DB_PASSWORD +``` +{% endcode %} + +The full set of configuration options is available in [MySQLOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.contrib.mysql.MySQLOnlineStoreConfig). + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Mys online store. + +| | Mys | +| :-------------------------------------------------------- | :-- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | no | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.mysql_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.mysql_online_store.rst new file mode 100644 index 00000000000..036922d658f --- /dev/null +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.mysql_online_store.rst @@ -0,0 +1,21 @@ +feast.infra.online\_stores.contrib.mysql\_online\_store package +=============================================================== + +Submodules +---------- + +feast.infra.online\_stores.contrib.mysql\_online\_store.mysql module +-------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.mysql_online_store.mysql + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.online_stores.contrib.mysql_online_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index 6afe9071ace..6b175f4584f 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -9,6 +9,7 @@ Subpackages feast.infra.online_stores.contrib.cassandra_online_store feast.infra.online_stores.contrib.hbase_online_store + feast.infra.online_stores.contrib.mysql_online_store Submodules ---------- @@ -29,6 +30,14 @@ feast.infra.online\_stores.contrib.hbase\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.mysql\_repo\_configuration module +-------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.mysql_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.contrib.postgres module -------------------------------------------------- diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/README.md new file mode 100644 index 00000000000..ac38237cd11 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/README.md @@ -0,0 +1,81 @@ +# Mysql Online Store +Mysql is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add Mysql support for Online Store. +We create a table _ which gets updated with data on every materialize call + + +#### Create a feature repository + +```shell +feast init feature_repo +cd feature_repo +``` + +#### Edit `feature_store.yaml` + +set `online_store` type to be `mysql` + +```yaml +project: feature_repo +registry: data/registry.db +provider: local +online_store: + type: mysql + host: 127.0.0.1 # mysql endpoint, default to 127.0.0.1 + port: 3306 # mysql port, default to 3306 + user: test # mysql user, default to test + password: test # mysql password, default to test + database: feast # mysql database, default to feast +``` + +#### Apply the feature definitions in `example.py` + +```shell +feast -c feature_repo apply +``` +##### Output +``` +Registered entity driver_id +Registered feature view driver_hourly_stats_view +Deploying infrastructure for driver_hourly_stats_view +``` + +### Materialize Latest Data to Online Feature Store (Mysql) +``` +$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +$ feast -c feature_repo materialize-incremental $CURRENT_TIME +``` +#### Output +``` +Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the mysql online store. + +driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30: +100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s] +``` + +### Fetch the latest features for some entity id +```python +from pprint import pprint +from feast import FeatureStore + +store = FeatureStore(repo_path=".") +feature_vector = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[ + {"driver_id": 1004}, + {"driver_id": 1005}, + ], +).to_dict() +pprint(feature_vector) + +``` +#### Output +``` +{'acc_rate': [0.01390857808291912, 0.4063614010810852], + 'avg_daily_trips': [69, 706], + 'conv_rate': [0.6624961495399475, 0.7595928311347961], + 'driver_id': [1004, 1005]} +``` diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py new file mode 100644 index 00000000000..fa7dd2c2a49 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py @@ -0,0 +1,225 @@ +from __future__ import absolute_import + +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +import pymysql +import pytz +from pydantic import StrictStr +from pymysql.connections import Connection +from pymysql.cursors import Cursor + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + + +class MySQLOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the MySQL online store. + NOTE: The class *must* end with the `OnlineStoreConfig` suffix. + """ + + type = "mysql" + + host: Optional[StrictStr] = None + user: Optional[StrictStr] = None + password: Optional[StrictStr] = None + database: Optional[StrictStr] = None + port: Optional[int] = None + + +class MySQLOnlineStore(OnlineStore): + """ + An online store implementation that uses MySQL. + NOTE: The class *must* end with the `OnlineStore` suffix. + """ + + _conn: Optional[Connection] = None + + def _get_conn(self, config: RepoConfig) -> Connection: + + online_store_config = config.online_store + assert isinstance(online_store_config, MySQLOnlineStoreConfig) + + if not self._conn: + self._conn = pymysql.connect( + host=online_store_config.host or "127.0.0.1", + user=online_store_config.user or "test", + password=online_store_config.password or "test", + database=online_store_config.database or "feast", + port=online_store_config.port or 3306, + autocommit=True, + ) + return self._conn + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + + conn = self._get_conn(config) + cur = conn.cursor() + + project = config.project + + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + + for feature_name, val in values.items(): + self.write_to_table( + created_ts, + cur, + entity_key_bin, + feature_name, + project, + table, + timestamp, + val, + ) + conn.commit() + if progress: + progress(1) + + @staticmethod + def write_to_table( + created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val + ) -> None: + cur.execute( + f""" + INSERT INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + values (%s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + value = %s, + event_ts = %s, + created_ts = %s; + """, + ( + # Insert + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + # Update on duplicate key + val.SerializeToString(), + timestamp, + created_ts, + ), + ) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + conn = self._get_conn(config) + cur = conn.cursor() + + result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] + + project = config.project + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + + cur.execute( + f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s", + (entity_key_bin,), + ) + + res = {} + res_ts: Optional[datetime] = None + records = cur.fetchall() + if records: + for feature_name, val_bin, ts in records: + val = ValueProto() + val.ParseFromString(val_bin) + res[feature_name] = val + res_ts = ts + + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ) -> None: + conn = self._get_conn(config) + cur = conn.cursor() + project = config.project + + # We don't create any special state for the entities in this implementation. + for table in tables_to_keep: + cur.execute( + f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + feature_name VARCHAR(256), + value BLOB, + event_ts timestamp NULL DEFAULT NULL, + created_ts timestamp NULL DEFAULT NULL, + PRIMARY KEY(entity_key, feature_name))""" + ) + + cur.execute( + f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);" + ) + + for table in tables_to_delete: + _drop_table_and_index(cur, project, table) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ) -> None: + conn = self._get_conn(config) + cur = conn.cursor() + project = config.project + + for table in tables: + _drop_table_and_index(cur, project, table) + + +def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: + table_name = _table_id(project, table) + cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") + cur.execute(f"DROP TABLE IF EXISTS {table_name}") + + +def _table_id(project: str, table: FeatureView) -> str: + return f"{project}_{table.name}" + + +def _to_naive_utc(ts: datetime) -> datetime: + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/mysql_repo_configuration.py new file mode 100644 index 00000000000..3e92ead2d0b --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_repo_configuration.py @@ -0,0 +1,10 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.mysql import ( + MySQLOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 47a5ae321d9..65d59a8eb59 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -50,6 +50,7 @@ "postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore", "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", "cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", + "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 9d10b2c3132..02d11ab5cd0 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -557,6 +557,8 @@ pyjwt[crypto]==2.4.0 # snowflake-connector-python pymssql==2.2.5 # via feast (setup.py) +pymysql==1.0.2 + # via feast (setup.py) pyodbc==4.0.34 # via feast (setup.py) pyopenssl==22.0.0 @@ -761,6 +763,8 @@ types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf +types-pymysql==1.0.19 + # via feast (setup.py) types-python-dateutil==2.8.19 # via feast (setup.py) types-pytz==2022.2.1.0 diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 93011cfdcf4..dc891475598 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -565,6 +565,8 @@ pyjwt[crypto]==2.4.0 # snowflake-connector-python pymssql==2.2.5 # via feast (setup.py) +pymysql==1.0.2 + # via feast (setup.py) pyodbc==4.0.34 # via feast (setup.py) pyopenssl==22.0.0 @@ -771,6 +773,8 @@ types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf +types-pymysql==1.0.19 + # via feast (setup.py) types-python-dateutil==2.8.19 # via feast (setup.py) types-pytz==2022.2.1.0 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index e13eee056bc..b7d7c3c06e8 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -557,6 +557,8 @@ pyjwt[crypto]==2.4.0 # snowflake-connector-python pymssql==2.2.5 # via feast (setup.py) +pymysql==1.0.2 + # via feast (setup.py) pyodbc==4.0.34 # via feast (setup.py) pyopenssl==22.0.0 @@ -763,6 +765,8 @@ types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf +types-pymysql==1.0.19 + # via feast (setup.py) types-python-dateutil==2.8.19 # via feast (setup.py) types-pytz==2022.2.1.0 diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py new file mode 100644 index 00000000000..093295c86ba --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py @@ -0,0 +1,33 @@ +from typing import Dict + +from testcontainers.mysql import MySqlContainer + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class MySQLOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ( + MySqlContainer("mysql:latest", platform="linux/amd64") + .with_exposed_ports(3306) + .with_env("MYSQL_USER", "root") + .with_env("MYSQL_PASSWORD", "test") + .with_env("MYSQL_DATABASE", "test") + ) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + exposed_port = self.container.get_exposed_port(3306) + return { + "type": "mysql", + "user": "root", + "password": "test", + "database": "test", + "port": exposed_port, + } + + def teardown(self): + self.container.stop() diff --git a/setup.py b/setup.py index 37ed471cfa6..ea166472e1e 100644 --- a/setup.py +++ b/setup.py @@ -113,6 +113,8 @@ MYSQL_REQUIRED = [ "mysqlclient", + "pymysql", + "types-PyMySQL" ] HBASE_REQUIRED = [