July 02, 2008

Tahiti gets a name and an initial code push

I haven't given a real update on Tahiti since my post last September announcing that I was putting it on hold. That's mostly been true since then. My schedule has prevented me from finding time to work on the project. In that time I've worked as a consultant with two different companies, completed multiple projects, presented at two conferences, attended two other conferences, and completed two full time semesters at Columbia.

However, I've still been thinking about the project and trying to devise a plan to get a prototype out. In January I decided to team up with Mint Digital to help out with design work while I did some Ruby consulting for them. I worked with their designers and put together some wire frames to solidify my ideas on the project and even came up with a proper name: Filterly.

My hope was to work on it during the spring semester, but commitments in and out of school kept me from finding time to code. Well now that it's summer and I'm not in school, I've been sneaking in some time here and there while not working at Mint. I written some of the back end code for crawling and updating feeds, which was the source of my recent posts about background processing and message queues.

I finally pushed out some code to EC2 and it should be running full time from here on. Right now it's just back end stuff and a landing page, but it's a start. If you're curious about what the hell the goal of the project is, please visit the landing page at Filterly.com where I lay out the basic idea.

I'm also posting this so that if people find the Filterly.com crawler in their server logs, this will have something more than just a note that I'm not working on it any more. Ok, enough of this pointless blather. I promise my next post will be more interesting. Probably about my EC2 setup and my plan to spend as little as possible on hosting costs.

June 19, 2008

Trouble With Integrated Background Rails Libraries

Originally this post was going to be about how I was using Adam Pisoni's ruby gem named Skynet to do distributed background processing. However, this is not a story of success with that library.

The concept of Skynet is really cool: a dead simple MapReduce implementation with ActiveRecord integration. Installing it was a breeze and getting it to work on my development system was quick and painless. Getting it running in production was an entirely different story. First I ran into problems with the logging. Skynet logs information to its own log files so I had to make sure the permissions on that were set up correctly. Then when  I tried to get it running on another EC2 instance it just wouldn't work. No errors, no information, no nothing. So I decided to jump into the code and see if I could figure things out. A few hours later after looking through quite a bit of code I was no closer to knowing why it was failing.

I then realized something. I'm not trying to do something horribly complex. I don't need the map reduce paradigm. I just needed a queue and a way to process jobs from it. What's more, I didn't even need a disk based persistent queue. I think this is a failing of a bunch of these fully integrated background processing libraries. They're tied to drb, or a  specific type of queue, or the workers aren't customizable, or the workers have undesirable behavior (like BJ spawning a full Rails instance for every job execution). The problem is that I wanted a few basics like processing a method on an AR object asynchronously (something all of these do to varying degrees of success), but I also wanted some very custom behavior for how workers handle web crawling jobs, which is a task that I have to do a bit of.

