Tag Archive: nosql


Notes on MongoDB

For an article in a German magazine I’ve been researching MongoDB over the last week or so. While I didn’t need a lot of the information I came across I collected some nicely distilled notes on some of its inner workings. You won’t find information on how to get data out of or into MongoDB. The notes deal with the way MongoDB treats and handles your data, a high-low-level view if you will. I tried to keep them as objective as possible, but I added some commentary below.

Most of this is distilled knowledge I gathered from the MongoDB documentation, credit for making such a good resource available for us to read goes to the Mongo team. I added some of my own conclusion where it made sense. They’re doing a great job documenting it, and I can highly recommend spending time to go through as much of it as possible to get a good overview of the whys and hows of MongoDB. Also, thanks to Mathias Stearn for hooking me up with some more details on future plans and inner workings in general. If you want to know more about its inner workings, there’s a webcast coming up where they’re gonna explain how it works. Continue reading “Notes on MongoDB” »

HandlerSocket mySQL’s NoSQL, PHP and Webscale

HandlerSocket is a server plug-in that interfaces with InnoDB directly bypassing for the most part the core mySQL server all together. Using Handler socket you do not connect to the traditional port 3306 or use the mySQL protocol to communicate with the mySQL server, you use 9998 reads, 9999 writes and talks a different protocol (much smaller) to an epoll-based service. This awesome addition means that you can have many many connections with very little overhead. The core mySQL server does a good job, but can do better. I’m sure the reason why epoll is not in the core engine is probably due to licensing issues or some other reason I am not aware of, like it would be a big deal to add it. 

Why bypass the server? Really bypassing the sql parser is what was desired. The sql parser ads a significant amount of slowdown at a huge transaction rate, additionally mutex contention is involved prior to reaching the storage engine with malloc overhead (although Monty has fixed this in MariaDB). Even though you can easily get 45-70K selects a second through the mySQL SQL parser layer for InnoDB primary key lookups, this number falls short of Memcache’s 600K Gets per second or various other NoSQL solutions that keep data in memory. Continue reading “HandlerSocket mySQL’s NoSQL, PHP and Webscale” »

Most of high scale web applications use MySQL + memcached. Many of them use also NoSQL like TokyoCabinet/Tyrant. In some cases people have dropped MySQL and have shifted to NoSQL. One of the biggest reasons for such a movement is that it is said that NoSQL performs better than MySQL for simple access patterns such as primary key lookups. Most of queries from web applications are simple so this seems like a reasonable decision.
Like many other high scale web sites, we at DeNA(*) had similar issues for years. But we reached a different conclusion. We are using “only MySQL”. We still use memcached for front-end caching (i.e. preprocessed HTML, count/summary info), but we do not use memcached for caching rows. We do not use NoSQL, either. Why? Because we could get much better performance from MySQL than from other NoSQL products. In our benchmarks, we could get 750,000+ qps on a commodity MySQL/InnoDB 5.1 server from remote web clients. We also have got excellent performance on production environments.
Maybe you can’t believe the numbers, but this is a real story. In this long blog post, I’d like to share our experiences.
(*) For those who do not know.. I left Oracle in August 2010. Now I work at DeNA, one of the largest social game platform providers in Japan. Continue reading “Using MySQL as a NoSQL – A story for exceeding 750,000 qps on a commodity server” »

HBase Architecture 101 – Storage

One of the more hidden aspects of HBase is how data is actually stored. While the majority of users may never have to bother about it you may have to get up to speed when you want to learn what the various advanced configuration options you have at your disposal mean. “How can I tune HBase to my needs?”, and other similar questions are certainly interesting once you get over the (at times steep) learning curve of setting up a basic system. Another reason wanting to know more is if for whatever reason disaster strikes and you have to recover a HBase installation.

In my own efforts getting to know the respective classes that handle the various files I started to sketch a picture in my head illustrating the storage architecture of HBase. But while the ingenious and blessed committers of HBase easily navigate back and forth through that maze I find it much more difficult to keep a coherent image. So I decided to put that sketch to paper. Here it is.

Please note that this is not a UML or call graph but a merged picture of classes and the files they handle and by no means complete though focuses on the topic of this post. I will discuss the details below and also look at the configuration options and how they affect the low-level storage files.

The Big Picture

So what does my sketch of the HBase innards really say? You can see that HBase handles basically two kinds of file types. One is used for the write-ahead log and the other for the actual data storage. The files are primarily handled by the HRegionServer‘s. But in certain scenarios even the HMaster will have to perform low-level file operations. You may also notice that the actual files are in fact divided up into smaller blocks when stored within the Hadoop Distributed Filesystem (HDFS). This is also one of the areas where you can configure the system to handle larger or smaller data better. More on that later.

The general flow is that a new client contacts the Zookeeper quorum (a separate cluster of Zookeeper nodes) first to find a particular row key. It does so by retrieving the server name (i.e. host name) that hosts the -ROOT- region from Zookeeper. With that information it can query that server to get the server that hosts the .META. table. Both of these two details are cached and only looked up once. Lastly it can query the .META. server and retrieve the server that has the row the client is looking for.

Once it has been told where the row resides, i.e. in what region, it caches this information as well and contacts the HRegionServer hosting that region directly. So over time the client has a pretty complete picture of where to get rows from without needing to query the .META. server again.

Note: The HMaster is responsible to assign the regions to each HRegionServer when you start HBase. This also includes the “special” -ROOT- and .META. tables.

Next the HRegionServer opens the region it creates a corresponding HRegion object. When the HRegion is “opened” it sets up a Store instance for each HColumnFamily for every table as defined by the user beforehand. Each of the Store instances can in turn have one or more StoreFile instances, which are lightweight wrappers around the actual storage file called HFile. A HRegion also has a MemStore and a HLog instance. We will now have a look at how they work together but also where there are exceptions to the rule.

Stay Put

So how is data written to the actual storage? The client issues a HTable.put(Put) request to the HRegionServer which hands the details to the matching HRegion instance. The first step is now to decide if the data should be first written to the “Write-Ahead-Log” (WAL) represented by the HLog class. The decision is based on the flag set by the client using Put.writeToWAL(boolean) method. The WAL is a standard Hadoop SequenceFile (although it is currently discussed if that should not be changed to a more HBase suitable file format) and it stores HLogKey‘s. These keys contain a sequential number as well as the actual data and are used to replay not yet persisted data after a server crash.

Once the data is written (or not) to the WAL it is placed in the MemStore. At the same time it is checked if the MemStore is full and in that case a flush to disk is requested. When the request is served by a separate thread in the HRegionServer it writes the data to an HFile located in the HDFS. It also saves the last written sequence number so the system knows what was persisted so far. Let”s have a look at the files now.

Files

