Interesting devops tech stuff

Pssst… Server Density v2 is coming soon!

Replacing RabbitMQ with MongoDB

Written by David Mytton

Back in April we ditched RabbitMQ in favour of building our own simple queuing system using MongoDB. This has been in production since then and has been working very well.

Last week I gave a presentation at the MongoUK conference and London DevOps meetup going into more technical detail. The presentation is embedded below along with a full writeup.

Why use a queue?

  • Background processing – tasks that don’t need to be immediately whilst the user waits, or even tasks that the user will wait for but have to be run by separate processes. The key here is the tasks get run asynchronously so the user is either doing something else or is waiting with a status page.
  • Examples of these kind of jobs could be sending notifications such as via e-mail, where a short delay isn’t critical or where there are many items to send e.g. bulk e-mailing.
  • Internal communication – streaming events throughout your infrastructure to different nodes or components.

What do you need in a queueing system?

  • Consumers – these listen in on the queue and grab messages as they come in to execute the instructions contained within the message. These are usually multiple identical processes so that many messages can be processed concurrently.
  • Atomic – you do not want multiple consumers pulling down the same message and causing duplication. So when a consumer acknowledges a message, no other consumers should be able to process it.
  • Speed – items need to be inserted into the queue very quickly. Equally, they need to be pulled out quickly. The actual processing time may vary depending on what is being done but the queue needs to handle large numbers of reads/writes.
  • Garbage collection (GC) – if a consumer dies mid-process then the message it was holding should be returned to the queue for another consumer to pick it up.

Implementation – Consumers

Interaction with RabbitMQ is done through AMQP which is a protocol designed specifically for queuing systems. Various libraries exist for different languages which implement this and although we had no problems using the Python AMQP library, the PHP libraries we tried all broke down at higher loads (although I understand there have been improvements to the PHP library since). MongoDB has similar libraries available for many different languages which all use the Mongo Wire Protocol, which we have been using at high load for several years.

Switching from the Python AMQP library (pika) to the Python Mongo driver (pymongo) saw a significant reduction in overhead which meant we could run more consumers per server and reduce the number of servers we needed to process the same number of incoming messages.

The atomic requirement is handled in AMQP using the consume/ack methods and in MongoDB you use the findAndModify command. The general form is:

db.runCommand( { findAndModify : collection, { options } } )

It will return the first document it finds that matchs your options, so the options we’re interested in are:

  • where – this is the query that will be performed to return a single document. To implement the queuing we need several meta fields as part of each method. We use inProg and done so we can keep a status record and query against pending processes: {'where' { 'inProg': false, 'done': false } }
  • sort – since findAndModify returns the first document it finds you may want to sort the results so you can return the oldest document (e.g. sorting by a timestamp field, or by _id which includes the timestamp as part of the ID). You could also set a priority field and give messages different levels of importance for the ones you want to process first.
  • update – we query against specific fields but also want to update them so the query continues to work (i.e. against pending messages). So we update the inProg field and also set a timestamp (used in the GC process later). {'update' : {'$set' : { 'inProg' : true, 't' : new Date() } } }

We’re not concerned with the other fields for the purpose of queuing.

The findAndModify query needs to be executed periodically to pull down messages from the database. How to do this will depend on the language you’re using but we use Python and can make use of the sched module:

def run(self):
   import time, sched
   sc = sched.scheduler(time.time, time.sleep)
   self.poll(sc)
   sc.run()

def poll(self, sc):
   doStuff()
   sc.enter(10, 1, self.poll, (sc,))

Here, the poll method will be called every 10 seconds and it’ll call the doStuff method and then reschedule the next execution. doStuff is where you’d put your call to findAndModify, plus any processing you want to do with the returned document.

We keep these scripts running as daemons using our daemon class, which is available free under an open source license.

Implementation – GC

As part of our call to findAndModify we set a timestamp so we know when the message was pulled down. Based on how long we expect consumers to take per message, we can easily query documents which are marked as in progress but the timestamp was some time ago.

For example if we expect items to be processed within 10 seconds:

 now = datetime.datetime.now()
difference = datetime.timedelta(seconds=10)
timeout = now - difference

queue.find({'inProg' : True, 'start' : {'$lte' : timeout} })

We can then update those documents to reset the inProg flag, effectively returning them to the queue for another consumer to work on.

Other considerations

  • Fault tolerance – replica sets in MongoDB work extremely well to allow automatic failover and redundancy. The original reason we removed RabbitMQ was no in-built support for redundancy across multiple nodes/data centres. This is now possible as of RabbitMQ 2.6.0.
  • Consistency – journaling is enabled by default in MongoDB 2.0 which means data is flushed to a journal file every 100ms (by default) and then to the data files every 60 seconds (by default). Once data is in the journal it can be considered safe. This should be used in combination with replica sets to ensure data goes to multiple locations. You can set flags on your queries to ensure that data has been written to the journal and/or flushed to disk plus written to n replica nodes if you need to ensure you don’t lose any messages in the queue.
  • Scaling – we use a capped collection for our queues. They are created as a fixed size and older documents get rolled out in insertion order so we don’t need to bother with removing documents, which can be quite an intensive operation. However, at some point you will hit a limit where the atomic nature of the findAndModify locks means you need to scale your writes across multiple machines. This has to be done using sharding of regular collections since capped collections cannot be sharded.

