aiopubsub - asyncio publish-subscribe within a process
======================================================
Simple publish-subscribe pattern for asyncio applications.
Why
----
When building big applications, separation of concerns is a great way to keep things manageable.
In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data
consumers. We went a step ahead and designed even the internals of our applications around this pattern.
We explain our thinking and the workings of ``aiopubsub`` in detail in our article
`Design your app using the pub-sub pattern with aiopubsub <https://quantlane.com/blog/aiopubsub/>`_.
We recommend reading it before using ``aiopubsub`` in your project.
Installation
--------------
``aiopubsub`` is only compatible with Python 3.8 and higher. There are no plans to support older versions.
``aiopubsub`` is `available on PyPI <https://pypi.org/project/aiopubsub/>`_ and you can install it with:
::
pip install aiopubsub
or
::
poetry add aiopubsub
How to use it
----------------------
The following comprehensive example is explained step-by-step
in our article
`"Design your app using the pub-sub pattern with aiopubsub" <https://quantlane.com/blog/aiopubsub/>`_.
.. code-block:: python
import asyncio
import dataclasses
import decimal
import aiopubsub
@dataclasses.dataclass
class Trade:
timestamp: float
quantity: int
price: decimal.Decimal
async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key}.')
async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key} that happened in NYSE')
async def main():
# create an aiopubsub hub
hub = aiopubsub.Hub()
# create a sample of data to send
trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))
# subscriber listens on every trade and calls the `on_trade` function
subscriber = aiopubsub.Subscriber(hub, 'trades')
subscribe_key = aiopubsub.Key('*', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_trade)
# publisher has a NASDAQ prefix and sends the trade that happened on Google stock
publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
publish_key = aiopubsub.Key('trade', 'GOOGL')
publisher.publish(publish_key, trade)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43')) with key = ('NASDAQ', 'trade', 'GOOGL').
# sample from another stock exchange
trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))
# subscribe only for the NYSE exchange
subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)
# publish NYSE trade
publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL').
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE
# clean the subscriber before the end of the program
await subscriber.remove_all_listeners()
if __name__ == '__main__':
asyncio.run(main())
Aiopubsub will use `logwood <https://github.com/qntln/logwood>`_ if it is installed, otherwise it will default
to the standard logging module. Note that ``logwood`` is required to run tests.
Architecture
------------
**Hub** accepts messages from **Publishers** and routes them to **Subscribers**. Each message is routed by its
**Key** - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys,
where any part of the key may be replaced replaced with a ``*`` (star).
``addedSubscriber`` and ``removedSubscriber`` messages
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When a new subscriber is added the Hub sends this message
.. code-block::
{
"key": ("key", "of", "added", "subscriber"),
"currentSubscriberCount": 2
}
under the key ``('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber')`` (the part after ``addedSubscriber``
is made of the subscribed key). Note the ``currentSubscriberCount`` field indicating how many subscribers are currently
subscribed.
When a subscriber is removed a message in the same format is sent, but this time under the key
``('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber')``.
Contributing
-------------
Pull requests are welcome! In particular, we are aware that the documentation could be improved.
If anything about ``aiopubsub`` is unclear, please feel free to
`simply open an issue <https://gitlab.com/quantlane/libs/aiopubsub/-/issues/new>`_ and we will do our best
to advise and explain 🙂
****
.. image:: quantlane.png
``fastenum`` was made by `Quantlane <https://quantlane.com>`_, a systematic trading firm.
We design, build and run our own stock trading platform.