Async-Observer with RabbitMQ

8th Aug 2008 | Tags: ruby rails merb

Update: This code is stale, I’ve extracted a gem of it and posted on github.

Async-observer is great. Fast, easy to use API, and it Just Works. The downside is that the backend, Beanstalkd, doesn’t support persistent messages in case the server crashes. I hear it’s on the roadmap, though.

However, there’s another messaging backend, RabbitMQ, that seems just as easy to get set up, and does support persistent messages. So, how to get these two bits of tech working together? Well, if you’re hosting your app on Thin (or another app server that runs in EventMachine), it’s pretty straightforward.

First, install the amqp ruby library to connect to rabbit, and then add a tiny bit of setup.

In config/environment.rb:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
require 'mq'
class BeanstalkPoolImpersonator
  def initialize(opts={})
    @opts = opts
  end

  def connect
    connection = AMQP.connect(@opts)
    @channel = channel = MQ.new(connection)
  end

  def use(queue)
    @queue = MQ::Queue.new(@channel, queue)
  end

  def yput(obj, pri, delay, ttr)
    p [obj, pri, delay, ttr]
    @queue.publish(YAML.dump(obj))
  end

  def last_server
    :last_server_stub
  end

  def subscribe(*args, &blk)
    @queue.subscribe(*args, &blk)
  end
end

Then, instead of connecting via Beanstalk::Pool.new, do this:

1
AsyncObserver::Queue.queue = BeanstalkPoolImpersonator.new()

You can pass an options hash to the new call, providing user, pass, vhost, host, or port as necessary.

Then, in your workers, load up the async_observer worker class, and extend like so:

1
2
3
4
5
6
7
8
9
10
11
12
13
class RabbitWorker < AsyncObserver::Worker
  def run()
    EM.run do
      AsyncObserver::Queue.queue.connect
      AsyncObserver::Queue.queue.use('1.0')
      AsyncObserver::Queue.queue.subscribe do |headers, msg|
        job = OpenStruct.new(:ybody => YAML.load(msg), :body => msg, :stats => [])
        job.id = headers.properties[:delivery_tag]
        safe_dispatch(job)
      end
    end
  end
end

Create the new worker the same way you would for the AO::Worker, and you’re set:

1
    RabbitWorker.new(binding).run()

Note: I’m maintaining a merb port of async-observer on github.

Note 2: This worker is somewhat fragile, if the RabbitMQ server goes down it will just hang forever waiting for more jobs. I’ll need to figure out a solution to that before we move this into production (and I wrap it up in a gem), but I thought I’d get this out and about now.