Enjoy this post? You may also like Multi data center redundancy – sysadmin considerations

  • http://gravatar.com/agirbal agirbal

    Great post!
    How do you handle the fact that capped collection will throw away oldest documents as you insert?
    Do you stop inserting if you get close to the limit, or just make it so large that it’s unlikely?

    • http://www.serverdensity.com David Mytton

      We size the capped collection large enough so it can hold documents for a certain period of time without any being processed. This allows for service outages or other failures. If we stopped inserting we’d lose documents in the same way as if they rolled out, so making sure it’s sufficiently large and monitoring this as usage grows is important.

  • http://twitter.com/HybridDBA MattK (@HybridDBA)

    This is not the first mention I have seen of using MongoDB capped collections as message queues (http://www.shtylman.com/archives/217 , http://www.mattinsler.com/why-and-how-i-replaced-amazon-sqs-with-mongodb/ ).

    It presents an interesting approach, as an alternative to “traditional” queuing systems. While the established protocols like AMQP are not natively presented, for a system that can implement its own, it would be interesting to see if the advantages of a queue with direct access to the DB storage layer would be a benefit. Mongo’s replication and sharding may also bring options to scale out.

    I am certain that there are cases where MongoDB does not fit in well where an ActiveMQ or RabbitMQ excels, but I have also seen some AMQ scenarios that did not go too smoothly. It is always good to have more than one tool in the bag.

    • http://www.serverdensity.com David Mytton

      That’s right, it should be quite clear from the post that this is an extremely simple queue and we don’t use any of the functionality in RabbitMQ like QoS or throttling. This could be possible with Mongo or within your own consumer code but then you get to the point of reinventing the wheel when there is a perfectly good solution (RabbitMQ + AMQP) already there.

      The point is that if you already know Mongo and have basic requirements then it can save you having to learn new tech and/or simplify components of the infrastructure by using a single technology.

  • http://twitter.com/flaper87 FlaPer87 (@flaper87)

    Awesome,

    Did you try kombu? It also implements mongodb as transport for messages.

    Great post!

  • http://twitter.com/brodrigu Brad Rodriguez (@brodrigu)

    I’m curious as to how many “messages” you are pushing through Mongo. Although I’m sure it works well for your use case, the obvious concern with using Mongo as a message queue is that it would not be able to meet the performance of a “real” message queue system at scale.

    Did you benchmark this setup? If so, what kind of throughput were you able to achieve?

    • http://www.serverdensity.com David Mytton

      The primary reason for switching was the lack of redundancy in RabbitMQ. We knew replication works well in MongoDB and that it could scale by using sharding, as we currently do for our main data store. We didn’t do any formal benchmarks but observed that we could process messages significantly faster in MongoDB, run more consumers per server and reduce the total number of servers we needed.

      • http://twitter.com/kapilvt kapil

        interesting article, i’m going to it a try in a new app. Also fwiw, rabbitmq 2.6 does have redundancy (replicated queues).

  • http://matthewpskelton.wordpress.com Matthew Skelton (@matthewpskelton)

    Interesting post/slides and comments – thanks for sharing.

    Did you need to achieve HA across more than one DC? Having heard some discussions on MongoDB (e.g. http://www.infoq.com/presentations/Why-I-Chose-MongoDB-for-Guardian) it seems that multi-site can be a challenge.

  • http://sassysaas.wordpress.com blindman2k

    Nice post. I implemented the same thing in my SMS gateway (www.mittosms.com) and the performance was fantastic for a while. Eventually I hit a point where Mongo did become a bottleneck (the consumer contention got too much) and I moved to a combination of Beanstalkd and MongoDB. MongoDB now holds the data and Beanstalkd holds the IDs in queue. Beanstalkd (kr.github.com/beanstalkd/) is lightening fast and non-blocking which solved the issue. I am now able to hit my targeted 500+ transactions per second and I am starting to overrun TCP buffers instead :)

    Personally, I would never use capped collections for critical data. It will bite you in the rear at some point in the future. It has a number of limitations and bugs that have been documented over the years that make me nervous and even if it works perfectly there is a chance you won’t know until it is too late that you have too many items in the queue.

  • Mark

    Have you seen any issues with concurrency using findAndModify? We’ve got pretty much the same setup (we just don’t set the timestamp of when the job was marked active) but I found that processes were somehow able to pick up the same job and process it at the same time.

    • http://blog.serverdensity.com/ David Mytton

      I’ve never seen that – findAndModify is supposed to be atomic so if you are seeing this I suggest reporting it to 10gen.