Replication Across the Country

The MongoDB .NET driver recently had an issue reported that turned out to be a bug on our part. It is a subtle bug that wouldn’t have shown up except for in a specific replica set configuration. I’ll first discuss the new behavior in 1.6 regarding read preferences for replica set members and then discuss the configuration and what actually happened.

Replica sets are the MongoDB name for a group of servers that have one primary and N secondaries. Standard setups usually include 3 replica set members, 1 primary and 2 secondaries. All write operations go to the primary and all reads are governed by the stated read preference and tagging. Together, read preferences and tagging form a way of targetting a specific server or a group of servers in the cluster for reads. In a heavy read/write environment where not-immediately up-to-date reads are valid (most scenarios actually fall into this camp), then a way to load balance your cluster is to allow secondaries to serve up data for reading while the primary takes care of writing.

There are a number of read preferences, Primary, PrimaryPreferred, Secondary, SecondaryPreferred, and Nearest. SecondaryPreferred means to read from a secondary if one is available, otherwise, read from a primary. In addition, when choosing a secondary, we only consider secondaries within 15 milliseconds (by default) of the lowest secondary ping time. We do this to ensure that your reads are generally as fast as possible.

For example, in the setup below with 4 secondaries and one primary, we’d randomly choose from servers B, C, and E when using the SecondaryPreferred read preference. D would be excluded because it’s ping time is 17ms behind that of the lowest secondary’s ping time.

Server   Type       Ping Time
A           Primary       3ms
B          Secondary   7ms
C          Secondary   2ms
D          Secondary   19ms
E          Secondary   11ms

Cloud providers like EC2 and Azure offer the possiblity to stand-up replica set members in different regions of the country. This is great because when an entire region goes down, your app can still function by reading off the servers in other regions of the country. In the case of the bug mentioned at the top of this post, a 2 member replica set existed where the primary was in Region 1 and a secondary was in Region 2. In addition, the web application was located in Region 1. Using the read preference SecondaryPreferred, every single read will have to exit Region 1 and go all the way to Region 2 to get data. This distance imposed a ~100ms penalty.

Our bug manifested itself because of this ping time lag in the regions. Even though we were supposed to choose the secondary, we didn’t because it’s ping time was so much slower than that of the primary. The fix for us is easy, but the customer has to wait for us to fix it. Hence, we suggested a better setup of the cluster to remedy the problem in the mean time. Simply setting up a new secondary in Region 1 will send all reads to this secondary, all writes to the primary, and the secondary in Region 2 is used for failover and backup. I’d actually suggest this setup regardless of the bug.

We’ll be fixing this bug in version 1.6.1, but be aware of your lag times when using disparate data centers.

Disconnecting with the MongoDB .NET Driver

The MongoDB .NET Driver has a public method called Disconnect on the MongoServer class.  This method is somewhat useful in certain contexts such as when the server is shutting down or the application is exiting.  However, it is extremely important to know what this method does before using it because it could kill your application.

The documentation simple states that this causes the server to disconnect from the server.  In other words, this method terminates all connections to all the servers and shuts down any in flight operations.  This isn’t your standard ADO.NET Connection at all.  In fact, MongoServer isn’t a connection at all, but rather a proxy to 1 or more mongod or mongos processes.

In addition, the documented way to get access to a MongoServer is to use a static Create method.  MongoServer.Create() and all it’s overloads actually return the same instance when the specified connection settings match a previously created MongoServer.  Therefore, the documented behavior of Disconnect is even more unexpected when this information gets factored in.

There is a good reason for this method.  It cleanly disposes of all the resources associated with the many connections and sockets it manages.  So it’s useful when an application is exiting or the OS is shutting down.  However, most people call Disconnect because it’s there and it seems like the right thing to do.

We’ve started working on the next version (2.0) of the driver and are working through some issues, such as this one, to clean up and correct so that the api matches expectations and to make it much more difficult to do the wrong thing.

