Work continues on my attempt to make a decent Ruby abstraction for writing simple distributed, multiprocess, and multithreaded batch processing applications kind of like MapReduce (I outlined much of the effort in my posts about project Tahiti). I thought it might be interesting to look at what some code that uses the system looks like. Other than a little test code, the first thing I've written that will use the system is just a feed updater. It brings back a list of feeds from an AR backed model, downloads them, and updates them. Here's the code with a few comments to help point out the use of each piece (although if you haven't read my previous posts you may be lost anyway):
class FeedUpdater < MapReduce
# had to do this so that feeds that hang can be killed.
THREAD_TIMEOUT = 20
# just a constant I use in multiple places.
# it determines how many key/value pairs a chunk process will take care of
CHUNK_SIZE = 200
# The map calls are threaded in the chunk_processor.
# No AR calls can be made in here. Just for the slower stuff
def map(blog_id, blog_data)
begin
last_modified = open(blog_data[0], {"User-Agent" => "Tahiti +http://www.pauldix.net",
"From" => "..."}) {|f| f.last_modified}
if (!blog_data[1] or last_modified > blog_data[1])
feed = FeedTools::Feed.open(blog_data[0], {:user_agent => 'Tahiti +http://www.pauldix.net'})
output.emit(blog_id, [feed, last_modified])
end
rescue Exception => exception
output.log("Exception: ", exception.message)
output.emit(blog_id, :error)
end
end
# What a DRb Worker machine sends to the chunk processor Process.
# notice that it's not sending AR objects
def self.get_key_value_pairs(chunk_number)
Blog.find_chunk_to_update((chunk_number-1)*CHUNK_SIZE, CHUNK_SIZE).collect do |blog|
[blog.id, [blog.feed_url, blog.last_updated_on]]
end
end
# what the Master DRb calls before sending the different chunk numbers out to DRb Worker machines
# This is for two things. One is to move data around to where it may be needed. Say out of the
# production database and into some other place. Secondly, the count is for later so that an
# estimate can be made of how many EC2 nodes to start up.
def self.setup_chunks_and_get_total
Blog.count_by_sql("SELECT count(*) AS count_all FROM blogs WHERE (update_failures < 5)")/CHUNK_SIZE
end
# this is the part where we can use active record.
# This is in the master thread of the chunk processor
def self.write_key_value(k, v)
blog = Blog.find(k)
if v == :error
blog.update_attribute(:update_failures, blog.update_failures + 1)
else
blog.update_articles_from_feed(v[0], v[1])
end
end
end
Basically it's just the map phase of a MapReduce. There are some additional parts that help the system distribute out the work. The thing I like is that nowhere in this piece of code do I really have to worry about the details of threading, multiprocess, and multi-machine headaches. I need to test the part on ActiveRecord a little more to be sure that it works. I'm trying to get around the fact that it isn't mulithreaded by having only the main thread in the chunk process make AR calls.
As a stylistic side note, I think some bits of this are a little ugly and could probably be made a little more readable with a few Structs.
The MapReduce parent class provides a number of default options like how large chunks should be, how often the chunk processors should update the Master DRb on the progress, the estimated amount of memory each chunk process will use, and the number of threads that a chunk process should kick off to process the different key/value pairs.
Now I just have to get this thing deployed onto EC2 and see how it does. I'm sure there will be plenty of fixes and updates to the whole thing before it becomes stable enough to run a bunch of different tasks. I hope that it will generalize to other tasks because distributed computing abstractions seem to be leaky no matter what (leaky abstractions).
Thanks to Zed and Defiler for helping me think through how to get the multi-threaded stuff to work. Hopefully I'll be making a post about the multi-threading stuff in the ChunkProcessor pretty soon.
Technorati Tags: ruby, mapreduce, distributed
The problem with FeedTools is that it uses REXML, which is painfully slow. I had a library in Ruby for pulling out feeds and I had to rewrite it in Python with Universal Feed Parses due to the slowness of the REXML library.
Posted by: Mario Lopes | August 25, 2007 at 04:27 PM