Async-Observer with RabbitMQ

August 8th 2008 | Tags: ruby rails merb

Update: This code is stale, I've extracted a gem of it and posted on github.

Async-observer is great. Fast, easy to use API, and it Just Works. The downside is that the backend, Beanstalkd, doesn't support persistent messages in case the server crashes. I hear it's on the roadmap, though.

However, there's another messaging backend, RabbitMQ, that seems just as easy to get set up, and does support persistent messages. So, how to get these two bits of tech working together? Well, if you're hosting your app on Thin (or another app server that runs in EventMachine), it's pretty straightforward.

First, install the amqp ruby library to connect to rabbit, and then add a tiny bit of setup.

In config/environment.rb:

require 'mq'
class BeanstalkPoolImpersonator
  def initialize(opts={})
    @opts = opts
  def connect
    connection = AMQP.connect(@opts)
    @channel = channel =
  def use(queue)
    @queue =, queue)

  def yput(obj, pri, delay, ttr)
    p [obj, pri, delay, ttr]

  def last_server
  def subscribe(*args, &blk)
    @queue.subscribe(*args, &blk)

Then, instead of connecting via, do this:

AsyncObserver::Queue.queue =

You can pass an options hash to the new call, providing user, pass, vhost, host, or port as necessary.

Then, in your workers, load up the async_observer worker class, and extend like so:

class RabbitWorker < AsyncObserver::Worker
  def run() do
      AsyncObserver::Queue.queue.subscribe do |headers, msg|
        job = => YAML.load(msg), :body => msg, :stats => []) =[:delivery_tag]

Create the new worker the same way you would for the AO::Worker, and you're set:

Note: I'm maintaining a merb port of async-observer on github.

Note 2: This worker is somewhat fragile, if the RabbitMQ server goes down it will just hang forever waiting for more jobs. I'll need to figure out a solution to that before we move this into production (and I wrap it up in a gem), but I thought I'd get this out and about now.

blog comments powered by Disqus