I mentioned in my blog post on m2node that we wrote an HTTP server at Braintree that has some similarities to Mongrel2. One responsibility of our HTTP server is handling request timeouts. Our timeout handling needs to be more sophisticated than simply setting an HTTP status because of our Transparent Redirect API. If a TR request times out, we need to parse the redirect URL from the params and redirect back to the merchant’s website.
With Mongrel2, we can easily implement the same behavior by building a service that runs between Mongrel2 and the application.
We can implement a timeout handler by using implementing a device similar to a ZeroMQ Streamer and Forwarder.
We’ll use a Streamer to pull requests from Mongrel2 and push them to the application.
ZMQ_STREAMER collects tasks from a set of pushers and forwards these to a set of pullers. You should use ZMQ_STREAMER with a ZMQ_PULL socket for the frontend and a ZMQ_PUSH socket for the backend.
We’ll use a Forwarder to subscribe to responses from the application and publish them to Mongrel2.
ZMQ_FORWARDER collects messages from a set of publishers and forwards these to a set of subscribers. You should use ZMQ_FORWARDER with a ZMQ_SUB socket for the frontend and a ZMQ_PUB socket for the backend.
This example will be using the em-zeromq
gem with Ruby’s EventMachine. You’ll need to be using Ruby 1.9, and you’ll also need the ffi
gem installed. We’ll start by running a TimeoutHandler in an EventMachine run loop.
require 'em-zeromq'
EM.run do
TimeoutHandler.start
puts "Ready..."
end
The start method will need to create four sockets:
class TimeoutHandler
def self.start
context = EM::ZeroMQ::Context.new(1)
responses_out = context.connect(ZMQ::PUB, 'tcp://127.0.0.1:8002')
requests_out = context.bind(ZMQ::PUSH, 'tcp://127.0.0.1:8003')
timeout_handler = new(requests_out, responses_out)
request_handler = RequestHandler.new(timeout_handler)
response_handler = ResponseHandler.new(timeout_handler)
requests_in = context.connect(ZMQ::PULL, 'tcp://127.0.0.1:8001', request_handler)
responses_in = context.bind(ZMQ::SUB, 'tcp://127.0.0.1:8004', response_handler)
responses_in.setsockopt(ZMQ::SUBSCRIBE, '')
end
end
We’ll initialize the timeout handler with the sockets to send requests and responses. The timers
instance variable will be used to store the EventMachine timers for triggering the timeouts.
class TimeoutHandler
def initialize(requests_out, responses_out)
@timers = {}
@requests_out = requests_out
@responses_out = responses_out
end
end
To use em-zeromq
, we need to initialize the ZMQ_PULL socket with an object that responds to on_readable
. That was done in the start
method above. We’ll use a simple class that delegates back to the timeout handler.
class RequestHandler
def initialize(timeout_handler)
@handler = timeout_handler
end
def on_readable(socket, messages)
messages.each { |m| @handler.request_received(m.copy_out_string) }
end
end
When a request is received, we’ll push it to the application on the ZMQ_PUSH socket. Then we can use an EventMachine timer to handle a timeout if we haven’t received the response in a certain amount of time.
class TimeoutHandler
def request_received(request)
@requests_out.send_msg(request)
server_uuid, connection_id = request.split(" ", 3)[0,2]
@timers["#{server_uuid} #{connection_id}"] = EM::Timer.new(TIMEOUT_IN_SECONDS) do
_handle_timeout(server_uuid, connection_id)
end
end
end
If the timer ends up firing, it’s easy to send a timeout response. For this example, I’m going to send a simple HTTP 503 response, but we could implement something more sophisticated here if necessary.
class TimeoutHandler
def _handle_timeout(server_uuid, connection_id)
response = "#{server_uuid} #{connection_id.length}:#{connection_id}, #{TIMEOUT_RESPONSE_BODY}"
@responses_out.send_msg(response)
@timers.delete("#{server_uuid} #{connection_id}")
end
Here’s the timeout response body that I’m using.
TIMEOUT_RESPONSE_BODY = [
"HTTP/1.1 503 Service Unavailable",
"Content-Type: text/plain",
"Transfer-Encoding: chunked",
"",
"13", "Request timed out.\n",
"0", "", ""
].join("\r\n")
Of course, hopefully requests never time out. When receiving responses from the application, we’ll need to cancel the timer. Using the same approach as when handling requests, a simple ResponseHandler class can respond to on_readable
on the ZMQ_SUB socket and delegate back to the timeout handler.
class ResponseHandler
def initialize(timeout_handler)
@handler = timeout_handler
end
def on_readable(socket, messages)
messages.each { |m| @handler.response_received(m.copy_out_string) }
end
end
When receiving a response, we need to check if the timer is still present. If it has been deleted, then a timeout response has already been sent and it’s too late to send the real response. If it is still present, we can cancel the timer and forward the response.
class TimeoutHandler
def response_received(response)
server_uuid, idNetstring = response.split(" ", 3)[0,2]
connection_id = idNetstring.split(":")[1][0..-2]
if timer = @timers.delete("#{server_uuid} #{connection_id}")
timer.cancel
@responses_out.send_msg(response)
end
end
end
To test the timeout handler we can build a simple application that will sleep for a specified number of seconds. I built an application in Node that is served using m2node. The code is written in CoffeeScript.
http = require 'http'
m2node = require 'm2node'
server = http.createServer((req, res) ->
console.log("#{req.method} #{req.url}")
sendResponse = ->
res.writeHead(200, {'Content-Type': 'text/plain'})
res.end('Hello World\n')
match = req.url.match(/sleep.(\d+)/)
if match
setTimeout(sendResponse, parseInt("#{match[1]}000"))
else
sendResponse()
)
m2node.run(
server,
recv_spec: 'tcp://127.0.0.1:8003'
send_spec: 'tcp://127.0.0.1:8004'
)
console.log('Ready...')
If the url starts with /sleep, the response will be delayed for the specified number of seconds. For example, hitting /sleep/3 will wait 3 seconds before sending the response.
Here’s the output if the timeout handler is configured to time out after 5 seconds and we hit the /sleep/6 URL.
$ curl -D - http://localhost:8000/sleep/6
HTTP/1.1 503 Service Unavailable
Content-Type: text/plain
Transfer-Encoding: chunked
Request timed out.
And of course, the proper response comes back as long as it finishes in less than the timeout threshold.
$ curl -D - http://localhost:8000/sleep/4
HTTP/1.1 200 OK
Content-Type: text/plain
Connection: keep-alive
Transfer-Encoding: chunked
Hello World
Because the timeout handler is implemented using a single-threaded event machine, there isn’t a race condition with sending responses in the timeout handler. If the timer is still present, then a response hasn’t been sent yet. If the timer is gone, then the timeout response has already been sent.
However, we could pull a request in the application after a timeout response has already been sent. Clearly a bad scenario, but if it happens, it would be best for the application to ignore the request.
Enterprise Integration Patterns recommends setting an expiration header and ignoring the request if it’s already expired.
A Message Expiration is a timestamp that specifies how long the message will live or when it will expire. It is like the expiration date on a milk carton. After that date, you shouldn’t drink the milk. Likewise, when a message expires, the messaging system should no longer deliver it. If a receiver still receives a message but cannot process it before the expiration, the receiver should throw away the message.
This could be implemented by having the timeout handler add an HTTP header to the request before pushing it out, and then updating the application to check that header before processing the request. I’ll leave implementing that as an exercise to the reader.
It’s also possible that the application will pick up a request before a timeout response has been sent, but not finish processing it until after the timeout response has been sent. The best course of action in this scenario likely varies depending on the application and the type of request.
I put the code for the timeout handler, the node application for testing, and the mongrel2 configuration into a gist.
git clone git://gist.github.com/1016048.git gist-1016048