Handling Timeouts with Mongrel2 and ZeroMQ

I mentioned in my blog post on m2node that we wrote an HTTP server at Braintree that has some similarities to Mongrel2. One responsibility of our HTTP server is handling request timeouts. Our timeout handling needs to be more sophisticated than simply setting an HTTP status because of our Transparent Redirect API. If a TR request times out, we need to parse the redirect URL from the params and redirect back to the merchant’s website.

With Mongrel2, we can easily implement the same behavior by building a service that runs between Mongrel2 and the application.

ZeroMQ Streamer and Forwarder

We can implement a timeout handler by using implementing a device similar to a ZeroMQ Streamer and Forwarder.

We’ll use a Streamer to pull requests from Mongrel2 and push them to the application.

ZMQ_STREAMER collects tasks from a set of pushers and forwards these to a set of pullers. You should use ZMQ_STREAMER with a ZMQ_PULL socket for the frontend and a ZMQ_PUSH socket for the backend.

We’ll use a Forwarder to subscribe to responses from the application and publish them to Mongrel2.

ZMQ_FORWARDER collects messages from a set of publishers and forwards these to a set of subscribers. You should use ZMQ_FORWARDER with a ZMQ_SUB socket for the frontend and a ZMQ_PUB socket for the backend.

socket setup with em-zeromq

This example will be using the em-zeromq gem with Ruby’s EventMachine. You’ll need to be using Ruby 1.9, and you’ll also need the ffi gem installed. We’ll start by running a TimeoutHandler in an EventMachine run loop.

require 'em-zeromq'

EM.run do
  TimeoutHandler.start
  puts "Ready..."
end

The start method will need to create four sockets:

class TimeoutHandler
  def self.start
    context = EM::ZeroMQ::Context.new(1)

    responses_out = context.connect(ZMQ::PUB, 'tcp://127.0.0.1:8002')
    requests_out = context.bind(ZMQ::PUSH, 'tcp://127.0.0.1:8003')

    timeout_handler = new(requests_out, responses_out)

    request_handler = RequestHandler.new(timeout_handler)
    response_handler = ResponseHandler.new(timeout_handler)

    requests_in = context.connect(ZMQ::PULL, 'tcp://127.0.0.1:8001', request_handler)

    responses_in = context.bind(ZMQ::SUB, 'tcp://127.0.0.1:8004', response_handler)
    responses_in.setsockopt(ZMQ::SUBSCRIBE, '')
  end
end

We’ll initialize the timeout handler with the sockets to send requests and responses. The timers instance variable will be used to store the EventMachine timers for triggering the timeouts.

class TimeoutHandler
  def initialize(requests_out, responses_out)
    @timers = {}
    @requests_out = requests_out
    @responses_out = responses_out
  end
end

Handling Requests

To use em-zeromq, we need to initialize the ZMQ_PULL socket with an object that responds to on_readable. That was done in the start method above. We’ll use a simple class that delegates back to the timeout handler.

class RequestHandler
  def initialize(timeout_handler)
    @handler = timeout_handler
  end

  def on_readable(socket, messages)
    messages.each { |m| @handler.request_received(m.copy_out_string) }
  end
end

When a request is received, we’ll push it to the application on the ZMQ_PUSH socket. Then we can use an EventMachine timer to handle a timeout if we haven’t received the response in a certain amount of time.

class TimeoutHandler
  def request_received(request)
    @requests_out.send_msg(request)
    server_uuid, connection_id = request.split(" ", 3)[0,2]
    @timers["#{server_uuid} #{connection_id}"] = EM::Timer.new(TIMEOUT_IN_SECONDS) do
      _handle_timeout(server_uuid, connection_id)
    end
  end
end

If the timer ends up firing, it’s easy to send a timeout response. For this example, I’m going to send a simple HTTP 503 response, but we could implement something more sophisticated here if necessary.

class TimeoutHandler
  def _handle_timeout(server_uuid, connection_id)
    response = "#{server_uuid} #{connection_id.length}:#{connection_id}, #{TIMEOUT_RESPONSE_BODY}"
    @responses_out.send_msg(response)
    @timers.delete("#{server_uuid} #{connection_id}")
  end

