Tag Archive: HBase


HBase Architecture 101 – Storage

One of the more hidden aspects of HBase is how data is actually stored. While the majority of users may never have to bother about it you may have to get up to speed when you want to learn what the various advanced configuration options you have at your disposal mean. “How can I tune HBase to my needs?”, and other similar questions are certainly interesting once you get over the (at times steep) learning curve of setting up a basic system. Another reason wanting to know more is if for whatever reason disaster strikes and you have to recover a HBase installation.

In my own efforts getting to know the respective classes that handle the various files I started to sketch a picture in my head illustrating the storage architecture of HBase. But while the ingenious and blessed committers of HBase easily navigate back and forth through that maze I find it much more difficult to keep a coherent image. So I decided to put that sketch to paper. Here it is.

Please note that this is not a UML or call graph but a merged picture of classes and the files they handle and by no means complete though focuses on the topic of this post. I will discuss the details below and also look at the configuration options and how they affect the low-level storage files.

The Big Picture

So what does my sketch of the HBase innards really say? You can see that HBase handles basically two kinds of file types. One is used for the write-ahead log and the other for the actual data storage. The files are primarily handled by the HRegionServer‘s. But in certain scenarios even the HMaster will have to perform low-level file operations. You may also notice that the actual files are in fact divided up into smaller blocks when stored within the Hadoop Distributed Filesystem (HDFS). This is also one of the areas where you can configure the system to handle larger or smaller data better. More on that later.

The general flow is that a new client contacts the Zookeeper quorum (a separate cluster of Zookeeper nodes) first to find a particular row key. It does so by retrieving the server name (i.e. host name) that hosts the -ROOT- region from Zookeeper. With that information it can query that server to get the server that hosts the .META. table. Both of these two details are cached and only looked up once. Lastly it can query the .META. server and retrieve the server that has the row the client is looking for.

Once it has been told where the row resides, i.e. in what region, it caches this information as well and contacts the HRegionServer hosting that region directly. So over time the client has a pretty complete picture of where to get rows from without needing to query the .META. server again.

Note: The HMaster is responsible to assign the regions to each HRegionServer when you start HBase. This also includes the “special” -ROOT- and .META. tables.

Next the HRegionServer opens the region it creates a corresponding HRegion object. When the HRegion is “opened” it sets up a Store instance for each HColumnFamily for every table as defined by the user beforehand. Each of the Store instances can in turn have one or more StoreFile instances, which are lightweight wrappers around the actual storage file called HFile. A HRegion also has a MemStore and a HLog instance. We will now have a look at how they work together but also where there are exceptions to the rule.

Stay Put

So how is data written to the actual storage? The client issues a HTable.put(Put) request to the HRegionServer which hands the details to the matching HRegion instance. The first step is now to decide if the data should be first written to the “Write-Ahead-Log” (WAL) represented by the HLog class. The decision is based on the flag set by the client using Put.writeToWAL(boolean) method. The WAL is a standard Hadoop SequenceFile (although it is currently discussed if that should not be changed to a more HBase suitable file format) and it stores HLogKey‘s. These keys contain a sequential number as well as the actual data and are used to replay not yet persisted data after a server crash.

Once the data is written (or not) to the WAL it is placed in the MemStore. At the same time it is checked if the MemStore is full and in that case a flush to disk is requested. When the request is served by a separate thread in the HRegionServer it writes the data to an HFile located in the HDFS. It also saves the last written sequence number so the system knows what was persisted so far. Let”s have a look at the files now.

Files

HBase has a configurable root directory in the HDFS but the default is /hbase. You can simply use the DFS tool of the Hadoop command line tool to look at the various files HBase stores.

$ hadoop dfs -lsr /hbase/docs

drwxr-xr-x – hadoop supergroup 0 2009-09-28 14:22 /hbase/.logs
drwxr-xr-x – hadoop supergroup 0 2009-10-15 14:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891
-rw-r–r– 3 hadoop supergroup 14980 2009-10-14 01:32 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255509179458
-rw-r–r– 3 hadoop supergroup 1773 2009-10-14 02:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255512781014
-rw-r–r– 3 hadoop supergroup 37902 2009-10-14 03:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255516382506