HBase has a configurable root directory in the HDFS but the default is /hbase. You can simply use the DFS tool of the Hadoop command line tool to look at the various files HBase stores.

$ hadoop dfs -lsr /hbase/docs

drwxr-xr-x – hadoop supergroup 0 2009-09-28 14:22 /hbase/.logs
drwxr-xr-x – hadoop supergroup 0 2009-10-15 14:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891
-rw-r–r– 3 hadoop supergroup 14980 2009-10-14 01:32 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255509179458
-rw-r–r– 3 hadoop supergroup 1773 2009-10-14 02:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255512781014
-rw-r–r– 3 hadoop supergroup 37902 2009-10-14 03:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255516382506

-rw-r–r– 3 hadoop supergroup 137648437 2009-09-28 14:20 /hbase/docs/1905740638/oldlogfile.log

drwxr-xr-x – hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123
-rw-r–r– 3 hadoop supergroup 2323 2009-09-01 23:16 /hbase/docs/999041123/.regioninfo
drwxr-xr-x – hadoop supergroup 0 2009-10-13 01:36 /hbase/docs/999041123/cache
-rw-r–r– 3 hadoop supergroup 91540404 2009-10-13 01:36 /hbase/docs/999041123/cache/5151973105100598304
drwxr-xr-x – hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123/contents
-rw-r–r– 3 hadoop supergroup 333470401 2009-09-27 18:02 /hbase/docs/999041123/contents/4397485149704042145
drwxr-xr-x – hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/language
-rw-r–r– 3 hadoop supergroup 39499 2009-09-04 01:16 /hbase/docs/999041123/language/8466543386566168248
drwxr-xr-x – hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/mimetype
-rw-r–r– 3 hadoop supergroup 134729 2009-09-04 01:16 /hbase/docs/999041123/mimetype/786163868456226374
drwxr-xr-x – hadoop supergroup 0 2009-10-08 22:45 /hbase/docs/999882558
-rw-r–r– 3 hadoop supergroup 2867 2009-10-08 22:45 /hbase/docs/999882558/.regioninfo
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:01 /hbase/docs/999882558/cache
-rw-r–r– 3 hadoop supergroup 45473255 2009-10-09 23:01 /hbase/docs/999882558/cache/974303626218211126
drwxr-xr-x – hadoop supergroup 0 2009-10-12 00:37 /hbase/docs/999882558/contents
-rw-r–r– 3 hadoop supergroup 467410053 2009-10-12 00:36 /hbase/docs/999882558/contents/2507607731379043001
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/language
-rw-r–r– 3 hadoop supergroup 541 2009-10-09 23:02 /hbase/docs/999882558/language/5662037059920609304
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/mimetype
-rw-r–r– 3 hadoop supergroup 84447 2009-10-09 23:02 /hbase/docs/999882558/mimetype/2642281535820134018
drwxr-xr-x – hadoop supergroup 0 2009-10-14 10:58 /hbase/docs/compaction.dir

The first set of files are the log files handled by the HLog instances and which are created in a directory called .logs underneath the HBase root directory. Then there is another subdirectory for each HRegionServer and then a log for each HRegion.

Next there is a file called oldlogfile.log which you may not even see on your cluster. They are created by one of the exceptions I mentioned earlier as far as file access is concerned. They are a result of so called “log splits”. When the HMaster starts and finds that there is a log file that is not handled by a HRegionServer anymore it splits the log copying the HLogKey‘s to the new regions they should be in. It places them directly in the region’s directory in a file named oldlogfile.log. Now when the respective HRegion is instantiated it reads these files and inserts the contained data into its local MemStore and starts a flush to persist the data right away and delete the file.

Note: Sometimes you may see left-over oldlogfile.log.old (yes, there is another .old at the end) which are caused by the HMaster trying repeatedly to split the log and found there was already another split log in place. At that point you have to consult with the HRegionServer or HMaster logs to see what is going on and if you can remove those files. I found at times that they were empty and therefore could safely be removed.

The next set of files are the actual regions. Each region name is encoded using a Jenkins Hash function and a directory created for it. The reason to hash the region name is because it may contain characters that cannot be used in a path name in DFS. The Jenkins Hash always returns legal characters, as simple as that. So you get the following path structure:

/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>

In the root of the region directory there is also a .regioninfo holding meta data about the region. This will be used in the future by an HBase fsck utility (see HBASE-7) to be able to rebuild a broken .META. table. For a first usage of the region info can be seen in HBASE-1867.

In each column-family directory you can see the actual data files, which I explain in the following section in detail.

Something that I have not shown above are split regions with their initial daughter reference files. When a data file within a region grows larger than the configured hbase.hregion.max.filesize then the region is split in two. This is done initially very quickly because the system simply creates two reference files in the new regions now supposed to host each half. The name of the reference file is an ID with the hashed name of the referenced region as a postfix, e.g. 1278437856009925445.3323223323. The reference files only hold little information: the key the original region was split at and wether it is the top or bottom reference. Of note is that these references are then used by the HalfHFileReader class (which I also omitted from the big picture above as it is only used temporarily) to read the original region data files. Only upon a compaction the original files are rewritten into separate files in the new region directory. This also removes the small reference files as well as the original data file in the original region.

And this also concludes the file dump here, the last thing you see is a compaction.dir directory in each table directory. They are used when splitting or compacting regions as noted above. They are usually empty and are used as a scratch area to stage the new data files before swapping them into place.

HFile

So we are now at a very low level of HBase’s architecture. HFile‘s (kudos to Ryan Rawson) are the actual storage files, specifically created to serve one purpose: store HBase’s data fast and efficiently. They are apparently based on Hadoop’s TFile (see HADOOP-3315) and mimic the SSTable format used in Googles BigTable architecture. The previous use of Hadoop’s MapFile‘s in HBase proved to be not good enough performance wise. So how do the files look like?

The files have a variable length, the only fixed blocks are the FileInfo and Trailer block. As the picture shows it is the Trailer that has the pointers to the other blocks and it is written at the end of persisting the data to the file, finalizing the now immutable data store. The Index blocks record the offsets of the Data and Meta blocks. Both the Data and the Meta blocks are actually optional. But you most likely you would always find data in a data store file.

How is the block size configured? It is driven solely by the HColumnDescriptor which in turn is specified at table creation time by the user or defaults to reasonable standard values. Here is an example as shown in the master web based interface:

{NAME => 'docs', FAMILIES => [{NAME => 'cache', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, {NAME => 'contents', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, ...

The default is "64KB" (or 65535 bytes). Here is what the HFile JavaDoc explains:

"Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB."

So each block with its prefixed "magic" header contains either plain or compressed data. How that looks like we will have a look at in the next section.

One thing you may notice is that the default block size for files in DFS is 64MB, which is 1024 times what the HFile default block size is. So the HBase storage files blocks do not match the Hadoop blocks. Therefore you have to think about both parameters separately and find the sweet spot in terms of performance for your particular setup.

One option in the HBase configuration you may see is hfile.min.blocksize.size. It seems to be only used during migration from earlier versions of HBase (since it had no block file format) and when directly creating HFile during bulk imports for example.

So far so good, but how can you see if a HFile is OK or what data it contains? There is an App for that!

The HFile.main() method provides the tools to dump a data file:

$ hbase org.apache.hadoop.hbase.io.hfile.HFile
usage: HFile [-f ] [-v] [-r ] [-a] [-p] [-m] [-k]
-a,--checkfamily Enable family check
-f,--file File to scan. Pass full-path; e.g.
hdfs://a:9000/hbase/.META./12/34
-k,--checkrow Enable row order check; looks for out-of-order keys
-m,--printmeta Print meta data of file
-p,--printkv Print key/value pairs
-r,--region Region to scan. Pass region name; e.g. '.META.,,1'
-v,--verbose Verbose output; emits file and meta data delimiters

Here is an example of what the output will look like (shortened here):

$ hbase org.apache.hadoop.hbase.io.hfile.HFile -v -p -m -f \
hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018

Scanning -> hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018
...
K: \x00\x04docA\x08mimetype\x00\x00\x01\x23y\x60\xE7\xB5\x04 V: text\x2Fxml
K: \x00\x04docB\x08mimetype\x00\x00\x01\x23x\x8C\x1C\x5E\x04 V: text\x2Fxml
K: \x00\x04docC\x08mimetype\x00\x00\x01\x23xz\xC08\x04 V: text\x2Fxml
K: \x00\x04docD\x08mimetype\x00\x00\x01\x23y\x1EK\x15\x04 V: text\x2Fxml
K: \x00\x04docE\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04 V: text\x2Fxml
Scanned kv count -> 1554

Block index size as per heapsize: 296
reader=hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018, \
compression=none, inMemory=false, \
firstKey=US6683275_20040127/mimetype:/1251853756871/Put, \
lastKey=US6684814_20040203/mimetype:/1251864683374/Put, \
avgKeyLen=37, avgValueLen=8, \
entries=1554, length=84447
fileinfoOffset=84055, dataIndexOffset=84277, dataIndexCount=2, metaIndexOffset=0, \
metaIndexCount=0, totalBytes=84055, entryCount=1554, version=1
Fileinfo:
MAJOR_COMPACTION_KEY = \xFF
MAX_SEQ_ID_KEY = 32041891
hfile.AVG_KEY_LEN = \x00\x00\x00\x25
hfile.AVG_VALUE_LEN = \x00\x00\x00\x08
hfile.COMPARATOR = org.apache.hadoop.hbase.KeyValue\x24KeyComparator
hfile.LASTKEY = \x00\x12US6684814_20040203\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04

The first part is the actual data stored as KeyValue pairs, explained in detail in the next section. The second part dumps the internal HFile.Reader properties as well as the Trailer block details and finally the FileInfo block values. This is a great way to check if a data file is still healthy.

KeyValue's

In essence each KeyValue in the HFile is simply a low-level byte array that allows for "zero-copy" access to the data, even with lazy or custom parsing if necessary. How are the instances arranged?


The structure starts with two fixed length numbers indicating the size of the key and the value part. With that info you can offset into the array to for example get direct access to the value, ignoring the key - if you know what you are doing. Otherwise you can get the required information from the key part. Once parsed into a KeyValue object you have getters to access the details.

Note: One thing to watch out for is the difference between KeyValue.getKey() and KeyValue.getRow(). I think for me the confusion arose from referring to "row keys" as the primary key to get a row out of HBase. That would be the latter of the two methods, i.e. KeyValue.getRow(). The former simply returns the complete byte array part representing the raw "key" as colored and labeled in the diagram.

This concludes my analysis of the HBase storage architecture. I hope it provides a starting point for your own efforts to dig into the grimy details. Have fun!

Update: Slightly updated with more links to JIRA issues. Also added Zookeeper to be more precise about the current mechanisms to look up a region.

Update 2: Added details about region references.

Update 3: Added more details about region lookup as requested.

Running Hadoop MapReduce With Cassandra NoSQL

So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/.  In short HBase is good for reads and Cassandra for writes.  Cassandra does a great job on reads too so please do not think I am shooting either down in any way.  I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both.  HBase recently got called up as a top level apache project coming up and out of Hadoop.

Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances.  Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely.   It is VERY straight forward and well thought through.  If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342

So how do you it?  Very simple, Cassandra provides an implementation of InputFormat.  Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically).  Their subclass connects your mapper to pull the data in from Cassandra.  What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.

See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example.  Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:

ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Cassandra vs HBase

We are an ad network. We need to store impressions and clicks. We were evaluating various big data (or nosql -what ever you may choose to call it) systems for our new project. We had used HBase for past 8 months in an experimental product and are satisfied with it but the hype about Cassandra was so much that we decided to give it a shot. I think for some reason Cassandra team has succeeded in marketing itself very well. You will find even non technical people (such as VCs, CEOs, Product Managers) in Santa Monica recommending Cassandra to each other.

First impression of Cassandra was a good one. Their web page looks much more professional and nicer than HBase. It’s very easy to get it up and running. The website is well documented. Literally, it took me 5 minutes to set it up and get it running.

The real challenge was to understand Cassandra data model and try to implement it for our use cases. It was very clear to us how we will do it with HBase as we have a pretty good experience with it. Even though Cassandra inherits the same data model from BigTable, there are some fundamental differences between Cassandra and HBase. I have tried to tabulate the differences between the two systems below:

Cassandra HBase
Lacks concept of a Table. All the documentation will tell you that it’s not common to have multiple keyspaces. That means you have to share a key space in a cluster. Furthermore adding a keyspace requires a cluster restart! Concept of Table exists. Each table has it’s own key space. This was a big win for us. You can add and remove table as as easily as a RDBMS.
Uses string keys. Very common to use uuids as the keys. You can use TimeUUID if you want your data to be sorted by time. Uses binary keys. It’s common to combine three different items together to form a key. This means you can search by more than one key in a give table.
Even if you use TimeUUID, as Cassandra load balances client requests, hot spotting problem won’t occur. (All the client requests going to one server in a cluster is known as a hot spot problem). If your key’s first component is time or a sequential number, then hotspotting occurs. All of the new keys will be inserted to one region until it fills up (hence by causing a hotspotting problem).
Offers sorting of columns. Does not have sorting of columns.
Concept of Supercolumn allows you to design very flexible, very complex schemas. Does not have supercolumns. But you can design a super column like structure as column names and values are binary.
Does not have any convinience method to to increment a column value. In fact the vary nature of eventual consistency makes it difficult to update/write a record and read it instantly after the update. You have to make sure that R + W > N to achive strong consitency. By design consitent. Offers a nice convinience method to increment counters. Very much suitable for data aggregation.
Map Reduce support is new. You will need a hadoop cluster to run it. Data will be tranferred from Cassandra cluster to the hadoop cluster. No suitable for running large data map reduce jobs. Map Reduce support is native. HBase is built on Hadoop. Data does not get transferred.
Comparatively simpler to maintain if you don’t have to have hadoop. Comparatively complicated as you have it has many moving pieces such as Zookeeper, Hadoop and HBase itself.
Does not have a native java api as of now. No java doc. Even though written in Java, you have to use Thrift to communicate with the cluster. Has a nice native java api. Feels much more java system than Cassandra. Being a java shop, it was important for us. HBase has a thrift interface for other laguages too.
No master server, hence no single point of failure. Although there exists a concept of a master server, HBase itself does not depend on it heavily. HBase cluster can keep serving data even if the master goes down. Hadoop namenode is a single point of failure.

After comparing the data model and features in this way, HBase was the clear winner for us. In my opinion, if consistency is what you need, HBase is a clear choice. Furthermore native map reduce, concept of a table and simpler schema that can be modified without a cluster restart are a big plus you cannot ignore. HBase is a much more mature platform. When people site Twitter, Facebook using Cassandra, they forget that the same organizations are using HBase too. In fact one of the committers of HBase was recenlty hired by Facebook which clearly shows their interest in HBase.

So in short, we will root for team HBase!

Redis vs Memcached

This month’s Linux Journal has an article about Redis.  I read about it while sitting on the shitter, because that’s about all that Linux Journal is useful for.  The article itself was crap, but the introduction to this product was at least tempting.

The basics:  take memcached and add a disk backing, replication, virtual memory, and some cool additional data structures.  Hashes, lists, sets, sorting, joining, transactions, yay!  I instantly got a geek boner scanning the feature list.  My boner quickly faded to about half mast when I saw the C clients that are available.  No consistent hashing?  Wait, no server hashing at all? Yet they have a gay Ruby client, fully featured?  C is the lowest common denominator when it comes to language support.  Start there, then add high level bindings using this low level library.  libmemcached got it right.  You start on the bottom and work your way up to higher level bindings.  It seems that Redis took the wrong approach to this which will result in every language having a different client implementation.  This leads to inconsistencies between languages, which is bad news.  That on first glance gave me that headed-for-the-toilet feeling for this project.

With a half-mast boner, I decided to do some benchmark comparisons.  Perhaps I’ll see some real numbers that might arouse me.  Stand back, I’m going to do some science!

The Test Setup

I used the same machine as client and server for both tests to avoid network adding a skew to the results.  (This should stress the algorithms themselves)  The hardware itself doesn’t matter, it’s an apples to apples comparison.  The software does however:

  • Redis 2.0.0 rc4
  • Memcached 1.4.5 (release)
  • Credis (libcredis client) 0.2.2
  • Libmemcached 0.31  (I know this is a little outdated, but for this test it works fine)
  • Memcache client benchmark app: mc_stress.c
  • Redis client benchmark app: redis_stress.c

The clients I wrote are just simple iterations of setting and getting keys and timing the results.  Since WP sucks for posting source code snippets I had to stuff these into PDF files.  Feel free to recreate my tests and post your results, since that’s what science is all about.

Results

This test varies the key size with a static value of only 3 bytes.  (Using 100k unique keys)  I’m guessing this will stress the internal hashing algorithms used for processing the key names.  As you can see here, Redis came up short by a range of 20-26% the speed of memcache.

Same test, but showing the Multi-GET performance.  Redis clearly has some kind of problem here, and I believe it might be due to requesting a large amount of keys in a single MGET operation.  100,000 keys seemed to be the upper limit… but in every case, Redis came up short.

This test stresses the different value sizes.  Using a small 10 byte key length, I used varying sizes of value lengths up to around 16KB.  Even though memcached advertises a 1MB value limitation, performance dropssignificantly over 8192 bytes.  SET speed was reduced by 100 times and depending on the test, GET speed was either significantly reduced, or reduced by 100 times as well.  I suspect that playing around with slab sizes might assist with the speed here, but that becomes quite the pain in the ass.  The main point:  If you use memcache, do not stuff large values into a single key.  Hash them out into namespaces, you’ll be much better off.

Adding to this, Redis boasts a 1GB value size.  We would need to graph another relationship of transfer rate to determine if there is an inefficiency in handling larger values, or if it’s just due to the size of the value itself.  Memcache shows a similar trend as value size increases, but again Redis is 20-40% slower than memcache.

Here is the same test, showing Multi-GET performance.  Similar trend as key length test.

Conclusion

After crunching all of these numbers and screwing around with the annoying intricacies of OpenOffice, I’m giving Redis a big thumbs down.  My initial sexual arousal from the feature list is long gone.  Granted, Redis might have its place in a large architecture, but certainly not a replacement to memcache.  When your site is hammering 20,000 keys per second and memcache latency is heavily dependent on delivery times, it makes no business sense to transparently drop in Redis.   The features are neat, and the extra data structures could be used to offload more RDBMS activity… but 20% is just too much to gamble on the heart of your architecture.

Maybe sometime in the future Redis will be up to par with Memcached performance.  Or, it could be the extra VM and disk backing is inherently flawed to begin with.  In that case, Memcached will always win, and Redis will be only useful in a niche market somewhere between DB caching and NoSQL fanboys.

Motivation

At Medallia, a key component of our system currently works with an open source relational db. Since this component mainly queries the db entries by key, we want to try to switch to a key-value storage system and take advantage of several benefits provided by such a system, including distributed replication, load balancing, and failover. One of our objectives is to re-architect this component in a way that will allow us to achieve horizontal scalability, that among other things will help us alleviate the high disk storage requirements we currently have.

Recently we took the time to look into this (and other technological improvements too, exciting times at Medallia right now!), and we reviewed several options. To make a long story short, we ended up with two finalists, Apache Cassandra and Project Voldemort.

These two projects seem to be the most mature open source options in their class, and both provide a native decentralized clustering support including partitioning, fault tolerance, and high availability. Both are based on Amazon’s Dynamo paper, but the main difference is that Voldemort follows a simple key/value model, while Cassandra uses a persistency model based onBigTable‘s column oriented model. Both provide support for read-consistency where read operations always return latest data, which was a requirement for us.

High level comparison

Project Voldemort

While not an exhaustive list, these are the most relevant pros and cons we identified when reviewing both stores:

  • Pros
    • Simpler API
    • Persistency based on Berkley DB, a mature and well-known key/value db
    • Uses Vector Clocks instead of simple timestamps. It doesn’t need the nodes (or clients) clocks to be synchronized
  • Cons
    • No built-in support for “multiple data center”-aware routing (meaning there must be 1 copy of each key in at least one data center)

Apache Cassandra

  • Pros
    • Broader range of systems in production (Facebook, Twitter, Digg, Rackspace)
    • Richer API which supports values with a dynamic column structure. The columns can evolve independently, meaning that you can update one column without reading the whole structure.
    • Optimized for writes (by design)
    • Configurable consistency level (specified on each request)
  • Cons
    • File format is still in development, changes to the internal structure are likely to happen. Due to the flexibility it provides, the file format is more complex and harder to reason with, especially in terms of performance
    • Requires Clock Synch (NTP) (for nodes and clients)
    • Reads are more disk-intensive than competitors
    • Doesn’t support client conflict resolution, so the latest update always wins

Performance Tests

To our surprise this was the only link we’ve found that compares the performance for both projects – thus we decided to write this post to share our research. We used the vpork test framework, which we modified to suit our needs by upgrading the client code to the latest versions, adding a warm-up phase, and adding rewrite capabilities. These are the results of our tests:

Setup:

  • Versions
    • Voldemort v0.80.1
    • Cassandra 0.6.0-beta3
  • Boxes: 3 similar nodes with the following spec:
    • 4GB maximum heap size
    • Replication parameters: N=3 (replicas for each entry), R=2 (nodes to wait for on each read), W=2 (nodes to block for on each write)
    • 8 processors on each server (Intel(R) Xeon(R) CPU E5504 @ 2.00GHz)
    • 1TB disk space (Seagate ST31000340NS, 7200 RPM, 32MB cache)
  • Persistence parameters
    • Voldemort (default values)
      • key-serializer: string
      • value-serializer: identity (byte array)
      • persistence engine=bdb (Berkley DB)
      • bdb.cache.size=1536MB
    • Cassandra
      • ColumnFamily definition: CompareWith=”BytesType” RowsCached=”10000″
      • ReplicationFactor=3
      • Partitioner=org.apache.cassandra.dht.RandomPartitioner
      • ConcurrentReads=16
      • ConcurrenWrites=32
  • Tests
    • Client Threads: 40
    • Initial load: 5 million records – records present before starting each test
    • WarmUp: 20K records – initial writes before measuring time for each test
    • Number of operations per test: 500K

We ran tests for 4 different write-rewrite-read configurations. A write is equivalent to a put operation with a new record (non-existing key). A rewrite is a put operation with an existing key. A read is a get operation on an existing key. These are the configurations we tested:

  • 50% Write 50% Read
  • 10% Write 40% Rewrite 50% Read
  • 50% Rewrite 50% Read
  • 90% Rewrite 10% Read

We ran all the tests for two different value sizes, 15 and 1.5 KB. Even though we evaluated different options, for our current needs, the last one with a 15 KB data entry was the most representative scenario.

The first pair of charts shows the latency, or average time it takes a read or write operation to complete in each case. Lower values are better. As expected, Cassandra write (and rewrite) times were consistently faster than Voldemort, while read times varied a bit depending on the scenario but were more or less the same in general.

test-avg-lat-1.5.png
test-avg-lat-15.png
The second pair of charts shows the maximum time in the best 99% of cases; again lower values are better:

test-99-lat-1.5.png

test-99-lat-15.png

On the front-end, we have a write-back cache which means that write operations don’t affect the user experience. On the other hand, read operations are directly related to page loads. That’s why we were concerned about the peak for Cassandra read in the last scenario for 15KB. We ran some further tests to measure the 99.9% and 99.99% percentiles and the difference was even greater: 5050 ms for Cassandra and 748 ms for Voldemort in the first case, and 9176 ms against 1129 ms in the second case. This huge difference was a key decision factor for us.

Finally, these two charts show the general throughput in terms of operations (read or write) per second. In this case higher values are better:

test-ops-1.5.png
test-ops-15.png

Notes:

  • Cassandra commit log and data folder are supposed to be placed at different disks to improve performance, we tested with both on the same disk.

Issues found while testing:

  • Voldemort client.put(K key, V value) (not the one that takes a Version object) throws ObsoleteVersionException if called with the same key from different threads. The javadoc states “Associated (sic) the given value to the key, clobbering any existing values stored for the key. “, so this was not expected.

And the winner is …

I think there is no clear winner, in general terms. The best option depends on many factors that each company has to evaluate. My preference changed a few times during the review and tests.

Having said that, we had to choose one, and we decided to go with Project Voldemort. The main reasons were the simplicity, better versioning control, persistency layer maturity, and latency predictability.

We are currently developing the new solution, and it will take some time before we can put it in production, but we wanted to share our preliminary results with everyone who is considering one of these two options, so they’ll have one more tool at the time of the decision.

We’ll keep you posted on how it goes.

Diego Erdody
Lead Software Engineer
Other useful articles comparing different key-value stores:

10 things you should know about NoSQL databases

For a quarter of a century, the relational database (RDBMS) has been the dominant model for database management. But, today, non-relational, “cloud,” or “NoSQL” databases are gaining mindshare as an alternative model for database management. In this article, we’ll look at the 10 key aspects of these non-relational NoSQL databases: the top five advantages and the top five challenges.

Note: This article is also available as a PDF download.

Five advantages of NoSQL

1: Elastic scaling

For years, database administrators have relied on scale up — buying bigger servers as database load increases — rather than scale out — distributing the database across multiple hosts as load increases. However, as transaction rates and availability requirements increase, and as databases move into the cloud or onto virtualized environments, the economic advantages of scaling out on commodity hardware become irresistible.

RDBMS might not scale out easily on commodity clusters, but the new breed of NoSQL databases are designed to expand transparently to take advantage of new nodes, and they’re usually designed with low-cost commodity hardware in mind.

2: Big data

Just as transaction rates have grown out of recognition over the last decade, the volumes of data that are being stored also have increased massively. O’Reilly has cleverly called this the “industrial revolution of data.” RDBMS capacity has been growing to match these increases, but as with transaction rates, the constraints of data volumes that can be practically managed by a single RDBMS are becoming intolerable for some enterprises. Today, the volumes of “big data” that can be handled by NoSQL systems, such as Hadoop, outstrip what can be handled by the biggest RDBMS.

3: Goodbye DBAs (see you later?)

Despite the many manageability improvements claimed by RDBMS vendors over the years, high-end RDBMS systems can be maintained only with the assistance of expensive, highly trained DBAs. DBAs are intimately involved in the design, installation, and ongoing tuning of high-end RDBMS systems.

NoSQL databases are generally designed from the ground up to require less management:  automatic repair, data distribution, and simpler data models lead to lower administration and tuning requirements — in theory. In practice, it’s likely that rumors of the DBA’s death have been slightly exaggerated. Someone will always be accountable for the performance and availability of any mission-critical data store.

4: Economics

NoSQL databases typically use clusters of cheap commodity servers to manage the exploding data and transaction volumes, while RDBMS tends to rely on expensive proprietary servers and storage systems. The result is that the cost per gigabyte or transaction/second for NoSQL can be many times less than the cost for RDBMS, allowing you to store and process more data at a much lower price point.

5: Flexible data models

Change management is a big headache for large production RDBMS. Even minor changes to the data model of an RDBMS have to be carefully managed and may necessitate downtime or reduced service levels.

NoSQL databases have far more relaxed — or even nonexistent — data model restrictions. NoSQL Key Value stores and document databases allow the application to store virtually any structure it wants in a data element. Even the more rigidly defined BigTable-based NoSQL databases (Cassandra, HBase) typically allow new columns to be created without too much fuss.

The result is that application changes and database schema changes do not have to be managed as one complicated change unit. In theory, this will allow applications to iterate faster, though,clearly, there can be undesirable side effects if the application fails to manage data integrity.

Five challenges of NoSQL

The promise of the NoSQL database has generated a lot of enthusiasm, but there are many obstacles to overcome before they can appeal to mainstream enterprises. Here are a few of the top challenges.

1: Maturity

RDBMS systems have been around for a long time. NoSQL advocates will argue that their advancing age is a sign of their obsolescence, but for most CIOs, the maturity of the RDBMS is reassuring. For the most part, RDBMS systems are stable and richly functional. In comparison, most NoSQL alternatives are in pre-production versions with many key features yet to be implemented.

Living on the technological leading edge is an exciting prospect for many developers, but enterprises should approach it with extreme caution.

2: Support

Enterprises want the reassurance that if a key system fails, they will be able to get timely and competent support. All RDBMS vendors go to great lengths to provide a high level of enterprise support.

In contrast, most NoSQL systems are open source projects, and although there are usually one or more firms offering support for each NoSQL database, these companies often are small start-ups without the global reach, support resources, or credibility of an Oracle, Microsoft, or IBM.

3: Analytics and business intelligence

NoSQL databases have evolved to meet the scaling demands of modern Web 2.0 applications. Consequently, most of their feature set is oriented toward the demands of these applications. However, data in an application has value to the business that goes beyond the insert-read-update-delete cycle of a typical Web application. Businesses mine information in corporate databases to improve their efficiency and competitiveness, and business intelligence (BI) is a key IT issue for all medium to large companies.

NoSQL databases offer few facilities for ad-hoc query and analysis. Even a simple query requires significant programming expertise, and commonly used BI tools do not provide connectivity to NoSQL.

Some relief is provided by the emergence of solutions such as HIVE or PIG, which can provide easier access to data held in Hadoop clusters and perhaps eventually, other NoSQL databases. Quest Software has developed a product — Toad for Cloud Databases — that can provide ad-hoc query capabilities to a variety of NoSQL databases.

4: Administration

The design goals for NoSQL may be to provide a zero-admin solution, but the current reality falls well short of that goal. NoSQL today requires a lot of skill to install and a lot of effort to maintain.

5: Expertise

There are literally millions of developers throughout the world, and in every business segment, who are familiar with RDBMS concepts and programming. In contrast, almost every NoSQL developer is in a learning mode. This situation will address naturally over time, but for now, it’s far easier to find experienced RDBMS programmers or administrators than a NoSQL expert.

Conclusion

NoSQL databases are becoming an increasingly important part of the database landscape, and when used appropriately, can offer real benefits. However, enterprises should proceed with caution with full awareness of the legitimate limitations and issues that are associated with these databases.


About the author

Guy Harrison is the director of research and development at Quest Software. A recognized database expert with more than 20 years of experience in application and database administration, performance tuning, and software development, Guy is the author of several books and many articles on database technologies and a regular speaker at technical conferences.

NOSQL Patterns

Over the last couple years, we see an emerging data storage mechanism for storing large scale of data. These storage solution differs quite significantly with the RDBMS model and is also known as the NOSQL. Some of the key players include …

  • GoogleBigTable, HBase, Hypertable
  • AmazonDynamo, Voldemort, Cassendra, Riak
  • Redis
  • CouchDB, MongoDB

These solutions has a number of characteristics in common

  • Key value store
  • Run on large number of commodity machines
  • Data are partitioned and replicated among these machines
  • Relax the data consistency requirement. (because the CAP theorem proves that you cannot get Consistency, Availability and Partitioning at the the same time)

The aim of this blog is to extract the underlying technologies that these solutions have in common, and get a deeper understanding on the implication to your application’s design. I am not intending to compare the features of these solutions, nor to suggest which one to use.

API model

The underlying data model can be considered as a large Hashtable (key/value store).

The basic form of API access is

  • get(key) — Extract the value given a key
  • put(key, value) — Create or Update the value given its key
  • delete(key) — Remove the key and its associated value

More advance form of API allows to execute user defined function in the server environment

  • execute(key, operation, parameters) — Invoke an operation to the value (given its key) which is a special data structure (e.g. List, Set, Map …. etc).
  • mapreduce(keyList, mapFunc, reduceFunc) — Invoke amap/reduce function across a key range.

Machines layout

The underlying infratructure is composed of large number (hundreds or thousands) of cheap, commoditized, unreliable machines connected through a network. We call each machine a physical node (PN). Each PN has the same set of software configuration but may have varying hardware capacity in terms of CPU, memory and disk storage. Within each PN, there will be a variable number of virtual node (VN) running according to the available hardware capacity of the PN.


Data partitioning (Consistent Hashing)

Since the overall hashtable is distributed across many VNs, we need a way to map each key to the corresponding VN.

One way is to use
partition = key mod (total_VNs)

The disadvantage of this scheme is when we alter the number of VNs, then the ownership of existing keys has changed dramatically, which requires full data redistribution. Most large scale store use a “consistent hashing” technique to minimize the amount of ownership changes.


In the consistent hashing scheme, the key space is finite and lie on the circumference of a ring. The virtual node id is also allocated from the same key space. For any key, its owner node is defined as the first encountered virtual node if walking clockwise from that key. If the owner node crashes, all the key it owns will be adopted by its clockwise neighbor. Therefore, key redistribution happens only within the neighbor of the crashed node, all other nodes retains the same set of keys.

Data replication

To provide high reiability from individually unreliable resource, we need to replicate the data partitions.

Replication not only improves the overall reliability of data, it also helps performance by spreading the workload across multiple replicas.


While read-only request can be dispatched to any replicas, update request is more challenging because we need to carefully co-ordinate the update which happens in these replicas.

Membership Changes

Notice that virtual nodes can join and leave the network at any time without impacting the operation of the ring.

When a new node joins the network

  1. The joining node announce its presence and its id to some well known VNs or just broadcast)
  2. All the neighbors (left and right side) will adjust the change of key ownership as well as the change of replica memberships. This is typically done synchronously.
  3. The joining node starts to bulk copy data from its neighbor in parallel asynchronously.
  4. The membership change is asynchronously propagate to the other nodes.


Notice that other nodes may not have their membership view updated yet so they may still forward the request to the old nodes. But since these old nodes (which is the neighbor of the new joined node) has been updated (in step 2), so they will forward the request to the new joined node.

On the other hand, the new joined node may still in the process of downloading the data and not ready to serve yet. We use the vector clock (described below) to determine whether the new joined node is ready to serve the request and if not, the client can contact another replica.

When an existing node leaves the network (e.g. crash)

  1. The crashed node no longer respond to gossip message so its neighbors knows about it.
  2. The neighbor will update the membership changes and copy data asynchronously


We haven’t talked about how the virtual nodes is mapped into the physical nodes. Many schemes are possible with the main goal that Virtual Node replicas should not be sitting on the same physical node. One simple scheme is to assigned Virtual node to Physical node in a random manner but check to make sure that a physical node doesn’t contain replicas of the same key ranges.

Notice that since machine crashes happen at the physical node level, which has many virtual nodes runs on it. So when a single Physical node crashes, the workload (of its multiple virtual node) is scattered across many physical machines. Therefore the increased workload due to physical node crashes is evenly balanced.

Client Consistency

Once we have multiple copies of the same data, we need to worry about how to synchronize them such that the client can has a consistent view of the data.

There is a number of client consistency models

  1. Strict Consistency (one copy serializability): This provides the semantics as if there is only one copy of data. Any update is observed instantaneously.
  2. Read your write consistency: The allows the client to see his own update immediately (and the client can switch server between requests), but not the updates made by other clients
  3. Session consistency: Provide the read-your-write consistency only when the client is issuing the request under the same session scope (which is usually bind to the same server)
  4. Monotonic Read Consistency: This provide the time monotonicity guarantee that the client will only see more updated version of the data in future requests.
  5. Eventual Consistency: This provides the weakness form of guarantee. The client can see an inconsistent view as the update are in progress. This model works when concurrent access of the same data is very unlikely, and the client need to wait for some time if he needs to see his previous update.

Depends on which consistency model to provide, 2 mechanisms need to be arranged …

  • How the client request is dispatched to a replica
  • How the replicas propagate and apply the updates

There are various models how these 2 aspects can be done, with different tradeoffs.

Master Slave (or Single Master) Model

Under this model, each data partition has a single master and multiple slaves. In above model, B is the master of keyAB and C, D are the slaves. All update requests has to go to the master where update is applied and then asynchronously propagated to the slaves. Notice that there is a time window of data lost if the master crashes before it propagate its update to any slaves, so some system will wait synchronously for the update to be propagated to at least one slave.

Read requests can go to any replicas if the client can tolerate some degree of data staleness. This is where the read workload is distributed among many replicas. If the client cannot tolerate staleness for certain data, it also need to go to the master.

Note that this model doesn’t mean there is one particular physical node that plays the role as the master. The granularity of “mastership” happens at the virtual node level. Each physical node has some virtual nodes acts as master of some partitions while other virtual nodes acts as slaves of other partitions. Therefore, the write workload is also distributed across different physical node, although this is due to partitioning rather than replicas

When a physical node crashes, the masters of certain partitions will be lost. Usually, the most updated slave will be nominated to become the new master.

Master Slave model works very well in general when the application has a high read/write ratio. It also works very well when the update happens evenly in the key range. So it is the predominant model of data replication.

There are 2 ways how the master propagate updates to the slave;State transfer and Operation transfer. In State transfer, the master passes its latest state to the slave, which then replace its current state with the latest state. In operation transfer, the master propagate a sequence of operations to the slave which then apply the operations in its local state.

The state transfer model is more robust against message lost because as long as a latter more updated message arrives, the replica still be able to advance to the latest state.

Even in state transfer mode, we don’t want to send the full object for updating other replicas because changes typically happens within a small portion of the object. In will be a waste of network bandwidth if we send the unchanged portion of the object, so we need a mechanism to detect and send just the delta (the portion that has been changed). One common approach is break the object into chunks and compute a hash tree of the object. So the replica can just compare their hash tree to figure out which chunk of the object has been changed and only send those over.

In operation transfer mode, usually much less data need to be send over the network. However, it requires a reliable message mechanism with delivery order guarantee.

Multi-Master (or No Master) Model

If there is hot spots in certain key range, and there is intensive write request, the master slave model will be unable to spread the workload evenly. Multi-master model allows updates to happen at any replica (I think call it “No-Master” is more accurate).

If any client can issue any update to any server, how do we synchronize the states such that we can retain client consistency and also eventually every replica will get to the same state ? We describe a number of different approaches in following …

Quorum Based 2PC

To provide “strict consistency”, we can use a traditional 2PC protocol to bring all replicas to the same state at every update. Lets say there is N replicas for a data. When the data is update, there is a “prepare” phase where the coordinator ask every replica to confirm whether each of them is ready to perform the update. Each of the replica will then write the data to a log file and when success, respond to the coordinator.

After gathering all replicas responses positively, the coordinator will initiate the second “commit” phase and then ask every replicas to commit and each replica then write another log entry to confirm the update. Notice that there are some scalability issue as the coordinator need to “synchronously” wait for quite a lot of back and forth network roundtrip and disk I/O to complete.

On the other hand, if any one of the replica crashes, the update will be unsuccessful. As there are more replicas, chance of having one of them increases. Therefore, replication is hurting the availability rather than helping. This make traditional 2PC not a popular choice for high throughput transactional system.

A more efficient way is to use the quorum based 2PC (e.g. PAXOS). In this model, the coordinator only need to update W replicas (rather than all N replicas) synchronously. The coordinator still write to all the N replicas but only wait for positive acknowledgment for any W of the N to confirm. This is much more efficient from a probabilistic standpoint.

However, since no all replicas are update, we need to be careful when reading the data to make sure the read can reach at least one replica that has been previously updated successful. When reading the data, we need to read R replicas and return the one with the latest timestamp.

For “strict consistency”, the important condition is to make sure the read set and the write set overlap. ie: W + R > N


As you can see, the quorum based 2PC can be considered as a general 2PC protocol where the traditional 2PC is a special case where W = N and R = 1. The general quorum-based model allow us to pick W and R according to our tradeoff decisions between read and write workload ratio.

If the user cannot afford to pick W, R large enough, ie: W + R <= N, then the client is relaxing its consistency model to a weaker one.

If the client can tolerate a more relax consistency model, we don’t need to use the 2PC commit or quorum based protocol as above. Here we describe a Gossip model where updates are propagate asynchronous via gossip message exchanges and an auto-entropy protocol to apply the update such that every replica eventually get to the latest state.

Vector Clock

Vector Clock is a timestamp mechanism such that we can reason about causal relationship between updates. First of all, each replica keeps vector clock. Lets say replica i has its clock Vi. Vi[i] is the logical clock which if every replica follows certain rules to update its vector clock.

  • Whenever an internal operation happens at replica i, it will advance its clock Vi[i]
  • Whenever replica i send a message to replica j, it will first advance its clock Vi[i] and attach its vector clock Vi to the message
  • Whenever replica j receive a message from replica i, it will first advance its clock Vj[j] and then merge its clock with the clock Vm attached in the message. ie: Vj[k] = max(Vj[k], Vm[k])


A partial order relationship can be defined such that Vi > Vj iff for all k, Vi[k] >= Vj[k]. We can use these partial ordering to derive causal relationship between updates. The reasoning behind is

  • The effect of an internal operation will be seen immediately at the same node
  • After receiving a message, the receiving node knows the situation of the sending node at the time when the message is send. The situation is not only including what is happening at the sending node, but also all the other nodes that the sending node knows about.
  • In other words, Vi[i] reflects the time of the latest internal operation happens at node i. Vi[k] = 6 reflects replica i has known the situation of replica k up to its logical clock 6.

Notice that the term “situation” is used here in an abstract sense. Depends on what information is passed in the message, the situation can be different. This will affect how the vector clock will be advanced. In below, we describe the “state transfer model” and the “operation transfer model” which has different information passed in the message and the advancement of their vector clock will also be different.

Because state is always flow from the replica to the client but not the other way round, the client doesn’t have an entry in the Vector clock. The vector clock contains only one entry for each replica. However, the client will also keep a vector clock from the last replica it contacts. This is important for support the client consistency model we describe above. For example, to support monotonic read, the replica will make sure the vector clock attached to the data is > the client’s submitted vector clock in the request.

Gossip (State Transfer Model)

In a state transfer model, each replica maintain a vector clock as well as a state version tree where each state is neither > or < among each other (based on vector clock comparison). In other words, the state version tree contains all the conflicting updates.

At query time, the client will attach its vector clock and the replica will send back a subset of the state tree which precedes the client’s vector clock (this will provide monotonic read consistency). The client will then advance its vector clock by merging all the versions. This means the client is responsible to resolve the conflict of all these versions because when the client sends the update later, its vector clock will precede all these versions.


At update, the client will send its vector clock and the replica will check whether the client state precedes any of its existing version, if so, it will throw away the client’s update.


Replicas also gossip among each other in the background and try to merge their version tree together.

Gossip (Operation Transfer Model)

In an operation transfer approach, the sequence of applying the operations is very important. At the minimum causal order need to be maintained. Because of the ordering issue, each replica has to defer executing the operation until all the preceding operations has been executed. Therefore replicas save the operation request to a log file and exchange the log among each other and consolidate these operation logs to figure out the right sequence to apply the operations to their local store in an appropriate order.

“Causal order” means every replica will apply changes to the “causes” before apply changes to the “effect”. “Total order” requires that every replica applies the operation in the same sequence.

In this model, each replica keeps a list of vector clock, Vi is the vector clock the replica itself and Vj is the vector clock when replica i receive replica j’s gossip message. There is also a V-state that represent the vector clock of the last updated state.

When a query is submitted by the client, it will also send along its vector clock which reflect the client’s view of the world. The replica will check if it has a view of the state that is later than the client’s view.


When an update operation is received, the replica will buffer the update operation until it can be applied to the local state. Every submitted operation will be tag with 2 timestamp, V-client indicates the client’s view when he is making the update request. V-@receive is the replica’s view when it receives the submission.

This update operation request will be sitting in the queue until the replica has received all the other updates that this one depends on. This condition is reflected in the vector clock Vi when it is larger than V-client


On the background, different replicas exchange their log for the queued updates and update each other’s vector clock. After the log exchange, each replica will check whether certain operation can be applied (when all the dependent operation has been received) and apply them accordingly. Notice that it is possible that multiple operations are ready for applying at the same time, the replica will sort these operation in causal order (by using the Vector clock comparison) and apply them in the right order.


The concurrent update problem at different replica can also happen. Which means there can be multiple valid sequences of operation. In order for different replica to apply concurrent update in the same order, we need a total ordering mechanism.

One approach is whoever do the update first acquire a monotonic sequence number and late comers follow the sequence. On the other hand, if the operation itself is commutative, then the order to apply the operations doesn’t matter

After applying the update, the update operation cannot be immediately removed from the queue because the update may not be fully exchange to every replica yet. We continuously check the Vector clock of each replicas after log exchange and after we confirm than everyone has receive this update, then we’ll remove it from the queue.

Map Reduce Execution

Notice that the distributed store architecture fits well into distributed processing as well. For example, to process a Map/Reduce operationover an input key list.

The system will push the map and reduce function to all the nodes (ie: moving the processing logic towards the data). The map function of the input keys will be distributed among the replicas of owning those input, and then forward the map output to the reduce function, where the aggregation logic will be executed.


Handling Deletes

In a multi-master replication system, we use Vector clock timestamp to determine causal order, we need to handle “delete” very carefully such that we don’t lost the associated timestamp information of the deleted object, otherwise we cannot even reason the order of when to apply the delete.

Therefore, we typically handle delete as a special update by marking the object as “deleted” but still keep its metadata / timestamp information around. Around a long enough time that we are confident that every replica has marked this object deleted, then we garbage collected the deleted object to reclaim its space.

Storage Implementaton

One strategy is to use make the storage implementation pluggable. e.g. A local MySQL DB, Berkeley DB, Filesystem or even a in memory Hashtable can be used as a storage mechanism.

Another strategy is to implement the storage in a highly scalable way. Here are some techniques that I learn from CouchDB and Google BigTable.

CouchDB has a MVCC model that uses a copy-on-modified approach. Any update will cause a private copy being made which in turn cause the index also need to be modified and causing the a private copy of the index as well, all the way up to the root pointer.

Notice that the update happens in an append-only mode where the modified data is appended to the file and the old data becomes garbage. Periodic garbage collection is done to compact the data. Here is how the model is implemented in memory and disks

In Google BigTable model, the data is broken down into multiple generations and the memory is use to hold the newest generation. Any query will search the mem data as well as all the data sets on disks and merge all the return results. Fast detection of whether a generation contains a key can be done by checking a bloom filter.

When update happens, both the mem data and the commit log will be written so that if the machine crashes before the mem data flush to disk, it can be recovered from the commit log.

Powered by WordPress | Theme: by 85ideas. Editor by Khoanguyen