diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 2122eef2213..edf4141b5be 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -85,6 +85,26 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): keepalive_timeout: float = 12.0 """Keep-alive timeout in seconds for async Dynamodb connections.""" + connect_timeout: Union[int, float] = 60 + """The time in seconds until a timeout exception is thrown when attempting to make + an async connection.""" + + read_timeout: Union[int, float] = 60 + """The time in seconds until a timeout exception is thrown when attempting to read + from an async connection.""" + + total_max_retry_attempts: Union[int, None] = None + """Maximum number of total attempts that will be made on a single request. + + Maps to `retries.total_max_attempts` in botocore.config.Config. + """ + + retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None + """The type of retry mode (aio)botocore should use. + + Maps to `retries.mode` in botocore.config.Config. + """ + class DynamoDBOnlineStore(OnlineStore): """ @@ -99,10 +119,16 @@ class DynamoDBOnlineStore(OnlineStore): _dynamodb_resource = None async def initialize(self, config: RepoConfig): + online_config = config.online_store + await _get_aiodynamodb_client( - config.online_store.region, - config.online_store.max_pool_connections, - config.online_store.keepalive_timeout, + online_config.region, + online_config.max_pool_connections, + online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) async def close(self): @@ -280,6 +306,10 @@ async def online_write_batch_async( online_config.region, online_config.max_pool_connections, online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) await dynamo_write_items_async(client, table_name, items) @@ -387,6 +417,10 @@ def to_tbl_resp(raw_client_response): online_config.region, online_config.max_pool_connections, online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) response_batches = await asyncio.gather( *[ @@ -546,16 +580,32 @@ def _get_aioboto_session(): async def _get_aiodynamodb_client( - region: str, max_pool_connections: int, keepalive_timeout: float + region: str, + max_pool_connections: int, + keepalive_timeout: float, + connect_timeout: Union[int, float], + read_timeout: Union[int, float], + total_max_retry_attempts: Union[int, None], + retry_mode: Union[Literal["legacy", "standard", "adaptive"], None], ): global _aioboto_client if _aioboto_client is None: logger.debug("initializing the aiobotocore dynamodb client") + + retries: Dict[str, Any] = {} + if total_max_retry_attempts is not None: + retries["total_max_attempts"] = total_max_retry_attempts + if retry_mode is not None: + retries["mode"] = retry_mode + client_context = _get_aioboto_session().create_client( "dynamodb", region_name=region, config=AioConfig( max_pool_connections=max_pool_connections, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries=retries if retries else None, connector_args={"keepalive_timeout": keepalive_timeout}, ), )