Replacing RabbitMQ with MongoDB

By David Mytton,
CEO & Founder of Server Density.

Published on the 28th September, 2011.

Update Nov 2015: – We have since reached the limits of MongoDB’s atomic findAndModify and have moved over to using Kafka for queuing.

Back in April we ditched RabbitMQ in favour of building our own simple queuing system using MongoDB. This has been in production since then and has been working very well.

Last week I gave a presentation at the MongoUK conference and London DevOps meetup going into more technical detail. The presentation is embedded below along with a full writeup.

[slideshare id=9457894&doc=mongouk11-replacingrabbitmqwithmongodb-110928083900-phpapp01]

Why use a queue?

  • Background processing – tasks that don’t need to be immediately whilst the user waits, or even tasks that the user will wait for but have to be run by separate processes. The key here is the tasks get run asynchronously so the user is either doing something else or is waiting with a status page.
  • Examples of these kind of jobs could be sending notifications such as via e-mail, where a short delay isn’t critical or where there are many items to send e.g. bulk e-mailing.
  • Internal communication – streaming events throughout your infrastructure to different nodes or components.

What do you need in a queueing system?

  • Consumers – these listen in on the queue and grab messages as they come in to execute the instructions contained within the message. These are usually multiple identical processes so that many messages can be processed concurrently.
  • Atomic – you do not want multiple consumers pulling down the same message and causing duplication. So when a consumer acknowledges a message, no other consumers should be able to process it.
  • Speed – items need to be inserted into the queue very quickly. Equally, they need to be pulled out quickly. The actual processing time may vary depending on what is being done but the queue needs to handle large numbers of reads/writes.
  • Garbage collection (GC) – if a consumer dies mid-process then the message it was holding should be returned to the queue for another consumer to pick it up.

Implementation – Consumers

Interaction with RabbitMQ is done through AMQP which is a protocol designed specifically for queuing systems. Various libraries exist for different languages which implement this and although we had no problems using the Python AMQP library, the PHP libraries we tried all broke down at higher loads (although I understand there have been improvements to the PHP library since). MongoDB has similar libraries available for many different languages which all use the Mongo Wire Protocol, which we have been using at high load for several years.

Switching from the Python AMQP library (pika) to the Python Mongo driver (pymongo) saw a significant reduction in overhead which meant we could run more consumers per server and reduce the number of servers we needed to process the same number of incoming messages.

The atomic requirement is handled in AMQP using the consume/ack methods and in MongoDB you use the findAndModify command. The general form is:

db.runCommand( { findAndModify : collection, { options } } )

It will return the first document it finds that matchs your options, so the options we’re interested in are:

  • where – this is the query that will be performed to return a single document. To implement the queuing we need several meta fields as part of each method. We use inProg and done so we can keep a status record and query against pending processes: {'where' { 'inProg': false, 'done': false } }
  • sort – since findAndModify returns the first document it finds you may want to sort the results so you can return the oldest document (e.g. sorting by a timestamp field, or by _id which includes the timestamp as part of the ID). You could also set a priority field and give messages different levels of importance for the ones you want to process first.
  • update – we query against specific fields but also want to update them so the query continues to work (i.e. against pending messages). So we update the inProg field and also set a timestamp (used in the GC process later). {'update' : {'$set' : { 'inProg' : true, 't' : new Date() } } }

We’re not concerned with the other fields for the purpose of queuing.

The findAndModify query needs to be executed periodically to pull down messages from the database. How to do this will depend on the language you’re using but we use Python and can make use of the sched module:

[sourcecode language=”python”]def run(self):
import time, sched
sc = sched.scheduler(time.time, time.sleep)

def poll(self, sc):
sc.enter(10, 1, self.poll, (sc,))[/sourcecode]

Here, the poll method will be called every 10 seconds and it’ll call the doStuff method and then reschedule the next execution. doStuff is where you’d put your call to findAndModify, plus any processing you want to do with the returned document.

We keep these scripts running as daemons using our daemon class, which is available free under an open source license.

Implementation – GC

As part of our call to findAndModify we set a timestamp so we know when the message was pulled down. Based on how long we expect consumers to take per message, we can easily query documents which are marked as in progress but the timestamp was some time ago.

For example if we expect items to be processed within 10 seconds:

[sourcecode language=”python”] now =
difference = datetime.timedelta(seconds=10)
timeout = now – difference

queue.find({‘inProg’ : True, ‘start’ : {‘$lte’ : timeout} })[/sourcecode]

We can then update those documents to reset the inProg flag, effectively returning them to the queue for another consumer to work on.

Other considerations

  • Fault tolerance – replica sets in MongoDB work extremely well to allow automatic failover and redundancy. The original reason we removed RabbitMQ was no in-built support for redundancy across multiple nodes/data centres. This is now possible as of RabbitMQ 2.6.0.
  • Consistency – journaling is enabled by default in MongoDB 2.0 which means data is flushed to a journal file every 100ms (by default) and then to the data files every 60 seconds (by default). Once data is in the journal it can be considered safe. This should be used in combination with replica sets to ensure data goes to multiple locations. You can set flags on your queries to ensure that data has been written to the journal and/or flushed to disk plus written to n replica nodes if you need to ensure you don’t lose any messages in the queue.
  • Scaling – we use a capped collection for our queues. They are created as a fixed size and older documents get rolled out in insertion order so we don’t need to bother with removing documents, which can be quite an intensive operation. However, at some point you will hit a limit where the atomic nature of the findAndModify locks means you need to scale your writes across multiple machines. This has to be done using sharding of regular collections since capped collections cannot be sharded.

Free eBook: The 9 Ingredients of Scale

From two students with pocket money, to 20 engineers and 80,000 servers on the books, our eBook is a detailed account of how we scaled a world-class DevOps team from the ground up. Download our definitive guide to scaling DevOps and how to get started on your journey.

Help us speak your language. What is your primary tech stack?

What infrastructure do you currently work with?

Articles you care about. Delivered.

Help us speak your language. What is your primary tech stack?

Maybe another time