darq
====
[ ~ Dependencies scanned by PyUp.io ~ ]
.. image:: https://github.com/seedofjoy/darq/workflows/Lint%20&%20test/badge.svg?branch=master
:target: https://github.com/seedofjoy/darq/actions
.. image:: https://codecov.io/gh/seedofjoy/darq/branch/master/graph/badge.svg
:target: https://codecov.io/gh/seedofjoy/darq
|
Async task manager with Celery-like features. Fork of `arq <http://github.com/samuelcolvin/arq>`_.
Features
--------
* Celery-like ``@task`` decorator, adds ``.delay()`` to enqueue job
* Proper ``mypy`` type checking: all arguments passed to ``.delay()`` will be checked against the original function signature
* Graceful shutdown: waits until running tasks are finished
Installation
------------
Darq uses ``aioredis`` 1.x as Redis client. Unfortunately, this library has been abandoned, and does not support Python 3.11. I made a fork with compatability fixes: ``evo-aioredis`` (https://github.com/evo-company/aioredis-py).
Because of this, ``aioredis`` is not currently added as Darq dependency, and you must install it yourself:
For Python<3.11 you can use:
.. code:: shell
pip install aioredis<2.0.0
For Python 3.11 (and older versions too) you can use fork:
.. code:: shell
pip install evo-aioredis<2.0.0
Quick start
-----------
.. code:: python
# some_project/darq_app.py
import asyncio
import darq
darq = darq.Darq(redis_settings=darq.RedisSettings(host='redis'))
@darq.task
async def add_to_42(a: int) -> int:
return 42 + a
async def main():
# Before adding tasks to queue we should connect darq instance to redis
await darq.connect()
# Direct call job as function:
result = await add_to_42(5) # result == 47
# Celery-like add task to queue:
await add_to_42.delay(a=5)
await darq.disconnect()
if __name__ == '__main__':
asyncio.run(main())
And start worker:
.. code:: shell
python3 -m darq.cli -A some_project.darq_app.darq worker
Worker output:
.. code:: shell
15:24:42: Starting worker for 1 functions: some_project.darq_app.add_to_42
15:24:42: redis_version=5.0.7 mem_usage=834.87K clients_connected=1 db_keys=2
15:25:08: 0.22s → 1315f27608e9408392bf5d3310bca38c:darq_app.add_to_42(a=5)
15:25:08: 0.00s ← 1315f27608e9408392bf5d3310bca38c:darq_app.add_to_42 ● 47
.. :changelog:
Changelog
---------
0.11.1 (2022-11-30)
...................
* Add Python 3.11 support (with ``evo-aioredis`` dependency instead of ``aioredis``)
* Remove ``pydantic`` dependency
* Remove ``aioredis`` from dependencies to allow choose between ``aioredis`` and ``evo-aioredis`` - fork with Python 3.11 compatability
0.11.0 (2022-08-03)
...................
* Added ability to optionally pass ``ctx`` to the task, like this:
.. code:: python
@task(with_ctx=True)
def foobar(ctx):
log.info('Foobar try %s', ctx['job_try'])
``ctx`` contains: ``job_id``, ``job_try``, ``enqueue_time``, ``score``, ``metadata`` + all worker's ``ctx`` (including custom context which can be passed via ``on_startup``). Thanks to `@kindermax <https://github.com/kindermax>`_ (https://github.com/seedofjoy/darq/pull/426) !
0.10.2 (2022-02-03)
...................
* Add proper typing for functions wrapped with the @task decorator. Mypy will now check that parameters are passed correctly when calling ``func()`` and ``func.delay()``
0.10.1 (2021-07-29)
...................
* Add ``sentinel_timeout`` (defaults to 0.2) param to ``RedisSettings``
0.10.0 (2021-07-09)
...................
* **Breaking change**: Rename ``darq.worker.Function`` to ``darq.worker.Task``
* Made ``job`` to ``task`` naming migration
* Add max_jobs parameter to CLI (thanks to `@antonmyronyuk <https://github.com/antonmyronyuk>`_)
* Fixed bug with ``expires`` argument: ``default_job_expires`` could not be replaced with ``None`` in ``@task`` or ``.apply_async``
0.9.0 (2020-06-24)
..................
* **Breaking change**: Add ``scheduler_ctx`` param to ``on_scheduler_startup`` and ``on_scheduler_shutdown`` to share data between this callbacks. It already has ``ctx['redis']`` - instance of ``ArqRedis``
0.8.0 (2020-06-22)
..................
* **Breaking change**: Changed CLI command format. Before: ``darq some_project.darq_app.darq``. Now: ``darq -A some_project.darq_app.darq worker``
* **Breaking change**: Scheduler (cron jobs) now run's seperate from worker (see ``darq scheduler`` command)
* **Breaking change**: Changed some function signatures (rename arguments)
* **Breaking change**: Remove ``redis_pool`` param from ``Darq`` app
* Add ``on_scheduler_startup`` and ``on_scheduler_shutdown`` callbacks
0.7.2 (2020-06-18)
..................
* Fix some types (cron, OnJobPrepublishType)
* ``on_job_prerun`` now runs before "task started" log and ``on_job_postrun`` now runs after "task finished" log
0.7.1 (2020-05-25)
..................
* ``.apply_async``: Make ``args`` and ``kwargs`` arguments optional
0.7.0 (2020-05-25)
..................
* Fork ``arq`` to project and merge it with ``darq`` (It was easier to rewrite ``arq`` than to write a wrapper)
* **Breaking change**: Remove "magic" params from ``.delay``. For enqueue job with special params added ``.apply_async``.
* Add ``watch``-mode to CLI.
* Fix: Now worker will not run cronjob if it's functions queue not match with worker's
0.6.0 (2020-03-08)
..................
* **Breaking change**: Changed `Darq` constructor from single `config` param to separate params.
* `arq_function.coroutine` now has `.delay` method.
0.5.0 (2020-03-03)
..................
* Add ``on_job_prepublish(metadata, arq_function, args, kwargs)`` callback. ``metadata`` is mutable dict, which will be available at ``ctx['metadata']``.
0.4.0 (2020-03-03)
..................
* Add ``default_job_expires`` param to Darq (if the job still hasn't started after this duration, do not run it). Default - 1 day
* Add `expires` param to ``@task`` (if set - overwrites ``default_job_expires``)
0.3.1 (2020-03-02)
..................
* Rewrite warm shutdown: now during warm shutdown cron is disabled, on second signal the warm shutdown will be canceled
0.3.0 (2020-02-27)
..................
* **Breaking change**: ``on_job_prerun`` and ``on_job_postrun`` now accepts ``arq.worker.Function`` instead of the original function (it can still be accessed at ``arq_function.coroutine``)
0.2.1 (2020-02-26)
..................
* Fix ``add_cron_jobs`` method. Tests added.
0.2.0 (2020-02-26)
..................
* Add ``on_job_prerun(ctx, function, args, kwargs)`` and ``on_job_postrun(ctx, function, args, kwargs, result)`` callbacks.
0.1.0 (2020-02-26)
..................
* **Breaking change**: Jobs no longer explicitly get ``JobCtx`` as the first argument, as in 99.9% cases it doesn't need it. In future release will be possible to optionally pass ``JobCtx`` in some way.
* **Breaking change**: All cron jobs should be wrapped in ``@task`` decorator
* Directly pass ``functions`` to ``arq.Worker``, not names.
0.0.3 (2020-02-25)
..................
* ``.delay()`` now returns ``arq_redis.enqueue_job`` result (``Optional[Job]``)
* Add ``py.typed`` file
* Fixed ``add_cron_jobs`` typing
0.0.2 (2020-02-24)
..................
* Add ``add_cron_jobs`` method
0.0.1 (2020-02-21)
..................
First release