mentby.com
Blog | Jobs | Help | Signup | Login

Morning,

Several people have recently been asking about a shovel for Rabbit,
which can consume messages from one consumer and deliver them to
another. Several of our clients have also been asking for such message
relocation equipment, and thus we have built one: the RabbitMQ-shovel
which is available from  http://hg.rabbitmq.com/rabbitmq-shovel/

Documentation is in the form of a README, which is duplicated below.

This code has not yet been through QA, so there may be bugs in it.
We are, as always, very happy to receive bug reports. But certainly, our
testing suggests it works for us ;)

It is licensed under the MPL v1.1

Have fun!

RabbitMQ-shovel
===============

Introduction
------------

This is a plug-in for RabbitMQ that shovels messages from a queue on
one broker to an exchange on another broker. The two brokers may be
the same. The plug-in allows several shovels to be specified at the
same time. Each shovel may have a number of source and destination
brokers specified, and one of each is chosen whenever the shovel
attempts to make a connection: this permits simple round-rabbit load
balancing.

Resources can be declared upon connection to both the source and
destination brokers, and parameters can be specified for both the
reception and publishing of messages.

Requirements
------------

Currently, you must build the server from source, under branch
bug16653. You must also have checked out the rabbitmq-public-umbrella
hg repository, and have the rabbitmq-erlang-client built. From
scratch, the following commands should build RabbitMQ with the shovel
plug-in:

hg clone  http://hg.rabbitmq.com/rabbitmq-public-umbrella
cd rabbitmq-public-umbrella
hg clone  http://hg.rabbitmq.com/rabbitmq-codegen
hg clone  http://hg.rabbitmq.com/rabbitmq-erlang-client
hg clone  http://hg.rabbitmq.com/rabbitmq-server
hg clone http://hg.rabbitmq.com/rabbitmq-shovel
cd rabbitmq-server
hg up -C bug16653
make -j
mkdir -p plugins
cd plugins
ln -s ../../rabbitmq-erlang-client
ln -s ../../rabbitmq-shovel
cd ../../rabbitmq-erlang-client
make
cd ../rabbitmq-shovel
make
cd ../rabbitmq-server
./scripts/rabbitmq-activate-plugins
make cleandb run

Configuration
-------------

The RabbitMQ configuration file specifies the shovel
configurations. This exists by default, in
/etc/rabbitmq/rabbitmq.config under Linux systems,
%RABBITMQ_BASE%\rabbitmq.config under Windows or somewhere else under
OS X. This file configures both RabbitMQ-server and all the plugins
installed in it. It is an Erlang-syntax file of the form:

[{section1, [section1-config]},
{section2, [section2-config]},
{sectionN, [sectionN-config]}
].

thus a list of tuples, where the left element of each tuple names the
applications being configured. Don't forget the last element of the
list doesn't have a trailing comma, and don't forget the full-stop is
needed after closing the list. Hence if you configure RabbitMQ-server
and the RabbitMQ-shovel, then the configuration file may have a
structure like this:

[{rabbit,        [configuration-for-RabbitMQ-server]},
{rabbit-shovel, [configuration-for-RabbitMQ-shovel]}
].

A full example of the shovel configuration is:

{rabbit_shovel,
  [{shovels,
    [{my_first_shovel,
      [{sources,      [{brokers,
                          ["amqp://fred:secret*******/my_vhost",
                           "amqp://john:secret*******/my_vhost"
                          ]},
                       {declarations,
                          ['queue.declare',
                           {'queue.bind',
                                  [{exchange, <<"my_exchange">>},
                                   {queue,    <<>>}]}
                          ]}]},
       {destinations, [{broker, "amqp://"},
                       {declarations,
                          [{'exchange.declare',
                                  [{exchange, <<"my_exchange">>},
                                   {type, <<"direct">>},
                                   durable]}
                          ]}]},
       {queue, <<>>},
       {qos, 10},
       {auto_ack, false},
       {tx_size, 0},
       {delivery_mode, keep},
       {publish_fields, [{exchange, <<"my_exchange">>},
                         {routing_key, <<"from_shovel">>}]},
       {reconnect, 5}
      ]}
     ]
   }]
}