I ended up going with Beanstalkd and the beanstalk-client gem (for a good intro to beanstalk read Geoffrey's post). I decided not to use the Rails integrated async-observer because the worker it provides didn't work for my needs. Instead, I wrote a basic wrapper for the queue, a generic job wrapper, and a worker. The code for the worker is something I really like because it's crazy simple. Have a look:

require "#{File.dirname(__FILE__)}/../config/environment.rb"

class QueueWorker
  def initialize(logger, queue)
    @queue = queue
    @logger = logger
  end
 
  def start(continue_loop = true)
    begin
      begin
        job = @queue.reserve
        job.process
        @queue.mark_as_finished(job)
      end while continue_loop
    rescue Exception => e
      @logger.error "Queue worker encountered an error: #{e.message}"
    end
  end 
end

QueueWorker.new(RAILS_DEFAULT_LOGGER, QueueWrapper.new(ARGV.first)).start

I like this worker because it doesn't know anything about the job or the queue. As long as the queue responds to 'reserve' and 'mark_as_finished', it can be anything. Likewise, the thing that is returned from reserve only needs to respond to 'process'. The logger object only needs to respond to 'error'. The worker is really stupid. It just grabs things off the queue and processes them and throws an error if execution stops. I could easily substitute the RAILS_DEFAULT_LOGGER in initialization with a wrapper to a monitoring service that will alert me on failure.

The QueueWrapper and another class called JobWrapper are the only points of interface between generic regular ruby code and the queueing mechanism. It's their responsibility to make sure that reserve and mark_as_finished methods exist. The JobWrapper also comes into play when pushing jobs onto the queue and marshaling them from the queue.

The real magic happens in the job definitions. With this structure I can define any kind of job I want. Here's the important snippet of some code for a generic AR async method call:

  def process
    model_name.constantize.find_by_id(id).send(method_to_send)
  end
 
  def self.queue_job(queue, model_instance, method)
    queue.push(GenericJob.new(model_instance.class.to_s, model_instance.id, method))
  end

And the active record integration looks like this:

class ActiveRecord::Base
  def send_later(method)
    GenericJob.queue_job(ActiveRecord::Base.app_background_queue, self, method)
  end
 
  def self.app_background_queue
    @@app_background_queue ||= QueueWrapper.new("#{BEANSTALK_HOST}:#{BEANSTALK_PORT}")
  end
end

So I built async processing for AR objects in less than a few hundred lines of code. The cool thing with this approach is that I can now define new jobs for crawling that have different process behavior. I actually queue up full batches of things instead of a single AR object. I can also define specific processing behavior like using multiple threads to deal with slow network IO. I like it because the worker that runs this job doesn't ever have to know about all this. It's completely agnostic to the types of jobs that it processes.

Wow, this ended up going on for a quite a while. I guess the lesson learned for me is that sometimes it's quicker to just pick up a few basic building blocks and wire them together.

Note: In fairness to Adam, he's busy working on a new version of Skynet that will offer other queuing options. It's definitely a project worth keeping an eye on.

June 12, 2008

Mocking Web Requests in Ruby with FakeWeb

I've been using rFeedParser lately to do a little feed parsing. One of the problems I ran into was trying to test the thing. Since there's network IO involved, I obviously want to get some mock action. In my first attempt I ended up having to modify rFeedParser's call to open to explicitly calling OpenURI.open so that I could mock it out with rspec like this:

OpenURI.should_receive(:open_uri).at_least(:once).with(feed_url, {
  "Accept-encoding"=>"gzip, deflate",
  "User-Agent"=>"Filterly +http://www.filterly.com",
  "Accept"=>"application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
  "A-IM"=>"feed"
  }).and_return(*xml_mocks)

xml_mocks is an array of strings that hold the xml I want to return on my sequence of calls. As you can see it's ugly as hell and not that fun to use. Last night at the New York City Ruby meeting (known as nyc.rb) I got a little pointer from Bryan Helmkamp to a library called FakeWeb. I have no clue how this escaped my notice until now, but it makes things much cleaner and easier. FakeWeb is a library written by Blaine Cook for faking web requests. Grab it like so:

sudo gem install FakeWeb

Then have fun with it like so:

require 'fake_web'
require 'open-uri'
# from the contents of a file:
FakeWeb.register_uri("http://www.pauldix.net/", :file => "some_file.xml")
# or from the contents of a string:
FakeWeb.register_uri("http://www.pauldix.net/", :string => "foo")
# and boom:
open("http://www.pauldix.net").read # =>  "foo"

There's only one gotcha to look out for. When you call FakeWeb.register_uri, make sure that the uri you pass in has a slash at the end. Your call to open can include the slash or not, but if you don't register the uri with the slash, the faking won't actually happen.

September 26, 2007

Tahiti on Hold

Here is a quick update in case anyone is following my progress on Tahiti. I've put the project on hold for a few months. I had to pick up a little consulting work to bring in some extra cash. That in addition to a full load at school has me completely swamped. The up side is that I'm working with the guys at EastMedia again.

One of my coworkers, Bryan Helmkamp, has an interesting blog if you're looking for a little Ruby reading material.

August 28, 2007

Tahiti Update: Didn't Make The TechCrunch20

This space has been quiet for a while so here's a quick update to let people know I'm still around. I heard back that project Tahiti didn't make the TechCrunch20. This was actually a good thing since the last month has been so busy with other stuff. Now I can move back to my original development plan which is to release to a small topic focused group and expand as I work out the kinks.

I wrapped up everything in Silicon Valley and now I'm back in the Alley. I'm ready to continue work on the project and will hopefully launch a private beta for people in the Ruby community sometime in September. It will be tough to keep it going while I'm starting a new semester, but hopefully I'll be able to pull it off.

I'm working out of CooperBricolage today and going to the nyc.rb hackfest later tonight.

July 20, 2007

Server Setup is The Suck

I'm trying to get my application setup and deployed on EC2. It's not going well. I'm using Paul Dowman's AMI, but of course it's giving me problems. Fix one problem then on to the next, then the next, then the next. Maybe later I'll restart the whole thing and take notes on the failures. Now if you'll excuse me, I have to find a small
animal to kick.

Update: I finally got it up. After more sweat and toil I found that it was probably just something really stupid on my part. Tomorrow night I'll have to make another image for the workers. Thanks for making the image Paul, you rock!

Technorati Tags: , , ,

July 18, 2007

A Distributed Parallel Ruby Feed Getter

Work continues on my attempt to make a decent Ruby abstraction for writing simple distributed, multiprocess, and multithreaded batch processing applications kind of like MapReduce (I outlined much of the effort in my posts about project Tahiti). I thought it might be interesting to look at what some code that uses the system looks like. Other than a little test code, the first thing I've written that will use the system is just a feed updater. It brings back a list of feeds from an AR backed model, downloads them, and updates them. Here's the code with a few comments to help point out the use of each piece (although if you haven't read my previous posts you may be lost anyway):

class FeedUpdater < MapReduce
  # had to do this so that feeds that hang can be killed.
  THREAD_TIMEOUT = 20
  # just a constant I use in multiple places.
  # it determines how many key/value pairs a chunk process will take care of
  CHUNK_SIZE = 200

  # The map calls are threaded in the chunk_processor.
  # No AR calls can be made in here. Just for the slower stuff
  def map(blog_id, blog_data)
    begin
      last_modified = open(blog_data[0], {"User-Agent" => "Tahiti +http://www.pauldix.net",
        "From" => "..."}) {|f| f.last_modified}
      if (!blog_data[1] or last_modified > blog_data[1])
        feed = FeedTools::Feed.open(blog_data[0], {:user_agent => 'Tahiti +http://www.pauldix.net'})
        output.emit(blog_id, [feed, last_modified])
      end
    rescue Exception => exception
      output.log("Exception: ", exception.message)
      output.emit(blog_id, :error)
    end
  end
 
  # What a DRb Worker machine sends to the chunk processor Process.
  # notice that it's not sending AR objects
  def self.get_key_value_pairs(chunk_number)
    Blog.find_chunk_to_update((chunk_number-1)*CHUNK_SIZE, CHUNK_SIZE).collect do |blog|
      [blog.id, [blog.feed_url, blog.last_updated_on]]
    end
  end
 
  # what the Master DRb calls before sending the different chunk numbers out to DRb Worker machines
  # This is for two things. One is to move data around to where it may be needed. Say out of the
  # production database and into some other place. Secondly, the count is for later so that an
  # estimate can be made of how many EC2 nodes to start up.
  def self.setup_chunks_and_get_total
    Blog.count_by_sql("SELECT count(*) AS count_all FROM blogs WHERE (update_failures < 5)")/CHUNK_SIZE
  end

  # this is the part where we can use active record.
  # This is in the master thread of the chunk processor
  def self.write_key_value(k, v)
    blog = Blog.find(k)
    if v == :error
      blog.update_attribute(:update_failures, blog.update_failures + 1)
    else
      blog.update_articles_from_feed(v[0], v[1])
    end
  end
end

Basically it's just the map phase of a MapReduce. There are some additional parts that help the system distribute out the work. The thing I like is that nowhere in this piece of code do I really have to worry about the details of threading, multiprocess, and multi-machine headaches. I need to test the part on ActiveRecord a little more to be sure that it works. I'm trying to get around the fact that it isn't mulithreaded by having only the main thread in the chunk process make AR calls.

As a stylistic side note, I think some bits of this are a little ugly and could probably be made a little more readable with a few Structs.

The MapReduce parent class provides a number of default options like how large chunks should be, how often the chunk processors should update the Master DRb on the progress, the estimated amount of memory each chunk process will use, and the number of threads that a chunk process should kick off to process the different key/value pairs.

Now I just have to get this thing deployed onto EC2 and see how it does. I'm sure there will be plenty of fixes and updates to the whole thing before it becomes stable enough to run a bunch of different tasks. I hope that it will generalize to other tasks because distributed computing abstractions seem to be leaky no matter what (leaky abstractions).

Thanks to Zed and Defiler for helping me think through how to get the multi-threaded stuff to work. Hopefully I'll be making a post about the multi-threading stuff in the ChunkProcessor pretty soon.

Technorati Tags: , ,

July 12, 2007

Notes on Ruby's DRb

Yesterday I described the basic design of a distributed Ruby system I'm implementing. One of the first parts of building this system is figuring out how the different machines are going to communicate and cooperate. Distributed Ruby (DRb) seems like the perfect choice for the task. Programming Ruby describes it as a simple, lightweight Ruby version of RMI or CORBA.

Unfortunately, the Pickaxe book is my only reference and it includes only the most basic example. I guess it will help me to lay out how I think things work, a few of the concerns I have, and my guesses at their answers.

First thing to take note of is that DRb objects are multithreaded. So if I start some object of mine running on port 1337, multiple remote processes can connect to it. Not only that, but they can make calls at the same time. To see some of the multithreaded action I created a few simple DRb scripts like so:

require 'drb'
require 'drb/observer'
class Worker
def initialize
@test = 0
end

def run(sleep_val)
sleep(sleep_val)
@test += 1
return @test
end
end
worker = Worker.new
DRb.start_service("druby://127.0.0.1:1337", worker)
DRb.thread.join
And a test script to call it:
sleep_time = ARGV[0].to_i
worker = DRbObject.new(nil, "druby://127.0.0.1:1337")
val = worker.run(sleep_time)
puts "test val: #{val}"
I bring up three console windows. In one I run the worker which now sits waiting to be called on port 1337. In another I kick off the test caller with an argument to make it sleep for 10 seconds. In the last I kick of the test caller with an argument to make it sleep for only a second.

Running that test I can see three things that I think are important. First is that both calls succeeded so the multithreaded nature of the DRb object works exactly as I would expect allowing multiple connections at the same time. Second is that the @test instance variable in the worker object is shared. There is one worker object that handles both requests and threads them off. This tells me that if I'm going to be doing anything with instance variables in DRb objects, I have to make them threadsafe using a Queue or some locking mechanism like Mutex or Monitor. The last thing is that the calls from the test script don't return until the remote worker finishes. So the .run calls don't just send the data out to the remote worker and go about their business, they wait for a response. I also tested for when the connection is made with a slight modification of the test script. The call to DRbObject.new doesn't actually connect to the remote worker. That doesn't occur until the .run method call.

All of this is incredibly boring and simple, but I need to take small steps to think about the problem. So here's a slight rehash of my structure for the different pieces. One Master process runs for each MapReduce that gets kicked off. One DRbWorker runs on each machine that is available to perform processing. The Master sends as many input Chunks to the DRbWorker as it can handle. For each Chunk processing request, the DRbWorker starts a new process that it detaches from. It hands that script a druby uri so the Chunk process can report back to the Master what its status is. This means the call the Master makes to the DRbWorker to process the Chunk returns  very quickly so the connection doesn't stay open (I'm assuming the connection is closed after the call returns, but I have to test this somehow).

This also means that the Master will have to start listening for incoming calls from the various Chunk processes before kicking them off to the DRbWorker. I was looking at using 'drb/observer' to have the DRbWorker send update notifications back to the Master, but I think it's easier to just have the Chunk process handle that. The DRbWorker is really just a dispatcher. It dispatches new processes into the machine that are requested from the Master.

I guess the only concern I have going forward is the issue of tracking which PIDs are responsible for processing each Chunk. I want the Master to be able to make a call to the DRbWorker to kill and/or restart a Chunk process if it thinks it has hung. I guess it's time to dig into launching and monitoring child processes.

Technorati Tags: , ,

July 11, 2007

Distributed, MultiProcess, MultiThreaded Ruby Hurts My Brain

I've been looking into spreading out some batch processing and background jobs over a group of machines. The trick is that I don't want to deal with the dirty details of writing multiprocess and multithreaded stuff more than once. I want to write an abstraction like MapReduce so I can then easily write code that runs distributed, but looks deceptively simple. The other catch is that I'm designing it to run on EC2.

I started out last week looking at Starfish as an option, but it didn't
quite meet my needs. It uses Rinda, which relies on UDP broadcasts. I'm
not even sure that will work in EC2 and in general it just seems like
too much of a chatty Kathy. So I decided to look at DRb and figure out
how to write my own stuff. Here's the outline of how I want it to work from a high level.

  • I write a basic MapReduce task (my design isn't exactly MR yet)
  • Run some cap deploy from my dev box that kicks off the following
  • Tells the main web server (running a Master) to start the number of EC2 nodes it determines it needs
  • Each EC2 node comes up and syncs the latest code with a "Worker" process waiting to be called
  • The Master sends the information on different input chunks out to each EC2 node through the Worker
  • The Worker kicks off a separate process for each chunk (each machine should be able to process multiple chunks at once)
  • The Chunk process gets the input chunk (in this case from the database) and splits the chunk into key/value pairs and threads out calls to map with each one
  • The threaded map calls return and the Chunk process notifies the Master (repeat for all chunks)
  • The Master turns down each EC2 node as it finishes processing all Chunks
This way I can run a bunch of different tasks all at the same time and only pay for the time that I use.

First is the MapReduce. When I write one it must have the following:
  • A class method determine how many chunks there are (needs AR access)
  • A class method to get each chunk and split into its key/value pairs (needs AR access and the outputted key/value pairs shouldn't be AR objects)
  • A map method
  • An attr_accessor for an output object that has an emit(key, value) method
  • A class method that takes a key/value pair from an emit and can use them to write data (needs AR access)
Notice how I flagged each thing in there that needs ActiveRecord access. That's because the Chunk process kicks off the map processing of each key/value pair in its own thread and ActiveRecord isn't threadsafe. I noticed a flag for ActiveRecord::Base.allow_concurrency, but that causes AR to open up a new connection for each thread and the whole thing just kind of scares me. If my fears are without merit please speak up. As it stands I have the Chunk process as the single AR connection and the gateway to that kind of interaction. That's where emit comes in. It tosses the values into a Queue which is threadsafe. As each thread terminates the main thread of the Chunk process clears out the Queue and calls to the MapReduce class method that knows what to do with the data.

The Worker is just a DRb process that listens for calls from the Master and kicks off new Chunk processes when instructed. The Master is the process that orchestrates the whole thing and will eventually be responsible for making sure each chunk gets processed. It is also a DRb listener since it has to receive updates from the Worker on each chunk.

I still have to work out some of the details since I've been testing out different things over the last week. Once I have the whole mess running on my development box I'll start working on getting it to work on EC2 (the real fun part).

Someone may tell me I should be using BackgroundDRb, but I'm not sure it will fit my needs either. I'm not looking at running just a few long running background tasks. I'm looking at running a bunch of stuff in parallel and with changing code and completely disconnected from the web server. Although at this point I'm so delirious that this may just be a really stupid design and overly complicated system.

Technorati Tags:

July 07, 2007

Keeping Tahiti Secret for TechCrunch20

Tahiti is the name of the project I've been working on for a while, but only recently gave a codename. A week ago I decided to start blogging about some of the different pieces without really giving away what the product is. One reason for the secrecy is because I don't really want to hype up something before its release. The blogging is about the technology and discussion with the Ruby community, not the product. The second reason is that I was considering submitting for the TechCrunch20 conference. Just in case you haven't heard of it here's a blurb from their site:

Twenty of the hottest new startups from around the world will announce and demo their products over a two day period at TechCrunch20. And they don’t pay a cent to do this. They will be selected to participate based on merit alone. In fact, we’re even offering a $50,000 cash award and lining up other in-kind services and awards from a generous group of corporate sponsors.
The deadline for submission was yesterday and I sent one in. I was hesitant to submit since I wanted to keep this project smaller scale in the beginning, but the opportunity was too much to resist. Because of the larger scale, I've recruited a long time friend and colleague to help out with the project. I referred to him as "The Adept" for a project I wanted to do a few years ago, but never got off the ground.

We won't find out if we've been selected as finalists until August 3rd, but this will definitely help to keep us motivated and on track. In the meantime I'll keep blogging about the coding and infrastructure process without giving away the idea.

Technorati Tags: ,