aiokafka
========
.. image:: https://travis-ci.com/aio-libs/aiokafka.svg?branch=master
:target: https://travis-ci.com/aio-libs/aiokafka
:alt: |Build status|
.. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master
:target: https://codecov.io/gh/aio-libs/aiokafka/branch/master
:alt: |Coverage|
.. image:: https://badges.gitter.im/Join%20Chat.svg
:target: https://gitter.im/aio-libs/Lobby
:alt: |Chat on Gitter|
asyncio client for Kafka
AIOKafkaProducer
****************
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
.. code-block:: python
from aiokafka import AIOKafkaProducer
import asyncio
async def send_one():
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# Produce message
await producer.send_and_wait("my_topic", b"Super message")
finally:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
asyncio.run(send_one())
AIOKafkaConsumer
****************
AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple
consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
.. code-block:: python
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
Running tests
-------------
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package,
or python source header files for compilation on Linux.
NOTE: You will also need a valid java installation. It's required for the ``keytool`` utility, used to
generate ssh keys for some tests.
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::
sudo apt-get install -y libsnappy-dev
make setup
Running tests with coverage::
make cov
To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable::
make cov KAFKA_VERSION=0.10.2.1
Test running cheatsheat:
* ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests fitst. Great for cleaning up a lot of errors, say after a big refactor.
* ``make test FLAGS="-k consumer"`` - run only the consumer tests.
* ``make test FLAGS="-m 'not ssl'"`` - run tests excluding ssl.
* ``make test FLAGS="--no-pull"`` - do not try to pull new docker image before test run.
=========
Changelog
=========
0.7.2 (2021-09-02)
==================
Bugfixes:
* Fix `CancelledError` handling in sender (issue #710)
* Fix exception for weakref use after object deletion (issue #755)
* Fix consumer's `start()` method hanging after being idle for more than
`max_poll_interval_ms` (issue #764)
Improved Documentation:
* Add `SASL_PLAINTEXT` and `SASL_SSL` to valid values of security protocol
attribute (pr #768 by @pawelrubin)
0.7.1 (2021-06-04)
==================
Bugfixes:
* Allow group coordinator to close when all brokers are unavailable (issue #659
and pr #660 by @dkilgore90)
* Exclude `.so` from source distribution to fix usage of sdist tarball
(issue #681 and pr #684 by ods)
* Add `dataclasses` backport package to dependencies for Python 3.6
(pr #690 by @ods)
* Fix initialization without running loop (issue #689 and pr #690 by @ods)
* Fix consumer fetcher for python3.9 (pr #672 by @dutradda)
* Make sure generation and member id are correct after (re)joining group.
(issue #727 and pr #747 by @vangheem)
Deprecation:
* Add deprecation warning when loop argument to AIOKafkaConsumer and
AIOKafkaProducer is passed. It's scheduled for removal in 0.8.0 as a
preparation step towards upcoming Python 3.10 (pr #699 by @ods)
Improved Documentation:
* Update docs and examples to not use deprecated practices like passing loop
explicitly (pr #693 by @ods)
* Add docstring for Kafka header support in `Producer.send()` (issue #566 and
pr #650 by @andreportela)
0.7.0 (2020-10-28)
==================
New features:
* Add support for Python 3.8 and 3.9. (issue #569, pr #669 and #676 by @ods)
* Drop support for Python 3.5. (pr #667 by @ods)
* Add `OAUTHBEARER` as a new `sasl_mechanism`. (issue #618 and pr #630 by @oulydna)
Bugfixes:
* Fix memory leak in kafka consumer when consumer is in idle state not consuming any message.
(issue #628 and pr #629 by @iamsinghrajat)
0.6.0 (2020-05-15)
==================
New features:
* Add async context manager support for both Producer and Consumer. (pr #613 and #494 by @nimish)
* Upgrade to kafka-python version 2.0.0 and set it as non-strict
parameter. (issue #590 by @yumendy and #558 by @originalgremlin)
* Make loop argument optional (issue #544)
* SCRAM-SHA-256 and SCRAM-SHA-512 support for SASL authentication (issue #571 and pr #588 by @SukiCZ)
* Added headers param to AIOKafkaProducer.send_and_wait (pr #553 by @megabotan)
* Add `consumer.last_poll_timestamp(partition)` which gives the ms timestamp of the last
update of `highwater` and `lso`. (issue #523 and pr #526 by @aure-olli)
* Change all code base to async-await (pr #522)
* Minor: added PR and ISSUE templates to GitHub
Bugfixes:
* Ignore debug package generation on bdist_rpm command. (issue #599 by @gabriel-tincu)
* UnknownMemberId was raised to the user instead of retrying on auto commit. (issue #611)
* Fix issue with messages not being read after subscriptions change with group_id=None. (issue #536)
* Handle `RequestTimedOutError` in `coordinator._do_commit_offsets()` method to explicitly mark
coordinator as dead. (issue #584 and pr #585 by @FedirAlifirenko)
* Added handling `asyncio.TimeoutError` on metadata request to broker and metadata update.
(issue #576 and pr #577 by @MichalMazurek)
* Too many reqs on kafka not available (issue #496 by @lud4ik)
* Consumer.seek_to_committed now returns mapping of committed offsets (pr #531 by @ask)
* Message Accumulator: add_message being recursive eventually overflows (pr #530 by @ask)
Improved Documentation:
* Clarify auto_offset_reset usage. (pr 601 by @dargor)
* Fix spelling errors in comments and documentation using codespell (pr #567 by mauritsvdvijgh)
* Delete old benchmark file (issue #546 by @jeffwidman)
* Fix a few typos in docs (pr #573 and pr #563 by @ultrabug)
* Fix typos, spelling, grammar, etc (pr #545 and pr #547 by @jeffwidman)
* Fix typo in docs (pr #541 by @pablogamboa)
* Fix documentation for benchmark (pr #537 by @abhishekray07)
* Better logging for bad CRC (pr #529 by @ask)
0.5.2 (2019-03-10)
==================
Bugfixes:
* Fix ConnectionError breaking metadata sync background task (issue #517 and #512)
* Fix event_waiter reference before assignment (pr #504 by @romantolkachyov)
* Bump version of kafka-python
0.5.1 (2019-03-10)
==================
New features:
* Add SASL support with both SASL plain and SASL GGSAPI. Support also includes
Broker v0.9.0, but you will need to explicitly pass ``api_version="0.9"``.
(Big thanks to @cyrbil and @jsurloppe for working on this)
* Added support for max_poll_interval_ms and rebalance_timeout_ms settings (
issue #67)
* Added pause/resume API for AIOKafkaConsumer. (issue #304)
* Added header support to both AIOKafkaConsumer and AIOKafkaProducer for
brokers v0.11 and above. (issue #462)
Bugfixes:
* Made sure to not request metadata for all topics if broker version is passed
explicitly and is 0.10 and above. (issue #440, thanks to @ulrikjohansson)
* Make sure heartbeat task will close if group is reset. (issue #372)
0.5.0 (2018-12-28)
==================
New features:
* Add full support for V2 format messages with a Cython extension. Those are
used for Kafka >= 0.11.0.0
* Added support for transactional producing (issue #182)
* Added support for idempotent producing with `enable_idempotence` parameter
* Added support for `fetch_max_bytes` in AIOKafkaConsumer. This can help limit
the amount of data transferred in a single roundtrip to broker, which is
essential for consumers with large amount of partitions
Bugfixes:
* Fix issue with connections not propagating serialization errors
* Fix issue with `group=None` resetting offsets on every metadata update
(issue #441)
* Fix issue with messages not delivered in order when Leader changes (issue
#228)
* Fixed version parsing of `api_version` parameter. Before it ignored the
parameter
0.4.3 (2018-11-01)
==================
Bugfix:
* Fixed memory issue introduced as a result of a bug in `asyncio.shield` and
not cancelling coroutine after usage. (see issue #444 and #436)
0.4.2 (2018-09-12)
==================
Bugfix:
* Added error propagation from coordinator to main consumer. Before consumer
just stopped with error logged. (issue #294)
* Fix manual partition assignment, broken in 0.4.0 (issue #394)
* Fixed RecursionError in MessageAccumulator.add_message (issue #409)
* Update kafka-python to latest 1.4.3 and added support for Python3.7
* Dropped support for Python3.3 and Python3.4
Infrastructure:
* Added Kafka 1.0.2 broker for CI test runner
* Refactored travis CI build pipeline
0.4.1 (2018-05-13)
==================
* Fix issue when offset commit error reports wrong partition in log (issue #353)
* Add ResourceWarning when Producer, Consumer or Connections are not closed
properly (issue #295)
* Fix Subscription None in GroupCoordinator._do_group_rejoin (issue #306)
0.4.0 (2018-01-30)
==================
Major changes:
* Full refactor of the internals of AIOKafkaConsumer. Needed to avoid several
race conditions in code (PR #286, fixes #258, #264 and #261)
* Rewrote Records parsing protocol to allow implementation of newer protocol
versions later
* Added C extension for Records parsing protocol, boosting the speed of
produce/consume routines significantly
* Added an experimental batch producer API for unique cases, where user wants
to control batching himself (by @shargan)
Minor changes:
* Add `timestamp` field to produced message's metadata. This is needed to find
LOG_APPEND_TIME configured timestamps.
* `Consumer.seek()` and similar API's now raise proper ``ValueError``'s on
validation failure instead of ``AssertionError``.
Bug fixes:
* Fix ``connections_max_idle_ms`` option, as earlier it was only applied to
bootstrap socket. (PR #299)
* Fix ``consumer.stop()`` side effect of logging an exception
ConsumerStoppedError (issue #263)
* Problem with Producer not able to recover from broker failure (issue #267)
* Traceback containing duplicate entries due to exception sharing (PR #247
by @Artimi)
* Concurrent record consumption rasing `InvalidStateError('Exception is not
set.')` (PR #249 by @aerkert)
* Don't fail ``GroupCoordinator._on_join_prepare()`` if ``commit_offset()``
throws exception (PR #230 by @shargan)
* Send session_timeout_ms to GroupCoordinator constructor (PR #229 by @shargan)
Big thanks to:
* @shargan for Producer speed enhancements and the batch produce API
proposal/implementation.
* @vineet-rh and other contributors for constant feedback on Consumer
problems, leading to the refactor mentioned above.
0.3.1 (2017-09-19)
==================
* Added `AIOKafkaProducer.flush()` method. (PR #209 by @vineet-rh)
* Fixed a bug with uvloop involving `float("inf")` for timeout. (PR #210 by
dmitry-moroz)
* Changed test runner to allow running tests on OSX. (PR #213 by @shargan)
0.3.0 (2017-08-17)
==================
* Moved all public structures and errors to `aiokafka` namespace. You will no
longer need to import from `kafka` namespace.
* Changed ConsumerRebalanceListener to support either function or coroutine
for `on_partitions_assigned` and `on_partitions_revoked` callbacks. (PR #190
by @ask)
* Added support for `offsets_for_times`, `beginning_offsets`, `end_offsets`
API's. (issue #164)
* Coordinator requests are now sent using a separate socket. Fixes slow commit
issue. (issuer #137, issue #128)
* Added `seek_to_end`, `seek_to_beginning` API's. (issue #154)
* Updated documentation to provide more useful usage guide on both Consumer and
Producer interface.
0.2.3 (2017-07-23)
==================
* Fixed retry problem in Producer, when buffer is not reset to 0 offset.
Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184)
* Fixed how Producer handles retries on Leader node failure. It just did not
work before... Thanks to @blugowski for the help in locating the problem.
(issue #176, issue #173)
* Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)
0.2.2 (2017-04-17)
==================
* Reconnect after KafkaTimeoutException. (PR #149 by @Artimi)
* Fixed compacted topic handling. It could skip messages if those were
compacted (issue #71)
* Fixed old issue with new topics not adding to subscription on pattern
(issue #46)
* Another fix for Consumer race condition on JoinGroup. This forces Leader to
wait for new metadata before assigning partitions. (issue #118)
* Changed metadata listener in Coordinator to avoid 2 rejoins in a rare
condition (issue #108)
* `getmany` will not return 0 results until we hit timeout. (issue #117)
Big thanks to @Artimi for pointing out several of those issues.
0.2.1 (2017-02-19)
==================
* Add a check to wait topic autocreation in Consumer, instead of raising
UnknownTopicOrPartitionError (PR #92 by fabregas)
* Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls
will result in ConsumerStoppedError (PR #81)
* Added `exclude_internal_topics` option for Consumer (PR #111)
* Better support for pattern subscription when used with `group_id` (part of PR #111)
* Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)
* Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)
* Add support for connections_max_idle_ms config (PR #113)
0.2.0 (2016-12-18)
==================
* Added SSL support. (PR #81 by Drizzt1991)
* Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)
* Fixed `next_record` recursion (PR #94 by fabregas)
* Fixed Heartbeat fail if no consumers (PR #92 by fabregas)
* Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)
* Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991)
* Fix kafka-python typos in docs (PR #69 by jeffwidman)
* Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)
0.1.4 (2016-11-07)
==================
* Bumped kafka-python version to 1.3.1 and Kafka to 0.10.1.0.
* Fixed auto version detection, to correctly handle 0.10.0.0 version
* Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers.
This allows a ``timestamp`` to be associated with messages.
* Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
* Minor refactorings
Big thanks to @fabregas for the hard work on this release (PR #60)
0.1.3 (2016-10-18)
==================
* Fixed bug with infinite loop on heartbeats with autocommit=True. #44
* Bumped kafka-python to version 1.1.1
* Fixed docker test runner with multiple interfaces
* Minor documentation fixes
0.1.2 (2016-04-30)
==================
* Added Python3.5 usage example to docs
* Don't raise retriable exceptions in 3.5's async for iterator
* Fix Cancellation issue with producer's `send_and_wait` method
0.1.1 (2016-04-15)
==================
* Fix packaging issues. Removed unneeded files from package.
0.1.0 (2016-04-15)
==================
Initial release
Added full support for Kafka 9.0. Older Kafka versions are not tested.