Firstly, all shovels are named. Here we have one shovel, called
'my_first_shovel'. We can have multiple shovels if you wish. Every
shovel must have all sub-fields specified: sources, destinations, qos,
auto_ack, delivery_mode, publish_fields, reconnect.

Sources and Destinations
------------------------

Sources and destinations specify respectively where messages are
fetched from and delivered too. One of 'broker' and 'brokers' must be
specified, and 'broker' is simply shorthand for when only one broker
needs specifying. Using 'brokers' allows a list of brokers to be
specified: whenever the connection to a broker is lost, another one is
chosen at random from the list and a connection attempt is made to
that. The syntax for broker URIs is:

amqp://username:password@host:port/vhost

If username or password are omitted, the default values of guest and
guest are used. If the vhost is omitted, the default value of / is
used. If the host is omitted, then the plugin uses the "direct"
connection internally rather than a network connection: this means it
connects to the RabbitMQ-server node on which it is running without
going through the network stack. This is much more efficient. If port
is omitted then the default value is used (5672 or 5671 if SSL is
used).

SSL is implemented, for which additional parameters are needed:

amqps://username:password*******&verify=verifyOption&fail_if_no_peer_cert=failOption

All five parameters (3 paths: cacertfile, certfile and keyfile; 2
options: verify, fail_if_no_peer_cert) must be specified. See the SSL
guide at  http://www.rabbitmq.com/ssl.html#configure-erlang  for details
of SSL in RabbitMQ in general and specifically for the Erlang client
(on which the shovel is built).

Note that SSL cannot be used with the direct connection (i.e. a host
must be specified when using SSL), and that it is preferable to use
the non-SSL direct connection to using SSL to connect to the same node
that's running the shovel.

Resource Declarations
---------------------

Both sources and destinations can have an optional 'declarations'
clause. The value of this is a key, consisting of AMQP Methods. If
default values are sufficient, then the method name alone can be
specified - e.g. 'queue.declare'. If parameters need to be set then
the method should be given as a tuple, with the right hand side a
proplist specifying which fields need altering from their default
values. E.g:
    {'exchange.declare',[{exchange, <<"my_exchange">>},
                         {type, <<"direct">>},
                         durable]},

One very useful feature here is the Most-Recently-Declared-Queue
feature, in which RabbitMQ remembers the name of the most recently
declared queue. This means that you can declare a private queue, and
then bind it to exchanges without ever needing to know its name.

queue :: binary
---------------

This feature specifies the name of the queue on the source brokers to
consume from. This queue must exist. Use the resource declarations to
create the queue (or ensure it exists) first. Note again that the
Most-Recently-Declared-Queue feature can be used here, thus an
anonymous queue can be used.

qos :: non-negative-integer
---------------------------

The shovel consumes from a queue. The QoS controls how many messages
are sent to the shovel in advance of the message the shovel is
currently processing.

auto_ack :: boolean
-------------------

Setting this to 'true' turns on the no_ack flag when subscribing to
the source queue.

tx_size :: non-negative-integer
-------------------------------

When set to 0, transactions are not used. Other values make publishes
transactional, with a commit every N messages. In lieu of the auto-ack
option, when transactions are not used, messages are acknowledged to
the source immediately after every publish. When transactions are
used, acks are only issued to the source on receipt of the commit-ok
message from the destination. This can thus be used to guarantee that
messages are only acknowledged (and thus forgotten about by the source
broker) when they are guaranteed to have been received by the
destination broker.

delivery_mode :: 'keep' | 0 | 2
-------------------------------

This affects the delivery_mode field when publishing to the
destination. A value of 'keep' means that the same delivery_mode
should be used as when the message was originally published to the
source broker. 0 and 2 override the original setting.

