diff --git a/DEVELOPER.md b/DEVELOPER.md index 201816c73..184265fc8 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -55,7 +55,25 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build ```bash python3 -c "import confluent_kafka; print('Setup successful!')" - ``` + + +#### Local Setup with UV + +Alternative setup instructions tested with python 3.11 + +```bash +# Modify pyproject.toml to require python version >=3.11 +# This fixes the cel-python dependency conflict +uv venv --python 3.11 +source .venv/bin/activate + +uv sync --extra dev --extra tests +uv pip install trivup setuptools +pytest tests/ + +# When making changes, change project.version in pyproject.toml before re-running: +uv sync --extra dev --extra tests +``` @@ -160,6 +178,26 @@ Integration tests (may require local/CI Kafka cluster; see tests/README.md): pytest -q tests/integration ``` +## Local Setup with UV + +Tested with python 3.11 + +```bash +# Modify pyproject.toml to require python version >=3.11 +# This fixes the cel-python dependency conflict +uv venv --python 3.11 +source .venv/bin/activate + +uv sync --extra dev --extra tests +uv pip install trivup setuptools +pytest tests/ + +# When making changes, change project.version in pyproject.toml before re-running: +uv sync --extra dev --extra tests + +``` + + ## Tests See [tests/README.md](tests/README.md) for instructions on how to run tests. diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 0e3d4a307..6d72367e9 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -420,6 +420,42 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, return cfl_PyInt_FromInt(qlen); } + +static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) { + + double tmout = 1; // Default timeout is 1 second for close to clear rather than indefinitely + static char *kws[] = { "timeout", NULL }; + rd_kafka_resp_err_t err; + CallState cs; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) + return NULL; + + if (!self->rk) + Py_RETURN_TRUE; + + CallState_begin(self, &cs); + + // Flush any remaining messages before closing if possible + err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout)); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + PyErr_WarnFormat(PyExc_RuntimeWarning, 1, + "Producer flush failed during close: %s", + rd_kafka_err2str(err)); + } + rd_kafka_destroy(self->rk); + rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested"); + + self->rk = NULL; + + if (!CallState_end(self, &cs)) + return NULL; + + Py_RETURN_TRUE; + +} + + /** * @brief Validate arguments and parse all messages in the batch * @param self Producer handle @@ -983,7 +1019,16 @@ static PyMethodDef Producer_methods[] = { " :rtype: int\n" "\n" }, - + { "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS, + ".. py:function:: close([timeout])\n" + "\n" + " Request to close the producer on demand with an optional timeout.\n" + "\n" + " :param: float timeout: Maximum time to block (default 1 second). (Seconds)\n" + " :rtype: bool\n" + " :returns: True if producer close requested successfully, False otherwise\n" + "\n" + }, { "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS, ".. py:function:: flush([timeout])\n" "\n" diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index b69f75dbd..68fa7f29d 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -91,6 +91,16 @@ #endif +#define CK_LOG_EMERG 0 +#define CK_LOG_ALERT 1 +#define CK_LOG_CRIT 2 +#define CK_LOG_ERR 3 +#define CK_LOG_WARNING 4 +#define CK_LOG_NOTICE 5 +#define CK_LOG_INFO 6 +#define CK_LOG_DEBUG 7 + + /**************************************************************************** * diff --git a/tests/test_Producer.py b/tests/test_Producer.py index f94e4903f..4586de234 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -55,6 +55,8 @@ def on_delivery(err, msg): except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT) + assert p.close(), "Failed to validate that producer was closed." + def test_produce_timestamp(): """ Test produce() with timestamp arg """ @@ -239,6 +241,8 @@ def test_transaction_api(): assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False + assert p.close(), "The producer was not closed" + def test_purge(): """ @@ -274,6 +278,8 @@ def on_delivery(err, msg): p.flush(0.002) assert cb_detector["on_delivery_called"] + assert p.close(), "The producer was not closed" + def test_producer_bool_value(): """ @@ -283,7 +289,7 @@ def test_producer_bool_value(): p = Producer({}) assert bool(p) - + assert p.close(), "The producer was not fully closed" def test_produce_batch_basic_types_and_data(): """Test basic data types, None/empty handling, and partition functionality.""" @@ -1395,3 +1401,37 @@ def __init__(self, config): # Test __len__() - should return 0 for closed producer (safe, no crash) assert len(producer) == 0 + + +def test_producer_close(): + """ + Ensures the producer close can be requested on demand + """ + conf = { + 'debug': 'all', + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + producer = Producer(conf) + producer.produce('mytopic', value='somedata', key='a key') + assert producer.close(), "The producer could not be closed on demand" + # Ensure no messages remain in the flush buffer after close + assert len(producer) == 0 + + +def test_producer_close_with_timeout(): + """ + Ensures the producer close can be requested on demand + """ + conf = { + 'debug': 'all', + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + producer = Producer(conf) + producer.produce('mytopic', value='somedata', key='a key') + assert producer.close(0.1), "The producer could not be closed on demand with timeout" + # Ensure no messages remain in the flush buffer after close + assert len(producer) == 0