معرفی شرکت ها


celery-amqp-events-0.1.0


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

Distributed event handling on top of Celery
ویژگی مقدار
سیستم عامل -
نام فایل celery-amqp-events-0.1.0
نام celery-amqp-events
نسخه کتابخانه 0.1.0
نگهدارنده []
ایمیل نگهدارنده []
نویسنده Sergey Tikhonov
ایمیل نویسنده zimbler@gmail.com
آدرس صفحه اصلی https://github.com/just-work/celery-amqp-events
آدرس اینترنتی https://pypi.org/project/celery-amqp-events/
مجوز MIT
**Celery-AMQP-Events** is a library that implements voluntary events handling on top of [Celery](https://docs.celeryproject.org/). * AMQP-based robustness of event handling * Celery tasks interface * Anti-flood tactics Installation ------------ ```shell script pip install celery-amqp-events ``` Configuration ------------- 1. Pass a unique "service name" to `Celery()` instance for each service that has event handlers (see `amqp_events.celery:events_app`). 2. Tell celery with `imports` setting where to find event handlers. 3. Configure broker connection and other celery settings. 4. Leave result backend empty - each event may have multiple consumers, event result is meaningless in this case. > You absolutely need to set separate name for each service that consumes > events, because without that each fired event will be handled only by > single randomly chosen service, because your services will share same queue > for this event. ```python from amqp_events.celery import events_app app = events_app( "service_name", # important in multi-service environment imports=['demo.tasks'], # modules where to find event handlers broker_url='amqp://guest:guest@rabbitmq:5672/', ) ``` Adding events and handlers -------------------------- ```python from demo.celery import app @app.event('service_name.object_name.event_name') def even_number_generated(number: int): # You may add some validation logic in event function body; if number % 2 != 0: raise ValueError("number is not even") @app.handler('service_name.object_name.event_name') def on_even_number_generated(number: int): # Handle event somehow print(f"even number {number} generated") ``` Running ------- * Start ordinary celery worker for your consumer service > Note that `mingle`, `gossip` and `heartbeat` should be disabled if not used. > These algorithms use broadcast events, which means that you'll have `N^2` > messages in RabbitMQ for `N` workers without any purpose. ```shell script celery worker -A your_service.celery \ --without-mingle --without-heartbeat --without-gossip ``` Sending events -------------- ```python import random from demo.events import number_is_even try: number_is_even(random.randint(0, 100)) except ValueError: print("oops, number was odd") ``` Robustness ---------- * If event fails with unhandled error, it is retried to separate queue with exponential backoff. * Backoff is used to prevent resources exhausting (like free http connections) * If no retry attempts left, unhandled event is moved to "archive" queue * Archive is used to catch messages which always produce an error in consumer; these messages can be manually retried when fix is released. * Archive is limited both by message TTL and message count limit, so alerts should exist. * Retry is done via separate queue because of multiple reasons: * using `countdown` forces consumer to keep "unacknowledged" events in memory, which is bad for load balancing and resource usage. * retrying to same queue will slow down event processing if retry probability is high enough * two faulty consumers retrying same event with **same routing key** will cause exponential growth of message count in RabbitMQ because message is split to multiple messages when published from same exchange to multiple queues. * By default, some fault-tolerance celery settings are enabled: * `task_acks_late` will delay task acknowledgement till end of processing * `task_reject_on_worker_lost` will prevent `ack` if worker was killed * `Task.autoretry_for` is set to retry on any `Exception` automatically * disabling `task_acks_on_failure_or_timeout` forces Celery to reject failed messages if `autoretry_for` failed to handle this. * `confirm_publish` in `broker_transport_options` will block producer till broker will confirm that it received incoming message. Delay on broker side -------------------- Celery`s default retry mechanics is: * construct a copy of currently handling message * change arguments or options for the new message from `retry` params * publish the copy to the same exchange and routing_key found in message `delivery_info` * acknowledge current message * after receiving the new message, celery puts it to an in-memory queue to wait until message ETA, and then pass this message to a first free worker. All this is because `AMQP` protocol doesn't define any delay-related properties. Currently, `RabbitMQ` has a plugin for delayed messages and some tricky schemes that allow to delay message delivery on the broker side. We use one of these schemes. ### Retry causes A Celery task could be retried in different cases: * manual retry from task implementation * retry on an exception listed in `Task.autoretry_for` * also a task may be rejected if `retry()` failed, if task task failed or if task has timeouted ### Retry routing To implement broker-side delays `EventsCelery` declares a set of retry exchanges and queues: * each exchange and queue is prefixes with "service name" * each queue is declared with `x-message-ttl` that is equal to `2^n`, where `n` is `AMQP_EVENTS_MAX_RETRIES` env variable and `Task.max_retries` settings for every task handler. * each queue also defines `x-dead-letter-exchange` argument, which points to "recover" exchange * this "recover" exchange has `topic` type and is bound to same queues as default "events" exchange * "retry" exchange has type `fanout`, so any message published to this exchange, regardless it's routing key, will be routed to corresponding queue * with "message ttl" set up, retried message will expire in `2^n` seconds, which is basically exponential backoff implementation * because of "dead letter exchange", expired message is moved to `recover` exchange, from which it is routed again to initial event queue via initial message routing key. ### Reject handling It's important to retry rejected messages when something goes wrong. * Each celery task queue is declared with `x-dead-letter-exchange` argument, which points to first "retry" exchange. * As described in "retry routing" section, rejected message is moved by RabbitMQ to a retry queue; after a second it expires and is moved again to "recover" exchange, and after that to initial event queue. ### Archiving By default, Celery just drops a task when `MaxRetriesExceededError` happens, but we want to archive such events for some time: * `EventsCelery` declares a `fanout` "archive" exchange and corresponding queue * "archive" exchange and queue are also prefixed with "service name" * "archive" queue is declared with `x-message-ttl` and `x-max-length` arguments, and message archive storage is limited both by time and message count. * When `EventsCelery` finds that task retries count is exceeded, it retries this task to "archive" exchange. ### Caveeats 1. Message reordering. What if broker-side delay will shuffle normal message ordering? This may break message ETA handling of change order of events. * By default, Celery receives retried and delayed message and keeps it in memory till message ETA. Before ETA it handles other incoming messages, and if these new messages have later ETA, they will be scheduled at corresponding time. * This is how things work when there are less than `prefetch_multiplier` messages in an incoming queue. If there are lot's of incoming messages, retried messages will arrive behind ETA time and of cource this changes event order. * With broker-side delay implementation task `message-ttl` is guaranteed to be less or equal to task ETA, so for celery worker the situation will look as a very long amqp `basic.publish` call. If there are no messages, celery-side ETA logic will apply and the task will start in time. 2. Queue count. `RabbitMQ` is sensitive to a total queue count. What if we declare too much queues? * Total queue count is linear to a max retries count and to events count. * Each retry queue is available to any Celery task and implements a delay equal to a degree of 2. 20 queues provide max delay more than 23 days, but if somebody needs milliseconds delays, he will need 10 more queues. * By default each event is routed to it's own queue, but with `topic` exchange you can manually set up a set of queues which handle groups of event types. You can even manually setup a single queue for each handled event. * Worst case needs `N + M + 1` queue, where N is `max_retries` limit and `M` is number of handled events. * Best case needs `2` queues, if max retries is set to 1 and all events are routed to single queue. "Archive" queue may be also disabled. 3. Celery internals. What if `Celery` will change and all this stuff will break? * To trigger auxiluary queues declaration we use celery signals, which we consider as public interface. * To declare queues we use `Celery.broker_connection()` default channel and this could be changed to use separate amqp connection. * To bind event queues we extend `Celery.conf.task_queues` with `kombu` entities, which provide public interface for queue arguments and additional bindings. * To perform retries, we change `Task.retry` arguments, this is also public interface. * Other things set up to provide more robust events handling are described in [Configuration and defaults](https://docs.celeryproject.org/en/stable/userguide/configuration.html) section of Celery documentation. * At last, we have plans to run integration tests with real celery workers, not only unit-tests for mocked celery internals. ### Broker-side Pros * Observability. There is no way to distinct delayed task and task in a deadlock state except celery events mechanics. RabbitMQ API provides clear way to monitor a number of retried and archived messages. * Lesser memory usage. Default Celery implementation may keep thousands of delayed messages in worker memory. Also it affects AMQP channel `qos` parameter, which could not be more that 1000 - this messes things up. Broker-side delayed messages are stored on disk in `RabbitMQ` db. * Autoretry for failed tasks. By default, each rejected message is lost forever. With our setup it will be retried, which is good for delivery guarantees but is also bad is message handling breaks constantly (i.e. in case of persistent celery failure). This should be handled carefully. * Robustness. Because of asynchronous nature of events handling pattern, we do all efforts to ensure that every single-time fired event is not lost somewhere between network failures and unhandled exceptions in business logic. It's worth mentioning that `RabbitMQ` is considered as durable system and we don't touch upon broker failures. ### Cons * Celery internals. Despite public celery interface usage, we can't guarantee that changes in default celery behavior won't affect broker-side retry model as Celery authors decide to move fast and break things in 5.x and later versions. * Complex logic. Debugging such complex algorithms requires deep RabbitMQ architecture and celery-amqp-events code knowledge. It's not trivial to find "the man who can". * Broker memory usage. Broker-side retry system may require a lot of memory and disk space at RabbitMQ, and it's whell-known that RabbitMQ performs much better without any disk usage. Retries should not be used as "normal" event handling flow. Related projects ---------------- ### Celery-message-consumer Robustness tactics is inspired by [celery-message-consumer](https://github.com/depop/celery-message-consumer) project which aims to handle events published to AMQP broker from non-celery-based projects (maybe from other languages). The main difference is that `Celery-AMQP-Events` uses Celery tasks instead of [including](https://github.com/depop/celery-message-consumer#celery) additional `consumer step` nearby the celery worker.


نیازمندی

مقدار نام
- Celery
- typing-extensions


نحوه نصب


نصب پکیج whl celery-amqp-events-0.1.0:

    pip install celery-amqp-events-0.1.0.whl


نصب پکیج tar.gz celery-amqp-events-0.1.0:

    pip install celery-amqp-events-0.1.0.tar.gz