Brewer’s CAP Theorem

Brewer’s CAP theorem is an important concept in scalability discussions. The theorem states that only two of the three items, Consistency, Availability, and Partition tolerance are achievable. Below are three illustrations of how this works. For the purposes of these examples, we will imagine a cluster of three storage nodes used to store user profiles.

Scenario A: Sacrificing Partition Tolerance

On each of the three nodes, we will only store a subset of the user profiles. This is called sharding. Node one will have users A-H, node two I-S, and node three T-Z. As long as each node is up and running, we have achieved a three times higher throughput than with a single node as each node only server a third of the traffic (assuming of course that user profile querying and updating is uniformly distributed through the alphabet). Consistency is achieved because immediately after data is written, it is accessible. Availability is achieved because each server is accessible in real time. However, we have lost the concept of partition tolerance as the disabling of one server has rendered a certain section of users unreachable. This carries the notion that upon hardware failure, data could have permanently been lost. All in all, not a good sacrifice under most circumstances.

Scenario B: Sacrificing Availability

On each of the three nodes, we will store all the user profiles. And furthermore, to guarantee data consistency and data loss prevention, we will ensure that every write into the system happens on all three nodes before it is completed. So, if were to update a profile for Bob McBob, any subsequent queries or writes on Bob McBob’s profile would be blocked until the update has completed. Even worse is when one of the nodes is lost but the requirement of three writes is still required, our entire system is unavailable until it is restored. This means that while our data is consistent and protected, we have sacrificed the availability of the data. This is a reasonable sacrifice in some systems. However, our goal is scalability and this does not fit that requirement.

Scenario C: Sacrificing Consistency

On each of the three nodes, we will store all the user profiles. However (and different than scenario B), we will acknowledge a completed write immediately and not wait for the other two nodes. This means that if a read comes in on node two for data written on node one, it may or may not be up-to-date depending on the latency of replication. We are still highly available and still partition tolerant (with respect to the latency it takes to replicate to another second node).

A majority of the time, scenario C is the chosen path for a couple of reasons. First, most business use cases do not require up-to-the-second information. Take, for instance, a generated report on the sales in a given region. While the business user may request “live” data, monitoring the usage of such a report will likely look as follows: 1) User prints a report and waits for x time (perhaps some coffee is obtained). 2) User imports report into Excel and slices and dices the data for y time. 3) User acts upon information. Overall, the decision is delayed from “live” data by x + y, which is most likely in the order of hours and is definitely not based on “live” data.

Second, a business benefit can be garnered in some cases. Let’s take an ATM for instance. Upon a withdrawal of funds, the ATM looks at the data available for a decision on whether or not to allow the transaction to proceed. It is not aware of any pending transfers to or from the account and is definitely not aware of what occurred in the last x minutes. If you were to use a mobile phone, move some money from your ATM account, and then inquire the ATM for a balance, the account would look no different and allow an overdraft. Ultimately, the bank, by choosing “eventual” consistency, has appropriated a fee.

In my next post, I’d like to discuss how this eventually consistent model applies to Event Sourcing and how we can structure our applications to take advantage of this in the context of enforcing transactional consistency.

Event Sourcing as the Canonical Source of Truth

Event Sourcing (ES) is a concept that enables us peruse the history of our system and know its state at any point in time.  A few reasons this is important range from investigating a bug only occurring under certain conditions to understanding why something was changed (why was customer X’s address changed).  Another distinct advantage of event sourcing is that we could rebuild an entire data store (SQL tables, MongoDB collections, flat files, etc…) by replaying each event against a listener.  This would look something like the below code:

var listeners = GetAllListeners();

foreach(var event in GetAllEvents())