Here’s the timeout response body that I’m using.

TIMEOUT_RESPONSE_BODY = [
  "HTTP/1.1 503 Service Unavailable",
  "Content-Type: text/plain",
  "Transfer-Encoding: chunked",
  "",
  "13", "Request timed out.\n",
  "0", "", ""
].join("\r\n")

Handling Responses

Of course, hopefully requests never time out. When receiving responses from the application, we’ll need to cancel the timer. Using the same approach as when handling requests, a simple ResponseHandler class can respond to on_readable on the ZMQ_SUB socket and delegate back to the timeout handler.

class ResponseHandler
  def initialize(timeout_handler)
    @handler = timeout_handler
  end

  def on_readable(socket, messages)
    messages.each { |m| @handler.response_received(m.copy_out_string) }
  end
end

When receiving a response, we need to check if the timer is still present. If it has been deleted, then a timeout response has already been sent and it’s too late to send the real response. If it is still present, we can cancel the timer and forward the response.

class TimeoutHandler
  def response_received(response)
    server_uuid, idNetstring = response.split(" ", 3)[0,2]
    connection_id = idNetstring.split(":")[1][0..-2]
    if timer = @timers.delete("#{server_uuid} #{connection_id}")
      timer.cancel
      @responses_out.send_msg(response)
    end
  end
end

Test Application

To test the timeout handler we can build a simple application that will sleep for a specified number of seconds. I built an application in Node that is served using m2node. The code is written in CoffeeScript.

http = require 'http'
m2node = require 'm2node'

server = http.createServer((req, res) ->
  console.log("#{req.method} #{req.url}")
  sendResponse = ->
    res.writeHead(200, {'Content-Type': 'text/plain'})
    res.end('Hello World\n')
  match = req.url.match(/sleep.(\d+)/)
  if match
    setTimeout(sendResponse, parseInt("#{match[1]}000"))
  else
    sendResponse()
)

m2node.run(
  server,
  recv_spec: 'tcp://127.0.0.1:8003'
  send_spec: 'tcp://127.0.0.1:8004'
)
console.log('Ready...')

If the url starts with /sleep, the response will be delayed for the specified number of seconds. For example, hitting /sleep/3 will wait 3 seconds before sending the response.

Output

Here’s the output if the timeout handler is configured to time out after 5 seconds and we hit the /sleep/6 URL.

$ curl -D - http://localhost:8000/sleep/6
HTTP/1.1 503 Service Unavailable
Content-Type: text/plain
Transfer-Encoding: chunked

Request timed out.

And of course, the proper response comes back as long as it finishes in less than the timeout threshold.

$ curl -D - http://localhost:8000/sleep/4
HTTP/1.1 200 OK
Content-Type: text/plain
Connection: keep-alive
Transfer-Encoding: chunked

Hello World

Message Expiration and Race Conditions

Because the timeout handler is implemented using a single-threaded event machine, there isn’t a race condition with sending responses in the timeout handler. If the timer is still present, then a response hasn’t been sent yet. If the timer is gone, then the timeout response has already been sent.

However, we could pull a request in the application after a timeout response has already been sent. Clearly a bad scenario, but if it happens, it would be best for the application to ignore the request.

Enterprise Integration Patterns recommends setting an expiration header and ignoring the request if it’s already expired.

A Message Expiration is a timestamp that specifies how long the message will live or when it will expire. It is like the expiration date on a milk carton. After that date, you shouldn’t drink the milk. Likewise, when a message expires, the messaging system should no longer deliver it. If a receiver still receives a message but cannot process it before the expiration, the receiver should throw away the message.

This could be implemented by having the timeout handler add an HTTP header to the request before pushing it out, and then updating the application to check that header before processing the request. I’ll leave implementing that as an exercise to the reader.

It’s also possible that the application will pick up a request before a timeout response has been sent, but not finish processing it until after the timeout response has been sent. The best course of action in this scenario likely varies depending on the application and the type of request.

Download

I put the code for the timeout handler, the node application for testing, and the mongrel2 configuration into a gist.