-rw-r–r– 3 hadoop supergroup 137648437 2009-09-28 14:20 /hbase/docs/1905740638/oldlogfile.log

drwxr-xr-x – hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123
-rw-r–r– 3 hadoop supergroup 2323 2009-09-01 23:16 /hbase/docs/999041123/.regioninfo
drwxr-xr-x – hadoop supergroup 0 2009-10-13 01:36 /hbase/docs/999041123/cache
-rw-r–r– 3 hadoop supergroup 91540404 2009-10-13 01:36 /hbase/docs/999041123/cache/5151973105100598304
drwxr-xr-x – hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123/contents
-rw-r–r– 3 hadoop supergroup 333470401 2009-09-27 18:02 /hbase/docs/999041123/contents/4397485149704042145
drwxr-xr-x – hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/language
-rw-r–r– 3 hadoop supergroup 39499 2009-09-04 01:16 /hbase/docs/999041123/language/8466543386566168248
drwxr-xr-x – hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/mimetype
-rw-r–r– 3 hadoop supergroup 134729 2009-09-04 01:16 /hbase/docs/999041123/mimetype/786163868456226374
drwxr-xr-x – hadoop supergroup 0 2009-10-08 22:45 /hbase/docs/999882558
-rw-r–r– 3 hadoop supergroup 2867 2009-10-08 22:45 /hbase/docs/999882558/.regioninfo
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:01 /hbase/docs/999882558/cache
-rw-r–r– 3 hadoop supergroup 45473255 2009-10-09 23:01 /hbase/docs/999882558/cache/974303626218211126
drwxr-xr-x – hadoop supergroup 0 2009-10-12 00:37 /hbase/docs/999882558/contents
-rw-r–r– 3 hadoop supergroup 467410053 2009-10-12 00:36 /hbase/docs/999882558/contents/2507607731379043001
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/language
-rw-r–r– 3 hadoop supergroup 541 2009-10-09 23:02 /hbase/docs/999882558/language/5662037059920609304
drwxr-xr-x – hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/mimetype
-rw-r–r– 3 hadoop supergroup 84447 2009-10-09 23:02 /hbase/docs/999882558/mimetype/2642281535820134018
drwxr-xr-x – hadoop supergroup 0 2009-10-14 10:58 /hbase/docs/compaction.dir

The first set of files are the log files handled by the HLog instances and which are created in a directory called .logs underneath the HBase root directory. Then there is another subdirectory for each HRegionServer and then a log for each HRegion.

Next there is a file called oldlogfile.log which you may not even see on your cluster. They are created by one of the exceptions I mentioned earlier as far as file access is concerned. They are a result of so called “log splits”. When the HMaster starts and finds that there is a log file that is not handled by a HRegionServer anymore it splits the log copying the HLogKey‘s to the new regions they should be in. It places them directly in the region’s directory in a file named oldlogfile.log. Now when the respective HRegion is instantiated it reads these files and inserts the contained data into its local MemStore and starts a flush to persist the data right away and delete the file.

Note: Sometimes you may see left-over oldlogfile.log.old (yes, there is another .old at the end) which are caused by the HMaster trying repeatedly to split the log and found there was already another split log in place. At that point you have to consult with the HRegionServer or HMaster logs to see what is going on and if you can remove those files. I found at times that they were empty and therefore could safely be removed.

The next set of files are the actual regions. Each region name is encoded using a Jenkins Hash function and a directory created for it. The reason to hash the region name is because it may contain characters that cannot be used in a path name in DFS. The Jenkins Hash always returns legal characters, as simple as that. So you get the following path structure:

/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>

In the root of the region directory there is also a .regioninfo holding meta data about the region. This will be used in the future by an HBase fsck utility (see HBASE-7) to be able to rebuild a broken .META. table. For a first usage of the region info can be seen in HBASE-1867.

In each column-family directory you can see the actual data files, which I explain in the following section in detail.