It’s quite simple and elegant, but more important is that it becomes the canonical data store; the single source of truth.  The alone yields some interesting possibilities.  One of which I’m quite fond is upon discovery of a poorly conceived database schema.  It is quite simple to redesign and build up as if it was in place on day one utilizing a likeness to the above code snippet.  In the same vein, imagine two separate applications needing access to the same data but having very different business models.  Instead of each application consuming a schema that makes sense for only one (or neither due to compromise), each has its own model serving its own needs.  Since neither is the canonical store of the data, duplication of data isn’t something to be frightened.

One thing to note as the discussion moves into scalability is that employing a denormalized schema design enabled by ES already increases our ability to scale.  When intersection of sets (sql joins) is unnecessary, queries against relational data sources perform much faster.

At this point, I have posted three articles, an introduction on how I got to where I am now, a discussion of CQRS, and now a discussion of ES.  I’d like to come full circle and discuss how CQRS + ES can be used to achieve further scalability, but first I need to address Brewer’s CAP Theorem and how it forms the backbone of many design decisions related to scalability.

Managing Complexity with CQRS

CQRS stands for command-query responsibility segregation.  It literally means to separate your commands from your queries, your reads from your writes.

This can take on many forms, the simplest having command messages differ from query messages.  It might seem obvious when stated like this, but I guarantee you have violated this idea numerous times.  I know I have.  For instance, take the example below of a client utilizing a service for a customer.

var customer = _service.GetCustomer(10);

customer.Address.Street = "1234 Blah St.";


The example above has 2 interesting characteristics.  First, we are sending the entire customer object back to update a single field in the address.  This isn’t necessarily bad, but it brings me to the second observation.

When looking through the history of a customer, the ability to tell why the street was changed (if at all) is impossible to discern.  The business intent of the change is missing.  Did the customer move?  Was there a typo in the street?  These are very different intentions that mean very different things.  Take for instance a business which sends letters when a customer moves confirming receipt of the change. The above snippet of code only allows us to send a letter when the address has changed.

Perhaps this would look a little better.

var customer = _service.GetCustomer(10);

var address = customer.Address;

address.Street = “1234 Blah St.”;

_service.CustomerHasMovedTo(customer.Id, address);

//or for a typo

_service.CorrectAddress(customer.Id, address);

While the above code may be a little more verbose, the intent is clear and the business can act accordingly.  Perhaps the business could disallow the move of a customer from Arizona (AZ) to Arkansas (AR) while still allowing a typo correcting an address that was supposed to be in Arkansas but was input incorrectly as Arizona.

In addition to business intent being important in the present, it is also important in the past.  The ability to reflect over historical events can prove an invaluable asset to a business.  In my next post, I’d like to discuss the Event Sourcing pattern.

My Road to CQRS

I remember my first job after graduating from college was building an internal .NET application interfacing with a legacy system.  This legacy system ran on an AS400 and was written in RPG utilizing a DB2 database for data storage.  When I looked at the database schema, I was horrified.  Opposite of any form of normalization, the real world apparently didn’t build applications as my school had taught.

That wasn’t true, however. The remainder of that job and every job thereafter, and most of the articles I read on the interwebs espouse the same principles I was taught.  Normalize your data, maintain referential integrity, run write operations inside a transaction, etc…  This seemed the universally accepted way to build systems.  So I continued on this learned trajectory churning out quality software meeting business requirements as specified.  When the database had trouble handling my 17 join query, we cached the results at the application layer.  When we could deal with day old data, we ran ETL processes at night to pre-calculate the source of the burdensome queries.

In retrospect, I wish these situations triggered my memory of that first legacy system.  The application cache and the extracted tables mirrored those early schemas that kept me up at night. And worse still is that these objects were not just use to read the data, but they were also used to update the data.  While I preached the principles of SOLID, I ignorantly violated the first letter in the acronym.

