معرفی شرکت ها


async-pool-executor-0.1


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

async_pool_executor,its api like the concurrent.futures
ویژگی مقدار
سیستم عامل -
نام فایل async-pool-executor-0.1
نام async-pool-executor
نسخه کتابخانه 0.1
نگهدارنده ['ydf']
ایمیل نگهدارنده ['ydf0509@sohu.com']
نویسنده bfzs
ایمیل نویسنده ydf0509@sohu.com
آدرس صفحه اصلی -
آدرس اینترنتی https://pypi.org/project/async-pool-executor/
مجوز BSD License
## pip install async_pool_executor ## 主要功能 ``` 主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。 使得在做生产 消费 时候,无需学习烦人的异步 loop 、 run_until_complete ,可以直接在同步函数中 submit。 生产和消费不需要在同一个loop中,喜欢同步编程思维的人可以用这个。 ``` ## 实现代码 ```python import asyncio import atexit import time import traceback from threading import Thread class AsyncPoolExecutor: """ 使api和线程池一样,最好的性能做法是submit也弄成 async def,生产和消费在同一个线程同一个loop一起运行,但会对调用链路的兼容性产生破坏,从而调用方式不兼容线程池。 """ def __init__(self, size, loop=None): """ :param size: 同时并发运行的协程任务数量。 :param loop: """ self._size = size self.loop = loop or asyncio.new_event_loop() self._sem = asyncio.Semaphore(self._size, loop=self.loop) self._queue = asyncio.Queue(maxsize=size, loop=self.loop) t = Thread(target=self._start_loop_in_new_thread) t.setDaemon(True) # 设置守护线程是为了有机会触发atexit,使程序自动结束,不用手动调用shutdown t.start() self._can_be_closed_flag = False atexit.register(self.shutdown) def submit(self, func, *args, **kwargs): future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop) # 这个 run_coroutine_threadsafe 方法也有缺点,消耗的性能巨大。 future.result() # 阻止过快放入,放入超过队列大小后,使submit阻塞。 async def _produce(self, func, *args, **kwargs): await self._queue.put((func, args, kwargs)) async def _consume(self): while True: func, args, kwargs = await self._queue.get() if func == 'stop': break try: await func(*args, **kwargs) except Exception as e: traceback.print_exc() def _start_loop_in_new_thread(self, ): # self._loop.run_until_complete(self.__run()) # 这种也可以。 # self._loop.run_forever() # asyncio.set_event_loop(self.loop) self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop)) self._can_be_closed_flag = True def shutdown(self): for _ in range(self._size): self.submit('stop', ) while not self._can_be_closed_flag: time.sleep(0.1) self.loop.close() print('关闭循环') if __name__ == '__main__': import nb_log async def async_f(x): await asyncio.sleep(2) print(x) pool =AsyncPoolExecutor(3) for i in range(30): pool.submit(async_f,i) ```


نحوه نصب


نصب پکیج whl async-pool-executor-0.1:

    pip install async-pool-executor-0.1.whl


نصب پکیج tar.gz async-pool-executor-0.1:

    pip install async-pool-executor-0.1.tar.gz