معرفی شرکت ها


coroexecutor-2022.8.1


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

A coroutine-based Executor implementation
ویژگی مقدار
سیستم عامل -
نام فایل coroexecutor-2022.8.1
نام coroexecutor
نسخه کتابخانه 2022.8.1
نگهدارنده []
ایمیل نگهدارنده []
نویسنده Caleb Hattingh
ایمیل نویسنده caleb.hattingh@gmail.com
آدرس صفحه اصلی https://github.com/cjrh/coroexecutor
آدرس اینترنتی https://pypi.org/project/coroexecutor/
مجوز -
.. image:: https://github.com/cjrh/coroexecutor/workflows/Python%20application/badge.svg :target: https://github.com/cjrh/coroexecutor/actions .. image:: https://img.shields.io/badge/stdlib--only-yes-green.svg :target: https://img.shields.io/badge/stdlib--only-yes-green.svg .. image:: https://coveralls.io/repos/github/cjrh/coroexecutor/badge.svg?branch=master :target: https://coveralls.io/github/cjrh/coroexecutor?branch=master .. image:: https://img.shields.io/pypi/pyversions/coroexecutor.svg :target: https://pypi.python.org/pypi/coroexecutor .. image:: https://img.shields.io/github/tag/cjrh/coroexecutor.svg :target: https://img.shields.io/github/tag/cjrh/coroexecutor.svg .. image:: https://img.shields.io/badge/install-pip%20install%20coroexecutor-ff69b4.svg :target: https://img.shields.io/badge/install-pip%20install%20coroexecutor-ff69b4.svg .. image:: https://img.shields.io/pypi/v/coroexecutor.svg :target: https://img.shields.io/pypi/v/coroexecutor.svg .. image:: https://img.shields.io/badge/calver-YYYY.MM.MINOR-22bfda.svg :target: http://calver.org/ .. warning:: This is alpha. Please don't rely on this in a production setting yet. I will remove this warning when it is ready. coroexecutor ============ .. contents:: :local: :depth: 2 :backlinks: top Provides an ``Executor`` interface for running a group of coroutines together in asyncio-native applications. Demo ---- .. code-block:: python3 import asyncio from coroexecutor import CoroutineExecutor async def f(dt, msg=''): await asyncio.sleep(dt) print(f'completion message: {msg}') async def main(): async with CoroutineExecutor(max_workers=10) as exe: t1 = await exe.submit(f, 0.01, msg="task 1") t2 = await exe.submit(f, 0.05, msg="task 2") assert t1.done() assert t2.done() asyncio.run(main()) ``max_workers`` controls how many submitted jobs can run concurrently. These internal workers are lightweight of course, they're just ``asyncio.Task`` instances. Millions of jobs can be pushed through the executor. As is normal for asyncio, concurrency requires that these jobs be IO-bound, and the upper bound for setting ``max_workers`` is mainly going to depend on your CPU and RAM resources. Discussion ---------- The ``CoroutineExecutor`` context manager works very much like the ``Executor`` implementations in the ``concurrent.futures`` package in the standard library. This is the intention of this package. The basic components of the interface are: - The executor applies a context over the creation of jobs - Jobs are submitted to the executor - All jobs must be complete when the context manager for the executor exits. After creating a context manager using ``CoroutineExecutor``, the two main features are the ``submit()`` method, and the ``map()`` method. It is impossible to *exactly* match the ``Executor`` interface in the ``concurrent.futures`` package because some functions in this interface need to be ``async`` functions. But we can get close; certainly close enough that a user with experience using the ``ThreadPoolExecutor`` or ``ProcessPoolExecutor`` should be able to figure things out pretty quickly. There is a great deal of complexity that can arise. The "happy path" is simple. You just submit jobs to the executor, and they will get executed accordingly. But there are many corner cases: - asyncio can concurrently execute thousands, or even tens of thousands of (IO-bound) jobs concurrently. But how to handle more, say, millions of jobs? - If one job raises an exception, how to terminate all the other jobs? In the CTRL-C case, this is desired, but what about other cases? Do you always want a single task failure (with an unexpected exception) to cancel the entire batch? And is there a difference between a job raising ``CancelledError`` versus raising some other kind of exception? - The ``CoroutineExectutor`` provides a context manager API: if some code within the body of the context manager (that is not a task) raises an exception, should all the submitted tasks also be cancelled? Each of these will be discussed in more detail in the sections that follow. Throttling, using ``max_workers`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Even though it is possible to concurrently execute a much larger number of (IO bound) tasks with asyncio compared to threads or processes, there will still be an upper limit the machine can handle based on either: - memory limitations: many task object instances - CPU limitations: too many concurrent task objects and events for the event loop to process. Thus, we also have a ``max_workers`` setting to limit concurrency. It might not be obvious how that limitation is applied, say, in the scenario of millions of jobs. The ``CoroutineExecutor.submit()`` is an ``async def`` method. This means that you will have to await it, like so: .. code-block:: python3 import asyncio from coroexecutor import CoroutineExecutor async def f(): print('hi!') async def main(): async with CoroutineExecutor(max_workers=10) as exe: t1 = await exe.submit(f) asyncio.run(main()) If the total number of jobs already submitted is less than ``max_workers``, the call to ``await exe.submit()`` will return immediately: the job will begin executing, and ``submit()`` returns an ``asyncio.Task`` instance for that job. However, if the total number of concurrently-running jobs is greater than the ``max_workers`` setting, this call will wait until the number of currently-running jobs drops below the threshold before adding the new job. This means that ``submit()`` applies *back-pressure*. Say you have a file containing ten million URLs that you want to fetch using aiohttp. That program might look something like this: .. code-block:: python3 import asyncio, aiohttp from coroexecutor import CoroutineExecutor async def fetch(url: str): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: print('body:', response.text()) # or whatever except Exception: print('Problem with url:', url) async def main(): async with CoroutineExecutor(max_workers=10000) as exe: for line in open('urls.txt'): await exe.submit(fetch, line) asyncio.run(main()) Assuming it takes 3 seconds to fetch a single url, this program should take around 1e7 / 1e4 => 1000 seconds to fetch all of them. About 17 minutes, since even though there are 10 million urls, we're doing 10k concurrently. (In practice, some of the endpoints will be very slow to respond, if they respond at all. So for real code you're going to want to either use aiohttp facilities for timeouts on the ``.get()``, or wrap the work inside an ``asyncio.wait_for()`` wrapper.) Note that we're handling errors inside our job function ``fetch()``. By default, if jobs raise exceptions these will cancel all pending jobs inside the executor, and shut it down. For long batch jobs, that may not be what we want, and this is discussed next. Dealing with errors and cancellation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Generally, there are these kinds of error situations: - A job is cancelled, and you want the executor to be shut down - A job is cancelled, and the executor must NOT be shut down - A job raises an exception (not ``CancelledError``), and you want the executor to shut down - A job raises an exception (not ``CancelledError``), and the executor must NOT be shut down Consider the previous example using aiohttp to fetch URLs: inside the ``fetch()`` function, we're handling ``Exception``, which includes ``asyncio.CancelledError``. In general, this is the correct thing to do because you can control what happens in each of the scenarios presented above. But what happens if your code is not supplying the jobs and you don't control how error handling inside them is being managed? By default, if any job raises an exception (cancellation or otherwise) that will initiate "shutdown" of the executor instance, and all other pending jobs on that executor will be cancelled. If you have a situation where this is not desired, you can ask ``CoroutineExecutor`` to ignore all task errors for you: .. code-block:: python3 import asyncio, aiohttp from coroexecutor import CoroutineExecutor async def naive_fetch(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print('body:', response.text()) # or whatever async def main(): async with CoroutineExecutor( max_workers=10000, suppress_task_errors=True, ) as exe: for line in open('urls.txt'): await exe.submit(naive_fetch, line) asyncio.run(main()) In this modified example, the job function ``naive_fetch`` has no error handling. No matter, the ``suppress_task_errors`` parameter will allow the executor to absorb them all. Be careful with this. I recommend against doing this wherever possible, and handle exceptions and ``CancelledError`` explicitly within your job functions instead. Examples -------- Using ``map`` ^^^^^^^^^^^^^ The ``concurrent.futures.Executor`` interface also defines ``map()`` which returns an iterator. However, it makes for sense for us to use an *asynchronous generator* for this purpose. Here's an example from the tests: .. code-block:: python3 times = [0.01, 0.02, 0.03] async def f(dt): await asyncio.sleep(dt) return dt async def main(): async with CoroutineExecutor() as exe: results = exe.map(f, times) assert [v async for v in results] == times asyncio.run(main()) You can see how ``async for`` is used to asynchronously loop over the result from calling ``map``. If one of the function calls raises an error, all unfinished calls will be cancelled, but you may still have received partial results. Here's another example from the tests: .. code-block:: python3 times = [0.01, 0.02, 0.1, 0.2] results = [] async def f(dt): await asyncio.sleep(dt) if dt == 0.1: raise Exception('oh noes') return dt async def main(): async with CoroutineExecutor() as exe: async for r in exe.map(f, times): results.append(r) with pytest.raises(Exception): asyncio.run(main()) assert results == [0.01, 0.02] The first two values of the batch finish quickly, and I saved these to the ``results`` list in the outer scope. Then, one of the jobs fails with an exception. This results in the other pending jobs being cancelled (i.e., the "0.2" case in this example), the ``CoroutineExecutor`` instance re-raising the exception, and in this example, the exception raises all the way out to the invocation of the ``run()`` function itself. However, note that we still have the results from jobs that succeeded. Nesting ^^^^^^^ You don't always have to submit tasks to the executor in a single function. The executor instance can be passed around and work can be added to it from several different places. .. code-block:: python3 from random import random async def f(dt): await asyncio.sleep(dt) async def producer1(executor: CoroutineExecutor): executor.submit(f, random()) executor.submit(f, random()) executor.submit(f, random()) async def producer2(executor: CoroutineExecutor): executor.submit(f, random()) executor.submit(f, random()) executor.submit(f, random()) async def main(): async with CoroutineExecutor(timeout=0.5) as executor: executor.submit(f, random()) executor.submit(f, random()) executor.submit(f, random()) executor.submit(producer1, executor) executor.submit(producer2, executor) run(main()) You can not only submit jobs within the executor context manager, but also pass the instance around and collect jobs from other functions too. And the timeout set when creating the ``CoroutineExecutor`` instance will still be applied.


نیازمندی

مقدار نام
- check-manifest
- pygments
- wheel
- pytest
- pytest-cov
- flake8


نحوه نصب


نصب پکیج whl coroexecutor-2022.8.1:

    pip install coroexecutor-2022.8.1.whl


نصب پکیج tar.gz coroexecutor-2022.8.1:

    pip install coroexecutor-2022.8.1.tar.gz