So, what did those original developers know that I didn’t? The original system was built to run on a mainframe with distributed terminal clients.  The mainframe does all the work while clients would simply view screens and then issue commands or queries to change the view or update the data respectively.  That resembles very closely the architecture of the web; a webserver on a box and a number of browsers connect to it to view data or post forms.  These days, our web servers can handle a whole lot of load (especially when load balanced), and much more than the original mainframes.  So, the mainframe guys supporting distributed clients are like a website supporting gazillions of hits a day (an hour?).  How did they manage this complexity?

CQRS stands for command-query responsibility segregation.  It literally means separating your commands from your queries; your reads from your writes.  They are responsible for different things.  Reads don’t have any business logic in them (aside from authorization perhaps).  So why did I keep insisting on a single model to rule them all?

This may sound complex and it can be.  In my next post, I want to delve into how CQRS can help us manage this complexity.

MongoSV Conference

I just returned from the MongoDB conference in San Jose on Saturday.  Because I’m a MongoDB Master, I was able to attend the Master’s Summit the day before the conference.  We did it unconference style and let each topic self-select based on what we wanted to talk about.  I discussed of lot of windows related things like performance counters and SCOM integration as well as how to evangelize to the Microsoft community as a whole.  10gen is really looking to expand into this area more so than they have in the past.

One of these efforts is that MongoDB now runs on Azure.  This is cool because it gives another possibility for scaling in the cloud.  Azure already offers 3 forms of data storage.  SQL, Table, and Blob.  Blob is just a filesystem and suitable for binary items like images.  Table storage is a way to store large quantities of non-relational data.  It is relatively cheap as is blog.  The last is SQL, where Azure supports storing relational data.  SQL Azure, however, is extremely expensive compared to Blob and Table storage.

MongoDB fits in between Table Storage and SQL Storage.  Underneath, it uses Blob storage to keep the data, making it much cheaper that SQL Azure.  MongoDB does not represent its data in relational form, but rather in document form.  However, unlike Table Storage, MongoDB is fully queryable, fully indexable, and super fast.  It is a great alternative for bridging the needs between dynamic queries and fully relational data.

All in all, I thoroughly enjoyed my time and hope to continue it through contact with the other Masters and feedback to 10gen.

Attending MongoSV

I’ll be attending MongoSV in california over the next two days.  Day 1 will be a summit for the MongoDB Master’s group (of which I am a member).  We’ll be discussing anything and everything about MongoDB with hopes to influence it’s future direction.

Day 2 will be more interesting.  As a .NET developer, I’m thoroughly interested in all things related to Microsoft.  A few days ago, 10gen announced that MongoDB has support for running on Azure.  In fact, Microsoft will be speaking on the topic at the conference.  This is totally interesting because it lets us marry a scalable infrastructure with a scalable database and not have to sacrifice either one for the other.  I have nothing against SQL Server and use it for all my transactional business needs.  However, when building systems to scale, transactional business models are not the correct choice.  I’ll talk more on this topic in my next post on CQRS.

Until then, I’ll take notes and blog my thoughts about the direction 10gen is going with MongoDB in the future.

MongoDB Open Source Efforts

I actively (when I have time) work on some open-source projects.  Both of them are related to the MongoDB C# Driver (to which I contributed a lot of code as well).

The first is FluentMongo ( which is a linq provider on top of the driver.  This was sucked out of an older C# driver (now defunct) to which I was a core committer with Steve Wagner ( and Sam Corder. Writing linq providers is incredibly difficult and I was so proud of my effort in the defunct project that I didn’t want it to go to waste; so I ported it over since the official driver did not have one (and still doesn’t).

The second project is Simple.Data.MongoDB (  If you haven’t yet played with Simple.Data (, then you are missing out.  It is completely abusing the point of C# 4’s dynamic keyword to build an Active Record style data layer in .NET.  It is a great fit for MongoDB because neither require a schema.  Simple.Data was built for a relational database but working with Mark Rendle has been a pleasure and he has changed some of the core to accomodate a different style database.

Anyways, just wanted to get this stuff out there and I’ll keep these updated as I add features to either one.