« Notes on Ruby's DRb | Main | Server Setup is The Suck »

July 18, 2007

A Distributed Parallel Ruby Feed Getter

Work continues on my attempt to make a decent Ruby abstraction for writing simple distributed, multiprocess, and multithreaded batch processing applications kind of like MapReduce (I outlined much of the effort in my posts about project Tahiti). I thought it might be interesting to look at what some code that uses the system looks like. Other than a little test code, the first thing I've written that will use the system is just a feed updater. It brings back a list of feeds from an AR backed model, downloads them, and updates them. Here's the code with a few comments to help point out the use of each piece (although if you haven't read my previous posts you may be lost anyway):

class FeedUpdater < MapReduce
  # had to do this so that feeds that hang can be killed.
  THREAD_TIMEOUT = 20
  # just a constant I use in multiple places.
  # it determines how many key/value pairs a chunk process will take care of
  CHUNK_SIZE = 200

  # The map calls are threaded in the chunk_processor.
  # No AR calls can be made in here. Just for the slower stuff
  def map(blog_id, blog_data)
    begin
      last_modified = open(blog_data[0], {"User-Agent" => "Tahiti +http://www.pauldix.net",
        "From" => "..."}) {|f| f.last_modified}
      if (!blog_data[1] or last_modified > blog_data[1])
        feed = FeedTools::Feed.open(blog_data[0], {:user_agent => 'Tahiti +http://www.pauldix.net'})
        output.emit(blog_id, [feed, last_modified])
      end
    rescue Exception => exception
      output.log("Exception: ", exception.message)
      output.emit(blog_id, :error)
    end
  end
 
  # What a DRb Worker machine sends to the chunk processor Process.
  # notice that it's not sending AR objects
  def self.get_key_value_pairs(chunk_number)
    Blog.find_chunk_to_update((chunk_number-1)*CHUNK_SIZE, CHUNK_SIZE).collect do |blog|
      [blog.id, [blog.feed_url, blog.last_updated_on]]
    end
  end
 
  # what the Master DRb calls before sending the different chunk numbers out to DRb Worker machines
  # This is for two things. One is to move data around to where it may be needed. Say out of the
  # production database and into some other place. Secondly, the count is for later so that an
  # estimate can be made of how many EC2 nodes to start up.
  def self.setup_chunks_and_get_total
    Blog.count_by_sql("SELECT count(*) AS count_all FROM blogs WHERE (update_failures < 5)")/CHUNK_SIZE
  end

  # this is the part where we can use active record.
  # This is in the master thread of the chunk processor
  def self.write_key_value(k, v)
    blog = Blog.find(k)
    if v == :error
      blog.update_attribute(:update_failures, blog.update_failures + 1)
    else
      blog.update_articles_from_feed(v[0], v[1])
    end
  end
end

Basically it's just the map phase of a MapReduce. There are some additional parts that help the system distribute out the work. The thing I like is that nowhere in this piece of code do I really have to worry about the details of threading, multiprocess, and multi-machine headaches. I need to test the part on ActiveRecord a little more to be sure that it works. I'm trying to get around the fact that it isn't mulithreaded by having only the main thread in the chunk process make AR calls.

As a stylistic side note, I think some bits of this are a little ugly and could probably be made a little more readable with a few Structs.

The MapReduce parent class provides a number of default options like how large chunks should be, how often the chunk processors should update the Master DRb on the progress, the estimated amount of memory each chunk process will use, and the number of threads that a chunk process should kick off to process the different key/value pairs.

Now I just have to get this thing deployed onto EC2 and see how it does. I'm sure there will be plenty of fixes and updates to the whole thing before it becomes stable enough to run a bunch of different tasks. I hope that it will generalize to other tasks because distributed computing abstractions seem to be leaky no matter what (leaky abstractions).

Thanks to Zed and Defiler for helping me think through how to get the multi-threaded stuff to work. Hopefully I'll be making a post about the multi-threading stuff in the ChunkProcessor pretty soon.

Technorati Tags: , ,

Comments

The problem with FeedTools is that it uses REXML, which is painfully slow. I had a library in Ruby for pulling out feeds and I had to rewrite it in Python with Universal Feed Parses due to the slowness of the REXML library.

The comments to this entry are closed.