معرفی شرکت ها


aio-redis-mq-2.0.0


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

Lightweight Message Queue & Broker base on async python redis streams
ویژگی مقدار
سیستم عامل -
نام فایل aio-redis-mq-2.0.0
نام aio-redis-mq
نسخه کتابخانه 2.0.0
نگهدارنده []
ایمیل نگهدارنده []
نویسنده kavinbj
ایمیل نویسنده kwfelix@163.com
آدرس صفحه اصلی http://github.com/kavinbj/aioRedisMQ
آدرس اینترنتی https://pypi.org/project/aio-redis-mq/
مجوز MIT
# aio_redis_mq Lightweight Message Queue & Broker base on async python redis streams # Suitable Application Environment Modern software applications have moved from being a single monolithic unit to loosely coupled collections of services. While this new architecture brings many benefits, those services still need to interact with each other, creating the need for robust and efficient messaging solutions. The following problems are suitable for using message queuing: - Asynchronous processing - Flow control - Service decoupling - Connect flow computing - As a publish / subscribe system # Installation ```bash pip install aio-redis-mq ``` ## Quick Start ## ```python import asyncio import time from aio_redis_mq import MQProducer, MQConsumer _redis_url = 'redis://root:xxxxx@localhost/1' async def producer_task(producer): for _ in range(0, 10): await asyncio.sleep(1) send_msg_id = await producer.send_message({'msg': f'msg_{_}', 'content': time.strftime("%Y-%m-%d %H:%M:%S")}) print(f'producer_task time at {time.strftime("%Y-%m-%d %H:%M:%S")}', f'message id={send_msg_id}') async def consumer_task(consumer: MQConsumer, consumer_index: int): for _ in range(0, 10): msg = await consumer.block_read_messages(block=1500) print(f'consumer_{consumer_index} block read message', msg) async def main(): # one producer producer = MQProducer('pub_stream', redis_name='_redis_local', redis_url=_redis_url) # three consumer consumer1 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url) consumer2 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url) consumer3 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url) await asyncio.gather( producer_task(producer), consumer_task(consumer1, 1), consumer_task(consumer2, 2), consumer_task(consumer3, 3) ) if __name__ == '__main__': asyncio.run(main()) ``` ## Group Consumer ## ```python import asyncio import time from aio_redis_mq import MQProducer, GroupManager, Group, GroupConsumer _redis_url = 'redis://root:xxxxx@localhost/1' async def producer_task(producer): for _ in range(0, 10): await asyncio.sleep(1) print(f'-------------------------------------{_}-------------------------------------') send_msg_id = await producer.send_message({'msg': f'msg_{_}', 'content': time.strftime("%Y-%m-%d %H:%M:%S")}) print(f'group_producer send_message time at {time.strftime("%Y-%m-%d %H:%M:%S")}', f'message id={send_msg_id}') async def consumer_task(consumer: GroupConsumer): for _ in range(0, 10): # Here we use a low-level read message API and do not detect pending messages or handle idle messages msg = await consumer.block_read_messages(count=1, block=1500) await asyncio.sleep(0.05) print(f'group_consumer {consumer.consumer_id} group={consumer.group_name} block read message', msg) if len(msg) > 0 and len(msg[0][1]) > 0: msg_id = msg[0][1][0][0] ack_result = await consumer.ack_message(msg_id) print(f'group_consumer {consumer.consumer_id} group={consumer.group_name} ack message id=' f'{msg_id} {"successful" if ack_result else "failed"}.') # show info async def show_groups_infor(group: Group): print(f'-----------------------------{group.group_name}---------- groups info ------------------------------------') group_info = await group.get_groups_info() print(f'group name: {group.group_name} groups info : {group_info}') print(f'-----------------------------{group.group_name}--------- consumer info -----------------------------------') consumer_info = await group.get_consumers_info() print(f'group name: {group.group_name} consumer info : {consumer_info}') print(f'-----------------------------{group.group_name}-------- pending info -------------------------------------') pending_info = await group.get_pending_info() print(f'group name: {group.group_name} pending info : {pending_info}') async def main(): # create one producer producer = MQProducer('group_stream1', redis_name='_group_redis_', redis_url=_redis_url) # create group manager , via same stream key, same redis_name group_manager = GroupManager('group_stream1', redis_name='_group_redis_', redis_url=_redis_url) # create first group group1 = await group_manager.create_group('group1') # create two consumers in the same group consumer1 = await group1.create_consumer('consumer1') consumer2 = await group1.create_consumer('consumer2') # create second group group2 = await group_manager.create_group('group2') # create three consumers in the same group consumer3 = await group2.create_consumer('consumer3') consumer4 = await group2.create_consumer('consumer4') consumer5 = await group2.create_consumer('consumer5') await asyncio.gather( producer_task(producer), consumer_task(consumer1), consumer_task(consumer2), consumer_task(consumer3), consumer_task(consumer4), consumer_task(consumer5) ) print('------------------------------------- show total infor -------------------------------------') stream_info = await group_manager.get_stream_info(group_manager.stream_key) print(f'stream_key: {group_manager.stream_key} stream info : {stream_info}') await show_groups_infor(group1) await show_groups_infor(group2) if __name__ == '__main__': asyncio.run(main()) ``` ## More Example ## For more examples, please query the example folder. # About Redis streams The Redis Stream is a new data type introduced with Redis 5.0, which models a log data structure in a more abstract way. Redis Streams doubles as a communication channel for building streaming architectures and as a log-like data structure for persisting data, making Streams the perfect solution for event sourcing. The stream type published in redis5.0 is also used to implement typical message queues. The emergence of this stream type meets almost all the requirements of message queues, including but not limited to: - Serialization generation of message ID - Message traversal - Blocking and non blocking reading of messages - Group consumption of messages - Unfinished message processing - Message queue monitoring # Comparison of basic concepts Common distributed message system, including RabbitMQ 、 RocketMQ 、 Kafka 、Pulsar 、Redis streams Redis streams vs Kafka |Kafka | Redis Streams | Description | |-----------|-------|--------| |Record | Message| Objects to be processed in the message engine | |Producer |Producer| Clients that publish new messages to topics | |Consumer |Consumer| Clients that subscribe to new messages from topics | |Consumer Group |Consumer Group| A group composed of multiple consumer instances can consume the same topic at the same time to achieve high throughput.| |Broker |Cluster Node| servers form the storage layer. Leader-Follower replica| |Topic | Stream Data type | Topics are logical containers that carry messages | |partitions |Different Redis keys| Redis Streams [Differences with Kafka (TM) partitions](https://redis.io/docs/manual/data-types/streams/#differences-with-kafka-tm-partitions) | # Performance You can use the following tools for performance testing. [OpenMessaging Benchmark Framework](https://github.com/openmessaging/benchmark) # API Reference ### MQClient ### client for message system, can manage and query messages. * #### `__init__(redis_name: Optional[str] = None, redis_url: Optional[str] = None, redis_pool: aioredis.client.Redis = None, **kwargs)` #### create MQ Client instance * `redis_name`: name for cache redis client * `redis_url`: redis server url * `redis_pool`: aioredis.client.Redis instance, defaults to None * #### `get_stream_length(stream_key: KeyT)` #### Returns the number of elements in a given stream. * `stream_key`: key of the stream. * #### `query_messages(stream_key: KeyT, min_id: StreamIdT = "-", max_id: StreamIdT = "+", count: Optional[int] = None)` #### query message value from min_id to max_id with count limit in a given stream. * `stream_key`: key of the stream. * `min_id`: first stream ID. defaults to '-', meaning the earliest available. * `max_id`: last stream ID. defaults to '+', meaning the latest available. * `count`: if set, only return this many items, beginning with the earliest available. * #### `reverse_query_messages(stream_key: KeyT, min_id: StreamIdT = "-", max_id: StreamIdT = "+", count: Optional[int] = None)` #### query message value in reverse order from min_id to max_id with count limit in a given stream. * #### `get_stream_info(stream_key: KeyT)` #### Returns general information about the stream. * #### `delete_message(stream_key: KeyT, *ids: StreamIdT)` #### Deletes one or more messages from a stream. * `stream_key`: key of the stream. * `*ids`: message ids to delete. * #### `trim_stream(stream_key: KeyT, maxlen: int, approximate: bool = True)` #### Deletes one or more messages from a stream. * `stream_key`: key of the stream. * `maxlen`: truncate old stream messages beyond this size * `maxlen`: actual stream length may be slightly more than maxlen ```python client = MQClient(redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0') # get stream length stream_length = await client.get_stream_length('_test_stream1') # get stream info stream_info = await client.get_stream_info('_test_stream1') assert stream_info.get('length') == stream_length # get first_message_info first_message_info = await client.query_messages('_test_stream1', count=1) # get last_message_info last_message_info = await client.reverse_query_messages('_test_stream1', count=1) assert first_message_info[0] == stream_info.get('first-entry') assert last_message_info[0] == stream_info.get('last-entry') ``` ### MQProducer <- MQClient ### message producer, MQClient with a specific stream key * #### `__init__(stream_key: KeyT, redis_name: str = None, redis_pool: aioredis.client.Redis = None, **kwargs)` #### message producer in message system based on a specific stream key. * `stream_key`: key of stream * `redis_name`: name for cache redis client * `redis_url`: redis server url * `redis_pool`: aioredis.client.Redis instance, defaults to None * #### `send_message(message: Dict[FieldT, EncodableT], msg_id: StreamIdT = "*", maxlen: int = None, approximate: bool = True)` #### __Coroutine__. send message content to a stream which is a message container, and return message id. * `message`: dict of field/value pairs to insert into the stream * `msg_id`: Location to insert this record. By default it is appended. * `maxlen`: max number of messages, truncate old stream members beyond this size * `approximate`: actual stream length may be slightly more than maxlen ```python producer = MQProducer('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0') send_msg_id = await producer.send_message({'msg_key1': 'value1', 'msg_key2': 'value2'}) ``` ### MQConsumer <- MQClient ### message consumer, MQClient with a specific stream key * #### `__init__(stream_key: KeyT, redis_name: str = None, redis_pool: aioredis.client.Redis = None, **kwargs)` #### message consumer in message system based on a specific stream key. * `stream_key`: key of stream * `redis_name`: name for cache redis client * `redis_url`: redis server url * `redis_pool`: aioredis.client.Redis instance, defaults to None * #### `read_messages(streams: Dict[KeyT, StreamIdT], count: Optional[int] = None)` #### __Coroutine__. read messages from streams as message containers * `streams`: a dict of stream keys to stream IDs, where IDs indicate the last ID already seen. * `count`: if set, only return this many items, beginning with the earliest available. * #### `block_read_messages(*stream_key: KeyT, count: Optional[int] = None, block: Optional[int] = None,)` #### __Coroutine__. Block and monitor multiple streams for new data. * `stream_key`: key of the stream. * `count`: if set, only return this many items, beginning with the earliest available. * `block`: number of milliseconds to wait, if nothing already present. ```python consumer = MQConsumer('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0') # block read new message new_msg = await consumer.block_read_messages(block=1500) # read messages from msg_id(0 or other id) in single stream (pub_stream) read_msgs = await consumer.read_messages({'pub_stream': 0}, count=10) ``` ### GroupManager ### * #### `__init__(stream_key: KeyT, redis_name: str = None, **kwargs)` #### group manager in message system based on a specific stream key. * `stream_key`: key of stream * `redis_name`: name for cache redis client * `redis_url`: redis server url * #### `create_group(group_name: GroupT, msg_id: StreamIdT = "$", mkstream: bool = True)` #### Create a new group consumer associated with a stream * `group_name`: name of the consumer group * `msg_id`: ID of the last item in the stream to consider already delivered. * `mkstream`: a boolean indicating whether to create new stream * #### `destroy_group(group_name: GroupT)` #### Destroy a consumer group * `group_name`: name of the consumer group * #### `get_groups_info()` #### Returns general information about the consumer groups of the stream. ```python group_manager = GroupManager('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0') # create group group = await group_manager.create_group('group') ``` ### Group ### * #### `create_consumer(consumer_id: ConsumerT)` #### create a consumer instance in group * `consumer_id`: id of consumer. * #### `delete_consumer(consumer_id: ConsumerT)` #### Remove a specific consumer from a consumer group. * `consumer_id`: id of consumer. * #### `set_msg_id(msg_id: StreamIdT)` #### Set the consumer group last delivered ID to something else. * `msg_id`: ID of the last item in the stream to consider already delivered * #### `get_groups_info()` #### Returns general information about the consumer groups of the stream. * #### `get_consumers_info()` #### Returns general information about the consumers in the group. only return consumer which has read message * #### `get_pending_info()` #### Returns information about pending messages of a group. * #### `query_pending_messages(min_msg_id: Optional[StreamIdT], max_msg_id: Optional[StreamIdT], count: Optional[int], consumer_id: Optional[ConsumerT] = None)` #### Returns information about pending messages, in a range. * `min_msg_id`: minimum message ID * `max_msg_id`: maximum message ID * `count`: number of messages to return * `consumer_id`: id of a consumer to filter by (optional) * #### `ack_message(*msg_id: StreamIdT)` #### Acknowledges the successful processing of one or more messages. * `msg_id`: message ids to acknowledge. * #### `claim_message(consumer_id: ConsumerT, min_idle_time: int, msg_ids: Union[List[StreamIdT], Tuple[StreamIdT]], idle: Optional[int] = None, time: Optional[int] = None, retrycount: Optional[int] = None, force: bool = False, justid: bool = False)` #### Changes the ownership of a pending message. In the context of a stream consumer group, this command changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument. * `consumer_id`: name of a consumer that claims the message. * `min_idle_time`: filter messages that were idle less than this amount of milliseconds * `msg_ids`: non-empty list or tuple of message IDs to claim * `idle`: Set the idle time (last time it was delivered) of the message in ms * `time`: optional integer. This is the same as idle but instead of a relative amount of milliseconds, it sets the idle time to a specific Unix time (in milliseconds). * `retrycount`: optional integer. set the retry counter to the specified value. This counter is incremented every time a message is delivered again. * `force`: optional boolean, false by default. Creates the pending message entry in the PEL even if certain specified IDs are not already in the PEL assigned to a different client. * `justid`: optional boolean, false by default. Return just an array of IDs of messages successfully claimed, without returning the actual message ### GroupConsumer ### * #### `read_messages(streams: Dict[KeyT, StreamIdT], count: Optional[int] = None, noack: bool = False)` #### Read from a stream via a consumer group. * `streams`: a dict of stream names to stream IDs, where IDs indicate the last ID already seen. * `count`: if set, only return this many items, beginning with the earliest available * `noack`: do not add messages to the PEL (Pending Entries List) * #### `block_read_messages(*stream_key: KeyT, block: Optional[int] = None, count: Optional[int] = None, noack: bool = False)` #### Block read from a stream via a consumer group. * `stream_key`: a list of stream key * `block`: number of milliseconds to wait, if nothing already present. * `count`: if set, only return this many items, beginning with the earliest available * `noack`: do not add messages to the PEL (Pending Entries List) * #### `query_pending_messages(min_msg_id: Optional[StreamIdT], max_msg_id: Optional[StreamIdT], count: Optional[int])` #### Returns information about pending messages, in a range. * `min_msg_id`: minimum message ID * `max_msg_id`: maximum message ID * `count`: number of messages to return * #### `ack_message(*msg_id: StreamIdT)` #### Acknowledges the successful processing of one or more messages. * `msg_id`: message ids to acknowledge. # Developer kavinbj


نیازمندی

مقدار نام
>=2.0.0 aioredis


زبان مورد نیاز

مقدار نام
>=3.7 Python


نحوه نصب


نصب پکیج whl aio-redis-mq-2.0.0:

    pip install aio-redis-mq-2.0.0.whl


نصب پکیج tar.gz aio-redis-mq-2.0.0:

    pip install aio-redis-mq-2.0.0.tar.gz