From 4ab642ebf57625fa3c28154aebe5e2beaf549d1a Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 11:33:56 -0800 Subject: [PATCH 1/7] Updated changelog for beta release and backfilled last hotfix --- CHANGELOG.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce2b6a63b..eb1bf5f96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # Confluent Python Client for Apache Kafka - CHANGELOG +## 2.13.0b1 - (beta) 2025-11-24 + +This is a beta release, notes to change to full release later on in changelog. Changes included are: + +- Enforced type hinting for all interaces +- Handle OAuth Token Refreshes +- Added black and isort linting rules and enforcement to codebase +- Fix support for wrapped Avro unions +- Enabled direct creation of Message objects +- Added `close()` method to producer +- Added context manager for librdkafka classes to enable easy scope cleanup +- Fixed segfault exceptions on calls against objects that had closed internal objects +- Handle evolution during field transformation of schemas +- Expose deterministic partitioner functions +- Handle null group name to prevent segfault in Admin `list_consumer_group_offsets()` +- Add Accept-Version header for schemas +- Ensure schemaId initialization is thread-safe +- Fix error propagation rule for Python's C API +- Fix SR delete behavior with client-side caching +- Enhanced the BufferTimeoutManager to flush the librdkafka queue + +## 2.12.2 - 2025-11-06 + +v2.12.2 is a hotfix for a critical problem found with Schema Registry clients in the 2.12.1 release: + +- Fix IndexOutOfBoundsException when evolving Avro schema that uses rules + ## v2.12.1 - 2025-10-21 v2.12.1 is a maintenance release with the following fixes: From fb6a518be3b6cc22ba79377cc187c37a283d7437 Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 12:30:06 -0800 Subject: [PATCH 2/7] Moved experimental module up a level --- examples/asyncio_avro_producer.py | 2 +- examples/asyncio_example.py | 2 +- src/confluent_kafka/__init__.py | 2 +- src/confluent_kafka/experimental/__init__.py | 20 -- .../experimental/aio/_AIOConsumer.py | 169 --------- .../experimental/aio/__init__.py | 18 - .../experimental/aio/_common.py | 87 ----- .../experimental/aio/producer/_AIOProducer.py | 298 ---------------- .../experimental/aio/producer/__init__.py | 23 -- .../aio/producer/_buffer_timeout_manager.py | 143 -------- .../aio/producer/_kafka_batch_executor.py | 141 -------- .../aio/producer/_message_batch.py | 70 ---- .../aio/producer/_producer_batch_processor.py | 332 ------------------ tests/ducktape/consumer_strategy.py | 2 +- tests/ducktape/producer_strategy.py | 2 +- tests/test_AIOConsumer.py | 6 +- tests/test_AIOProducer.py | 6 +- tests/test_buffer_timeout_manager.py | 2 +- tests/test_kafka_batch_executor.py | 2 +- tests/test_producer_batch_processor.py | 10 +- 20 files changed, 18 insertions(+), 1319 deletions(-) delete mode 100644 src/confluent_kafka/experimental/__init__.py delete mode 100644 src/confluent_kafka/experimental/aio/_AIOConsumer.py delete mode 100644 src/confluent_kafka/experimental/aio/__init__.py delete mode 100644 src/confluent_kafka/experimental/aio/_common.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/_AIOProducer.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/__init__.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/_message_batch.py delete mode 100644 src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py diff --git a/examples/asyncio_avro_producer.py b/examples/asyncio_avro_producer.py index 0aeb20f77..e779d3170 100644 --- a/examples/asyncio_avro_producer.py +++ b/examples/asyncio_avro_producer.py @@ -20,7 +20,7 @@ import argparse import asyncio -from confluent_kafka.experimental.aio import AIOProducer +from confluent_kafka.aio import AIOProducer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer from confluent_kafka.serialization import MessageField, SerializationContext diff --git a/examples/asyncio_example.py b/examples/asyncio_example.py index 0a16a745d..470a2c8ed 100644 --- a/examples/asyncio_example.py +++ b/examples/asyncio_example.py @@ -20,7 +20,7 @@ import signal import sys -from confluent_kafka.experimental.aio import AIOConsumer, AIOProducer +from confluent_kafka.aio import AIOConsumer, AIOProducer # This example demonstrates comprehensive AsyncIO usage patterns with Kafka: # - Event loop safe callbacks that don't block the loop diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 2157426cb..f88f27a37 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -54,7 +54,7 @@ __all__ = [ "admin", "Consumer", - "experimental", + "aio", "KafkaError", "KafkaException", "kafkatest", diff --git a/src/confluent_kafka/experimental/__init__.py b/src/confluent_kafka/experimental/__init__.py deleted file mode 100644 index ebe62837e..000000000 --- a/src/confluent_kafka/experimental/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Experimental APIs for confluent_kafka. - -These APIs are subject to change and may be removed or modified in -incompatible ways in future releases. -""" diff --git a/src/confluent_kafka/experimental/aio/_AIOConsumer.py b/src/confluent_kafka/experimental/aio/_AIOConsumer.py deleted file mode 100644 index 0ff986a62..000000000 --- a/src/confluent_kafka/experimental/aio/_AIOConsumer.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import concurrent.futures -from typing import Any, Callable, Dict, Optional, Tuple - -import confluent_kafka - -from . import _common as _common - - -class AIOConsumer: - def __init__( - self, - consumer_conf: Dict[str, Any], - max_workers: int = 2, - executor: Optional[concurrent.futures.Executor] = None, - ) -> None: - if executor is not None: - # Executor must have at least one worker. - # At least two workers are needed when calling re-entrant - # methods from callbacks. - self.executor = executor - else: - if max_workers < 1: - raise ValueError("max_workers must be at least 1") - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) - - loop = asyncio.get_event_loop() - wrap_common_callbacks = _common.wrap_common_callbacks - wrap_conf_callback = _common.wrap_conf_callback - wrap_common_callbacks(loop, consumer_conf) - wrap_conf_callback(loop, consumer_conf, 'on_commit') - - self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(consumer_conf) - - async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: - return await _common.async_call(self.executor, blocking_task, *args, **kwargs) - - def _wrap_callback( - self, - loop: asyncio.AbstractEventLoop, - callback: Callable[..., Any], - edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, - edit_kwargs: Optional[Callable[[Any], Any]] = None, - ) -> Callable[..., Any]: - def ret(*args: Any, **kwargs: Any) -> Any: - if edit_args: - args = edit_args(args) - if edit_kwargs: - kwargs = edit_kwargs(kwargs) - f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), loop) - return f.result() - - return ret - - async def poll(self, *args: Any, **kwargs: Any) -> Any: - """ - Polls for a single message from the subscribed topics. - - Performance Note: - For high-throughput applications, prefer consume() over poll(): - consume() can retrieve multiple messages per call and amortize the - async overhead across the entire batch. - - On the other hand, poll() retrieves one message per call, which means - the ThreadPoolExecutor overhead is applied to each individual message. - This can result in lower throughput compared to the synchronous - consumer.poll() due to the async coordination overhead not being - amortized. - - """ - return await self._call(self._consumer.poll, *args, **kwargs) - - async def consume(self, *args: Any, **kwargs: Any) -> Any: - """ - Consumes a batch of messages from the subscribed topics. - - Performance Note: - This method is recommended for high-throughput applications. - - By retrieving multiple messages per ThreadPoolExecutor call, the async - coordination overhead is shared across all messages in the batch, - resulting in much better throughput compared to repeated poll() calls. - """ - return await self._call(self._consumer.consume, *args, **kwargs) - - def _edit_rebalance_callbacks_args(self, args: Tuple[Any, ...]) -> Tuple[Any, ...]: - args_list = list(args) - args_list[0] = self - return tuple(args_list) - - async def subscribe(self, *args: Any, **kwargs: Any) -> Any: - loop = asyncio.get_event_loop() - for callback in ['on_assign', 'on_revoke', 'on_lost']: - if callback in kwargs: - kwargs[callback] = self._wrap_callback( - loop, kwargs[callback], self._edit_rebalance_callbacks_args - ) # noqa: E501 - return await self._call(self._consumer.subscribe, *args, **kwargs) - - async def unsubscribe(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.unsubscribe, *args, **kwargs) - - async def commit(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.commit, *args, **kwargs) - - async def close(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.close, *args, **kwargs) - - async def seek(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.seek, *args, **kwargs) - - async def pause(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.pause, *args, **kwargs) - - async def resume(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.resume, *args, **kwargs) - - async def store_offsets(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.store_offsets, *args, **kwargs) - - async def committed(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.committed, *args, **kwargs) - - async def assign(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.assign, *args, **kwargs) - - async def unassign(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.unassign, *args, **kwargs) - - async def incremental_assign(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.incremental_assign, *args, **kwargs) - - async def incremental_unassign(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.incremental_unassign, *args, **kwargs) - - async def assignment(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.assignment, *args, **kwargs) - - async def position(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.position, *args, **kwargs) - - async def consumer_group_metadata(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.consumer_group_metadata, *args, **kwargs) - - async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.set_sasl_credentials, *args, **kwargs) - - async def list_topics(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.list_topics, *args, **kwargs) - - async def get_watermark_offsets(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.get_watermark_offsets, *args, **kwargs) - - async def offsets_for_times(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._consumer.offsets_for_times, *args, **kwargs) diff --git a/src/confluent_kafka/experimental/aio/__init__.py b/src/confluent_kafka/experimental/aio/__init__.py deleted file mode 100644 index 098d0b922..000000000 --- a/src/confluent_kafka/experimental/aio/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._AIOConsumer import AIOConsumer -from .producer import AIOProducer - -__all__ = ['AIOConsumer', 'AIOProducer'] diff --git a/src/confluent_kafka/experimental/aio/_common.py b/src/confluent_kafka/experimental/aio/_common.py deleted file mode 100644 index 128d7b0f9..000000000 --- a/src/confluent_kafka/experimental/aio/_common.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import concurrent.futures -import functools -import logging -from typing import Any, Callable, Dict, Optional, Tuple, TypeVar - -T = TypeVar('T') - - -class AsyncLogger: - - def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger) -> None: - self.loop = loop - self.logger = logger - - def log(self, *args: Any, **kwargs: Any) -> None: - self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) - - -def wrap_callback( - loop: asyncio.AbstractEventLoop, - callback: Callable[..., Any], - edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, - edit_kwargs: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, -) -> Callable[..., Any]: - def ret(*args: Any, **kwargs: Any) -> Any: - if edit_args: - args = edit_args(args) - if edit_kwargs: - kwargs = edit_kwargs(kwargs) - f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), loop) - return f.result() - - return ret - - -def wrap_conf_callback(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any], name: str) -> None: - if name in conf: - cb = conf[name] - conf[name] = wrap_callback(loop, cb) - - -def wrap_conf_logger(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any]) -> None: - if 'logger' in conf: - conf['logger'] = AsyncLogger(loop, conf['logger']) - - -async def async_call( - executor: concurrent.futures.Executor, blocking_task: Callable[..., T], *args: Any, **kwargs: Any -) -> T: - """Helper function for blocking operations that need ThreadPool execution - - Args: - executor: ThreadPoolExecutor to use for blocking operations - blocking_task: The blocking function to execute - *args, **kwargs: Arguments to pass to the blocking function - - Returns: - Result of the blocking function execution - """ - return ( - await asyncio.gather( - asyncio.get_running_loop().run_in_executor(executor, functools.partial(blocking_task, *args, **kwargs)) - ) - )[0] - - -def wrap_common_callbacks(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any]) -> None: - wrap_conf_callback(loop, conf, 'error_cb') - wrap_conf_callback(loop, conf, 'throttle_cb') - wrap_conf_callback(loop, conf, 'stats_cb') - wrap_conf_callback(loop, conf, 'oauth_cb') - wrap_conf_logger(loop, conf) diff --git a/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py b/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py deleted file mode 100644 index 922e46349..000000000 --- a/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py +++ /dev/null @@ -1,298 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import concurrent.futures -import logging -from typing import Any, Callable, Dict, Optional - -import confluent_kafka - -from .. import _common as _common -from ._buffer_timeout_manager import BufferTimeoutManager -from ._kafka_batch_executor import ProducerBatchExecutor -from ._producer_batch_processor import ProducerBatchManager - -logger = logging.getLogger(__name__) - - -class AIOProducer: - - # ======================================================================== - # INITIALIZATION AND LIFECYCLE MANAGEMENT - # ======================================================================== - - def __init__( - self, - producer_conf: Dict[str, Any], - max_workers: int = 4, - executor: Optional[concurrent.futures.Executor] = None, - batch_size: int = 1000, - buffer_timeout: float = 1.0, - ) -> None: - if executor is not None: - self.executor = executor - else: - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) - # Store the event loop for async operations - self._loop = asyncio.get_running_loop() - - wrap_common_callbacks = _common.wrap_common_callbacks - wrap_common_callbacks(self._loop, producer_conf) - - self._producer: confluent_kafka.Producer = confluent_kafka.Producer(producer_conf) - - # Batching configuration - self._batch_size: int = batch_size - - # Producer state management - self._is_closed: bool = False # Track if producer is closed - - # Initialize Kafka batch executor for handling Kafka operations - self._kafka_executor = ProducerBatchExecutor(self._producer, self.executor) - - # Initialize batch processor for message batching and processing - self._batch_processor = ProducerBatchManager(self._kafka_executor) - - # Initialize buffer timeout manager for timeout handling - self._buffer_timeout_manager = BufferTimeoutManager(self._batch_processor, self._kafka_executor, buffer_timeout) - if buffer_timeout > 0: - self._buffer_timeout_manager.start_timeout_monitoring() - - async def close(self) -> None: - """Close the producer and cleanup resources - - This method performs a graceful shutdown sequence to ensure all resources - are properly cleaned up and no messages are lost: - - 1. **Signal Shutdown**: Sets the closed flag to signal the timeout task to stop - 2. **Cancel Timeout Task**: Immediately cancels the buffer timeout monitoring task - 3. **Flush All Messages**: Flushes any buffered messages and waits for delivery confirmation - 4. **Shutdown ThreadPool**: Waits for all pending ThreadPool operations to complete - 5. **Cleanup**: Ensures the underlying librdkafka producer is properly closed. The shutdown - is designed to be safe and non-blocking for the asyncio event loop - while ensuring all pending operations complete before the producer is closed. - - Raises: - Exception: May raise exceptions from buffer flushing, but these are logged - and don't prevent the cleanup process from completing. - """ - # Set closed flag to signal timeout task to stop - self._is_closed = True - - # Stop the buffer timeout monitoring task - self._buffer_timeout_manager.stop_timeout_monitoring() - - # Flush any remaining messages - try: - await self.flush() - except Exception: - logger.error("Error flushing messages during close", exc_info=True) - raise - - # Shutdown the ThreadPool executor and wait for any remaining tasks to complete - # This ensures that all pending poll(), flush(), and other blocking operations - # finish before the producer is considered fully closed - if hasattr(self, 'executor'): - # executor.shutdown(wait=True) is a blocking call that: - # - Prevents new tasks from being submitted to the ThreadPool - # - Waits for all currently executing and queued tasks to complete - # - Returns only when all worker threads have finished - # - # We run this in a separate thread (using None as executor) to avoid - # blocking the asyncio event loop during the potentially long shutdown wait - await asyncio.get_running_loop().run_in_executor(None, self.executor.shutdown, True) - - def __del__(self) -> None: - """Cleanup method called during garbage collection - - This ensures that the timeout task is properly cancelled even if - close() wasn't explicitly called. - """ - if hasattr(self, '_is_closed'): - self._is_closed = True - if hasattr(self, '_buffer_timeout_manager'): - self._buffer_timeout_manager.stop_timeout_monitoring() - - # ======================================================================== - # CORE PRODUCER OPERATIONS - Main public API - # ======================================================================== - - async def poll(self, timeout: float = 0, *args: Any, **kwargs: Any) -> int: - """Processes delivery callbacks from librdkafka - blocking depends on timeout - - This method triggers any pending delivery reports that have been - queued by librdkafka when messages are delivered or fail to deliver. - - Args: - timeout: Timeout in seconds for waiting for callbacks: - - 0 = non-blocking, return after processing available callbacks - - >0 = block up to timeout seconds waiting for new callbacks - - -1 = block indefinitely until callbacks are available - - Returns: - Number of callbacks processed during this call - """ - return await self._call(self._producer.poll, timeout, *args, **kwargs) - - async def produce( - self, topic: str, value: Optional[Any] = None, key: Optional[Any] = None, *args: Any, **kwargs: Any - ) -> asyncio.Future[Any]: - """Batched produce: Accumulates messages in buffer and flushes when threshold reached - - Args: - topic: Kafka topic name (required) - value: Message payload (optional) - key: Message key (optional) - *args, **kwargs: Additional parameters like partition, timestamp, headers - - Returns: - asyncio.Future: Future that resolves to the delivered message or raises exception on failure - """ - result = asyncio.get_running_loop().create_future() - - msg_data = {'topic': topic, 'value': value, 'key': key} - - # Add optional parameters to message data - if 'partition' in kwargs: - msg_data['partition'] = kwargs['partition'] - if 'timestamp' in kwargs: - msg_data['timestamp'] = kwargs['timestamp'] - if 'headers' in kwargs: - # Headers are not supported in batch mode due to librdkafka API limitations. - # Use individual synchronous produce() calls if headers are required. - raise NotImplementedError( - "Headers are not supported in AIOProducer batch mode. " - "Use the synchronous Producer.produce() method if headers are required." - ) - - self._batch_processor.add_message(msg_data, result) - - self._buffer_timeout_manager.mark_activity() - - # Check if we should flush the buffer - if self._batch_processor.get_buffer_size() >= self._batch_size: - await self._flush_buffer() - - return result - - async def flush(self, *args: Any, **kwargs: Any) -> Any: - """Waits until all messages are delivered or timeout - - This method performs a complete flush: - 1. Flushes any buffered messages from local buffer to librdkafka - 2. Waits for librdkafka to deliver/acknowledge all messages - """ - # First, flush any remaining messages in the buffer for all topics - if not self._batch_processor.is_buffer_empty(): - await self._flush_buffer() - # Update buffer activity since we just flushed - self._buffer_timeout_manager.mark_activity() - - # Then flush the underlying producer and wait for delivery confirmation - return await self._call(self._producer.flush, *args, **kwargs) - - async def purge(self, *args: Any, **kwargs: Any) -> Any: - """Purges messages from internal queues - may block during cleanup""" - # Cancel all pending futures - self._batch_processor.cancel_pending_futures() - - # Clear local message buffer and futures - self._batch_processor.clear_buffer() - - # Update buffer activity since we cleared the buffer - self._buffer_timeout_manager.mark_activity() - - return await self._call(self._producer.purge, *args, **kwargs) - - async def list_topics(self, *args: Any, **kwargs: Any) -> Any: - return await self._call(self._producer.list_topics, *args, **kwargs) - - # ======================================================================== - # TRANSACTION OPERATIONS - Kafka transaction support - # ======================================================================== - - async def init_transactions(self, *args: Any, **kwargs: Any) -> Any: - """Network call to initialize transactions""" - return await self._call(self._producer.init_transactions, *args, **kwargs) - - async def begin_transaction(self, *args: Any, **kwargs: Any) -> Any: - """Network call to begin transaction""" - - # Flush messages to set a clean state before entering a transaction - await self.flush() - - return await self._call(self._producer.begin_transaction, *args, **kwargs) - - async def send_offsets_to_transaction(self, *args: Any, **kwargs: Any) -> Any: - """Network call to send offsets to transaction""" - return await self._call(self._producer.send_offsets_to_transaction, *args, **kwargs) - - async def commit_transaction(self, *args: Any, **kwargs: Any) -> Any: - """Commit transaction after flushing all buffered messages""" - - # Flush to ensure messages in the local batch_processor buffer are - # delivered to librdkafka - await self.flush() - - # Then commit transaction - return await self._call(self._producer.commit_transaction, *args, **kwargs) - - async def abort_transaction(self, *args: Any, **kwargs: Any) -> Any: - """Network call to abort transaction - - Messages produced before the call (i.e. inside the transaction boundary) will be aborted. - Messages that are still in flight may be failed by librdkafka as they are considered - outside the transaction boundary. - Refer to librdkafka documentation section "Transactional producer API" - for more details: - https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#transactional-producer - """ - - # Flush to ensure messages in the local batch_processor buffer are - # delivered to librdkafka - await self.flush() - - return await self._call(self._producer.abort_transaction, *args, **kwargs) - - # ======================================================================== - # AUTHENTICATION AND SECURITY - # ======================================================================== - - async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: - """Authentication operation that may involve network calls""" - return await self._call(self._producer.set_sasl_credentials, *args, **kwargs) - - # ======================================================================== - # BATCH PROCESSING OPERATIONS - Delegated to BatchProcessor - # ======================================================================== - - async def _flush_buffer(self, target_topic: Optional[str] = None) -> None: - """Flush the current message buffer using clean batch processing flow - - This method demonstrates the new architecture where AIOProducer simply - orchestrates the workflow between components: - 1. BatchProcessor creates immutable MessageBatch objects - 2. ProducerBatchExecutor executes each batch - 3. BufferTimeoutManager handles activity tracking - """ - await self._batch_processor.flush_buffer(target_topic) - - # ======================================================================== - # UTILITY METHODS - Helper functions and internal utilities - # ======================================================================== - - async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: - """Helper method for blocking operations that need ThreadPool execution""" - return await _common.async_call(self.executor, blocking_task, *args, **kwargs) diff --git a/src/confluent_kafka/experimental/aio/producer/__init__.py b/src/confluent_kafka/experimental/aio/producer/__init__.py deleted file mode 100644 index cbdd46cd1..000000000 --- a/src/confluent_kafka/experimental/aio/producer/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Confluent Kafka Experimental AIOProducer Module - -This module contains all the components for the async Kafka producer. -""" - -from ._AIOProducer import AIOProducer - -__all__ = ['AIOProducer'] diff --git a/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py b/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py deleted file mode 100644 index 958f6f0b1..000000000 --- a/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import logging -import time -import weakref -from typing import TYPE_CHECKING, Optional - -if TYPE_CHECKING: - # Import only for type checking to avoid circular dependency - from ._kafka_batch_executor import ProducerBatchExecutor - from ._producer_batch_processor import ProducerBatchManager - -logger = logging.getLogger(__name__) - - -class BufferTimeoutManager: - """Manages buffer timeout and activity tracking for message batching - - This class is responsible for: - - Monitoring buffer inactivity and triggering automatic flushes - - Tracking buffer activity timestamps - - Managing background timeout monitoring tasks - - Coordinating between batch processor and executor for timeout flushes - """ - - def __init__( - self, batch_processor: "ProducerBatchManager", kafka_executor: "ProducerBatchExecutor", timeout: float - ) -> None: - """Initialize the buffer timeout manager - - Args: - batch_processor: ProducerBatchManager instance for creating batches - kafka_executor: ProducerBatchExecutor instance for executing batches - timeout: Timeout in seconds for buffer inactivity (0 disables timeout) - """ - self._batch_processor = batch_processor - self._kafka_executor = kafka_executor - self._timeout = timeout - self._last_activity: float = time.time() - self._timeout_task: Optional[asyncio.Task[None]] = None - self._running: bool = False - - def start_timeout_monitoring(self) -> None: - """Start the background task that monitors buffer inactivity - - Creates an async task that runs in the background and periodically checks - if messages have been sitting in the buffer for too long without being - flushed. - - Key design decisions: - 1. **Weak Reference**: Uses weakref.ref(self) to prevent circular refs - 2. **Self-Canceling**: The task stops itself if manager is GC'd - 3. **Adaptive Check Interval**: Uses timeout to determine check frequency - """ - if not self._timeout or self._timeout <= 0: - return # Timeout disabled - - self._running = True - self._timeout_task = asyncio.create_task(self._monitor_timeout()) - - def stop_timeout_monitoring(self) -> None: - """Stop and cleanup the buffer timeout monitoring task""" - self._running = False - if self._timeout_task and not self._timeout_task.done(): - self._timeout_task.cancel() - self._timeout_task = None - - def mark_activity(self) -> None: - """Update the timestamp of the last buffer activity - - This method should be called whenever: - 1. Messages are added to the buffer (in produce()) - 2. Buffer is manually flushed - 3. Buffer is purged/cleared - """ - self._last_activity = time.time() - - async def _monitor_timeout(self) -> None: - """Monitor buffer timeout in background task - - This method runs continuously in the background, checking for buffer - inactivity and triggering flushes when the timeout threshold is exceeded. - """ - # Use weak reference to avoid circular reference and allow garbage collection - manager_ref = weakref.ref(self) - - while True: - # Check interval should be proportional to buffer timeout for efficiency - manager = manager_ref() - if manager is None or not manager._running: - break - - # Calculate adaptive check interval: timeout/2 with bounds - # Examples: 0.1s→0.1s, 1s→0.5s, 5s→1.0s, 30s→1.0s - check_interval = max(0.1, min(1.0, manager._timeout / 2)) - await asyncio.sleep(check_interval) - - # Re-check manager after sleep - manager = manager_ref() - if manager is None or not manager._running: - break - - # Check if buffer has been inactive for too long - time_since_activity = time.time() - manager._last_activity - if time_since_activity >= manager._timeout: - - try: - # Flush the buffer due to timeout - await manager._flush_buffer_due_to_timeout() - # Update activity since we just flushed - manager.mark_activity() - except Exception: - logger.error("Error flushing buffer due to timeout", exc_info=True) - # Re-raise all exceptions - don't swallow any errors - raise - - async def _flush_buffer_due_to_timeout(self) -> None: - """Flush buffer due to timeout by coordinating batch processor/executor - - This method handles the complete timeout flush workflow: - 1. Create batches from the batch processor - 2. Execute batches from the batch processor - 3. Flush librdkafka queue to ensure messages are delivered - """ - # Create batches from current buffer and send to librdkafka queue - await self._batch_processor.flush_buffer() - - # Flush librdkafka queue to ensure messages are delivered to broker - # 0 timeout means non-blocking flush - await self._kafka_executor.flush_librdkafka_queue(0) diff --git a/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py b/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py deleted file mode 100644 index b2bd477e7..000000000 --- a/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py +++ /dev/null @@ -1,141 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import concurrent.futures -import logging -from typing import Any, Dict, List, Sequence - -import confluent_kafka - -from .. import _common - -logger = logging.getLogger(__name__) - - -class ProducerBatchExecutor: - """Executes Kafka batch operations via thread pool - - This class is responsible for: - - Executing produce_batch operations against confluent_kafka.Producer - - Handling partial batch failures from librdkafka - - Managing thread pool execution to avoid blocking the event loop - - Processing delivery callbacks for successful messages - - Supporting partition-specific batch operations - """ - - def __init__(self, producer: confluent_kafka.Producer, executor: concurrent.futures.Executor) -> None: - """Initialize the Kafka batch executor - - Args: - producer: confluent_kafka.Producer instance for Kafka operations - executor: ThreadPoolExecutor for running blocking operations - """ - self._producer = producer - self._executor = executor - - async def execute_batch(self, topic: str, batch_messages: Sequence[Dict[str, Any]], partition: int = -1) -> int: - """Execute a batch operation via thread pool - - This method handles the complete batch execution workflow: - 1. Execute produce_batch in thread pool to avoid blocking event loop - 2. Handle partial failures that occur during produce_batch - 3. Poll for delivery reports of successful messages - - Args: - topic: Target topic for the batch - batch_messages: List of prepared messages with callbacks assigned - partition: Target partition for the batch (-1 = RD_KAFKA_PARTITION_UA) - - Returns: - Result from producer.poll() indicating number of delivery reports processed - - Raises: - Exception: Any exception from the batch operation is propagated - """ - - def _produce_batch_and_poll() -> int: - """Helper function to run in thread pool - - This function encapsulates all the blocking Kafka operations: - - Call produce_batch with specific partition and individual message callbacks - - Handle partial batch failures for messages that fail immediately - - Poll for delivery reports to trigger callbacks for successful messages - """ - # Call produce_batch with specific partition and individual callbacks - # Convert tuple to list since produce_batch expects a list - messages_list: List[Dict[str, Any]] = ( - list(batch_messages) if isinstance(batch_messages, tuple) else batch_messages # type: ignore - ) - - # Use the provided partition for the entire batch - # This enables proper partition control while working around librdkafka limitations - self._producer.produce_batch(topic, messages_list, partition=partition) - - # Handle partial batch failures: Check for messages that failed - # during produce_batch. These messages have their msgstates - # destroyed in Producer.c and won't get callbacks from librdkafka, - # so we need to manually invoke their callbacks - self._handle_partial_failures(messages_list) - - # Immediately poll to process delivery callbacks for successful messages - poll_result = self._producer.poll(0) - - return poll_result - - # Execute in thread pool to avoid blocking event loop - loop = asyncio.get_running_loop() - return await loop.run_in_executor(self._executor, _produce_batch_and_poll) - - async def flush_librdkafka_queue(self, timeout=-1): - """Flush the librdkafka queue and wait for all messages to be delivered - This method awaits until all outstanding produce requests are completed - or the timeout is reached, unless the timeout is set to 0 (non-blocking). - Args: - timeout: Maximum time to wait in seconds: - - -1 = wait indefinitely (default) - - 0 = non-blocking, return immediately - - >0 = wait up to timeout seconds - Returns: - Number of messages still in queue after flush attempt - """ - return await _common.async_call(self._executor, self._producer.flush, timeout) - - def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None: - """Handle messages that failed during produce_batch - - When produce_batch encounters messages that fail immediately (e.g., - message too large, invalid topic, etc.), librdkafka destroys their - msgstates and won't call their callbacks. We detect these failures by - checking for '_error' in the message dict (set by Producer.c) and - manually invoke the simple future-resolving callbacks. - - Args: - batch_messages: List of message dictionaries that were passed to produce_batch - """ - for msg_dict in batch_messages: - if '_error' in msg_dict: - # This message failed during produce_batch - its callback - # won't be called by librdkafka - callback = msg_dict.get('callback') - if callback: - # Extract the error from the message dict (set by Producer.c) - error = msg_dict['_error'] - # Manually invoke the callback with the error - # Note: msg is None since the message failed before being queued - try: - callback(error, None) - except Exception: - logger.warning("Exception in callback during partial failure handling", exc_info=True) - raise diff --git a/src/confluent_kafka/experimental/aio/producer/_message_batch.py b/src/confluent_kafka/experimental/aio/producer/_message_batch.py deleted file mode 100644 index 506209291..000000000 --- a/src/confluent_kafka/experimental/aio/producer/_message_batch.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -from typing import Any, Dict, NamedTuple, Optional, Sequence - - -# Create immutable MessageBatch value object using modern typing -class MessageBatch(NamedTuple): - """Immutable batch of messages for Kafka production - - This represents a group of messages destined for the same topic and partition, - along with their associated futures for delivery confirmation. - """ - - topic: str # Target topic for this batch - messages: Sequence[Dict[str, Any]] # Prepared message dictionaries - futures: Sequence[asyncio.Future[Any]] # Futures to resolve on delivery - partition: int = -1 # Target partition for this batch (-1 = RD_KAFKA_PARTITION_UA) - - @property - def size(self) -> int: - """Get the number of messages in this batch""" - return len(self.messages) - - @property - def info(self) -> str: - """Get a string representation of batch info""" - return f"MessageBatch(topic='{self.topic}', partition={self.partition}, size={len(self.messages)})" - - -def create_message_batch( - topic: str, - messages: Sequence[Dict[str, Any]], - futures: Sequence[asyncio.Future[Any]], - callbacks: Optional[Any] = None, - partition: int = -1, -) -> MessageBatch: - """Create an immutable MessageBatch from sequences - - This factory function converts mutable sequences into an immutable MessageBatch object. - Uses tuples internally for immutability while accepting any sequence type as input. - - Args: - topic: Target topic name - messages: Sequence of prepared message dictionaries - futures: Sequence of asyncio.Future objects - callbacks: Deprecated parameter, ignored for backwards compatibility - partition: Target partition for this batch (-1 = RD_KAFKA_PARTITION_UA) - - Returns: - MessageBatch: Immutable batch object - """ - return MessageBatch( - topic=topic, - messages=tuple(messages) if not isinstance(messages, tuple) else messages, - futures=tuple(futures) if not isinstance(futures, tuple) else futures, - partition=partition, - ) diff --git a/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py b/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py deleted file mode 100644 index cfbf4a9f1..000000000 --- a/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py +++ /dev/null @@ -1,332 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import copy -import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple - -from confluent_kafka import KafkaException as _KafkaException - -from ._message_batch import MessageBatch, create_message_batch - -if TYPE_CHECKING: - # Import only for type checking to avoid circular dependency - from ._kafka_batch_executor import ProducerBatchExecutor - -logger = logging.getLogger(__name__) - - -class ProducerBatchManager: - """Handles batching and processing of Kafka messages for AIOProducer - - This class encapsulates all the logic for: - - Grouping messages by topic and partition - - Managing message buffers and futures - - Creating simple future-resolving callbacks - - Executing batch operations via librdkafka - """ - - def __init__(self, kafka_executor: "ProducerBatchExecutor") -> None: - """Initialize the batch processor - - Args: - kafka_executor: KafkaBatchExecutor instance for Kafka operations - """ - self._kafka_executor = kafka_executor - self._message_buffer: List[Dict[str, Any]] = [] - self._buffer_futures: List[asyncio.Future[Any]] = [] - - def add_message(self, msg_data: Dict[str, Any], future: asyncio.Future[Any]) -> None: - """Add a message to the batch buffer - - Args: - msg_data: Dictionary containing message data - future: asyncio.Future to resolve when message is delivered - """ - self._message_buffer.append(msg_data) - self._buffer_futures.append(future) - - def get_buffer_size(self) -> int: - """Get the current number of messages in the buffer""" - return len(self._message_buffer) - - def is_buffer_empty(self) -> bool: - """Check if the buffer is empty""" - return len(self._message_buffer) == 0 - - def clear_buffer(self) -> None: - """Clear the entire buffer""" - self._message_buffer.clear() - self._buffer_futures.clear() - - def cancel_pending_futures(self) -> None: - """Cancel all pending futures in the buffer""" - for future in self._buffer_futures: - if not future.done(): - future.cancel() - - def create_batches(self, target_topic: Optional[str] = None) -> List[MessageBatch]: - """Create MessageBatch objects from the current buffer - - Args: - target_topic: Optional topic to create batches for (None for all) - - Returns: - List[MessageBatch]: List of immutable MessageBatch objects - """ - if self.is_buffer_empty(): - return [] - - # Group by topic and partition for optimal batching - topic_partition_groups = self._group_messages_by_topic_and_partition() - batches = [] - - for (topic, partition), group_data in topic_partition_groups.items(): - if target_topic is None or topic == target_topic: - # Prepare batch messages - batch_messages = self._prepare_batch_messages(group_data["messages"]) - - # Assign simple future-resolving callbacks to messages - self._assign_future_callbacks(batch_messages, group_data["futures"]) - - # Create immutable MessageBatch object with partition info - batch = create_message_batch( - topic=topic, - messages=batch_messages, - futures=group_data["futures"], - callbacks=None, # No user callbacks anymore - partition=partition, # Add partition info to batch - ) - batches.append(batch) - - return batches - - def _clear_topic_from_buffer(self, target_topic: str) -> None: - """Remove messages for a specific topic from the buffer - - Args: - target_topic: Topic to remove from buffer - """ - messages_to_keep = [] - futures_to_keep = [] - - for i, msg_data in enumerate(self._message_buffer): - if msg_data["topic"] != target_topic: - messages_to_keep.append(msg_data) - futures_to_keep.append(self._buffer_futures[i]) - - self._message_buffer = messages_to_keep - self._buffer_futures = futures_to_keep - - async def flush_buffer(self, target_topic: Optional[str] = None) -> None: - """Flush the current message buffer using produce_batch - - Args: - target_topic: Optional topic to flush (None for all topics) - - Returns: - None - """ - if self.is_buffer_empty(): - return - - # Create batches for processing - batches = self.create_batches(target_topic) - - # Clear the buffer immediately to prevent race conditions - if target_topic is None: - # Clear entire buffer since we're processing all messages - self.clear_buffer() - else: - # Clear only messages for the target topic that we're processing - self._clear_topic_from_buffer(target_topic) - - try: - # Execute batches with cleanup - await self._execute_batches(batches, target_topic) - except Exception: - # Add batches back to buffer on failure - try: - self._add_batches_back_to_buffer(batches) - except Exception: - logger.error(f"Error adding batches back to buffer on failure. messages might be lost: {batches}") - raise - raise - - async def _execute_batches(self, batches: List[MessageBatch], target_topic: Optional[str] = None) -> None: - """Execute batches and handle cleanup after successful execution - - Args: - batches: List of batches to execute - target_topic: Optional topic for selective buffer clearing - - Returns: - None - - Raises: - Exception: If any batch execution fails - """ - # Execute each batch - for batch in batches: - try: - # Execute batch using the Kafka executor - await self._kafka_executor.execute_batch(batch.topic, batch.messages, batch.partition) - - except Exception as e: - # Handle batch failure by failing all unresolved futures for this batch - self._handle_batch_failure(e, batch.futures) - # Re-raise the exception so caller knows the batch operation failed - raise - - def _add_batches_back_to_buffer(self, batches: List[MessageBatch]) -> None: - """Add batches back to the buffer when execution fails - - Args: - batches: List of MessageBatch objects to add back to buffer - """ - for batch in batches: - # Add each message and its future back to the buffer - for i, message in enumerate(batch.messages): - # Reconstruct the original message data from the batch - msg_data = { - 'topic': batch.topic, - 'value': message.get('value'), - 'key': message.get('key'), - } - - # Add optional fields if present - if 'partition' in message: - msg_data['partition'] = message['partition'] - if 'timestamp' in message: - msg_data['timestamp'] = message['timestamp'] - if 'headers' in message: - msg_data['headers'] = message['headers'] - - # Add the message and its future back to the buffer - self._message_buffer.append(msg_data) - self._buffer_futures.append(batch.futures[i]) - - def _group_messages_by_topic_and_partition(self) -> Dict[Tuple[str, int], Dict[str, List[Any]]]: - """Group buffered messages by topic and partition for optimal batching - - This function efficiently organizes the mixed-topic message buffer into - topic+partition-specific groups, enabling proper partition control while - maintaining batch efficiency. - - Algorithm: - - Single O(n) pass through message buffer - - Groups related data (messages, futures) by (topic, partition) tuple - - Maintains index relationships between buffer arrays - - Uses partition from message data, defaults to RD_KAFKA_PARTITION_UA - (-1) if not specified - - Returns: - dict: Topic+partition groups with structure: - { - ('topic_name', partition): { - 'messages': [msg_data1, ...], # Message dicts - 'futures': [future1, ...], # asyncio.Future objects - } - } - """ - topic_partition_groups: Dict[Tuple[str, int], Dict[str, Any]] = {} - - # Iterate through buffer once - O(n) complexity - for i, msg_data in enumerate(self._message_buffer): - topic = msg_data["topic"] - # Get partition from message data, default to RD_KAFKA_PARTITION_UA (-1) if not specified - partition = msg_data.get("partition", -1) # -1 = RD_KAFKA_PARTITION_UA - - # Create composite key for grouping - group_key = (topic, partition) - - # Create new topic+partition group if this is first message for this combination - if group_key not in topic_partition_groups: - topic_partition_groups[group_key] = { - "messages": [], # Message data for produce_batch - "futures": [], # Futures to resolve on delivery - } - - # Add message and related data to appropriate topic+partition group - # Note: All arrays stay synchronized by index - topic_partition_groups[group_key]["messages"].append(msg_data) - topic_partition_groups[group_key]["futures"].append(self._buffer_futures[i]) - - return topic_partition_groups - - def _prepare_batch_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Prepare messages for produce_batch by removing internal fields - - Args: - messages: List of message dictionaries - - Returns: - List of cleaned message dictionaries ready for produce_batch - """ - batch_messages = [] - for msg_data in messages: - # Create a shallow copy and remove fields not needed by produce_batch - batch_msg = copy.copy(msg_data) - batch_msg.pop("topic", None) # Remove topic since it's passed separately - # Note: We keep 'partition' in individual messages for reference, - # but the batch partition will be used by produce_batch - batch_messages.append(batch_msg) - - return batch_messages - - def _assign_future_callbacks( - self, batch_messages: List[Dict[str, Any]], futures: Sequence[asyncio.Future[Any]] - ) -> None: - """Assign simple future-resolving callbacks to each message in batch - - Args: - batch_messages: List of message dictionaries for produce_batch - futures: List of asyncio.Future objects to resolve - """ - for i, batch_msg in enumerate(batch_messages): - future = futures[i] - - def create_simple_callback(fut: asyncio.Future[Any]) -> Callable[[Any, Any], None]: - """Create a simple callback that only resolves the future""" - - def simple_callback(err: Any, msg: Any) -> None: - if err: - if not fut.done(): - fut.set_exception(_KafkaException(err)) - else: - if not fut.done(): - fut.set_result(msg) - - return simple_callback - - # Assign the simple callback to this message - batch_msg["callback"] = create_simple_callback(future) - - def _handle_batch_failure(self, exception: Exception, batch_futures: Sequence[asyncio.Future[Any]]) -> None: - """Handle batch operation failure by failing all unresolved futures - - When a batch operation fails before any individual callbacks are invoked, - we need to fail all futures for this batch since none of the per-message - callbacks will be called by librdkafka. - - Args: - exception: The exception that caused the batch to fail - batch_futures: List of futures for this batch - """ - # Fail all futures since no individual callbacks will be invoked - for future in batch_futures: - # Only set exception if future isn't already done - if not future.done(): - future.set_exception(exception) diff --git a/tests/ducktape/consumer_strategy.py b/tests/ducktape/consumer_strategy.py index 37855e0a6..0b3c1c886 100644 --- a/tests/ducktape/consumer_strategy.py +++ b/tests/ducktape/consumer_strategy.py @@ -11,7 +11,7 @@ import time from confluent_kafka import Consumer -from confluent_kafka.experimental.aio import AIOConsumer +from confluent_kafka.aio import AIOConsumer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient, SchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroDeserializer from confluent_kafka.schema_registry._async.json_schema import AsyncJSONDeserializer diff --git a/tests/ducktape/producer_strategy.py b/tests/ducktape/producer_strategy.py index aad0c9233..cae299f45 100644 --- a/tests/ducktape/producer_strategy.py +++ b/tests/ducktape/producer_strategy.py @@ -419,7 +419,7 @@ def create_producer(self, config_overrides=None): # Enable logging for AIOProducer import logging - from confluent_kafka.experimental.aio import AIOProducer + from confluent_kafka.aio import AIOProducer logging.basicConfig(level=logging.INFO) diff --git a/tests/test_AIOConsumer.py b/tests/test_AIOConsumer.py index b749a3f61..61ff6ae80 100644 --- a/tests/test_AIOConsumer.py +++ b/tests/test_AIOConsumer.py @@ -7,7 +7,7 @@ import pytest from confluent_kafka import KafkaError, KafkaException, TopicPartition -from confluent_kafka.experimental.aio._AIOConsumer import AIOConsumer +from confluent_kafka.aio._AIOConsumer import AIOConsumer class TestAIOConsumer: @@ -16,13 +16,13 @@ class TestAIOConsumer: @pytest.fixture def mock_consumer(self): """Mock the underlying confluent_kafka.Consumer.""" - with patch('confluent_kafka.experimental.aio._AIOConsumer.confluent_kafka.Consumer') as mock: + with patch('confluent_kafka.aio._AIOConsumer.confluent_kafka.Consumer') as mock: yield mock @pytest.fixture def mock_common(self): """Mock the _common module callback wrapping.""" - with patch('confluent_kafka.experimental.aio._AIOConsumer._common') as mock: + with patch('confluent_kafka.aio._AIOConsumer._common') as mock: async def mock_async_call(executor, blocking_task, *args, **kwargs): return blocking_task(*args, **kwargs) diff --git a/tests/test_AIOProducer.py b/tests/test_AIOProducer.py index 5612ed670..f26a8a8a4 100644 --- a/tests/test_AIOProducer.py +++ b/tests/test_AIOProducer.py @@ -11,7 +11,7 @@ import pytest from confluent_kafka import KafkaError, KafkaException -from confluent_kafka.experimental.aio.producer._AIOProducer import AIOProducer +from confluent_kafka.aio.producer._AIOProducer import AIOProducer class TestAIOProducer: @@ -19,12 +19,12 @@ class TestAIOProducer: @pytest.fixture def mock_producer(self): - with patch('confluent_kafka.experimental.aio.producer._AIOProducer.confluent_kafka.Producer') as mock: + with patch('confluent_kafka.aio.producer._AIOProducer.confluent_kafka.Producer') as mock: yield mock @pytest.fixture def mock_common(self): - with patch('confluent_kafka.experimental.aio.producer._AIOProducer._common') as mock: + with patch('confluent_kafka.aio.producer._AIOProducer._common') as mock: async def mock_async_call(executor, blocking_task, *args, **kwargs): return blocking_task(*args, **kwargs) diff --git a/tests/test_buffer_timeout_manager.py b/tests/test_buffer_timeout_manager.py index a95039de3..c2a228d72 100644 --- a/tests/test_buffer_timeout_manager.py +++ b/tests/test_buffer_timeout_manager.py @@ -11,7 +11,7 @@ import unittest from unittest.mock import AsyncMock, Mock -from confluent_kafka.experimental.aio.producer._buffer_timeout_manager import BufferTimeoutManager +from confluent_kafka.aio.producer._buffer_timeout_manager import BufferTimeoutManager class TestBufferTimeoutManager(unittest.TestCase): diff --git a/tests/test_kafka_batch_executor.py b/tests/test_kafka_batch_executor.py index 2975ba0aa..ca263736e 100644 --- a/tests/test_kafka_batch_executor.py +++ b/tests/test_kafka_batch_executor.py @@ -14,7 +14,7 @@ from unittest.mock import Mock, patch import confluent_kafka -from confluent_kafka.experimental.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor +from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) diff --git a/tests/test_producer_batch_processor.py b/tests/test_producer_batch_processor.py index 25f5966e7..1fb902402 100644 --- a/tests/test_producer_batch_processor.py +++ b/tests/test_producer_batch_processor.py @@ -14,9 +14,9 @@ from unittest.mock import Mock, patch import confluent_kafka -from confluent_kafka.experimental.aio.producer._AIOProducer import AIOProducer -from confluent_kafka.experimental.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor -from confluent_kafka.experimental.aio.producer._producer_batch_processor import ( +from confluent_kafka.aio.producer._AIOProducer import AIOProducer +from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor +from confluent_kafka.aio.producer._producer_batch_processor import ( ProducerBatchManager as ProducerBatchProcessor, ) @@ -403,7 +403,7 @@ def test_future_based_error_handling(self): def test_add_batches_back_to_buffer_basic(self): """Test adding batches back to buffer with basic message data""" - from confluent_kafka.experimental.aio.producer._message_batch import create_message_batch + from confluent_kafka.aio.producer._message_batch import create_message_batch # Create test futures future1 = asyncio.Future() @@ -442,7 +442,7 @@ def test_add_batches_back_to_buffer_basic(self): def test_add_batches_back_to_buffer_empty_batch(self): """Test adding empty batch back to buffer""" - from confluent_kafka.experimental.aio.producer._message_batch import create_message_batch + from confluent_kafka.aio.producer._message_batch import create_message_batch # Create empty batch batch = create_message_batch(topic='test-topic', messages=[], futures=[], partition=0) From c41b67a0c093cabbdd938b28940b5f369b22382c Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 13:56:59 -0800 Subject: [PATCH 3/7] Added aio files back --- src/confluent_kafka/aio/_AIOConsumer.py | 171 +++++++++ src/confluent_kafka/aio/__init__.py | 20 ++ src/confluent_kafka/aio/_common.py | 89 +++++ .../aio/producer/_AIOProducer.py | 300 ++++++++++++++++ src/confluent_kafka/aio/producer/__init__.py | 25 ++ .../aio/producer/_buffer_timeout_manager.py | 143 ++++++++ .../aio/producer/_kafka_batch_executor.py | 143 ++++++++ .../aio/producer/_message_batch.py | 72 ++++ .../aio/producer/_producer_batch_processor.py | 334 ++++++++++++++++++ 9 files changed, 1297 insertions(+) create mode 100644 src/confluent_kafka/aio/_AIOConsumer.py create mode 100644 src/confluent_kafka/aio/__init__.py create mode 100644 src/confluent_kafka/aio/_common.py create mode 100644 src/confluent_kafka/aio/producer/_AIOProducer.py create mode 100644 src/confluent_kafka/aio/producer/__init__.py create mode 100644 src/confluent_kafka/aio/producer/_buffer_timeout_manager.py create mode 100644 src/confluent_kafka/aio/producer/_kafka_batch_executor.py create mode 100644 src/confluent_kafka/aio/producer/_message_batch.py create mode 100644 src/confluent_kafka/aio/producer/_producer_batch_processor.py diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py new file mode 100644 index 000000000..659175841 --- /dev/null +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -0,0 +1,171 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +from typing import Any, Callable, Dict, Optional, Tuple + +import confluent_kafka + +from . import _common as _common + + +class AIOConsumer: + def __init__( + self, + consumer_conf: Dict[str, Any], + max_workers: int = 2, + executor: Optional[concurrent.futures.Executor] = None, + ) -> None: + if executor is not None: + # Executor must have at least one worker. + # At least two workers are needed when calling re-entrant + # methods from callbacks. + self.executor = executor + else: + if max_workers < 1: + raise ValueError("max_workers must be at least 1") + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + + loop = asyncio.get_event_loop() + wrap_common_callbacks = _common.wrap_common_callbacks + wrap_conf_callback = _common.wrap_conf_callback + wrap_common_callbacks(loop, consumer_conf) + wrap_conf_callback(loop, consumer_conf, 'on_commit') + + self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(consumer_conf) + + async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + return await _common.async_call(self.executor, blocking_task, *args, **kwargs) + + def _wrap_callback( + self, + loop: asyncio.AbstractEventLoop, + callback: Callable[..., Any], + edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, + edit_kwargs: Optional[Callable[[Any], Any]] = None, + ) -> Callable[..., Any]: + def ret(*args: Any, **kwargs: Any) -> Any: + if edit_args: + args = edit_args(args) + if edit_kwargs: + kwargs = edit_kwargs(kwargs) + f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), loop) + return f.result() + + return ret + + async def poll(self, *args: Any, **kwargs: Any) -> Any: + """ + Polls for a single message from the subscribed topics. + + Performance Note: + For high-throughput applications, prefer consume() over poll(): + consume() can retrieve multiple messages per call and amortize the + async overhead across the entire batch. + + On the other hand, poll() retrieves one message per call, which means + the ThreadPoolExecutor overhead is applied to each individual message. + This can result in lower throughput compared to the synchronous + consumer.poll() due to the async coordination overhead not being + amortized. + + """ + return await self._call(self._consumer.poll, *args, **kwargs) + + async def consume(self, *args: Any, **kwargs: Any) -> Any: + """ + Consumes a batch of messages from the subscribed topics. + + Performance Note: + This method is recommended for high-throughput applications. + + By retrieving multiple messages per ThreadPoolExecutor call, the async + coordination overhead is shared across all messages in the batch, + resulting in much better throughput compared to repeated poll() calls. + """ + return await self._call(self._consumer.consume, *args, **kwargs) + + def _edit_rebalance_callbacks_args(self, args: Tuple[Any, ...]) -> Tuple[Any, ...]: + args_list = list(args) + args_list[0] = self + return tuple(args_list) + + async def subscribe(self, *args: Any, **kwargs: Any) -> Any: + loop = asyncio.get_event_loop() + for callback in ['on_assign', 'on_revoke', 'on_lost']: + if callback in kwargs: + kwargs[callback] = self._wrap_callback( + loop, kwargs[callback], self._edit_rebalance_callbacks_args + ) # noqa: E501 + return await self._call(self._consumer.subscribe, *args, **kwargs) + + async def unsubscribe(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.unsubscribe, *args, **kwargs) + + async def commit(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.commit, *args, **kwargs) + + async def close(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.close, *args, **kwargs) + + async def seek(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.seek, *args, **kwargs) + + async def pause(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.pause, *args, **kwargs) + + async def resume(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.resume, *args, **kwargs) + + async def store_offsets(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.store_offsets, *args, **kwargs) + + async def committed(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.committed, *args, **kwargs) + + async def assign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.assign, *args, **kwargs) + + async def unassign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.unassign, *args, **kwargs) + + async def incremental_assign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.incremental_assign, *args, **kwargs) + + async def incremental_unassign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.incremental_unassign, *args, **kwargs) + + async def assignment(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.assignment, *args, **kwargs) + + async def position(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.position, *args, **kwargs) + + async def consumer_group_metadata(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.consumer_group_metadata, *args, **kwargs) + + async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.set_sasl_credentials, *args, **kwargs) + + async def list_topics(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.list_topics, *args, **kwargs) + + async def get_watermark_offsets(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.get_watermark_offsets, *args, **kwargs) + + async def offsets_for_times(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._consumer.offsets_for_times, *args, **kwargs) + + diff --git a/src/confluent_kafka/aio/__init__.py b/src/confluent_kafka/aio/__init__.py new file mode 100644 index 000000000..c21468d07 --- /dev/null +++ b/src/confluent_kafka/aio/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._AIOConsumer import AIOConsumer +from .producer import AIOProducer + +__all__ = ['AIOConsumer', 'AIOProducer'] + + diff --git a/src/confluent_kafka/aio/_common.py b/src/confluent_kafka/aio/_common.py new file mode 100644 index 000000000..0d9ead5cf --- /dev/null +++ b/src/confluent_kafka/aio/_common.py @@ -0,0 +1,89 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import functools +import logging +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar + +T = TypeVar('T') + + +class AsyncLogger: + + def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger) -> None: + self.loop = loop + self.logger = logger + + def log(self, *args: Any, **kwargs: Any) -> None: + self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) + + +def wrap_callback( + loop: asyncio.AbstractEventLoop, + callback: Callable[..., Any], + edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, + edit_kwargs: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, +) -> Callable[..., Any]: + def ret(*args: Any, **kwargs: Any) -> Any: + if edit_args: + args = edit_args(args) + if edit_kwargs: + kwargs = edit_kwargs(kwargs) + f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), loop) + return f.result() + + return ret + + +def wrap_conf_callback(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any], name: str) -> None: + if name in conf: + cb = conf[name] + conf[name] = wrap_callback(loop, cb) + + +def wrap_conf_logger(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any]) -> None: + if 'logger' in conf: + conf['logger'] = AsyncLogger(loop, conf['logger']) + + +async def async_call( + executor: concurrent.futures.Executor, blocking_task: Callable[..., T], *args: Any, **kwargs: Any +) -> T: + """Helper function for blocking operations that need ThreadPool execution + + Args: + executor: ThreadPoolExecutor to use for blocking operations + blocking_task: The blocking function to execute + *args, **kwargs: Arguments to pass to the blocking function + + Returns: + Result of the blocking function execution + """ + return ( + await asyncio.gather( + asyncio.get_running_loop().run_in_executor(executor, functools.partial(blocking_task, *args, **kwargs)) + ) + )[0] + + +def wrap_common_callbacks(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any]) -> None: + wrap_conf_callback(loop, conf, 'error_cb') + wrap_conf_callback(loop, conf, 'throttle_cb') + wrap_conf_callback(loop, conf, 'stats_cb') + wrap_conf_callback(loop, conf, 'oauth_cb') + wrap_conf_logger(loop, conf) + + diff --git a/src/confluent_kafka/aio/producer/_AIOProducer.py b/src/confluent_kafka/aio/producer/_AIOProducer.py new file mode 100644 index 000000000..f2eb982aa --- /dev/null +++ b/src/confluent_kafka/aio/producer/_AIOProducer.py @@ -0,0 +1,300 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import logging +from typing import Any, Callable, Dict, Optional + +import confluent_kafka + +from .. import _common as _common +from ._buffer_timeout_manager import BufferTimeoutManager +from ._kafka_batch_executor import ProducerBatchExecutor +from ._producer_batch_processor import ProducerBatchManager + +logger = logging.getLogger(__name__) + + +class AIOProducer: + + # ======================================================================== + # INITIALIZATION AND LIFECYCLE MANAGEMENT + # ======================================================================== + + def __init__( + self, + producer_conf: Dict[str, Any], + max_workers: int = 4, + executor: Optional[concurrent.futures.Executor] = None, + batch_size: int = 1000, + buffer_timeout: float = 1.0, + ) -> None: + if executor is not None: + self.executor = executor + else: + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + # Store the event loop for async operations + self._loop = asyncio.get_running_loop() + + wrap_common_callbacks = _common.wrap_common_callbacks + wrap_common_callbacks(self._loop, producer_conf) + + self._producer: confluent_kafka.Producer = confluent_kafka.Producer(producer_conf) + + # Batching configuration + self._batch_size: int = batch_size + + # Producer state management + self._is_closed: bool = False # Track if producer is closed + + # Initialize Kafka batch executor for handling Kafka operations + self._kafka_executor = ProducerBatchExecutor(self._producer, self.executor) + + # Initialize batch processor for message batching and processing + self._batch_processor = ProducerBatchManager(self._kafka_executor) + + # Initialize buffer timeout manager for timeout handling + self._buffer_timeout_manager = BufferTimeoutManager(self._batch_processor, self._kafka_executor, buffer_timeout) + if buffer_timeout > 0: + self._buffer_timeout_manager.start_timeout_monitoring() + + async def close(self) -> None: + """Close the producer and cleanup resources + + This method performs a graceful shutdown sequence to ensure all resources + are properly cleaned up and no messages are lost: + + 1. **Signal Shutdown**: Sets the closed flag to signal the timeout task to stop + 2. **Cancel Timeout Task**: Immediately cancels the buffer timeout monitoring task + 3. **Flush All Messages**: Flushes any buffered messages and waits for delivery confirmation + 4. **Shutdown ThreadPool**: Waits for all pending ThreadPool operations to complete + 5. **Cleanup**: Ensures the underlying librdkafka producer is properly closed. The shutdown + is designed to be safe and non-blocking for the asyncio event loop + while ensuring all pending operations complete before the producer is closed. + + Raises: + Exception: May raise exceptions from buffer flushing, but these are logged + and don't prevent the cleanup process from completing. + """ + # Set closed flag to signal timeout task to stop + self._is_closed = True + + # Stop the buffer timeout monitoring task + self._buffer_timeout_manager.stop_timeout_monitoring() + + # Flush any remaining messages + try: + await self.flush() + except Exception: + logger.error("Error flushing messages during close", exc_info=True) + raise + + # Shutdown the ThreadPool executor and wait for any remaining tasks to complete + # This ensures that all pending poll(), flush(), and other blocking operations + # finish before the producer is considered fully closed + if hasattr(self, 'executor'): + # executor.shutdown(wait=True) is a blocking call that: + # - Prevents new tasks from being submitted to the ThreadPool + # - Waits for all currently executing and queued tasks to complete + # - Returns only when all worker threads have finished + # + # We run this in a separate thread (using None as executor) to avoid + # blocking the asyncio event loop during the potentially long shutdown wait + await asyncio.get_running_loop().run_in_executor(None, self.executor.shutdown, True) + + def __del__(self) -> None: + """Cleanup method called during garbage collection + + This ensures that the timeout task is properly cancelled even if + close() wasn't explicitly called. + """ + if hasattr(self, '_is_closed'): + self._is_closed = True + if hasattr(self, '_buffer_timeout_manager'): + self._buffer_timeout_manager.stop_timeout_monitoring() + + # ======================================================================== + # CORE PRODUCER OPERATIONS - Main public API + # ======================================================================== + + async def poll(self, timeout: float = 0, *args: Any, **kwargs: Any) -> int: + """Processes delivery callbacks from librdkafka - blocking depends on timeout + + This method triggers any pending delivery reports that have been + queued by librdkafka when messages are delivered or fail to deliver. + + Args: + timeout: Timeout in seconds for waiting for callbacks: + - 0 = non-blocking, return after processing available callbacks + - >0 = block up to timeout seconds waiting for new callbacks + - -1 = block indefinitely until callbacks are available + + Returns: + Number of callbacks processed during this call + """ + return await self._call(self._producer.poll, timeout, *args, **kwargs) + + async def produce( + self, topic: str, value: Optional[Any] = None, key: Optional[Any] = None, *args: Any, **kwargs: Any + ) -> asyncio.Future[Any]: + """Batched produce: Accumulates messages in buffer and flushes when threshold reached + + Args: + topic: Kafka topic name (required) + value: Message payload (optional) + key: Message key (optional) + *args, **kwargs: Additional parameters like partition, timestamp, headers + + Returns: + asyncio.Future: Future that resolves to the delivered message or raises exception on failure + """ + result = asyncio.get_running_loop().create_future() + + msg_data = {'topic': topic, 'value': value, 'key': key} + + # Add optional parameters to message data + if 'partition' in kwargs: + msg_data['partition'] = kwargs['partition'] + if 'timestamp' in kwargs: + msg_data['timestamp'] = kwargs['timestamp'] + if 'headers' in kwargs: + # Headers are not supported in batch mode due to librdkafka API limitations. + # Use individual synchronous produce() calls if headers are required. + raise NotImplementedError( + "Headers are not supported in AIOProducer batch mode. " + "Use the synchronous Producer.produce() method if headers are required." + ) + + self._batch_processor.add_message(msg_data, result) + + self._buffer_timeout_manager.mark_activity() + + # Check if we should flush the buffer + if self._batch_processor.get_buffer_size() >= self._batch_size: + await self._flush_buffer() + + return result + + async def flush(self, *args: Any, **kwargs: Any) -> Any: + """Waits until all messages are delivered or timeout + + This method performs a complete flush: + 1. Flushes any buffered messages from local buffer to librdkafka + 2. Waits for librdkafka to deliver/acknowledge all messages + """ + # First, flush any remaining messages in the buffer for all topics + if not self._batch_processor.is_buffer_empty(): + await self._flush_buffer() + # Update buffer activity since we just flushed + self._buffer_timeout_manager.mark_activity() + + # Then flush the underlying producer and wait for delivery confirmation + return await self._call(self._producer.flush, *args, **kwargs) + + async def purge(self, *args: Any, **kwargs: Any) -> Any: + """Purges messages from internal queues - may block during cleanup""" + # Cancel all pending futures + self._batch_processor.cancel_pending_futures() + + # Clear local message buffer and futures + self._batch_processor.clear_buffer() + + # Update buffer activity since we cleared the buffer + self._buffer_timeout_manager.mark_activity() + + return await self._call(self._producer.purge, *args, **kwargs) + + async def list_topics(self, *args: Any, **kwargs: Any) -> Any: + return await self._call(self._producer.list_topics, *args, **kwargs) + + # ======================================================================== + # TRANSACTION OPERATIONS - Kafka transaction support + # ======================================================================== + + async def init_transactions(self, *args: Any, **kwargs: Any) -> Any: + """Network call to initialize transactions""" + return await self._call(self._producer.init_transactions, *args, **kwargs) + + async def begin_transaction(self, *args: Any, **kwargs: Any) -> Any: + """Network call to begin transaction""" + + # Flush messages to set a clean state before entering a transaction + await self.flush() + + return await self._call(self._producer.begin_transaction, *args, **kwargs) + + async def send_offsets_to_transaction(self, *args: Any, **kwargs: Any) -> Any: + """Network call to send offsets to transaction""" + return await self._call(self._producer.send_offsets_to_transaction, *args, **kwargs) + + async def commit_transaction(self, *args: Any, **kwargs: Any) -> Any: + """Commit transaction after flushing all buffered messages""" + + # Flush to ensure messages in the local batch_processor buffer are + # delivered to librdkafka + await self.flush() + + # Then commit transaction + return await self._call(self._producer.commit_transaction, *args, **kwargs) + + async def abort_transaction(self, *args: Any, **kwargs: Any) -> Any: + """Network call to abort transaction + + Messages produced before the call (i.e. inside the transaction boundary) will be aborted. + Messages that are still in flight may be failed by librdkafka as they are considered + outside the transaction boundary. + Refer to librdkafka documentation section "Transactional producer API" + for more details: + https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#transactional-producer + """ + + # Flush to ensure messages in the local batch_processor buffer are + # delivered to librdkafka + await self.flush() + + return await self._call(self._producer.abort_transaction, *args, **kwargs) + + # ======================================================================== + # AUTHENTICATION AND SECURITY + # ======================================================================== + + async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: + """Authentication operation that may involve network calls""" + return await self._call(self._producer.set_sasl_credentials, *args, **kwargs) + + # ======================================================================== + # BATCH PROCESSING OPERATIONS - Delegated to BatchProcessor + # ======================================================================== + + async def _flush_buffer(self, target_topic: Optional[str] = None) -> None: + """Flush the current message buffer using clean batch processing flow + + This method demonstrates the new architecture where AIOProducer simply + orchestrates the workflow between components: + 1. BatchProcessor creates immutable MessageBatch objects + 2. ProducerBatchExecutor executes each batch + 3. BufferTimeoutManager handles activity tracking + """ + await self._batch_processor.flush_buffer(target_topic) + + # ======================================================================== + # UTILITY METHODS - Helper functions and internal utilities + # ======================================================================== + + async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """Helper method for blocking operations that need ThreadPool execution""" + return await _common.async_call(self.executor, blocking_task, *args, **kwargs) + + diff --git a/src/confluent_kafka/aio/producer/__init__.py b/src/confluent_kafka/aio/producer/__init__.py new file mode 100644 index 000000000..d8df2496e --- /dev/null +++ b/src/confluent_kafka/aio/producer/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Confluent Kafka AIOProducer Module + +This module contains all the components for the async Kafka producer. +""" + +from ._AIOProducer import AIOProducer + +__all__ = ['AIOProducer'] + + diff --git a/src/confluent_kafka/aio/producer/_buffer_timeout_manager.py b/src/confluent_kafka/aio/producer/_buffer_timeout_manager.py new file mode 100644 index 000000000..958f6f0b1 --- /dev/null +++ b/src/confluent_kafka/aio/producer/_buffer_timeout_manager.py @@ -0,0 +1,143 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import time +import weakref +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + # Import only for type checking to avoid circular dependency + from ._kafka_batch_executor import ProducerBatchExecutor + from ._producer_batch_processor import ProducerBatchManager + +logger = logging.getLogger(__name__) + + +class BufferTimeoutManager: + """Manages buffer timeout and activity tracking for message batching + + This class is responsible for: + - Monitoring buffer inactivity and triggering automatic flushes + - Tracking buffer activity timestamps + - Managing background timeout monitoring tasks + - Coordinating between batch processor and executor for timeout flushes + """ + + def __init__( + self, batch_processor: "ProducerBatchManager", kafka_executor: "ProducerBatchExecutor", timeout: float + ) -> None: + """Initialize the buffer timeout manager + + Args: + batch_processor: ProducerBatchManager instance for creating batches + kafka_executor: ProducerBatchExecutor instance for executing batches + timeout: Timeout in seconds for buffer inactivity (0 disables timeout) + """ + self._batch_processor = batch_processor + self._kafka_executor = kafka_executor + self._timeout = timeout + self._last_activity: float = time.time() + self._timeout_task: Optional[asyncio.Task[None]] = None + self._running: bool = False + + def start_timeout_monitoring(self) -> None: + """Start the background task that monitors buffer inactivity + + Creates an async task that runs in the background and periodically checks + if messages have been sitting in the buffer for too long without being + flushed. + + Key design decisions: + 1. **Weak Reference**: Uses weakref.ref(self) to prevent circular refs + 2. **Self-Canceling**: The task stops itself if manager is GC'd + 3. **Adaptive Check Interval**: Uses timeout to determine check frequency + """ + if not self._timeout or self._timeout <= 0: + return # Timeout disabled + + self._running = True + self._timeout_task = asyncio.create_task(self._monitor_timeout()) + + def stop_timeout_monitoring(self) -> None: + """Stop and cleanup the buffer timeout monitoring task""" + self._running = False + if self._timeout_task and not self._timeout_task.done(): + self._timeout_task.cancel() + self._timeout_task = None + + def mark_activity(self) -> None: + """Update the timestamp of the last buffer activity + + This method should be called whenever: + 1. Messages are added to the buffer (in produce()) + 2. Buffer is manually flushed + 3. Buffer is purged/cleared + """ + self._last_activity = time.time() + + async def _monitor_timeout(self) -> None: + """Monitor buffer timeout in background task + + This method runs continuously in the background, checking for buffer + inactivity and triggering flushes when the timeout threshold is exceeded. + """ + # Use weak reference to avoid circular reference and allow garbage collection + manager_ref = weakref.ref(self) + + while True: + # Check interval should be proportional to buffer timeout for efficiency + manager = manager_ref() + if manager is None or not manager._running: + break + + # Calculate adaptive check interval: timeout/2 with bounds + # Examples: 0.1s→0.1s, 1s→0.5s, 5s→1.0s, 30s→1.0s + check_interval = max(0.1, min(1.0, manager._timeout / 2)) + await asyncio.sleep(check_interval) + + # Re-check manager after sleep + manager = manager_ref() + if manager is None or not manager._running: + break + + # Check if buffer has been inactive for too long + time_since_activity = time.time() - manager._last_activity + if time_since_activity >= manager._timeout: + + try: + # Flush the buffer due to timeout + await manager._flush_buffer_due_to_timeout() + # Update activity since we just flushed + manager.mark_activity() + except Exception: + logger.error("Error flushing buffer due to timeout", exc_info=True) + # Re-raise all exceptions - don't swallow any errors + raise + + async def _flush_buffer_due_to_timeout(self) -> None: + """Flush buffer due to timeout by coordinating batch processor/executor + + This method handles the complete timeout flush workflow: + 1. Create batches from the batch processor + 2. Execute batches from the batch processor + 3. Flush librdkafka queue to ensure messages are delivered + """ + # Create batches from current buffer and send to librdkafka queue + await self._batch_processor.flush_buffer() + + # Flush librdkafka queue to ensure messages are delivered to broker + # 0 timeout means non-blocking flush + await self._kafka_executor.flush_librdkafka_queue(0) diff --git a/src/confluent_kafka/aio/producer/_kafka_batch_executor.py b/src/confluent_kafka/aio/producer/_kafka_batch_executor.py new file mode 100644 index 000000000..a410dede6 --- /dev/null +++ b/src/confluent_kafka/aio/producer/_kafka_batch_executor.py @@ -0,0 +1,143 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import logging +from typing import Any, Dict, List, Sequence + +import confluent_kafka + +from .. import _common + +logger = logging.getLogger(__name__) + + +class ProducerBatchExecutor: + """Executes Kafka batch operations via thread pool + + This class is responsible for: + - Executing produce_batch operations against confluent_kafka.Producer + - Handling partial batch failures from librdkafka + - Managing thread pool execution to avoid blocking the event loop + - Processing delivery callbacks for successful messages + - Supporting partition-specific batch operations + """ + + def __init__(self, producer: confluent_kafka.Producer, executor: concurrent.futures.Executor) -> None: + """Initialize the Kafka batch executor + + Args: + producer: confluent_kafka.Producer instance for Kafka operations + executor: ThreadPoolExecutor for running blocking operations + """ + self._producer = producer + self._executor = executor + + async def execute_batch(self, topic: str, batch_messages: Sequence[Dict[str, Any]], partition: int = -1) -> int: + """Execute a batch operation via thread pool + + This method handles the complete batch execution workflow: + 1. Execute produce_batch in thread pool to avoid blocking event loop + 2. Handle partial failures that occur during produce_batch + 3. Poll for delivery reports of successful messages + + Args: + topic: Target topic for the batch + batch_messages: List of prepared messages with callbacks assigned + partition: Target partition for the batch (-1 = RD_KAFKA_PARTITION_UA) + + Returns: + Result from producer.poll() indicating number of delivery reports processed + + Raises: + Exception: Any exception from the batch operation is propagated + """ + + def _produce_batch_and_poll() -> int: + """Helper function to run in thread pool + + This function encapsulates all the blocking Kafka operations: + - Call produce_batch with specific partition and individual message callbacks + - Handle partial batch failures for messages that fail immediately + - Poll for delivery reports to trigger callbacks for successful messages + """ + # Call produce_batch with specific partition and individual callbacks + # Convert tuple to list since produce_batch expects a list + messages_list: List[Dict[str, Any]] = ( + list(batch_messages) if isinstance(batch_messages, tuple) else batch_messages # type: ignore + ) + + # Use the provided partition for the entire batch + # This enables proper partition control while working around librdkafka limitations + self._producer.produce_batch(topic, messages_list, partition=partition) + + # Handle partial batch failures: Check for messages that failed + # during produce_batch. These messages have their msgstates + # destroyed in Producer.c and won't get callbacks from librdkafka, + # so we need to manually invoke their callbacks + self._handle_partial_failures(messages_list) + + # Immediately poll to process delivery callbacks for successful messages + poll_result = self._producer.poll(0) + + return poll_result + + # Execute in thread pool to avoid blocking event loop + loop = asyncio.get_running_loop() + return await loop.run_in_executor(self._executor, _produce_batch_and_poll) + + async def flush_librdkafka_queue(self, timeout=-1): + """Flush the librdkafka queue and wait for all messages to be delivered + This method awaits until all outstanding produce requests are completed + or the timeout is reached, unless the timeout is set to 0 (non-blocking). + Args: + timeout: Maximum time to wait in seconds: + - -1 = wait indefinitely (default) + - 0 = non-blocking, return immediately + - >0 = wait up to timeout seconds + Returns: + Number of messages still in queue after flush attempt + """ + return await _common.async_call(self._executor, self._producer.flush, timeout) + + def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None: + """Handle messages that failed during produce_batch + + When produce_batch encounters messages that fail immediately (e.g., + message too large, invalid topic, etc.), librdkafka destroys their + msgstates and won't call their callbacks. We detect these failures by + checking for '_error' in the message dict (set by Producer.c) and + manually invoke the simple future-resolving callbacks. + + Args: + batch_messages: List of message dictionaries that were passed to produce_batch + """ + for msg_dict in batch_messages: + if '_error' in msg_dict: + # This message failed during produce_batch - its callback + # won't be called by librdkafka + callback = msg_dict.get('callback') + if callback: + # Extract the error from the message dict (set by Producer.c) + error = msg_dict['_error'] + # Manually invoke the callback with the error + # Note: msg is None since the message failed before being queued + try: + callback(error, None) + except Exception: + logger.warning("Exception in callback during partial failure handling", exc_info=True) + raise + + diff --git a/src/confluent_kafka/aio/producer/_message_batch.py b/src/confluent_kafka/aio/producer/_message_batch.py new file mode 100644 index 000000000..5487da5b6 --- /dev/null +++ b/src/confluent_kafka/aio/producer/_message_batch.py @@ -0,0 +1,72 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from typing import Any, Dict, NamedTuple, Optional, Sequence + + +# Create immutable MessageBatch value object using modern typing +class MessageBatch(NamedTuple): + """Immutable batch of messages for Kafka production + + This represents a group of messages destined for the same topic and partition, + along with their associated futures for delivery confirmation. + """ + + topic: str # Target topic for this batch + messages: Sequence[Dict[str, Any]] # Prepared message dictionaries + futures: Sequence[asyncio.Future[Any]] # Futures to resolve on delivery + partition: int = -1 # Target partition for this batch (-1 = RD_KAFKA_PARTITION_UA) + + @property + def size(self) -> int: + """Get the number of messages in this batch""" + return len(self.messages) + + @property + def info(self) -> str: + """Get a string representation of batch info""" + return f"MessageBatch(topic='{self.topic}', partition={self.partition}, size={len(self.messages)})" + + +def create_message_batch( + topic: str, + messages: Sequence[Dict[str, Any]], + futures: Sequence[asyncio.Future[Any]], + callbacks: Optional[Any] = None, + partition: int = -1, +) -> MessageBatch: + """Create an immutable MessageBatch from sequences + + This factory function converts mutable sequences into an immutable MessageBatch object. + Uses tuples internally for immutability while accepting any sequence type as input. + + Args: + topic: Target topic name + messages: Sequence of prepared message dictionaries + futures: Sequence of asyncio.Future objects + callbacks: Deprecated parameter, ignored for backwards compatibility + partition: Target partition for this batch (-1 = RD_KAFKA_PARTITION_UA) + + Returns: + MessageBatch: Immutable batch object + """ + return MessageBatch( + topic=topic, + messages=tuple(messages) if not isinstance(messages, tuple) else messages, + futures=tuple(futures) if not isinstance(futures, tuple) else futures, + partition=partition, + ) + + diff --git a/src/confluent_kafka/aio/producer/_producer_batch_processor.py b/src/confluent_kafka/aio/producer/_producer_batch_processor.py new file mode 100644 index 000000000..e2dbd1f9b --- /dev/null +++ b/src/confluent_kafka/aio/producer/_producer_batch_processor.py @@ -0,0 +1,334 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import copy +import logging +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple + +from confluent_kafka import KafkaException as _KafkaException + +from ._message_batch import MessageBatch, create_message_batch + +if TYPE_CHECKING: + # Import only for type checking to avoid circular dependency + from ._kafka_batch_executor import ProducerBatchExecutor + +logger = logging.getLogger(__name__) + + +class ProducerBatchManager: + """Handles batching and processing of Kafka messages for AIOProducer + + This class encapsulates all the logic for: + - Grouping messages by topic and partition + - Managing message buffers and futures + - Creating simple future-resolving callbacks + - Executing batch operations via librdkafka + """ + + def __init__(self, kafka_executor: "ProducerBatchExecutor") -> None: + """Initialize the batch processor + + Args: + kafka_executor: KafkaBatchExecutor instance for Kafka operations + """ + self._kafka_executor = kafka_executor + self._message_buffer: List[Dict[str, Any]] = [] + self._buffer_futures: List[asyncio.Future[Any]] = [] + + def add_message(self, msg_data: Dict[str, Any], future: asyncio.Future[Any]) -> None: + """Add a message to the batch buffer + + Args: + msg_data: Dictionary containing message data + future: asyncio.Future to resolve when message is delivered + """ + self._message_buffer.append(msg_data) + self._buffer_futures.append(future) + + def get_buffer_size(self) -> int: + """Get the current number of messages in the buffer""" + return len(self._message_buffer) + + def is_buffer_empty(self) -> bool: + """Check if the buffer is empty""" + return len(self._message_buffer) == 0 + + def clear_buffer(self) -> None: + """Clear the entire buffer""" + self._message_buffer.clear() + self._buffer_futures.clear() + + def cancel_pending_futures(self) -> None: + """Cancel all pending futures in the buffer""" + for future in self._buffer_futures: + if not future.done(): + future.cancel() + + def create_batches(self, target_topic: Optional[str] = None) -> List[MessageBatch]: + """Create MessageBatch objects from the current buffer + + Args: + target_topic: Optional topic to create batches for (None for all) + + Returns: + List[MessageBatch]: List of immutable MessageBatch objects + """ + if self.is_buffer_empty(): + return [] + + # Group by topic and partition for optimal batching + topic_partition_groups = self._group_messages_by_topic_and_partition() + batches = [] + + for (topic, partition), group_data in topic_partition_groups.items(): + if target_topic is None or topic == target_topic: + # Prepare batch messages + batch_messages = self._prepare_batch_messages(group_data["messages"]) + + # Assign simple future-resolving callbacks to messages + self._assign_future_callbacks(batch_messages, group_data["futures"]) + + # Create immutable MessageBatch object with partition info + batch = create_message_batch( + topic=topic, + messages=batch_messages, + futures=group_data["futures"], + callbacks=None, # No user callbacks anymore + partition=partition, # Add partition info to batch + ) + batches.append(batch) + + return batches + + def _clear_topic_from_buffer(self, target_topic: str) -> None: + """Remove messages for a specific topic from the buffer + + Args: + target_topic: Topic to remove from buffer + """ + messages_to_keep = [] + futures_to_keep = [] + + for i, msg_data in enumerate(self._message_buffer): + if msg_data["topic"] != target_topic: + messages_to_keep.append(msg_data) + futures_to_keep.append(self._buffer_futures[i]) + + self._message_buffer = messages_to_keep + self._buffer_futures = futures_to_keep + + async def flush_buffer(self, target_topic: Optional[str] = None) -> None: + """Flush the current message buffer using produce_batch + + Args: + target_topic: Optional topic to flush (None for all topics) + + Returns: + None + """ + if self.is_buffer_empty(): + return + + # Create batches for processing + batches = self.create_batches(target_topic) + + # Clear the buffer immediately to prevent race conditions + if target_topic is None: + # Clear entire buffer since we're processing all messages + self.clear_buffer() + else: + # Clear only messages for the target topic that we're processing + self._clear_topic_from_buffer(target_topic) + + try: + # Execute batches with cleanup + await self._execute_batches(batches, target_topic) + except Exception: + # Add batches back to buffer on failure + try: + self._add_batches_back_to_buffer(batches) + except Exception: + logger.error(f"Error adding batches back to buffer on failure. messages might be lost: {batches}") + raise + raise + + async def _execute_batches(self, batches: List[MessageBatch], target_topic: Optional[str] = None) -> None: + """Execute batches and handle cleanup after successful execution + + Args: + batches: List of batches to execute + target_topic: Optional topic for selective buffer clearing + + Returns: + None + + Raises: + Exception: If any batch execution fails + """ + # Execute each batch + for batch in batches: + try: + # Execute batch using the Kafka executor + await self._kafka_executor.execute_batch(batch.topic, batch.messages, batch.partition) + + except Exception as e: + # Handle batch failure by failing all unresolved futures for this batch + self._handle_batch_failure(e, batch.futures) + # Re-raise the exception so caller knows the batch operation failed + raise + + def _add_batches_back_to_buffer(self, batches: List[MessageBatch]) -> None: + """Add batches back to the buffer when execution fails + + Args: + batches: List of MessageBatch objects to add back to buffer + """ + for batch in batches: + # Add each message and its future back to the buffer + for i, message in enumerate(batch.messages): + # Reconstruct the original message data from the batch + msg_data = { + 'topic': batch.topic, + 'value': message.get('value'), + 'key': message.get('key'), + } + + # Add optional fields if present + if 'partition' in message: + msg_data['partition'] = message['partition'] + if 'timestamp' in message: + msg_data['timestamp'] = message['timestamp'] + if 'headers' in message: + msg_data['headers'] = message['headers'] + + # Add the message and its future back to the buffer + self._message_buffer.append(msg_data) + self._buffer_futures.append(batch.futures[i]) + + def _group_messages_by_topic_and_partition(self) -> Dict[Tuple[str, int], Dict[str, List[Any]]]: + """Group buffered messages by topic and partition for optimal batching + + This function efficiently organizes the mixed-topic message buffer into + topic+partition-specific groups, enabling proper partition control while + maintaining batch efficiency. + + Algorithm: + - Single O(n) pass through message buffer + - Groups related data (messages, futures) by (topic, partition) tuple + - Maintains index relationships between buffer arrays + - Uses partition from message data, defaults to RD_KAFKA_PARTITION_UA + (-1) if not specified + + Returns: + dict: Topic+partition groups with structure: + { + ('topic_name', partition): { + 'messages': [msg_data1, ...], # Message dicts + 'futures': [future1, ...], # asyncio.Future objects + } + } + """ + topic_partition_groups: Dict[Tuple[str, int], Dict[str, Any]] = {} + + # Iterate through buffer once - O(n) complexity + for i, msg_data in enumerate(self._message_buffer): + topic = msg_data["topic"] + # Get partition from message data, default to RD_KAFKA_PARTITION_UA (-1) if not specified + partition = msg_data.get("partition", -1) # -1 = RD_KAFKA_PARTITION_UA + + # Create composite key for grouping + group_key = (topic, partition) + + # Create new topic+partition group if this is first message for this combination + if group_key not in topic_partition_groups: + topic_partition_groups[group_key] = { + "messages": [], # Message data for produce_batch + "futures": [], # Futures to resolve on delivery + } + + # Add message and related data to appropriate topic+partition group + # Note: All arrays stay synchronized by index + topic_partition_groups[group_key]["messages"].append(msg_data) + topic_partition_groups[group_key]["futures"].append(self._buffer_futures[i]) + + return topic_partition_groups + + def _prepare_batch_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Prepare messages for produce_batch by removing internal fields + + Args: + messages: List of message dictionaries + + Returns: + List of cleaned message dictionaries ready for produce_batch + """ + batch_messages = [] + for msg_data in messages: + # Create a shallow copy and remove fields not needed by produce_batch + batch_msg = copy.copy(msg_data) + batch_msg.pop("topic", None) # Remove topic since it's passed separately + # Note: We keep 'partition' in individual messages for reference, + # but the batch partition will be used by produce_batch + batch_messages.append(batch_msg) + + return batch_messages + + def _assign_future_callbacks( + self, batch_messages: List[Dict[str, Any]], futures: Sequence[asyncio.Future[Any]] + ) -> None: + """Assign simple future-resolving callbacks to each message in batch + + Args: + batch_messages: List of message dictionaries for produce_batch + futures: List of asyncio.Future objects to resolve + """ + for i, batch_msg in enumerate(batch_messages): + future = futures[i] + + def create_simple_callback(fut: asyncio.Future[Any]) -> Callable[[Any, Any], None]: + """Create a simple callback that only resolves the future""" + + def simple_callback(err: Any, msg: Any) -> None: + if err: + if not fut.done(): + fut.set_exception(_KafkaException(err)) + else: + if not fut.done(): + fut.set_result(msg) + + return simple_callback + + # Assign the simple callback to this message + batch_msg["callback"] = create_simple_callback(future) + + def _handle_batch_failure(self, exception: Exception, batch_futures: Sequence[asyncio.Future[Any]]) -> None: + """Handle batch operation failure by failing all unresolved futures + + When a batch operation fails before any individual callbacks are invoked, + we need to fail all futures for this batch since none of the per-message + callbacks will be called by librdkafka. + + Args: + exception: The exception that caused the batch to fail + batch_futures: List of futures for this batch + """ + # Fail all futures since no individual callbacks will be invoked + for future in batch_futures: + # Only set exception if future isn't already done + if not future.done(): + future.set_exception(exception) + + From 9b011ca13bc6dc81e55d54fa2a2533547c418339 Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 14:09:51 -0800 Subject: [PATCH 4/7] Version v2.13.0b1 --- pyproject.toml | 2 +- src/confluent_kafka/src/confluent_kafka.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3cf400bf2..f7e3fe1c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "confluent-kafka" -version = "2.12.1" +version = "2.13.0b1" description = "Confluent's Python client for Apache Kafka" classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 32414c1d8..d7953cc26 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -38,7 +38,7 @@ /** * @brief confluent-kafka-python version, must match that of pyproject.toml. */ -#define CFL_VERSION_STR "2.12.1" +#define CFL_VERSION_STR "2.13.0b1" /** * Minimum required librdkafka version. This is checked both during From c2e08491a963f9864f728442e4428aadcbb4ddfe Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 15:25:05 -0800 Subject: [PATCH 5/7] Fixing sleep for Windows --- src/confluent_kafka/src/confluent_kafka.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 1505ddd0b..443d02cfb 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -29,6 +29,9 @@ #include +#ifdef _WIN32 +#include +#endif /** * @brief KNOWN ISSUES @@ -2057,7 +2060,11 @@ int wait_for_oauth_token_set(Handle *h) { while (!h->oauth_token_set && elapsed_sec < max_wait_sec) { CallState cs; CallState_begin(h, &cs); +#ifdef _WIN32 + Sleep(retry_interval_sec * 1000); +#else sleep(retry_interval_sec); +#endif CallState_end(h, &cs); elapsed_sec += retry_interval_sec; } From 988355030037f503b4fbf785153debea89d1e1e0 Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 15:47:16 -0800 Subject: [PATCH 6/7] black and isort --- src/confluent_kafka/aio/_AIOConsumer.py | 2 -- src/confluent_kafka/aio/__init__.py | 2 -- src/confluent_kafka/aio/_common.py | 2 -- src/confluent_kafka/aio/producer/_AIOProducer.py | 2 -- src/confluent_kafka/aio/producer/__init__.py | 2 -- src/confluent_kafka/aio/producer/_kafka_batch_executor.py | 2 -- src/confluent_kafka/aio/producer/_message_batch.py | 2 -- src/confluent_kafka/aio/producer/_producer_batch_processor.py | 2 -- tests/test_producer_batch_processor.py | 4 +--- 9 files changed, 1 insertion(+), 19 deletions(-) diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py index 659175841..0ff986a62 100644 --- a/src/confluent_kafka/aio/_AIOConsumer.py +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -167,5 +167,3 @@ async def get_watermark_offsets(self, *args: Any, **kwargs: Any) -> Any: async def offsets_for_times(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.offsets_for_times, *args, **kwargs) - - diff --git a/src/confluent_kafka/aio/__init__.py b/src/confluent_kafka/aio/__init__.py index c21468d07..098d0b922 100644 --- a/src/confluent_kafka/aio/__init__.py +++ b/src/confluent_kafka/aio/__init__.py @@ -16,5 +16,3 @@ from .producer import AIOProducer __all__ = ['AIOConsumer', 'AIOProducer'] - - diff --git a/src/confluent_kafka/aio/_common.py b/src/confluent_kafka/aio/_common.py index 0d9ead5cf..128d7b0f9 100644 --- a/src/confluent_kafka/aio/_common.py +++ b/src/confluent_kafka/aio/_common.py @@ -85,5 +85,3 @@ def wrap_common_callbacks(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any]) wrap_conf_callback(loop, conf, 'stats_cb') wrap_conf_callback(loop, conf, 'oauth_cb') wrap_conf_logger(loop, conf) - - diff --git a/src/confluent_kafka/aio/producer/_AIOProducer.py b/src/confluent_kafka/aio/producer/_AIOProducer.py index f2eb982aa..922e46349 100644 --- a/src/confluent_kafka/aio/producer/_AIOProducer.py +++ b/src/confluent_kafka/aio/producer/_AIOProducer.py @@ -296,5 +296,3 @@ async def _flush_buffer(self, target_topic: Optional[str] = None) -> None: async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: """Helper method for blocking operations that need ThreadPool execution""" return await _common.async_call(self.executor, blocking_task, *args, **kwargs) - - diff --git a/src/confluent_kafka/aio/producer/__init__.py b/src/confluent_kafka/aio/producer/__init__.py index d8df2496e..34c837513 100644 --- a/src/confluent_kafka/aio/producer/__init__.py +++ b/src/confluent_kafka/aio/producer/__init__.py @@ -21,5 +21,3 @@ from ._AIOProducer import AIOProducer __all__ = ['AIOProducer'] - - diff --git a/src/confluent_kafka/aio/producer/_kafka_batch_executor.py b/src/confluent_kafka/aio/producer/_kafka_batch_executor.py index a410dede6..b2bd477e7 100644 --- a/src/confluent_kafka/aio/producer/_kafka_batch_executor.py +++ b/src/confluent_kafka/aio/producer/_kafka_batch_executor.py @@ -139,5 +139,3 @@ def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None except Exception: logger.warning("Exception in callback during partial failure handling", exc_info=True) raise - - diff --git a/src/confluent_kafka/aio/producer/_message_batch.py b/src/confluent_kafka/aio/producer/_message_batch.py index 5487da5b6..506209291 100644 --- a/src/confluent_kafka/aio/producer/_message_batch.py +++ b/src/confluent_kafka/aio/producer/_message_batch.py @@ -68,5 +68,3 @@ def create_message_batch( futures=tuple(futures) if not isinstance(futures, tuple) else futures, partition=partition, ) - - diff --git a/src/confluent_kafka/aio/producer/_producer_batch_processor.py b/src/confluent_kafka/aio/producer/_producer_batch_processor.py index e2dbd1f9b..cfbf4a9f1 100644 --- a/src/confluent_kafka/aio/producer/_producer_batch_processor.py +++ b/src/confluent_kafka/aio/producer/_producer_batch_processor.py @@ -330,5 +330,3 @@ def _handle_batch_failure(self, exception: Exception, batch_futures: Sequence[as # Only set exception if future isn't already done if not future.done(): future.set_exception(exception) - - diff --git a/tests/test_producer_batch_processor.py b/tests/test_producer_batch_processor.py index 1fb902402..404255bbe 100644 --- a/tests/test_producer_batch_processor.py +++ b/tests/test_producer_batch_processor.py @@ -16,9 +16,7 @@ import confluent_kafka from confluent_kafka.aio.producer._AIOProducer import AIOProducer from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor -from confluent_kafka.aio.producer._producer_batch_processor import ( - ProducerBatchManager as ProducerBatchProcessor, -) +from confluent_kafka.aio.producer._producer_batch_processor import ProducerBatchManager as ProducerBatchProcessor # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) From b6637b3772200b1fb1133819677f6e68bea990f1 Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Mon, 24 Nov 2025 16:51:59 -0800 Subject: [PATCH 7/7] Disabled style check for clang-format --- tools/source-package-verification.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/source-package-verification.sh b/tools/source-package-verification.sh index 530107f26..bd82aaf5b 100755 --- a/tools/source-package-verification.sh +++ b/tools/source-package-verification.sh @@ -52,8 +52,9 @@ if [[ $OS_NAME == linux && $ARCH == x64 ]]; then # Run these actions and tests only in this case echo "Checking code formatting ..." # Check all tracked files (Python and C) - all_files=$(git ls-tree -r --name-only HEAD | egrep '\.(py|c|h)$') - clang-format --version + # Commenting out clang-format until consistent versions across platforms are available for dev and CI testing + all_files=$(git ls-tree -r --name-only HEAD | egrep '\.(py)$') # egrep '\.(py|c|h)$') + # clang-format --version tools/style-format.sh $all_files || exit 1 echo "Building documentation ..." flake8 --exclude ./_venv,*_pb2.py,./build