publish_fields
--------------

This is a list of tuples which override fields in the publish method
when publishing to the destination. This can be used to direct
messages to a particular exchange on the destination, for example, or
change the routing key. By default, the routing key of the message as
it is received by the shovel is passed through, but this can be
overridden as necessary.

reconnect :: non-negative-integer
---------------------------------

When an error occurs, the shovel will disconnect from both the source
and destination broker immediately. This will force uncommitted
transactions at the destination to be rolled back, and delivered but
unacknowledged messages from the source to be requeued. The shovel
will then try connecting again. If this is unsuccessful, then it's not
a good idea for the shovel to very quickly and repeatedly try to
reconnect. The value specified here is the number of seconds to wait
between each connection attempt.

Note that if set to 0, the shovel will never try to reconnect: it'll
stop after the first error.

Obtaining shovel statuses
-------------------------

From the broker Erlang prompt, call
rabbit_shovel_status:status(). This will return a list, with one row
for each configured shovel. Each row has three fields: the shovel
name, the shovel status, and the timestamp (a local calendar time of
{{YYYY,MM,DD},{HH,MM,SS}}). There are 3 possible statuses:

'starting': The shovel is starting up, connecting and creating
resources.

'running': The shovel is up and running, shovelling messages.

{'terminated', Reason}: Something's gone wrong. The Reason should give
a further indication of where the fault lies.


Matthew Sackman Mon, 01 Feb 2010 02:56:58 -0800

Just to let you all know, the changes that this plugin required to the
server have been merged into default and from there to the new persister
so you can now use the shovel straight off the default branch or new
persister branch of the server.

For those of you theoretically inclined, note that you can use the
shovel to create cycles, and even duplication of messages. This could be
used for iteration (think ! in π-calculus or actually maybe just
restriction). This alone doesn't make it turning complete. The challenge
is to come up with the minimum pluggable exchange type and an encoding
of instructions as messages to represent instructions such that you can
demonstrate Turing-completeness. λ, TM, π or SKI are all equally valid.

Then, in your encoding, send messages in, which represent the
instructions for factorial. Then send further messages in (peano
encoding) being the argument, and on another queue should appear the
result. Prizes will be awarded for the most elegant and concise
solution.

Matthew


Matthew Sackman Mon, 08 Mar 2010 03:46:22 -0800

Wanted to get shovel to work, but am getting

{amqp_error,access_refused,
                            "access to vhost 'vtest' refused for user 'shovel'",
                            'connection.open'}}

from the rabbit server.  

I am sure the username and password are correct.  From the bql security document,  http://www.rabbitmq.com/rabbitmq-bql.html#bql-security-rules,  showing the exact same error, it seems that I HAVE to use bql to grant permission to amq.default(which doesn't actually exist on the OSX 1.7.0 build of rabbitmq).  Instead of trying to get bql to work, I want to make sure I am heading toward the right direction.  Would appreciate any pointer from folks who have it working.

By the way, the version of rabbitmq-shovel I downloaded via hg has a few parameter modifications differ from given example in the blog:

From rabbitmq-shovel/src/rabbit_shovel_sup.erl
reconnect becomes reconnect_delay
qos becomes prefetch_count

And this is not a valid parameter:
{delivery_mode, keep},

Thanks,
Mark


Mark Lin Wed, 21 Apr 2010 17:46:25 -0700

I haven't used that shovel yet, but do you need to set the vhost to /vtest
rather then vtest?

I'm not sure if it's stripping the / or not.

Scott Brooks


Scott Brooks Wed, 21 Apr 2010 18:12:37 -0700

Mark,

Have you granted the 'shovel' user access to the vhost 'vtest'? See http://www.rabbitmq.com/admin-guide.html#set_permissions  - so something like
   rabbitmqctl set_permissions -p vtest shovel ".*" ".*" ".*"
should do the trick.

