معرفی شرکت ها


arque-1.1.0


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

Asyncio Reliable Queue (based on redis)
ویژگی مقدار
سیستم عامل -
نام فایل arque-1.1.0
نام arque
نسخه کتابخانه 1.1.0
نگهدارنده []
ایمیل نگهدارنده []
نویسنده Andrei Roskach
ایمیل نویسنده code.impactor@gmail.com
آدرس صفحه اصلی https://github.com/code-impactor/arque
آدرس اینترنتی https://pypi.org/project/arque/
مجوز MIT
# arque Asyncio Reliable Queue (based on redis) Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [[1]](#ref1) [[2]](#ref2) and the "torrelque" python module [[3]](#ref3). #### Features: - Asynchronous: based on asyncio and aioredis - Reliable: at any moment task data stored in redis database - Throttling: controls number of tasks in execution - Delayed queue: defers task availability - Dead letters: put task data in failed queue after number of predefined retry attempts - Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5' - Used in containerized applications (managed by kubernetes) in high load environments #### Install: ```bash pip install arque ``` #### Usage: ```python import signal import random import logging import asyncio import aioredis import time from functools import wraps from arque import Arque logger = logging.getLogger(__name__) async def shutdown(signal, loop): """Cleanup tasks tied to the service's shutdown.""" logging.info(f"Received exit signal {signal.name}...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] [task.cancel() for task in tasks] logging.info(f"Cancelling {len(tasks)}outstanding tasks") await asyncio.gather(*tasks) logging.info(f"Flushing metrics") loop.stop() def aioredis_pool(host='redis://localhost', encoding='utf8'): def wrapper(func): @wraps(func) async def wrapped(): redis = await aioredis.create_redis_pool(host, encoding=encoding) try: return await func(redis=redis) finally: redis.close() await redis.wait_closed() return wrapped return wrapper @aioredis_pool(host='redis://localhost', encoding='utf8') async def produce_task(redis=None): logger.info('Starting producing...') queue = Arque(redis=redis) while True: for _ in range(1): task = {'value': random.randint(0, 99)} task_id = f"custom_{task['value']}_{time.time()}" logger.debug('Produced task %s', task) await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1) await asyncio.sleep(1) async def process(task_data): logger.debug('Consumed task %s', task_data) await asyncio.sleep(1) @aioredis_pool(host='redis://localhost', encoding='utf8') async def consume_task(redis=None): logger.info('Starting consuming...') queue = Arque(redis=redis, working_limit=3) while True: task_id, task_data = await queue.dequeue() if task_id == '__not_found__': continue if task_id == '__overloaded__': print(f'TASK ID: {task_id}') await asyncio.sleep(1) continue if task_id == '__marked_as_failed___': print(f'FAILED ID: {task_id}') continue try: await process(task_data) await queue.release(task_id) except Exception: logger.exception('Job processing has failed') await queue.requeue(task_id, delay=5) stats = await queue.get_stats() logger.info(stats) @aioredis_pool(host='redis://localhost', encoding='utf8') async def sweep_task(redis=None): logger.info('Starting sweeping...') queue = Arque(redis=redis, sweep_interval=5) await queue.schedule_sweep() @aioredis_pool(host='redis://localhost', encoding='utf8') async def stats_task(redis=None): logger.info('Starting stats...') queue = Arque(redis=redis) while True: stats = await queue.get_stats() logger.info(stats) await asyncio.sleep(5) async def example(): tasks = [] for _ in range(5): tasks.append(consume_task()) tasks.append(produce_task()) tasks.append(sweep_task()) tasks.append(stats_task()) await asyncio.gather(*tasks) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s') loop = asyncio.get_event_loop() signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1) for s in signals: loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop))) try: loop.run_until_complete(example()) finally: loop.close() logging.info("Successfully shutdown...") ``` #### Reference <a name="ref1"></a>[1] [Reliable Queueing in Redis (Part 1)](http://blog.bronto.com/engineering/reliable-queueing-in-redis-part-1/) <a name="ref2"></a>[2] [DEWIRE Redis as a Reliable Work Queue.pdf](https://www.percona.com/sites/default/files/DEWIRE%20Redis%20as%20a%20Reliable%20Work%20Queue.pdf) <a name="ref3"></a>[3] [torrelque](https://bitbucket.org/saaj/torrelque)


نیازمندی

مقدار نام
>=1.2.0 aioredis


زبان مورد نیاز

مقدار نام
>=3.7 Python


نحوه نصب


نصب پکیج whl arque-1.1.0:

    pip install arque-1.1.0.whl


نصب پکیج tar.gz arque-1.1.0:

    pip install arque-1.1.0.tar.gz