Something that I have not shown above are split regions with their initial daughter reference files. When a data file within a region grows larger than the configured hbase.hregion.max.filesize then the region is split in two. This is done initially very quickly because the system simply creates two reference files in the new regions now supposed to host each half. The name of the reference file is an ID with the hashed name of the referenced region as a postfix, e.g. 1278437856009925445.3323223323. The reference files only hold little information: the key the original region was split at and wether it is the top or bottom reference. Of note is that these references are then used by the HalfHFileReader class (which I also omitted from the big picture above as it is only used temporarily) to read the original region data files. Only upon a compaction the original files are rewritten into separate files in the new region directory. This also removes the small reference files as well as the original data file in the original region.

And this also concludes the file dump here, the last thing you see is a compaction.dir directory in each table directory. They are used when splitting or compacting regions as noted above. They are usually empty and are used as a scratch area to stage the new data files before swapping them into place.

HFile

So we are now at a very low level of HBase’s architecture. HFile‘s (kudos to Ryan Rawson) are the actual storage files, specifically created to serve one purpose: store HBase’s data fast and efficiently. They are apparently based on Hadoop’s TFile (see HADOOP-3315) and mimic the SSTable format used in Googles BigTable architecture. The previous use of Hadoop’s MapFile‘s in HBase proved to be not good enough performance wise. So how do the files look like?

The files have a variable length, the only fixed blocks are the FileInfo and Trailer block. As the picture shows it is the Trailer that has the pointers to the other blocks and it is written at the end of persisting the data to the file, finalizing the now immutable data store. The Index blocks record the offsets of the Data and Meta blocks. Both the Data and the Meta blocks are actually optional. But you most likely you would always find data in a data store file.

How is the block size configured? It is driven solely by the HColumnDescriptor which in turn is specified at table creation time by the user or defaults to reasonable standard values. Here is an example as shown in the master web based interface:

{NAME => 'docs', FAMILIES => [{NAME => 'cache', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, {NAME => 'contents', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, ...

The default is "64KB" (or 65535 bytes). Here is what the HFile JavaDoc explains:

"Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB."

So each block with its prefixed "magic" header contains either plain or compressed data. How that looks like we will have a look at in the next section.

One thing you may notice is that the default block size for files in DFS is 64MB, which is 1024 times what the HFile default block size is. So the HBase storage files blocks do not match the Hadoop blocks. Therefore you have to think about both parameters separately and find the sweet spot in terms of performance for your particular setup.

One option in the HBase configuration you may see is hfile.min.blocksize.size. It seems to be only used during migration from earlier versions of HBase (since it had no block file format) and when directly creating HFile during bulk imports for example.

So far so good, but how can you see if a HFile is OK or what data it contains? There is an App for that!

The HFile.main() method provides the tools to dump a data file:

$ hbase org.apache.hadoop.hbase.io.hfile.HFile
usage: HFile [-f ] [-v] [-r ] [-a] [-p] [-m] [-k]
-a,--checkfamily Enable family check
-f,--file File to scan. Pass full-path; e.g.
hdfs://a:9000/hbase/.META./12/34
-k,--checkrow Enable row order check; looks for out-of-order keys
-m,--printmeta Print meta data of file
-p,--printkv Print key/value pairs
-r,--region Region to scan. Pass region name; e.g. '.META.,,1'
-v,--verbose Verbose output; emits file and meta data delimiters

Here is an example of what the output will look like (shortened here):

$ hbase org.apache.hadoop.hbase.io.hfile.HFile -v -p -m -f \
hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018

Scanning -> hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018
...
K: \x00\x04docA\x08mimetype\x00\x00\x01\x23y\x60\xE7\xB5\x04 V: text\x2Fxml
K: \x00\x04docB\x08mimetype\x00\x00\x01\x23x\x8C\x1C\x5E\x04 V: text\x2Fxml
K: \x00\x04docC\x08mimetype\x00\x00\x01\x23xz\xC08\x04 V: text\x2Fxml
K: \x00\x04docD\x08mimetype\x00\x00\x01\x23y\x1EK\x15\x04 V: text\x2Fxml
K: \x00\x04docE\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04 V: text\x2Fxml
Scanned kv count -> 1554

Block index size as per heapsize: 296
reader=hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018, \
compression=none, inMemory=false, \
firstKey=US6683275_20040127/mimetype:/1251853756871/Put, \
lastKey=US6684814_20040203/mimetype:/1251864683374/Put, \
avgKeyLen=37, avgValueLen=8, \
entries=1554, length=84447
fileinfoOffset=84055, dataIndexOffset=84277, dataIndexCount=2, metaIndexOffset=0, \
metaIndexCount=0, totalBytes=84055, entryCount=1554, version=1
Fileinfo:
MAJOR_COMPACTION_KEY = \xFF
MAX_SEQ_ID_KEY = 32041891
hfile.AVG_KEY_LEN = \x00\x00\x00\x25
hfile.AVG_VALUE_LEN = \x00\x00\x00\x08
hfile.COMPARATOR = org.apache.hadoop.hbase.KeyValue\x24KeyComparator
hfile.LASTKEY = \x00\x12US6684814_20040203\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04

The first part is the actual data stored as KeyValue pairs, explained in detail in the next section. The second part dumps the internal HFile.Reader properties as well as the Trailer block details and finally the FileInfo block values. This is a great way to check if a data file is still healthy.

KeyValue's

In essence each KeyValue in the HFile is simply a low-level byte array that allows for "zero-copy" access to the data, even with lazy or custom parsing if necessary. How are the instances arranged?


The structure starts with two fixed length numbers indicating the size of the key and the value part. With that info you can offset into the array to for example get direct access to the value, ignoring the key - if you know what you are doing. Otherwise you can get the required information from the key part. Once parsed into a KeyValue object you have getters to access the details.

Note: One thing to watch out for is the difference between KeyValue.getKey() and KeyValue.getRow(). I think for me the confusion arose from referring to "row keys" as the primary key to get a row out of HBase. That would be the latter of the two methods, i.e. KeyValue.getRow(). The former simply returns the complete byte array part representing the raw "key" as colored and labeled in the diagram.

This concludes my analysis of the HBase storage architecture. I hope it provides a starting point for your own efforts to dig into the grimy details. Have fun!

Update: Slightly updated with more links to JIRA issues. Also added Zookeeper to be more precise about the current mechanisms to look up a region.

Update 2: Added details about region references.

Update 3: Added more details about region lookup as requested.

Running Hadoop MapReduce With Cassandra NoSQL

So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/.  In short HBase is good for reads and Cassandra for writes.  Cassandra does a great job on reads too so please do not think I am shooting either down in any way.  I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both.  HBase recently got called up as a top level apache project coming up and out of Hadoop.

Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances.  Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely.   It is VERY straight forward and well thought through.  If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342

So how do you it?  Very simple, Cassandra provides an implementation of InputFormat.  Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically).  Their subclass connects your mapper to pull the data in from Cassandra.  What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.

See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example.  Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:

ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Cassandra vs HBase

We are an ad network. We need to store impressions and clicks. We were evaluating various big data (or nosql -what ever you may choose to call it) systems for our new project. We had used HBase for past 8 months in an experimental product and are satisfied with it but the hype about Cassandra was so much that we decided to give it a shot. I think for some reason Cassandra team has succeeded in marketing itself very well. You will find even non technical people (such as VCs, CEOs, Product Managers) in Santa Monica recommending Cassandra to each other.

First impression of Cassandra was a good one. Their web page looks much more professional and nicer than HBase. It’s very easy to get it up and running. The website is well documented. Literally, it took me 5 minutes to set it up and get it running.

The real challenge was to understand Cassandra data model and try to implement it for our use cases. It was very clear to us how we will do it with HBase as we have a pretty good experience with it. Even though Cassandra inherits the same data model from BigTable, there are some fundamental differences between Cassandra and HBase. I have tried to tabulate the differences between the two systems below:

Cassandra HBase
Lacks concept of a Table. All the documentation will tell you that it’s not common to have multiple keyspaces. That means you have to share a key space in a cluster. Furthermore adding a keyspace requires a cluster restart! Concept of Table exists. Each table has it’s own key space. This was a big win for us. You can add and remove table as as easily as a RDBMS.
Uses string keys. Very common to use uuids as the keys. You can use TimeUUID if you want your data to be sorted by time. Uses binary keys. It’s common to combine three different items together to form a key. This means you can search by more than one key in a give table.
Even if you use TimeUUID, as Cassandra load balances client requests, hot spotting problem won’t occur. (All the client requests going to one server in a cluster is known as a hot spot problem). If your key’s first component is time or a sequential number, then hotspotting occurs. All of the new keys will be inserted to one region until it fills up (hence by causing a hotspotting problem).
Offers sorting of columns. Does not have sorting of columns.
Concept of Supercolumn allows you to design very flexible, very complex schemas. Does not have supercolumns. But you can design a super column like structure as column names and values are binary.
Does not have any convinience method to to increment a column value. In fact the vary nature of eventual consistency makes it difficult to update/write a record and read it instantly after the update. You have to make sure that R + W > N to achive strong consitency. By design consitent. Offers a nice convinience method to increment counters. Very much suitable for data aggregation.
Map Reduce support is new. You will need a hadoop cluster to run it. Data will be tranferred from Cassandra cluster to the hadoop cluster. No suitable for running large data map reduce jobs. Map Reduce support is native. HBase is built on Hadoop. Data does not get transferred.
Comparatively simpler to maintain if you don’t have to have hadoop. Comparatively complicated as you have it has many moving pieces such as Zookeeper, Hadoop and HBase itself.
Does not have a native java api as of now. No java doc. Even though written in Java, you have to use Thrift to communicate with the cluster. Has a nice native java api. Feels much more java system than Cassandra. Being a java shop, it was important for us. HBase has a thrift interface for other laguages too.
No master server, hence no single point of failure. Although there exists a concept of a master server, HBase itself does not depend on it heavily. HBase cluster can keep serving data even if the master goes down. Hadoop namenode is a single point of failure.

After comparing the data model and features in this way, HBase was the clear winner for us. In my opinion, if consistency is what you need, HBase is a clear choice. Furthermore native map reduce, concept of a table and simpler schema that can be modified without a cluster restart are a big plus you cannot ignore. HBase is a much more mature platform. When people site Twitter, Facebook using Cassandra, they forget that the same organizations are using HBase too. In fact one of the committers of HBase was recenlty hired by Facebook which clearly shows their interest in HBase.

So in short, we will root for team HBase!

BigTable Model with Cassandra and HBase

Recently in a number of “scalability discussion meeting”, I’ve seen the following pattern coming up repeatedly …

  1. To make your app scalable, you try to make your app layer “stateless”.
  2. OK, so you move the “state” out from your application layer out to a shared DB, or shared data layer.
  3. Now, how do we make the data tier scalable, by definition, we cannot make the data tier stateless.
  4. OK, now lets think about how to “partition” your data and spread them across multiple machines in such a way that workload is balanced.
  5. Now there are more boxes and what if some of them crashes.
  6. OK, we should replicate the data across machines.
  7. Now, how do we keep these data in sync …
  8. And then Cloud computing gets into the picture (as it always does). Now we are not just having a pool of machines but also the pool size can grow and shrink according to workload fluctuation (you don’t want to pay for something idle, right ?).
  9. Now we need to figure out as we add more machines into the pool or remove machine from the pool, how we should “redistribute” the data.

This is an area where NOSQL shines. In the last 18 months, NOSQL has become one of the hottest topic in the software industry. It has been introduced as a solution to large scale data storage problem at the range of Terabytes or Petabytes. Dozens of NOSQL products has come to the market, but two leaders HBase and Cassandra seems to stand out from the rest in terms of their adoption.

Given an increasing demand of explaining these 2 products recently, I decide to write a post on this.

Not to repeat the basic theory of NOSQL here, for a foundation ofdistributed system theory underlying the NOSQL design, please refer to my earlier blog

Both Hbase and Cassandra are based on Google BigTable model, here lets introduce some key characteristic underlying Bigtable first.

Fundamentally Distributed
BigTable is built from the ground up on a “highly distributed”, “share nothing” architecture. Data is supposed to store in large number of unreliable, commodity server boxes by “partitioning” and “replication”. Data partitioning means the data are partitioned by its key and stored in different servers. Replication means the same data element is replicated multiple times at different servers.


Column Oriented
Unlike traditional RDBMS implementation where each “row” is stored contiguous on disk, BigTable, on the other hand, store each column contiguously on disk. The underlying assumption is that in most cases not all columns are needed for data access, column oriented layout allows more records sitting in a disk block and hence can reduce the disk I/O.

Column oriented layout is also very effective to store very sparse data (many cells have NULL value) as well as multi-value cell. The following diagram illustrate the difference between a Row-oriented layout and a Column-oriented layout


Variable number of Columns
In RDBMS, each row must have a fixed set of columns defined by the table schema, and therefore it is not easy to support columns with multi-value attributes. The BigTable model introduces the “Column Family” concept such that a row has a fixed number of “column family” but within the “column family”, a row can have a variable number of columns that can be different in each row.


In the Bigtable model, the basic data storage unit is a cell, (addressed by a particular row and column). Bigtable allow multiple timestamp version of data within a cell. In other words, user can address a data element by the rowid, column name and the timestamp. At the configuration level, Bigtable allows the user to specify how many versions can be stored within each cell either by count (how many) or by freshness (how old).

At the physical level, BigTable store each column family contiguously on disk (imagine one file per column family), and physically sort the order of data by rowid, column name and timestamp. After that, the sorted data will be compressed so that a disk block size can store more data. On the other hand, since data within a column family usually has a similar pattern, data compression can be very effective.

Sequential write
BigTable model is highly optimized for write operation (insert/update/delete) with sequential write (no disk seek is needed). Basically, write happens by first appending a transaction entry to a log file (hence the disk write I/O is sequential with no disk seek), followed by writing the data into an in-memory Memtable . In case of the machine crashes and all in-memory state is lost, the recovery step will bring the Memtable up to date by replaying the updates in the log file.

All the latest update therefore will be stored at the Memtable, which will grow until reaching a size threshold, then it will flushed the Memtable to the disk as an SSTable (sorted by the String key). Over a period of time there will be multiple SSTables on the disk that store the data.

Merged read
Whenever a read request is received, the system will first lookup the Memtable by its row key to see if it contains the data. If not, it will look at the on-disk SSTable to see if the row-key is there. We call this the “merged read” as the system need to look at multiple places for the data. To speed up the detection, SSTable has a companion Bloom filter such that it can rapidly detect the absence of the row-key. In other words, only when the bloom filter returns positive will the system be doing a detail lookup within the SSTable.

Periodic Data Compaction
As you can imagine, it can be quite inefficient for the read operation when there are too many SSTables scattering around. Therefore, the system periodically merge the SSTable. Notice that since each of the SSTable is individually sorted by key, a simple “merge sort” is sufficient to merge multiple SSTable into one. The merge mechanism is based on a logarithm property where two SSTable of the same size will be merge into a single SSTable will doubling the size. Therefore the number of SSTable is proportion to O(logN) where N is the number of rows.

After looking at the common part, lets look at their difference of Hbase and Cassandra.

HBase
Based on the BigTable, HBase uses the Hadoop Filesystem (HDFS) as its data storage engine. The advantage of this approach is then HBase doesn’t need to worry about data replication, data consistency and resiliency because HDFS has handled it already. Of course, the downside is that it is also constrained by the characteristics of HDFS, which is not optimized for random read access. In addition, there will be an extra network latency between the DB server to the File server (which is the data node of Hadoop).


In the HBase architecture, data is stored in a farm of Region Servers. The “key-to-server” mapping is needed to locate the corresponding server and this mapping is stored as a “Table” similar to other user data table.

Before a client do any DB operation, it needs to first locate the corresponding region server.

  1. The client contacts a predefined Master server who replies the endpoint of a region server that holds a “Root Region” table.
  2. The client contacts the region server who replies the endpoint of a second region server who holds a “Meta Region” table, which contains a mapping from “user table” to “region server”.
  3. The client contacts this second region server, passing along the user table name. This second region server will lookup its meta region and reply an endpoint of a third region server who holds a “User Region”, which contains a mapping from “key range” to “region server”
  4. The client contacts this third region server, passing along the row key that it wants to lookup. This third region server will lookup its user region and reply the endpoint of a fourth region server who holds the data that the client is looking for.
  5. Client will cache the result along this process so subsequent request doesn’t need to go through this multi-step process again to resolve the corresponding endpoint.

In Hbase, the in-memory data storage (what we refer to as “Memtable” in above paragraph) is implemented in Memcache. The on-disk data storage (what we refer to as “SSTable” in above paragraph) is implemented as a HDFS file residing in Hadoop data node server. The Log file is also stored as an HDFS file. (I feel storing a transaction log file remotely will hurt performance)

Also in the HBase architecture, there is a special machine playing the “role of master” who monitors and coordinates the activities of all region servers (the heavy-duty worker node). To the best of my knowledge, the master node is the single point of failure at this moment.

For a more detail architecture description, Lars George has a very good explanation in the log file implementation as well as the data storage architecture of Hbase.

Cassandra
Also based on the BigTable model, Cassandra use the DHT (distributed hash table) model to partition its data, based on the paper described in the Amazon Dynamo model.

Consistent Hashing via O(1) DHT
Each machine (node) is associated with a particular id that is distributed in a keyspace (e.g. 128 bit). All the data element is also associated with a key (in the same key space). The server owns all the data whose key lies between its id and the preceding server’s id.

Data is also replicated across multiple servers. Cassandra offers multiple replication schema including storing the replicas in neighbor servers (whose id succeed the server owning the data), or a rack-aware strategy by storing the replicas in a physical location. The simple partition strategy is as follows …


Tunable Consistency Level
Unlike Hbase, Cassandra allows you to choose the consistency level that is suitable to your application, so you can gain more scalability if willing to tradeoff some data consistency.

For example, it allows you to choose how many ACK to receive from different replicas before considering a WRITE to be successful. Similarly, you can choose how many replica’s response to be received in the case of READ before return the result to the client.

By choosing the appropriate number for W and R response, you can choose the level of consistency you like. For example, to achieve Strict Consistency, we just need to pick W, R such that W + R > N. This including the possibility of (W = one and R = all), (R = one and W = all), (W = quorum and R = quorum). Of course, if you don’t need strict consistency, you can even choose a smaller value for W and R and gain a bigger availability. Regardless of what consistency level you choose, the data will be eventual consistent by the “hinted handoff”, “read repair” and “anti-entropy sync” mechanism described below.

Hinted Handoff
The client performs a write by send the request to any Cassandra node which will act as the proxy to the client. This proxy node will located N corresponding nodes that holds the data replicas and forward the write request to all of them. In case any node is failed, it will pick a random node as a handoff node and write the request with a hint telling it to forward the write request back to the failed node after it recovers. The handoff node will then periodically check for the recovery of the failed node and forward the write to it. Therefore, the original node will eventually receive all the write request.

Conflict Resolution
Since write can reach different replica, the corresponding timestamp of the data is used to resolve conflict, in other words, the latest timestamp wins and push the earlier timestamps into an earlier version (they are not lost)

Read Repair
When the client performs a “read”, the proxy node will issue N reads but only wait for R copies of responses and return the one with the latest version. In case some nodes respond with an older version, the proxy node will send the latest version to them asynchronously, hence these left-behind node will still eventually catch up with the latest version.

Anti-Entropy data sync
To ensure the data is still in sync even there is no READ and WRITE occurs to the data, replica nodes periodically gossip with each other to figure out if anyone out of sync. For each key range of data, each member in the replica group compute a Merkel tree (a hash encoding tree where the difference can be located quickly) and send it to other neighbors. By comparing the received Merkel tree with its own tree, each member can quickly determine which data portion is out of sync. If so, it will send the diff to the left-behind members.

Anti-entropy is the “catch-all” way to guarantee eventual consistency, but is also pretty expensive and therefore is not done frequently. By combining the data sync with read repair and hinted handoff, we can keep the replicas pretty up-to-date.

BigTable trade offs
To retain the scalability features of BigTable, some of the basic features of what RDBMS has provided is missing in the BigTable model. Here we highlight the rough edges of Bigtable.

1) Primitive transaction support
Transaction protection is only guaranteed within a single row. In other words, you cannot start a atomic transaction to modify multiple rows.

2) Primitive isolation support
While you are reading a row, other people may have modified the same row and update it before you. Your view is not current anymore but your later update can easily wipe off other people’s change.

There are many techniques how concurrent update can be isolated, including pessimistic approach like locking or optimistic approach by using vector clock to be the version stamp. But to the best of my understanding, there is no robust test-and-set operation in the BigTable model (this is some getLock mechanism in Hbase which I haven’t looked into), my impression is that there is no easy way to check there is no concurrent update happen in between.

Because of this limitation, I think BigTable model is more suitable for those applications where concurrent update to the same row is very rare, or some inconsistency is tolerable at the application level. Fortunately, there are still a lot of applications falling into this bucket.

3) No indexes
Notice that data within BigTable are all physically sorted; by rowid, column name and timestamp. There is no index from the column value to its containing rowid.

