From 3dc769ea0dd5b6221d45d6965b1b5dd689467524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Thu, 20 Mar 2025 14:22:07 +0100 Subject: [PATCH] Make keepalive_timeout configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- .../feast/infra/online_stores/dynamodb.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 15e8357754b..2122eef2213 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -82,6 +82,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): max_pool_connections: int = 10 """Max number of connections for async Dynamodb operations""" + keepalive_timeout: float = 12.0 + """Keep-alive timeout in seconds for async Dynamodb connections.""" + class DynamoDBOnlineStore(OnlineStore): """ @@ -97,7 +100,9 @@ class DynamoDBOnlineStore(OnlineStore): async def initialize(self, config: RepoConfig): await _get_aiodynamodb_client( - config.online_store.region, config.online_store.max_pool_connections + config.online_store.region, + config.online_store.max_pool_connections, + config.online_store.keepalive_timeout, ) async def close(self): @@ -272,7 +277,9 @@ async def online_write_batch_async( for entity_key, features, timestamp, _ in _latest_data_to_write(data) ] client = await _get_aiodynamodb_client( - online_config.region, config.online_store.max_pool_connections + online_config.region, + online_config.max_pool_connections, + online_config.keepalive_timeout, ) await dynamo_write_items_async(client, table_name, items) @@ -377,7 +384,9 @@ def to_tbl_resp(raw_client_response): entity_id_batches.append(entity_id_batch) client = await _get_aiodynamodb_client( - online_config.region, online_config.max_pool_connections + online_config.region, + online_config.max_pool_connections, + online_config.keepalive_timeout, ) response_batches = await asyncio.gather( *[ @@ -536,14 +545,19 @@ def _get_aioboto_session(): return _aioboto_session -async def _get_aiodynamodb_client(region: str, max_pool_connections: int): +async def _get_aiodynamodb_client( + region: str, max_pool_connections: int, keepalive_timeout: float +): global _aioboto_client if _aioboto_client is None: logger.debug("initializing the aiobotocore dynamodb client") client_context = _get_aioboto_session().create_client( "dynamodb", region_name=region, - config=AioConfig(max_pool_connections=max_pool_connections), + config=AioConfig( + max_pool_connections=max_pool_connections, + connector_args={"keepalive_timeout": keepalive_timeout}, + ), ) context_stack = contextlib.AsyncExitStack() _aioboto_client = await context_stack.enter_async_context(client_context)