Originally this post was going to be about how I was using Adam Pisoni's ruby gem named Skynet to do distributed background processing. However, this is not a story of success with that library.
The concept of Skynet is really cool: a dead simple MapReduce implementation with ActiveRecord integration. Installing it was a breeze and getting it to work on my development system was quick and painless. Getting it running in production was an entirely different story. First I ran into problems with the logging. Skynet logs information to its own log files so I had to make sure the permissions on that were set up correctly. Then when I tried to get it running on another EC2 instance it just wouldn't work. No errors, no information, no nothing. So I decided to jump into the code and see if I could figure things out. A few hours later after looking through quite a bit of code I was no closer to knowing why it was failing.
I then realized something. I'm not trying to do something horribly complex. I don't need the map reduce paradigm. I just needed a queue and a way to process jobs from it. What's more, I didn't even need a disk based persistent queue. I think this is a failing of a bunch of these fully integrated background processing libraries. They're tied to drb, or a specific type of queue, or the workers aren't customizable, or the workers have undesirable behavior (like BJ spawning a full Rails instance for every job execution). The problem is that I wanted a few basics like processing a method on an AR object asynchronously (something all of these do to varying degrees of success), but I also wanted some very custom behavior for how workers handle web crawling jobs, which is a task that I have to do a bit of.
I ended up going with Beanstalkd and the beanstalk-client gem (for a good intro to beanstalk read Geoffrey's post). I decided not to use the Rails integrated async-observer because the worker it provides didn't work for my needs. Instead, I wrote a basic wrapper for the queue, a generic job wrapper, and a worker. The code for the worker is something I really like because it's crazy simple. Have a look:
require "#{File.dirname(__FILE__)}/../config/environment.rb"
class QueueWorker
def initialize(logger, queue)
@queue = queue
@logger = logger
end
def start(continue_loop = true)
begin
begin
job = @queue.reserve
job.process
@queue.mark_as_finished(job)
end while continue_loop
rescue Exception => e
@logger.error "Queue worker encountered an error: #{e.message}"
end
end
end
QueueWorker.new(RAILS_DEFAULT_LOGGER, QueueWrapper.new(ARGV.first)).start
I like this worker because it doesn't know anything about the job or the queue. As long as the queue responds to 'reserve' and 'mark_as_finished', it can be anything. Likewise, the thing that is returned from reserve only needs to respond to 'process'. The logger object only needs to respond to 'error'. The worker is really stupid. It just grabs things off the queue and processes them and throws an error if execution stops. I could easily substitute the RAILS_DEFAULT_LOGGER in initialization with a wrapper to a monitoring service that will alert me on failure.
The QueueWrapper and another class called JobWrapper are the only points of interface between generic regular ruby code and the queueing mechanism. It's their responsibility to make sure that reserve and mark_as_finished methods exist. The JobWrapper also comes into play when pushing jobs onto the queue and marshaling them from the queue.
The real magic happens in the job definitions. With this structure I can define any kind of job I want. Here's the important snippet of some code for a generic AR async method call:
def process
model_name.constantize.find_by_id(id).send(method_to_send)
end
def self.queue_job(queue, model_instance, method)
queue.push(GenericJob.new(model_instance.class.to_s, model_instance.id, method))
end
And the active record integration looks like this:
class ActiveRecord::Base
def send_later(method)
GenericJob.queue_job(ActiveRecord::Base.app_background_queue, self, method)
end
def self.app_background_queue
@@app_background_queue ||= QueueWrapper.new("#{BEANSTALK_HOST}:#{BEANSTALK_PORT}")
end
end
So I built async processing for AR objects in less than a few hundred lines of code. The cool thing with this approach is that I can now define new jobs for crawling that have different process behavior. I actually queue up full batches of things instead of a single AR object. I can also define specific processing behavior like using multiple threads to deal with slow network IO. I like it because the worker that runs this job doesn't ever have to know about all this. It's completely agnostic to the types of jobs that it processes.
Wow, this ended up going on for a quite a while. I guess the lesson learned for me is that sometimes it's quicker to just pick up a few basic building blocks and wire them together.
Note: In fairness to Adam, he's busy working on a new version of Skynet that will offer other queuing options. It's definitely a project worth keeping an eye on.