This model is quite different from RDBMS where you typically define a table and worry about defining the index later. There is no such “index” concept in BigTable and you need to carefully plan out the physical sorting order of your data layout.

Lacking index turns out to be quite inconvenient and many people using Bigtable ends up building their own index at the application level. This usually results in having a highly denormalized data model with lots of column family who store links to other tables. Any update to the base data need to carefully update these other column family as well. From a performance angle, this is actually better than maintaining index in RDBMS because Bigtable is optimized from writes. However, since it is now the application logic to maintain the index, this can be a source of application bugs.

4) No referential integrity enforcement
As mentioned above, since you are building artificial index at the application level, you need to maintain the integrity of your index as well. This includes update your index when the base data is inserted, modified or deleted. This kind of handling logic is traditionally residing at the RDBMS level, but since BigTable has no such referential integrity concept, this responsibility is now landed on your application logic.

5) Lack of surrounding tools
As NOSQL or BigTable is very new, the tools surrounding it is definitely not comparable to the RDBMS world at this moment, such tools includes report generation, BI, data warehouse … etc.

I observe the general trend that most NOSQL products are moving towards the direction to provide an ODBC / JDBC interface to integrate with existing tool markets easier. But at this moment, to the best of my understanding, such interface is not wide spread yet.

Design Patterns for Bigtable model
Due to the very different model of Bigtable, the data model design methodology is also quite different from traditional RDBMS schema design. Here is a sequence of steps that are pretty common …

1) Identify all your query scenarios
Since there is no index concept, you have to plan out carefully how your data is physically sorted. Therefore it is important to find all your query use cases first.

2) Define your “entity table” and its corresponding column families
For an entity table, it is pretty common to have one column family storing all the entity attributes, and multiple column families to store the links to other entities. (e.g. A “UserTable” may contain a column family “baseInfo” to store all attributes of the user, a column family “friend” to store the links to another user, a column family “company” to store links to another CompanyTable)

3) Define your “index table”
The “index table” is what your application build to support reverse lookup. The “key” is typically base on the search criteria you have identified in your query scenario. It is not uncommon that each query may have its own specific index table.

4) Make sure your application logic updates the index correctly
Since the index table has to be maintained by application logic, you need to check to make sure it is done correctly. In many cases, this can be quite a source of bugs.

It is important to realize that NOSQL is not advocating a replacement of RDBMS which has been proven in many lines of application. The NOSQL should be considered a complementary technologies for some niche area where RDBMS is not covering well.

Powered by WordPress | Theme: by 85ideas. Editor by Khoanguyen