No, you don't have to use bql; rabbitmqctl (which ships with the broker)
is all you need.

'amq.default' is a pseudonym for the otherwise nameless default
exchange. From  http://www.rabbitmq.com/admin-guide.html#access-control  ...

"For convenience RabbitMQ maps AMQP's default exchange's blank name to
'amq.default' when performing permission checks."

Whether or not the shovel user needs access to the default exchange
depends on the rest of your shovel configuration. But in any case the
".*" regexps in the above set_permissions call will cover that.

The shovel, like the rest of the code base, is evolving, so for the most
up to date documentation always refer to the docs shipped with the code,
i.e.  http://hg.rabbitmq.com/rabbitmq-shovel/file/default/README  in this
case.

Regards,

Matthias.


Matthias Radestock Wed, 21 Apr 2010 23:39:29 -0700

Thank you!  It turns out my vhost name actually has "/" in the name. So I was specifying the wrong vhost name all along.  I guess since the default  vhost is "/", I was looking at it like uri, so I had to create vhost preceded by "/".

Always helpful to have another pair of eyes looking at the problem.

Two quick questions, is it possible to define multiple queue.bind in the same declaration?  This is to shovel traffic from multiple exchanges.    And can we use the Most-Recently-Declared-Queue in the resource definition and NOT have it persistent so if shovel dies, the queue dies with it instead of accumulate messages?

Thanks again,
Mark, happy shoveling.

Mark,

Wanted to get shovel to work, but am getting

{amqp_error,access_refused, "access to vhost 'vtest' refused for user
'shovel'", 'connection.open'}}

from the rabbit server.

I am sure the username and password are correct.

Have you granted the 'shovel' user access to the vhost 'vtest'? See http://www.rabbitmq.com/admin-guide.html#set_permissions  - so something like
  rabbitmqctl set_permissions -p vtest shovel ".*" ".*" ".*"
should do the trick.

From the bql security document, http://www.rabbitmq.com/rabbitmq-bql.html#bql-security-rules,  showing
the exact same error, it seems that I HAVE to use bql ...

No, you don't have to use bql; rabbitmqctl (which ships with the broker)
is all you need.

to grant permission to amq.default(which doesn't actually exist on
the OSX 1.7.0 build of rabbitmq).

'amq.default' is a pseudonym for the otherwise nameless default
exchange. From  http://www.rabbitmq.com/admin-guide.html#access-control  ...

"For convenience RabbitMQ maps AMQP's default exchange's blank name to
'amq.default' when performing permission checks."

Whether or not the shovel user needs access to the default exchange
depends on the rest of your shovel configuration. But in any case the
".*" regexps in the above set_permissions call will cover that.

By the way, the version of rabbitmq-shovel I downloaded via hg has a
few parameter modifications differ from given example in the blog.

The shovel, like the rest of the code base, is evolving, so for the most
up to date documentation always refer to the docs shipped with the code,
i.e.  http://hg.rabbitmq.com/rabbitmq-shovel/file/default/README  in this
case.

Regards,

Matthias.


Mark Lin Thu, 22 Apr 2010 11:40:37 -0700

Yes. There are no restrictions on the declarations at all.

Yes. When you declare the queue, make sure you set the auto-delete flag.

Matthew


Matthew Sackman Thu, 22 Apr 2010 11:58:57 -0700

Yes. This is probably the most unsavoury part of the schema. The problem
is that a) we need to support vhosts that start with / and that don't
start with /; b) the default vhost is /

Thus:

amqp://user:pass@host/   => vhost = /
amqp://user:pass@host//  => vhost = /
amqp://user:pass@host/a  => vhost = a
amqp://user:pass@host//a => vhost = /a

So basically, the first / is definitely just a separator, so if your
vhost is not / but does start with a /, then you need to add another /.
The confusion arises because the separator is / and the default vhost is
also /

Matthew


Matthew Sackman Mon, 26 Apr 2010 07:32:17 -0700



Related Topics

Post a Comment