From b5e58bf9b0ecf94da7a3422f8fddc1a67b74ba31 Mon Sep 17 00:00:00 2001 From: Chimey Rock Date: Tue, 28 Oct 2025 11:24:53 +0700 Subject: [PATCH 1/2] feat: Support batch_write for MySQL OnlineStore Signed-off-by: Chimey Rock docs: update document for MySQL Online Store to support batch write Signed-off-by: Chimey Rock --- .../mysql_online_store/README.md | 3 + .../online_stores/mysql_online_store/mysql.py | 118 ++++++++++++++---- .../mysql_repo_configuration.py | 2 + .../universal/online_store/mysql.py | 28 +++++ 4 files changed, 127 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/README.md b/sdk/python/feast/infra/online_stores/mysql_online_store/README.md index ac38237cd11..7b6e97091d9 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/README.md @@ -25,6 +25,9 @@ online_store: user: test # mysql user, default to test password: test # mysql password, default to test database: feast # mysql database, default to feast + batch_write: false # supporting batch write and commit per batch + batch_size: 100 # batch size, default to 100 + ``` #### Apply the feature definitions in `example.py` diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index d44eddfbd0b..2172f3aa359 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -30,6 +30,8 @@ class MySQLOnlineStoreConfig(FeastConfigBaseModel): password: Optional[StrictStr] = None database: Optional[StrictStr] = None port: Optional[int] = None + batch_write: Optional[bool] = False + batch_size: Optional[int] = None class MySQLOnlineStore(OnlineStore): @@ -51,7 +53,7 @@ def _get_conn(self, config: RepoConfig) -> Connection: password=online_store_config.password or "test", database=online_store_config.database or "feast", port=online_store_config.port or 3306, - autocommit=True, + autocommit=(not online_store_config.batch_write), ) return self._conn @@ -69,29 +71,97 @@ def online_write_batch( project = config.project - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key( - entity_key, - entity_key_serialization_version=3, - ).hex() - timestamp = to_naive_utc(timestamp) - if created_ts is not None: - created_ts = to_naive_utc(created_ts) - - for feature_name, val in values.items(): - self.write_to_table( - created_ts, - cur, - entity_key_bin, - feature_name, - project, - table, - timestamp, - val, - ) - conn.commit() - if progress: - progress(1) + batch_write = config.online_store.batch_write + if not batch_write: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=3, + ).hex() + timestamp = to_naive_utc(timestamp) + if created_ts is not None: + created_ts = to_naive_utc(created_ts) + + for feature_name, val in values.items(): + self.write_to_table( + created_ts, + cur, + entity_key_bin, + feature_name, + project, + table, + timestamp, + val, + ) + conn.commit() + if progress: + progress(1) + else: + batch_size = config.online_store.bacth_size + if not batch_size or batch_size < 2: + raise ValueError("Batch size must be at least 2") + insert_values = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + timestamp = to_naive_utc(timestamp) + if created_ts is not None: + created_ts = to_naive_utc(created_ts) + + for feature_name, val in values.items(): + serialized_val = val.SerializeToString() + insert_values.append( + ( + entity_key_bin, + feature_name, + serialized_val, + timestamp, + created_ts, + ) + ) + + if len(insert_values) >= batch_size: + try: + self._execute_batch(cur, project, table, insert_values) + conn.commit() + if progress: + progress(len(insert_values)) + except Exception as e: + conn.rollback() + raise e + insert_values.clear() + + if insert_values: + try: + self._execute_batch(cur, project, table, insert_values) + conn.commit() + if progress: + progress(len(insert_values)) + except Exception as e: + conn.rollback() + raise e + + def _execute_batch(self, cur, project, table, insert_values): + sql = f""" + INSERT INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + values (%s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + value = VALUES(value), + event_ts = VALUES(event_ts), + created_ts = VALUES(created_ts); + """ + try: + cur.executemany(sql, insert_values) + except Exception as e: + # Log SQL info for debugging without leaking sensitive data + first_sample = insert_values[0] if insert_values else None + raise RuntimeError( + f"Failed to execute batch insert into table '{_table_id(project, table)}' " + f"(rows={len(insert_values)}, sample={first_sample}): {e}" + ) from e @staticmethod def write_to_table( diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql_repo_configuration.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql_repo_configuration.py index 3e92ead2d0b..e5a1c0114c3 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql_repo_configuration.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql_repo_configuration.py @@ -2,9 +2,11 @@ IntegrationTestRepoConfig, ) from tests.integration.feature_repos.universal.online_store.mysql import ( + BatchWriteMySQLOnlineStoreCreator, MySQLOnlineStoreCreator, ) FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator), + IntegrationTestRepoConfig(online_store_creator=BatchWriteMySQLOnlineStoreCreator), ] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py index 093295c86ba..c0ba91d15a4 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py @@ -31,3 +31,31 @@ def create_online_store(self) -> Dict[str, str]: def teardown(self): self.container.stop() + + +class BatchWriteMySQLOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ( + MySqlContainer("mysql:latest", platform="linux/amd64") + .with_exposed_ports(3306) + .with_env("MYSQL_USER", "root") + .with_env("MYSQL_PASSWORD", "test") + .with_env("MYSQL_DATABASE", "test") + ) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + exposed_port = self.container.get_exposed_port(3306) + return { + "type": "mysql", + "user": "root", + "password": "test", + "database": "test", + "port": exposed_port, + "batch_write": "True", + "bacth_size": "1000", + } + + def teardown(self): + self.container.stop() From 20a4d194c85ffb74b6a6a15cb45d206671efd661 Mon Sep 17 00:00:00 2001 From: Chimey Rock Date: Fri, 31 Oct 2025 01:17:25 +0700 Subject: [PATCH 2/2] docs: Add batch_write for MySQL Online Store Signed-off-by: Chimey Rock --- docs/reference/online-stores/mysql.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/reference/online-stores/mysql.md b/docs/reference/online-stores/mysql.md index 8868e64279d..2f9650f8916 100644 --- a/docs/reference/online-stores/mysql.md +++ b/docs/reference/online-stores/mysql.md @@ -28,6 +28,28 @@ online_store: The full set of configuration options is available in [MySQLOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.mysql_online_store.MySQLOnlineStoreConfig). +## Batch write mode +By default, the MySQL online store performs row-by-row insert and commit for each feature record. While this ensures per-record atomicity, it can lead to significant overhead on write operations — especially on distributed SQL databases (for example, TiDB, which is MySQL-compatible and uses a consensus protocol). + +To improve writing performance, you can enable batch write mode by setting `batch_write` to `true` and `batch_size`, which executes multiple insert queries in batches and commits them together per batch instead of committing each record individually. + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: mysql + host: DB_HOST + port: DB_PORT + database: DB_NAME + user: DB_USERNAME + password: DB_PASSWORD + batch_write: true + batch_size: 100 +``` +{% endcode %} + ## Functionality Matrix The set of functionality supported by online stores is described in detail [here](overview.md#functionality).