Slowly but surely I’ve been pecking away at a little Rails-based side-project for the last four or five months. I’m this close to flipping the on switch—but in the meantime I’ve still got some “i”s to dot and “t”s to cross. One of those was switching from in-request mail delivery to asynchronous mail delivery. The app I’ve been working on involves two parties marching a particular transaction through a variety of state transitions, each of which usually sends an email to either or both parties.
Like a good boy I started out with the simplest thing that could work which was to simply call mailers in my model. However, I wanted to limit the number of activities performed during a request to keep the app feeling responsive. So I decided that asynchronous mail delivery was a “pre-launch” feature that I had to have.
I looked at a variety of background processing tools, including Bj, Starling/Workling, Spawn and AP4R. Each had its strengths and weaknesses but none of them felt like the right fit. My research criteria included:
- Job persistence via the database
- Something that could get a Rails environment cheaply
- Runs outside of the Rails processes
- Minimum fuss to get it running
In the end the one that hit the sweet-spot best was delayed_job. It had the DB persistence I was looking for, but didn’t source the Rails environment for each worker and it was extremely simple to plumb it into my app.
Refactoring
The first step was creating the DelayedJob worker classes; one for each mail action. At first this turned into a big pile of five-line classes, so to keep things organized I put these all in app/models/jobs and put each class in the Jobs module namespace. This was better, but not good enough so the final step was putting all of the worker classes in a single file, app/models/jobs.rb.
The second step was to find every place in the model where I called my mailer classes directly and replace them with calls to enqueue the appropriate worker job.
Here is what things looked like at first:
1 class UserObserver << ActiveRecord::Observer
2 def after_create(user)
3 unless user.current_state == :latent or user.is_a?(Admin)
4 UserNotifier.deliver_signup_notification(user)
5 end
6 end
7
8 def after_save(user)
9 if user.current_state == :promoted
10 UserNotifier.deliver_signup(user)
11 else
12 UserNotifier.deliver_activation(user) if user.recently_activated?
13 end
14 end
15 end
16
Then the UserObserver was refactored like this:
1 class UserObserver << ActiveRecord::Observer
2 def after_create(user)
3 unless user.current_state == :latent or user.is_a?(Admin)
4 Delayed::Job.enqueue(Jobs::UserNotifierSignupNotificationJob.new(user.id))
5 end
6 end
7
8 def after_save(user)
9 if user.current_state == :promoted
10 Delayed::Job.enqueue(Jobs::UserNotifierSignupNotificationJob.new(user.id))
11 else
12 Delayed::Job.enqueue(Jobs::UserNotifierActivationJob.new(user.id)) if user.recently_activated?
13 end
14 end
15 end
16
With the following workers (abridged):
1 module Jobs
2 class UserNotifierDisconnectJob << Struct.new(:user_id)
3 def perform
4 UserNotifier.deliver_disconnect(user_id)
5 end
6 end
7
8 class UserNotifierResetPasswordJob << Struct.new(:user)
9 def perform
10 UserNotifier.deliver_reset_password(user)
11 end
12 end
13
14 class UserNotifierSignupNotificationJob << Struct.new(:user)
15 def perform
16 UserNotifier.deliver_signup_notification(user)
17 end
18 end
19
20 class UserNotifierStartDisconnectJob << Struct.new(:user_id)
21 def perform
22 UserNotifier.deliver_start_disconnect(user_id)
23 end
24 end
25 end
26
Testing
Prior to switching to asynchronous processing, mail delivery was triggered within my models, either via Observers or as Procs attached to state transitions (I’m using the acts_as_state_machine plugin). Therefore my tests had loads of assertions that various state changes in the model resulted in direct email delivery. In the asynchronous model of course, that changes slightly. While the state change ultimately ends in mail delivery, it only happens indirectly.
So here I had a big pile of tests that asserted that poking the model in certain ways resulted in a mail delivery. In my unit-tests I really just wanted to test the interaction between the models and DelayedJob. After all, if something went hay-wire during mail delivery the culprit would likely be my new worker classes, not my model.
However, for my integration tests I still wanted to keep the assertions about actual mail delivery since that was an important part of the stories. I could easily do this by monkey-patching the DelayedJob::enqueue method to call the worker’s perform method directly. In my unit-tests I monkey-patched the DelayedJob::enqueue method to work more like a mock object which added some inquiry methods to check that it had been invoked correctly.
In isolation this worked great, but running all the tests together resulted in a number of random failures. I’ve run into this enough times to recognize that some tests were somehow poisoning the run-time environment for the others. It turns out that my two approaches were incompatible with each other unless I was very diligent about cleaning everything up properly. I will admit with red-faced shame that I punted. I did the lamest thing one could possibly do and redefined DelayedJob::enqueue for all of my tests and kept all of my original assertions. I’m not proud of it, but it does work.
Running in Production
The next trick was getting this all running in a production environment and I needed to figure out how one or more workers would be started and kept running. While it’s great to have this decoupled from the Rails environment, it means that it’s a separate process that needs to be managed.
My solution was to use the daemons gem to create a couple of scripts. Then I used Tom Preston-Warner’s god, to monitor my process. The scripts look like this:
1 2
3 unless ARGV.size == 1
4 $stderr.puts "USAGE: #{0} [environment]"
5 exit 1
6 end
7
8 RAILS_ENV = ARGV.first
9 require File.dirname(__FILE__) + '/../config/environment'
10
11 Delayed::Worker.new.start
12
And the “control” script looks like this:
1 2
3 require "rubygems"
4 require "daemons"
5
6 def running?(pid)
7 8 9 10 11 begin
12 Process.kill(0, pid)
13 return true
14 rescue Errno::ESRCH
15 return false
16 rescue ::Exception 17 return true
18 end
19 end
20
21 if ARGV.size == 1 and ARGV.first == "status"
22 pidfile = "/var/run/delayed_job_worker.pid"
23 if File.exists?(pidfile)
24 pid = open(pidfile).readlines.first.strip.to_i
25 if running?(pid)
26 puts "delayed_job_worker is running (#{pid})"
27 else
28 puts "delayed_job_worker is NOT running (#{pid})"
29 end
30 else
31 puts "delayed_job_worker is NOT running (none)"
32 end
33 else
34 Daemons.run(File.dirname(__FILE__) + '/delayed_job_worker',
35 :backtrace => true,
36 :log_output => true,
37 :dir => File.dirname(__FILE__) + "/../tmp/pids",
38 :dir_mode => :normal,
39 :multiple => false)
40 end
41
The first script is merely the smallest amount of chrome required to start a worker. Note that I’m using John Barnette’s version of delayed_job which gives us that nice little worker riff. The second script is essentially the daemons wrapper around my worker. For a little extra goodness I added my own “status” command which can be handy for debugging.
Getting those to work properly took a little bit of testing by hand. Fortunately, the entire solution is really a series of layers applied on top of each other. Once you convince yourself that an inner-layer is working correctly you can move on to build the outer layers.
The next step was to create a proper god configuration. I have more than one thing to monitor on my setup so I have a master god configuration that includes sub-configurations. My “main” configuration is a simple one-liner:
God.load "/etc/god/*.god"
My application-specific configuration looks like this (with a few edits for public consumption):
1 RAILS_ROOT = "/var/www/moochbot/current"
2
3 God::Contacts::Email.message_settings = {
4 5 }
6
7 God::Contacts::Email.server_settings = {
8 9 }
10
11 God.contact(:email) do |c|
12 13 end
After setting up some default notification details, we get into the meat of defining our “watch”:
14
15 God.watch do |w|
16 w.name = "delayed_job_worker"
17 w.interval = 10.seconds
18 w.start = "#{RAILS_ROOT}/script/delayed_job_worker_control start -- production"
19 w.stop = "#{RAILS_ROOT}/script/delayed_job_worker_control stop"
20 w.restart = "#{RAILS_ROOT}/script/delayed_job_worker_control restart"
21 w.start_grace = 10.seconds
22 w.restart_grace = 10.seconds
23 w.pid_file = "#{RAILS_ROOT}/tmp/pids/delayed_job_worker.pid"
24
25 w.uid = "deploy"
26 w.gid = "root"
27 w.behavior(:clean_pid_file)
Next we need to define our transitions. My first attempt at this failed because I was missing these and my watched process was stuck in the “unmonitored” state. It’s worth spending some time reading the docs on the homepage since at first glance the configuration wasn’t obvious to me.
28
29 30 w.transition(:init, { true => :up, false => :start }) do |on|
31 on.condition(:process_running) do |c|
32 c.running = true
33 end
34 end
35
36 37 w.transition([:start, :restart], :up) do |on|
38 on.condition(:process_running) do |c|
39 c.running = true
40 end
41
42 43 on.condition(:tries) do |c|
44 c.times = 5
45 c.transition = :start
46 end
47 end
48
49 50 w.transition(:up, :start) do |on|
51 on.condition(:process_exits)
52 end
53
Finally I specify some resource-consumption boundaries to make sure that my little worker daemon doesn’t take over my box:
54 w.restart_if do |restart|
55 restart.condition(:memory_usage) do |c|
56 c.above = 100.megabytes
57 c.times = [3, 5]
58 end
59
60 restart.condition(:cpu_usage) do |c|
61 c.above = 50.percent
62 c.times = 5
63 end
64 end
65
66 w.lifecycle do |on|
67 on.condition(:flapping) do |c|
68 c.to_state = [:start, :restart]
69 c.times = 5
70 c.within = 5.minute
71 c.transition = :unmonitored
72 c.retry_in = 10.minutes
73 c.retry_times = 5
74 c.retry_within = 2.hours
75 end
76 end
77 end
78