.. image:: https://travis-ci.org/collective/collective.zamqp.png
:target: http://travis-ci.org/collective/collective.zamqp
AMQP integration for Zope2 (and Plone)
======================================
**collective.zamqp** acts as a *Zope Server* by co-opting Zope's asyncore
mainloop (using asyncore-supporting AMQP-library
`pika <http://pypi.python.org/pypi/pika>`_),
and injecting consumed messages as *requests* to be handled by ZPublisher
(exactly like Zope ClockServer).
Therefore AMQP-messages are handled (by default) in a similar environment to
regular HTTP-requests: ZCA-hooks, events and everything else behaving normally.
This package is an almost complete rewrite of
`affinitic.zamqp <http://pypi.python.org/pypi/affinitic.zamqp>`_,
but preserves its ideas on how to setup AMQP-messaging
by configuring only producers and consumers.
TODO
* rewrite documentation to reflect the new design
* update affinitic.zamqp's tests for the new design
While we are still documenting and testing **collective.zamqp**,
you may want to take a look at `collective.zamqpdemo
<http://github.com/datakurre/collective.zamqpdemo/>`_ for an example of use.
Installation
============
A common setup includes a producer and a worker instance which are both
connected to the same rabbitmq server.
Buildout
--------
Both instances can be easily configured using buildout::
[buildout]
parts =
instance
worker
...
[instance]
recipe = plone.recipe.zope2instance
http-address = 8080
eggs =
...
collective.zamqp
...
zope-conf-additional =
%import collective.zamqp
<amqp-broker-connection>
connection_id superuser
hostname my.rabbithostname.com
port 5672
username guest
password guest
heartbeat 120
keepalive 60
</amqp-broker-connection>
[worker]
<= instance
http-address = 8081
zserver-threads = 1
environment-vars =
ZAMQP_LOGLEVEL INFO
zope-conf-additional =
${instance:zope-conf-additional}
<amqp-consuming-server>
connection_id superuser
site_id Plone
user_id admin
</amqp-consuming-server>
Explanation of configuration parameters
---------------------------------------
- amqp-broker-connection
``connection_id``
Connection id, which is the registered name of the created
global BrokerConnection-utility.
``hostname`` (optional)
Hostname or IP Address of RabbitMQ server to connect to, default to
``localhost``.
``port`` (optional)
TCP port of RabbitMQ server to connect to, defaults to ``5672``.
``virtual_host`` (optional)
RabbitMQ virtual host to use, defaults to ``/``.
``username`` (optional)
Plain text username for RabbitMQ connection, defaults to ``guest``.
``password`` (optional)
Plain text password for RabbitMQ connection, defaults to ``guest``.
``heartbeat`` (optional)
AMQP heartbeat interval in seconds, defaults to ``0`` to disable
heartbeat.
``keepalive`` (optional)
Register producer, consumer, view and clock-server with the given
integer timeout in seconds to keep the connection alive. Defaults
``0``.
``prefetch_count`` (optional)
AMQP channel prefetch count limit, defaults to 0 for no limit.
- amqp-consuming-server
``connection_id``
The name of a global utility providing configured IBrokerConnection. A
consuming server will serve consumers registered for its connection id
only.
``scheme`` (optional)
Configure URL schema for virtual rewriting using VirtualHostMonster,
default is ``http``. Will be ignored if no ``hostname`` is given.
``hostname`` (optional)
Hostname which will be passed into fake request. When calling
``object.absolute_url()`` in a consuming server you'll receive this
hostname. URL will be rewritten using VirtualHostMonster using following
scheme::
/VirtualHostBase/{scheme}/{hostname}:{port}/{site_id}/VirtualHostRoot/...
If a hostname is configured, VirtualHostMonster will be invoked,
``socket.gethostname()`` will be used else.
``port`` (optional)
Configure port for virtual rewriting using VirtualHostMonster,
default is ``80``. Will be ignored if no ``hostname`` is given.
``use_vhm`` (optional)
Create VirtualHostMonster-wrapped method calls when hostname is set. VHM
is used to tell portal the configured real public hostname and to hide
portal's id from path. Defaults to *on*.
``vhm_method_prefix`` (optional)
Explicitly set the VHM method prefix for AMQP-based requests. Most
typical options may look like:
* /VirtualHostBase/https/example.com:443/Plone
* /VirtualHostBase/https/example.com:443/Plone/VirtualHostRoot
* /VirtualHostBase/https/example.com:443/Plone/VirtualHostRoot/_vh_subsite
Note: This overrides the default implicit VHM-support by setting scheme,
hostname, port and use_vhm, but will still require use_vhm enabled to be
active. Empty value fallbacks to the old default use_vhm-behavior.
``site_id``
The id of a site, which should be the context when consuming the AMQP
messages, which the consumers of a consuming server consume. If a
``hostname`` is given, this will be used for VirtualHostMonster
rewrites.
``user_id`` (optional)
Optional user id of the Plone user, whose privileges are used to consume
the messages. By default, the messages are consumed as Anonymous User
calling trusted filesystem code.
Configuring logging
-------------------
You may want to in/decrease ``collective.zamqp`` loglevel which can easily be
done by passing an environment variable into worker instance as seen in
buildout example above::
[worker]
...
environment-vars =
ZAMQP_LOGLEVEL INFO
...
Valid parameters are:
- DEBUG
- INFO
Changelog
=========
0.16.2 (2018-02-27)
-------------------
- Fix issue where connection delay did not increment due to routing
configuration error causing disconnection whiles still configuring
exchanges, queues or bindings
[datakurre]
- Fix issue where a automatic reconnection caused extra connection to be
made, sometimes resulting in wrong delivery tags on consumed messages,
leading to precondition failed errors when acking those messages
[datakurre]
0.16.1 (2017-11-29)
-------------------
- Fix issue where blocking connection was unable close channel and connection
properly due to bug in Pika 0.9.5
[datakurre]
0.16.0 (2016-06-08)
-------------------
- Fix msgpack serializer to encode and decode using 'utf-8' encoding by default
[Asko Soukka]
0.15.1 (2016-01-28)
-------------------
- Fix to require collective.monkeypatcher >= 1.1.1
[datakurre]
0.15.0 (2016-01-27)
-------------------
- **Note:** This release changes producers and messages to join the current
transaction using transaction's ``join`` instead of deprecated ``register``.
So far, this change has seem to been fully backwards compatible.
- Add minimal savepoint support (NoRollbackSavepoint)
[datakurre]
- Fix to join transaction using join instead of register to support savepoints
[datakurre]
- Fix issue where pika blocking connection raised error because of
a missing log import in its 0.9.5 release
[datakurre]
- Fix testing integration related issue where ZAMQP fixture left dangling
registrations to slow down and cause side-effects for tests
[datakurre]
0.14.3 (2014-10-23)
-------------------
- Log connection and channel creation etc.
[sunew]
0.14.2 (2014-10-23)
-------------------
- The ZAMQP_LOGLEVEL environment variable was not yet accessible at import
time. Moved Logger to utils module to delay the import.
[sunew]
0.14.1 (2014-10-10)
-------------------
- Fix issue where logging reject did not get enough arguments
[datakurre]
0.14.0 (2014-09-18)
-------------------
- Fix issue where language is not correctly set for AMQP requests
[datakurre]
- Remove hard dependency to z3c.unconfigure 1.0.1
[saily]
0.13.2 (2013-11-08)
-------------------
- Remove duplicate default producer from test fixture.
[datakurre]
0.13.1 (2013-11-08)
-------------------
- Fix packaging.
0.13.0 (2013-11-08)
-------------------
- Add vhm_method_prefix-option to replace old vhm-options
[datakurre]
0.12.0 (2013-08-08)
-------------------
- Fix to store durable publishable messages during reconnection period
in memory on the base of message's delivery_mode-proprety, not
producer's durable-property
[datakurre]
- Combine loggers and reduce logging
[saily, datakurre]
0.11.0 (2013-06-05)
-------------------
- Set ``use_vhm=False`` for ZAMQP Test Layer
[saily]
- Add option ``use_vhm`` to make VHM-methods optional (to allow use of
hostname=localhost:8080/Plone in development)
[datakurre]
- Added a configureable scheme, hostname and port keyword to be passed into
fake request object in consumers. We well automatically rewrite those URLs
using VirtualHostMonster.
[datakurre, saily]
- Reduce amount of logging, fixes #2. [saily]
- Documentation updates
[saily]
- Add prefetch_count-option for connections
[datakurre]
- Add fixture with RabbitMQ trace firehose consumer to ease debugging
[datakurre]
- Fix to requeue messages also when subclass of ConflictError is raised during
commit (e.g. ReadConflictError)
[datakurre]
0.10.2 (2013-03-14)
-------------------
- Fix to process Pika timeouts on every asyncore poll to fix heartbeat issues
0.10.1 (2013-03-13)
-------------------
- Add configuration options for 'exchange_auto_declare' and
'queue_auto_declare'
- Fix 'queue_exclusive = True' to imply also 'queue_durable = False' and
'queue_auto_delete = True'
0.10.0 (2013-03-12)
-------------------
- Update keepalive-ping-queue to use 1 hour message ttl
- Add option (defaulting to true) for monkey patching Zope2 lifetime_loop for
shorter asyncore.poll-timeouts on consuming instances.
0.9.8 (2013-03-09)
------------------
- Fix regression: allow to bind queue with empty routing key
0.9.7 (2013-03-05)
------------------
- Fix to not try to requeue auto-acknowledged messages on conflict errors,
because it's not possible
- Fix consumer configuration to support list of routing keys (to ease the use
of metronome like exchanges)
0.9.6 (2013-02-21)
------------------
- Fix Rabbit-fixture to be silent when Rabbit has been killed before teardown
0.9.5 (2013-02-20)
------------------
- Fix to include convenient default producers in the default test layer
0.9.4 (2012-11-27)
------------------
- Fixed bugs in registering connections and consuming servers in ZAMQP layer.
- Added 'text/csv' serializer (for serializing iterable container of
dictionaries into RFC4180 CSV data and deserializing such messages into
tuple of dictionaries)
0.9.3 (2012-11-27)
------------------
- Enhanced 'runAsyncTest' to accept 'loop_timeout' and 'loop_count' parameters.
- Fixed optional json-serializer to try to import 'json' at first and only then
'simplejson'.
0.9.2 (2012-09-18)
------------------
- Pinned z3c.unconfigure==1.0.1.
- Added test fixture to be used with ZSERVER-fixtures to support Selenium-testing.
- Fixed consuming server to default to 'Anonymous User' instead of None.
- Added runAsyncTest-helper for running tests depending on the asyncore loop.
Added consuming server for ZAMQP-layer.
0.9.1 (2012-09-18)
------------------
- Fixed to not set correlation_id-property for message if the given
correlation_id is None.
0.9.0 (2012-09-17)
------------------
- Added alias 'Producer.register' for VTM._register.
- Renamed 'connected'-property to 'is_connect'.
0.8.1 (2012-09-06)
------------------
- Fixed queue length helpers to use BlockingChannel-helper properly.
0.8.0 (2012-09-06)
------------------
- Fixed consumers without marker interface not to start consuming.
- Enhanced undolog for transactions by 'zamqp-consumer'-view.
- Fixed consuming view to annotate transaction with the user configured for the
current consuming service.
- Added separate auto_delete-setting for exchanges and queues. Previously
auto_delete was set as negation of durability, which remains the default.
- Added connection configuration to default with implicit default producer
registration (= producer with the same name/id as the connection, but no any
special routing).
- Added support for custom 'x-cookie-auth' header. Its value will be set to
value of '__ac' cookie for AMQP request to allow PAS-authentication for
logged in user (e.g. to support authenticated asyncronous tasks).
- Added __len__ for consumer and producer for getting the queue length (if the
related queue is defined) using blocking channel.
- Added BlockingChannel wrapper to be used with 'with' statement to create
separate blocking connections for quick raw AMQP-operations.
- Fixed to never declare queue starting with 'amq.', which is reserved prefix
in RabbitMQ. Allow empty queue names to support automatic (broker-generated)
queue-names.
- Fixed to never declare RabbitMQ default exchanges (declarig of any
'amq.'-starting exchange will be skipped).
- Added json-serializer (when either json or simplejson can be imported).
0.7.18 (2012-08-10)
-------------------
- Bug fixes.
[malthe]
- Added plone.testing layer for RabbitMQ + Zope. Added a dummy test for the
layer. Enabled RabbitMQ-parts in test buildout.
- Fixed consumers and producers to use the default exchange by default to allow
the easiest possible configuration for the use of the default exchange only.
0.7.17 (2012-05-21)
-------------------
- Added transaction aware reject to message.
- Added site_id-substitution support for consumer name to make consuming
service and site specific consumers available for lookup.
- Fixed to not crash if connection if not defined. Just warn.
- Fixed grok.name-magic to work more similarly in consumer (name is taken as
queue) as in producer (name is taken as routing_key).
- Refactored ping to use simple dynamic properties for routing_key and queue.
- Refactored producer, consumer and connection init to allow configuration
using simple dynamic properties.
- Refactored producer, consumer and connection init to allow configuration
using simple dynamic properties.
- Dropped 'magical' buildout name from keepalive's ping queues.
- Removed 'magical' proxying of message body's properties before we rely on it.
0.7.16 (2012-05-04)
-------------------
- Forced correlation_id to be str.
- Changed default serializer from 'text/plain' to 'pickle'.
- Fixed added dependencies to work on Plone 4.0.x.
0.7.14 (2012-05-02)
-------------------
- Fixed to requeue message when transaction of successful handling is aborted
(e.g. due to ZODB conflict error).
0.7.12 (2012-04-25)
-------------------
- Added support for sauna.reload.
0.7.11 (2012-04-18)
-------------------
- Changed ping to be logged on debug-level instead of info-level.
0.7.10 (2012-04-18)
-------------------
- Fixed Pika-adapter to process timeouts to support AMQP-heartbeat.
0.7.9 (2012-04-16)
------------------
- Modified keepalive-setting to accept an integer instead of boolean to
allow configuration of keepalive-ping-interval in detail.
0.7.8 (2012-04-16)
------------------
- Fixed issue where a typo in message de-serialization hide de-serialized body.
0.7.7 (2012-04-04)
------------------
- Fixed issue with attribute not found in threadlocals.
0.7.5 (2012-02-26)
------------------
- Minor fixes for being more *sauna.reload*-friendly.
0.7.4 (2012-03-12)
------------------
- Simplified Ping-consumer to ack messages and log ping directly withing
asyncore loop without creating a fake HTTP-request.
0.7.3 (2012-03-09)
------------------
- Added a helper function ``collective.zamqp.utils.getBuildoutName`` to be
used in configuration re-usable packages using buildout-depending
AMQP-queues (e.g. for replies).
0.7.2 (2012-03-08)
------------------
- Added *keepalive* option for AMQP Broker Connection -configuration in
zope.conf to auto-register all needed utilities, views and clock-servers for
keeping the connection alive with regular ping message.
0.7.1 (2012-03-06)
------------------
- Allowed new named AMQP Broker Connections to be defined in zope.conf
(or in 'zope-conf-additional' in instance buildout recipe).
0.7.0 (2012-02-05)
------------------
- Internal development release.