Queues and Callbacks

A major part of our work behind the scenes is about improving our internal processes and, whenever possible, automating tasks. To this end we have a number of systems that need to communicate with each other.

The Control Panel that you may be familiar with uses Delayed Job. This is a Rails-specific gem that uses the database as a queue, with a nicely packaged worker process that handles messages as they arrive. Because the Control Panel only ever talks to Rails from Rails, this worked extremely well.

However, our other systems were not homogenous - there are a number of different interfaces that needed to be instructed at various times and across various machines, and Delayed Job didn’t really fit the bill. In particular, there were some tasks that could only happen on certain servers - while Delayed Job let us have multiple worker processes on different boxes, it essentially managed a single queue, so it could not differentiate between messages for one worker and messages for another.

AMQP to the rescue

Because of this, we looked elsewhere for our central work queue. We decided to use the AMQP protocol, with RabbitMQ as our implementation. AMQP is a protocol for taking incoming messages, placing them on one of many queues and routing them to a destination reliably; RabbitMQ is a server, built in Erlang, that is pretty lightweight and has some nice clustering features built-in.

Into the Warren

When it came to getting our Rails applications to talk to RabbitMQ, we had a couple of choices.

There is an AMQP gem which looked great. Unfortunately, as it uses EventMachine internally, it caused a few problems with our Passenger-based deployments - EventMachine opens a background thread which can cause issues to the web-server controlled Passenger processes.

There was also a gem called bunny. This works synchronously, thus avoiding the EventMachine issues - you connect and bunny either says “here’s the next message” or “the queue is empty” and then you disconnect.

It looked like bunny was perfect for our Passenger-based apps, with the AMQP gem being used in the non-Passenger side of things (actually in the end, we went with bunny on both sides, but for other reasons). But there was still a couple of things that we wanted to do with the messages.

Firstly, we are Rubyists and AMQP (the protocol) is language agnostic. In particular, we wanted to put a hash of data onto the queue and have a hash of data presented to us at the other end. Secondly, security is important to us - especially when it comes to tasks like rebooting or reconfiguring a server - so we wanted to ensure that the contents of the message were not tampered with in transit.

For these reasons, we built Warren. This adds filters over the AQMP layer. In our case, we had one filter that automatically marshalls the data to and from YAML as it is put onto and taken off the queue and a second HMAC filter that, with one line of configuration, ensures that the data was not altered on its journey.

Warren itself has an adapter layer, so it can quite happily use either the AMQP gem or bunny to actually talk to the queue. Adding new adapters, or filters, is as simple as creating a new subclass - Warren detects it and adds it to its stack.


With Warren it is pretty simple to put a message on to the queue:

Warren::Queue.publish(@queue_name, @hash_of_data)

It is just as easy to pull the messages off the other end:

Warren::Queue.subscribe(@queue_name) do | hash_of_data |
  do_something_with hash_of_data

But how do we implement this mysterious do_something_with(hash_of_data) call?

For our purposes, the endpoint needs to do different things depending upon which machine we are on and which queue we are listening to. The key one is our “builder” system which handles reboots, migrations and deployments, but there are several others, which while not yet live, will be coming onstream soon.

So effectively, we needed a set of daemon processes that listen to given queues and handle the messages that they receive. This splits into two components - the daemon framework and the workers themselves - which lead to Bigwig (keeping the rabbit theme going, Bigwig was a character in Watership Down).

Bigwig expects a configuration file, telling it how to connect to the AMQP server and a folder full of plugins. Each plugin is a simple Ruby class that performs a given action. Bigwig takes the incoming messages, looks for a parameter within the Hash called “method” and then uses that to find a plugin. The plugin is then invoked and can perform whatever tasks it needs to, using any data from the incoming message.

For monitoring purposes, Bigwig automatically deals with any incoming messages with a method of “ping” - these simply write a message to the log file. We can then send ping messages to the queue at set intervals and monitor that the log file is updated when expected - if we don’t see the update then we trigger an alert, in case the queue, or Bigwig, is down.

If the queue itself goes down, for whatever reason, then Bigwig uses an intelligent reconnect pattern. At first, it retries often, to minimise the downtime, but if the queue stays down, it reconnects less frequently, giving the queue a chance to restart before being swamped with connections. Of course, if the queue fails, then the ping messages won’t make it through and our monitor should alert us.

Calling back

Our primary internal application creates a Task object, with a unique identifier when dispatching tasks across the queue. This ID is then passed onto the queue and each Bigwig plugin is told of it when it is invoked. The application itself makes the Tasks available over ActiveResource.

3410902This means that the Bigwig plugins themselves don’t actually need data passing over the queue. Instead, when they are invoked, they call back into the primary application, using the Task ID as the key to their ActiveResource call. They can then extract the data they need from the Task object and use that to do whatever they need to do. When completed, they call the complete! method on the ActiveResource object and our application knows that the job has been done (alternatively, they can call error! and our app knows that something went wrong). All of these appear in an event feed on the application’s homepage - so we know at a glance what activity is happening across our systems and, more importantly, what has failed.

We could have implemented this using AMQP again - a ‘responses’ queue that our primary application listens to, with each worker putting a message back when it was done. But we chose to use ActiveResource, partly because it is so simple (Rails makes it super-easy to expose your objects as XML and ActiveResource makes it equally easy for other Ruby programs to call back to them), and partly for philosophical reasons. When the primary application places a message onto the queue, it doesn’t necessarily know how it will be handled. AMQP dispatches it and a listener deals with it - in our case the listener is likely to be a Bigwig instance, but doesn’t have to be. But when the callback is made, it is always made into our application, which is already geared up for completing tasks or marking them as having an error. As we know that it is a Rails app at the other end, we chose to follow the path of least-resistance and use the “Rails way”.

However, we did have one issue with ActiveResource. Most of our plugins actually shell out to invoke Ruby scripts, with the plugin unpacking the information from the Task and generating command line arguments. This is a slight security risk, just because it is feasible that someone could inject a malicious command into the command line parameters (although as this is all internal it should never be an issue and Ruby’s system command does escape any supplied parameters). So we are switching the scripts to accept environment variables, thus bypassing the shell and eliminating a whole category of potential attacks. Secondly, using ActiveResource in a non-Rails application caused some weird errors - but only when you tried to POST back to the ActiveResource object. It turns out that ActiveSupport was causing some shifting to occur in the background and that was knocking out some of our own code. The solution was simple - we had to ensure that the require ‘activeresource’ statement was the first thing in each script.

So that’s a quick peek into the workings of some of our internal systems. We put a lot of thought into how these different components should work together, as their reliability is fundamental to our success as a business. And while they are not infallible (nothing ever is) I think you will agree that they show that using Ruby and AMQP as the “glue” to tie different systems together is not only feasible, it’s actually quite easy.

Recent posts

Get started with Brightbox Sign up takes just two minutes...