Latest Entries »

CloudStore (former Kosmos)

KFS được viết bằng C++, triển khai cụ thể ý tưởng của Google File System (lưu ý rằng, hệ thống do chính Google triển khai có mã nguồn đóng.)

Nhìn qua trang chủ http://kosmosfs.sourceforge.net/ thấy thật sơ sài; ngay ở trang wiki thông tin cũng rất hạn chế. Ta tóm tắt lại vài ý chính

  1. KFS có thể dùng để thay thế cho HDFS trong hệ thống Hypertable và Hadoop (lưu ý rằng, Hadoop không phải là một distributed file system, mà là một software framework hỗ trợ cho việc phân tán ứng dụng);
  2. KFS có ba thành phần
    1. meta-data server: một server chính lưu trữ thông tin meta, cung cấp không gian tên toàn cục (global namespace). Lưu ý là thông tin về cấu trúc thư mục sẽ được lưu trong bộ nhớ;
    2. block server: Tập tin bất kỳ được chia thành các block 64MB và được lưu ở các node (còn gọi là chunk server.) (bằng việc nhân bản và tách — strip);
    3. client library: cho phép ứng dụng giao tiếp (đọc/ghi) tập tin vào hệ thống KFS. Các thư viện giao tiếp có cho C++, Java và Python;
  3. Hỗ trợ FUSE: Trên hệ thống Linux, có thể kết nối với KFS thông qua FUSE, nhưngn điều này không có nghĩa là KFS tương thích với POSIX;
  4. Rack-aware: KFS chú ý tới vị trí của các chunk server khi lưu trữ, và nó luôn cố gắng phân tán dữ liệu đến các vị trí xa nhau;
  5. Re-replication: Khi tập tin có số lượng nhân bản nhỏ hơn tối thiểu $k$ xác định trước, nó sẽ được nhân bản thêm ở các chunk server khác;
  6. Caching: các thư viện ở phía ứng dụng khách sử dụng các bộ đệm để tăng hiệu năng; Thậm chí, các thư viện này còn tự động thay đổi đích truy cập trong trường hợp chunk server đang dùng không thể truy cập được;
  7. Triển khai dễ dàng: thông qua các kịch bản để triển khai và tắt/mở dịch vụ.

Nhìn chung, ứng dụng thích hợp nhất của KFS là thay thế cho HDFS trong các hệ thống Hadoop. Do đó, chủ đề KFS vs. HDFS cần phải theo dõi và phân tích kỹ lưỡng. Tất nhiên, những ai không thích nền tảng Java-based có thể tìm thấy ở KFS sự lựa chọn tốt :)

Ref.:

  1. http://kosmosfs.sourceforge.net/features.html
  2. http://sourceforge.net/apps/trac/kosmosfs/wiki/HowKFSWorks
  3. http://www.skrenta.com/2007/09/kosmix_releases_google_gfs_wor.html

Sưu tầm : http://dragula.org/blogs/576


MogileFS

MogileFS là một trong các sản phẩm mã nguồn mở của Danga. Danga Ineractive (đổi thành Danga vào năm 2002) ban đầu là công ty của Brad Fitzpatrick, sau đó được SixApart mua lại vào năm 2005. Về Brad Fitzpatrick, bạn sẽ ngạc nhiên khi biết anh chàng sinh năm 1980, là tác giả của LiveJournal và memcachedLiveJournal cũng như WordPress, nhưng được dùng chủ yếu ở Mỹ và Nga (một công ty của Nga đã mua lại LiveJournal từ SixApart vào năm 2007), còn memcached là một ứng dụng quá nổi tiếng: tiêu biểu nhất là nó được dùng trong LiveJournal, Facebook và Youtube.

Trong bài này, ta nói về MogileFS, “a distributed (meta) file system. Spray files across cheap disks on your network. Pay less for storage. No proprietary on-disk file formats.” Các tính chất cơ bản của MogileFS:

  1. application level: MogileFS hoạt động ở tầng ứng dụng, không cần cài thêm mô-đun cho nhân hệ thống. Do đó, việc triển khai sẽ đơn giản;
  2. no single point of failure: lý do là ba thành phần của MogileFS có thể chạy ở nhiều máy (chưa rõ nghĩa: một thành phần có thể chạy đồng thời ở nhiều máy khác nhau hay không?);
  3. automatic file replication: Tùy thuộc vào “lớp”, tập tin sẽ được nhân bản ở các node, với số nhân bản tùy vào thiết lập của “lớp”. Việc phân chia ra các “lớp” cho phép xác định mức độ quan trọng và mức độ sử dụng của các tập tin trong lớp đó;
  4. better than RAID: Trong thiết lập non-SAN RAID, các đĩa cứng sẽ có đặc tính bền bỉ (redundant) chứ không phải là các máy tính; do đó, nếu hỏng hóc xảy ra đối với máy tính, thì tập tin sẽ không truy cập được. Khi dùng MogileFS, tập tin sẽ được nhân bản ở nhiều đĩa và nhiều máy con khác nhau, đảm bảo chúng luôn dùng được ngay cả khi một số máy tính trong hệ thống MogileFS bị hỏng;
  5. Flat Namespace: Các tập tin được xác định một cách duy nhất thông qua chìa khóa nằm trong không gian tên phẳng, toàn cục;
  6. Shared-Nothing: Không phục thuộc vào hệ thống SAN với các đĩa chia sẻ; các máy con trong hệ thống quản lý hệ thống đĩa của riêng nó, mà không chia sẻ đĩa cứng với các máy khác trong hệ thống;
  7. No RAID required: Các đĩa của từng node trong hệ thống có thể dùng RAID, nhưng điều đó là không bắt buộc;
  8. Local filesystem agnostic: Đĩa cứng ở từng node được định dạng tùy theo hệ thống cài ở node đó (ext3, XFS,…); MogileFS sử dụng hệ thống riêng để định vị tập tin, thư mục, và do đó bạn sẽ không gặp phải các giới hạn như về số tập tin tối đa trong thư mục, hoặc số thư mục con tối đa trong một thư mục.

Tuy nhiên, MogileFS vẫn chưa/không hoàn hảo

  1. Không tuân theo chuẩn POSIX: các ứng dụng Unix chuẩn sẽ không làm việc được với MogileFS. Bạn chỉ sử dụng MogileFS để lưu trữ tập tin và sau đó đọc ra thông tin (nhiều lần), bằng cách thông qua các thư viện để truy cập đến hệ thống tập tin MogileFS qua các phương thức PUT/GET (HTTP protocol.)
  2. Chưa portable hoàn toàn: bởi vì MogileFS có một số thành phần chỉ hoạt động trong môi trường Linux

Ref.:

  1. http://danga.com/mogilefs/
  2. http://en.wikipedia.org/wiki/Brad_Fitzpatrick
  3. http://en.wikipedia.org/wiki/Livejournal

Sưu tầm : http://dragula.org/blogs/575


Gần đây, 2 nhà nghiên cứu đã cho ra đời công cụ có thể dùng để bẻ khóa dữ liệu đã được mã hóa qua hệ thống web server được chứa trong cookies và các ký tự ẩn trong trang HTML. Cách thức này được sử dụng trong công cụ Padding Oracle Exploitation Tool (Poet) của Juliano Rizzo và Thai Duong, đồng thời cũng được dùng để crack CAPTCHAS.

Poet được sử dụng trong Padding Oracle AttackPDF, lần đầu được phát hiện năm 2002, chuyên sử dụng để giải mã chế độ Cypher Block Chaining (CBC) mà không cần đến key. Các ứng dụng web như trên sử dụng framework phổ biến JavaServer Faces (JSF) cũng dễ dàng bị khai thác theo cách này.

Cơ chế tấn công Padding Oracle Attack đã chỉ ra rằng thực tế các khối mã hóa riêng biệt phải đạt độ dài tối thiểu 8 hoặc 16 byte cho mỗi ký tự. Nhưng để đáp ứng được yêu cầu, những cơ chế này phải sử dụng thêm các byte bổ sung đối với các khối cuối cùng. Bên cạnh đó, có rất nhiều phương thức để thực hiện cơ chế bổ sung này, và 1 trong số đó sử dụng crack. Đây là thời điểm mà Padding Oracle – chương trình hoặc dịch vụ cần thiết được sử dụng để thông báo trạng thái để cho biết liệu việc bổ sung dung lượng trong các gói dữ liệu nhận được có được coi là hợp lệ hay không, và tiếp tục tham gia vào quá trình tổng thể. Đây chính xác là những gì mà framework JFS thể hiện.

Bằng cách thử tất cả các cách bổ sung và kết hợp khác có thể, cơ chế này hoàn toàn có thể giải mã ViewStates, vốn được nhúng sẵn trong trang HTML và dùng để lưu trữ thông tin lượng truy cập từ phía client nhanh chóng. Các bạn có thể xem video trình diễn quá trình này trên Youtube.

Các dữ liệu đã được giải mã đồng thời lưu trữ lượng thông tin bí mật mà khách ghé thăm website không được quyền truy cập. Cơ chế tấn công này được miêu tả đầy đủ và cặn kẽ tại đây.

Mặt khác, công nghệ này cũng được sử dụng để hóa mã các giải pháp khác, 1 trong số đó là mã hóa ký tự ảnh trong mẫu có sẵn – điển hình là CAPTCHAS. Để tránh việc lưu trữ những thông tin này, một số hệ thống server chuyển toàn bộ dữ liệu tới client rồi sau đó so sánh với tín hiệu trả về.

Mặc dù Poet chỉ có thể crack ViewStates, nhưng điều này cũng đủ để giúp các nhà phát triển kiểm tra và phát hiện lỗ hổng trên trang web của họ. Công cụ này có giao diện đồ họa GUI và hoạt động với Windows, Mac OS X và Linux.

T.Anh (theo H-online)


How To Develop iPhone Apps on Windows

Not many developers speak Objective C, the Mantra for writing iPhone applications. And majority of the developers don’t own Mac either, so what is the way to go ?

Of course one can buy a macbook and do it. Here’s what will make it possible do all that on Windows, Linux

Top 5 Ways:

Method 5. Toolchains: There are several toolchains available (like winChain) that actually lets you write and build iPhone applications on windows. There are several associated tutorials to build the Objective C code on Windows. But there is a problem, the apps hence developed will work on Jailbroken iPhones only. We’ve seen few hacks to get over that and make it to App Store, but as Apple keeps on updating SDKs, toolchains need regular updates. It’s a hassle to make it up all the time. That’s why this is the least of the recommended methods.

Method 4. Use other Languages instead of Objective-C

(i) Code in Java: For Java developers, there is a workaround: XMLVM.

XMLVM is an extensible cross-compiler toolchain which instead of cross-compiling on a source code level, XMLVM cross-compiles byte code instructions from Sun’s Java virtual machine and Microsoft’s Common Language Runtime (CLR). And the Result: The byte code instructions are easier to cross-compile and the difficult parsing of a high-level programming language is left to a regular compiler and you get to write apps in different language and then compile and convert to a different one. The diagram below shows an abstract idea:

Without laying much stress on it, I`ll share my experience. The project is a great piece of Innovation but is still in it’s early phases. On one side, I was able to use their API and develop fairly well application (Simple game, Travel app), but when it comes to complex graphics, features, this method looked pretty immature. However, over time this should change and we could see the project doing almost everything the original SDK does. And yes, you can test your apps on the Java based simulator and deploy on jailbroken iPhone.

There are several other frameworks (like  Appcelerator`s Titanium) that let you code iPhone apps in Java, but the limitations are similar thought they are all worth giving a look for most day-to-day apps.

Update: (ii): Code in C/C++

DragonFireSDK: Say no to Objective C, say no to forced-Mac and yes to C/C++, Windows. This founds the base for DragonFireSDK that uses Microsoft Visual C++ to develop, test iPhone apps.

Apps, Games created with DragonFireSDK can be completely written and debugged in Windows and are also fully compliant for distribution and sales at the Apple iPhone App Store.

There is a quick Starter Guide available that help you kick start writing your first iPhone app and run it inside the emulator that ships with it. The API is quiet simple to use and is available here. One of the Apps: Un Stacker developed using this SDK is already available on App Store [link]. In addition, 5 Sample Apps are demonstrated and explained with code.

Method 3. Hackintosh: This is one of the effective ways of doing it: Install Mac on PC and then run the Native iPhone SDK. This is already a popular practice among OSx86 communities. The only limitation is that it could get tricky and time consuming for the newbees. You can refer to our Hackintosh Guides:

Method 2: Cross compilation of Adobe apps: You can write your apps in Flash Actionscript 2, ActionScript 3 or Adobe AIR, Flex and then cross compile it to ARM binary that is executable on iPhone. This can be done installing Project Sprouts for which sample Flex applications source is available here.

Here is a video on how this is done:

Method 1. Flash CS 5: This is in fact the most effective and easiest way to make it to App store doing all the “legal stuff”.

Flash CS 5 introduces new Feature that let’s you develop iPhone native applications just like you develop Adobe AIR apps. Recently, Adobe announced support for Multitouch, Accelerometer, GPS support in Flash 10.1 for phones. CS5 adds new APIs that lets developers leverage these modern Phone features and hence develop application not just for iPhone but for all Phones that support Flash.

So the Horizon is quiet big, and CS5 with ActionScript could find a great way to develop applications on iPhone. If you are familiar with a scripting language, say, Javascript, learning ActionScript is as easy as an ApplePie.

Already, App Store has a number of Apps built based on Flash (I believe they are using Crosscompilation): you can checkout few full blown apps here.

Only bad part of this method is, it’s still unavailable. However This is what Official adobe site has to say about it:

When will the Flash Professional CS5 beta be available for download?
The beta will be available for download from Adobe Labs before the end of 2009.

You can develop, build and test in native Flash debugger, however, soon we should see a simulator for mobile devices, especially for the iPhone.

I needed more clarity whether the final step, signing of Apps would be possible on Windows. I contacted Adobe on this. Alexander MacDonald said “Once you have created your content it is compiled into an iphone executable, then signed by our ADT tool and then zipped to create an ipa—the only thing you need from apple is your developer certificate. The crypto algorithms used by Apple to sign iPhone apps are all industry standard ones which anyone can implement on any platform they wish,” which in the case of Flash CS5, also includes Windows.

The app hence created can be installed to iPhone via iTunes for testing to substitute absence of simulator for the mean time. So all in all, everything would be legal, and will work great.

However, here is the demo of how applications will be created in Flash CS5:

Today, it doesn’t support everything SDK supports, but it would soon do. With Flash opening up a way to iPhone development, Adobe is adding millions of new developers to the iPhone App store contributors.

For more iPhone, Programming, Open source, Windows, Mac OSTech Guides and Tech News catch us on Twitter @taranfx or subscribe below:

10 things you should know about NoSQL databases

For a quarter of a century, the relational database (RDBMS) has been the dominant model for database management. But, today, non-relational, “cloud,” or “NoSQL” databases are gaining mindshare as an alternative model for database management. In this article, we’ll look at the 10 key aspects of these non-relational NoSQL databases: the top five advantages and the top five challenges.

Note: This article is also available as a PDF download.

Five advantages of NoSQL

1: Elastic scaling

For years, database administrators have relied on scale up — buying bigger servers as database load increases — rather than scale out — distributing the database across multiple hosts as load increases. However, as transaction rates and availability requirements increase, and as databases move into the cloud or onto virtualized environments, the economic advantages of scaling out on commodity hardware become irresistible.

RDBMS might not scale out easily on commodity clusters, but the new breed of NoSQL databases are designed to expand transparently to take advantage of new nodes, and they’re usually designed with low-cost commodity hardware in mind.

2: Big data

Just as transaction rates have grown out of recognition over the last decade, the volumes of data that are being stored also have increased massively. O’Reilly has cleverly called this the “industrial revolution of data.” RDBMS capacity has been growing to match these increases, but as with transaction rates, the constraints of data volumes that can be practically managed by a single RDBMS are becoming intolerable for some enterprises. Today, the volumes of “big data” that can be handled by NoSQL systems, such as Hadoop, outstrip what can be handled by the biggest RDBMS.

3: Goodbye DBAs (see you later?)

Despite the many manageability improvements claimed by RDBMS vendors over the years, high-end RDBMS systems can be maintained only with the assistance of expensive, highly trained DBAs. DBAs are intimately involved in the design, installation, and ongoing tuning of high-end RDBMS systems.

NoSQL databases are generally designed from the ground up to require less management:  automatic repair, data distribution, and simpler data models lead to lower administration and tuning requirements — in theory. In practice, it’s likely that rumors of the DBA’s death have been slightly exaggerated. Someone will always be accountable for the performance and availability of any mission-critical data store.

4: Economics

NoSQL databases typically use clusters of cheap commodity servers to manage the exploding data and transaction volumes, while RDBMS tends to rely on expensive proprietary servers and storage systems. The result is that the cost per gigabyte or transaction/second for NoSQL can be many times less than the cost for RDBMS, allowing you to store and process more data at a much lower price point.

5: Flexible data models

Change management is a big headache for large production RDBMS. Even minor changes to the data model of an RDBMS have to be carefully managed and may necessitate downtime or reduced service levels.

NoSQL databases have far more relaxed — or even nonexistent — data model restrictions. NoSQL Key Value stores and document databases allow the application to store virtually any structure it wants in a data element. Even the more rigidly defined BigTable-based NoSQL databases (Cassandra, HBase) typically allow new columns to be created without too much fuss.

The result is that application changes and database schema changes do not have to be managed as one complicated change unit. In theory, this will allow applications to iterate faster, though,clearly, there can be undesirable side effects if the application fails to manage data integrity.

Five challenges of NoSQL

The promise of the NoSQL database has generated a lot of enthusiasm, but there are many obstacles to overcome before they can appeal to mainstream enterprises. Here are a few of the top challenges.

1: Maturity

RDBMS systems have been around for a long time. NoSQL advocates will argue that their advancing age is a sign of their obsolescence, but for most CIOs, the maturity of the RDBMS is reassuring. For the most part, RDBMS systems are stable and richly functional. In comparison, most NoSQL alternatives are in pre-production versions with many key features yet to be implemented.

Living on the technological leading edge is an exciting prospect for many developers, but enterprises should approach it with extreme caution.

2: Support

Enterprises want the reassurance that if a key system fails, they will be able to get timely and competent support. All RDBMS vendors go to great lengths to provide a high level of enterprise support.

In contrast, most NoSQL systems are open source projects, and although there are usually one or more firms offering support for each NoSQL database, these companies often are small start-ups without the global reach, support resources, or credibility of an Oracle, Microsoft, or IBM.

3: Analytics and business intelligence

NoSQL databases have evolved to meet the scaling demands of modern Web 2.0 applications. Consequently, most of their feature set is oriented toward the demands of these applications. However, data in an application has value to the business that goes beyond the insert-read-update-delete cycle of a typical Web application. Businesses mine information in corporate databases to improve their efficiency and competitiveness, and business intelligence (BI) is a key IT issue for all medium to large companies.

NoSQL databases offer few facilities for ad-hoc query and analysis. Even a simple query requires significant programming expertise, and commonly used BI tools do not provide connectivity to NoSQL.

Some relief is provided by the emergence of solutions such as HIVE or PIG, which can provide easier access to data held in Hadoop clusters and perhaps eventually, other NoSQL databases. Quest Software has developed a product — Toad for Cloud Databases — that can provide ad-hoc query capabilities to a variety of NoSQL databases.

4: Administration

The design goals for NoSQL may be to provide a zero-admin solution, but the current reality falls well short of that goal. NoSQL today requires a lot of skill to install and a lot of effort to maintain.

5: Expertise

There are literally millions of developers throughout the world, and in every business segment, who are familiar with RDBMS concepts and programming. In contrast, almost every NoSQL developer is in a learning mode. This situation will address naturally over time, but for now, it’s far easier to find experienced RDBMS programmers or administrators than a NoSQL expert.

Conclusion

NoSQL databases are becoming an increasingly important part of the database landscape, and when used appropriately, can offer real benefits. However, enterprises should proceed with caution with full awareness of the legitimate limitations and issues that are associated with these databases.


About the author

Guy Harrison is the director of research and development at Quest Software. A recognized database expert with more than 20 years of experience in application and database administration, performance tuning, and software development, Guy is the author of several books and many articles on database technologies and a regular speaker at technical conferences.

Xử lý dữ liệu phân tán với MapReduce

Quy trình giúp xử lý tập hợp dữ liệu siêu lớn, đặt tại các máy tính phân tán; giúp tiết kiệm chi phí xây dựng máy chủ lưu trữ dữ liệu; phát triển điện toán mây…

Bài toán đặt ra

Chúng ta có khối lượng lớn dữ liệu (terabytes) phân bố trên hàng ngàn máy tính biệt lập, và có rất nhiều dự án khác nhau cần dùng cũng như phân tích trên khối dữ liệu này. Làm thế nào để việc thao tác, xử lý dữ liệu hiệu quả, tiết kiệm nhất?

Đây là bài toán đặt ra khi mà mạng xã hội, dịch vụ trực tuyến… ngày càng phát triển mạnh mẽ.

Các khó khăn khi phải xử lý một tập hợp lớn dữ liệu phân tán:

- Quản lý, sắp xếp lịch trình truy xuất I/O

- Quản lý tiến trình song song và phân tán

- Theo dõi trạng thái dữ liệu

- Xử lý lỗi

- Quản lý số lượng lớn dữ liệu có quan hệ phụ thuộc nhau

- ….

MapReduce (MR) là quy trình giúp xử lý tập hợp dữ liệu siêu lớn đặt tại các máy tính phân tán, có thể xử lý được dữ liệu không cấu trúc (dữ liệu lưu trữ dạng tệp tin hệ thống) và dữ liệu cấu trúc (dữ liệu quan hệ 2 chiều). Trong MR, các máy tính chứa dữ liệu đơn lẻ được gọi là các nút (node)

MR định nghĩa dữ liệu (cấu trúc và không cấu trúc) dưới dạng cặp khóa/giá trị (key/value). Ví dụ, key có thể là tên của tập tin (file) và value nội dung của tập tin, hoặc key là địa chỉ URL và value là nội dung của URL,… Việc định nghĩa dữ liệu thành cặp key/value này linh hoạt hơn các bảng dữ liệu quan hệ 2 chiều truyền thống (quan hệ cha – con hay còn gọi là khóa chính – khóa phụ).

Quy trình xử lý

MapReduce được xây dựng từ mô hình lập trình hàm và lập trình song song. Tăng tốc độ thực thi xử lý dữ liệu là mục đích quan trọng nhất của MR. Quy trình này gồm 2 phần:

- Map: Đầu vào là nút chủ (master node) và sau đó chia nhỏ nó ra thành các vấn đề bé hơn. Gọi là các split 0, split 1, split 2, …

- Reduce: Từ các đầu ra trung gian sẽ tổng hợp lại để đưa ra các kết quả cuối cùng cho vấn đề master.

Một ví dụ về hàm Map:

def map (key, value):
list = []
for x in value:
if test:
list.append( (key, x) )
return list

Một ví dụ về hàm Reduce:

def reduce (key, listOgValues):
result = 0
for x in listOgValues:
result += x
return (key, result)

Để xử lý khối dữ liệu bao gồm rất nhiều cặp (key, value), lập trình viên viết hai hàm map và reduce. Hàm map có đầu vào là một cặp (k1, v1) và đầu ra là một danh sách các cặp (k2, v2). Như vập hàm Map có thể được viết theo dạng: map(k1,v1) => list(k2,v2). Và hàm reduce có dạng reduce(k2, list (v2)) => list(v3).

(1): Thư viện MR mà chương trình người dùng (User Program) sử dụng chia các tập tin đầu vào (dữ liệu cần xử lý) thành các phần nhỏ. Dung lượng mỗi phần từ 16 megabytes đến 64 megabytes (MB). Và sau đó sao chép chương trình thành các tiến trình song song chạy trên các máy tính phân tán chứa dữ liệu.

(2): Chương trình điều khiển Master sẽ gán mỗi phần dữ liệu cho một hàm Map và một hàm Reduce.

(3) – (4): worker là phần được gán một hàm Map và Reduce để xử lý, nó sẽ đọc dữ liệu, phân tích cặp key/value ở đầu vào và phân tích thành các cặp trung gian khác được lưu tại vùng nhớ đệm.

(5): Định kỳ, các cặp dữ liệu trung gian sẽ được đẩy đến các worker tương ứng (do master điều khiển) để hàm reduce xử lý. Các thuật toán sắp xếp, so sánh, phân vùng dữ liệu sẽ được sử dụng tại giai đoạn này. Các tập dữ liệu trung gian có cùng key sẽ được sắp xếp cùng một nhóm.

(6): Khi tất cả các tác vụ Map và Reduce đã hoàn tất thì sẽ cho ra kết quả cuối cùng của quy trình MR.

Với MR, rất nhiều máy tính trung gian có thể sử dụng để xử lý dữ liệu mà trước kia không thể.

MR cho phép lập trình viên dễ dàng sử dụng thư viện định tuyến MR để lập trình song song chính xác và hiệu quả, không phải bận tâm đến việc trao đổi dữ liệu giữa các cluster khác nhau vì sự độc lập dữ liệu khá cao; không phải theo dõi xử lý lỗi, các tác vụ…

Hiện nay đã có một số mô hình MR trên các ngôn ngữ Java, C++, Python, Perl, Ruby và C. Lập trình viên có thể lựa chọn ngôn ngữ và thư viện MR để xây dựng ứng dụng của mình. Thường các cài đặt MR đòi hỏi phải chạy trên một hệ thống tệp tin phân tán thích hợp, ví dụ như Google File System (GFS), Amazon S3,…

Ứng dụng thực tế

Năm 2009 dự án mã nguồn mở Hadoop của Apache đã lập kỷ lục thế giới về sắp xếp khối dữ liệu siêu lớn (sắp xếp một petabyte dữ liệu trong 16,25 giờ và một terabyte trong 62 giây). Mỗi ngày có đến vài nghìn hay vài chục nghìn chương trình MR được chạy ở Google, và rất nhiều công ty khác trên thế giới.

MR là giải pháp tốt cho các dạng bài toán xử lý khối lượng dữ liệu phát sinh khổng lồ với các tác vụ phân tích và tính toán phức tạp và không lường trước được, trong các lĩnh vực như khai khác dữ liệu (data mining), phân tích tài chính, mô phỏng, … Một số ví dụ:

- Sắp xếp dữ liệu phân tán: một khối lượng dữ liệu lớn được đặt tại nhiều máy khác nhau và cần phải sắp xếp chúng một cách đồng bộ để các ứng dụng truy xuất đạt hiệu quả cao.

- Đếm tần số truy cập một địa chỉ URL: hàm Map sẽ xử lý nhật ký (log) các yêu cầu truy xuất đến trang web có địa chỉ URL cần đếm và đầu ra là cặp giá trị <URL, 1>. Hàm Reduce tiếp tục xử lý và kết quả là cặp giá trị <URL, total count> (total count: là tổng số truy cập vào URL đó).

- Dùng trong trí tuệ nhân tạo khi phân tích thống kê.

- Xử lý dữ liệu bản đồ (đường, địa điểm…).

- Bài toán xếp thứ hạng (ranking) một trang web theo mức độ quan tâm của người dùng.

Bằng cách tập trung vào cốt lõi của thuật toán, sử dụng MR tiết kiệm được khá nhiều chi phí xây dựng các máy chủ lưu trữ dữ liệu. Ngoài Google, các hãng Yahoo, Facebook, Rackspace, …cũng đều đã sử dụng MR để xử lý dữ liệu.

Hiện nay người ta bắt đầu sử dụng MR cho việc phát triển các đám mây điện toán, thuật ngữ Cloud MapReduce hứa hẹn mở ra một hướng mới.

Map Reduce (MR)

MR là một “mô hình lập trình” (programming model), lần đầu báo cáo trong bài báo của Jefferey Dean và Sanjay Ghemawat ở hội nghị OSDI 2004. MR chỉ là một ý tưởng, một abstraction. Để hiện thực nó thì cần một implementation cụ thể. Google có một implementation của MR bằng C++. Apache có Hadoop, một implementation mã nguồn mở khác trên Java thì phải (ít nhất người dùng dùng Hadoop qua một Java interface).

Thử tưởng tượng ta có nhiều terabytes dữ liệu phân bố trong một cluster của hàng ngàn máy PCs biệt lập (cấu hình share nothing), và ta có rất nhiều dự án khác nhau cần dùng và phân tích mớ dữ liệu này. Việc chứa dữ liệu (có replication) trong các data centers dùng commodity computers và commodity network devices là một chiến lược thành công của Google. Nghe đâu chiến lược này đã giúp họ tiết kiệm rất nhiều tiền thay vì dùng vài high-end servers. Khối dữ liệu này đại khái là một tập hợp của nhiều tỉ cặp (key, value). Với Google thì có thể key = file_name và value = file content, hoặc key = URL và value = URL content, vân vân. Do chúng ta không lường trước được các bài toán phân tích khối dữ liệu này trong tương lai là gì, độ phức tạp ra sao, dùng các kiểu hàm số gì, … cho nên việc đưa các cặp (key, value) này vào một MPP-DBMS sẵn “có thể” không phải là chiến lược tối ưu; tại vì khi đưa dữ liệu vào một MPP-DBMS thì ta cần thiết kế một schema, và các bài toán phân tích dữ liệu trong MPP-DBMS sẽ có thể bị ràng buộc bởi cái schema đó.

Vì thế, Google quyết định thiết kế MR để cung cấp chỉ một vài primitives thật đơn giản để truy cập và xử lý mớ dữ liệu này . Một implementation của MR sẽ tự động song song hóa quá trình tính toán, và bảo đảm tính chịu đựng lỗi, sao cho tính toán được chính xác và hiệu quả nhất. Như vậy, các primitives phải được thiết kế thế nào đó để cho (1) thật đơn giản, (2) có tính biểu cảm cao để người dùng có thể dùng chúng làm nhiều tác vụ tính toán khác nhau, (3) có thể song song hóa được. Ngoài ra, để cho MR thật sự hiệu quả, thường các implementation đòi hỏi một phải chạy MR trên với một distributed file system thích hợp, ví dụ như Google File System (GFS), Amazon S3, hoặc là Hadoop distributed file system (HDFS).

Lấy ý tưởng từ lập trình hàm, MR có hai primitives cơ bản là Map và Reduce. Đó là lý do tại sao mô hình lập trình này được gọi là MR. Map và Reduce là các combinators phổ dụng trong các ngôn ngữ lập trình hàm như Lisp.

Để xử lý khối dữ liệu bao gồm rất nhiều cặp (key, value), lập trình viên viết hai hàm map và reduce. Hàm map có input là một cặp (k1, v1) và output là một danh sách các cặp (k2, v2). Chú ý rằng các input và output keys và values có thể thuộc về các kiểu dữ liệu khác nhau, tùy hỉ. Như vập hàm map có thể được viết một cách hình thức như sau:

map(k1,v1) -> list(k2,v2)

MR sẽ áp dụng hàm map (mà người dùng MR viết) vào từng cặp (key, value) trong khối dữ liệu vào, chạy rất nhiều phiên bản của map song song với nhau trên các máy tính của cluster. Sau giai đoạn này thì chúng ta có một tập hợp rất nhiều cặp (key, value) thuộc kiểu (k2, v2) gọi là các cặp (key, value) trung gian. MR cũng sẽ nhóm các cặp này theo từng key, như vậy các cặp (key, value) trung gian có cùngk2 sẽ nằm cùng một nhóm trung gian.

Giai đoạn hai MR sẽ áp dụng hàm reduce (mà người dùng MR viết) vào từng nhóm trung gian. Một cách hình thức, hàm này có thể mô tả như sau:

reduce(k2, list (v2)) -> list(v3)

Trong đó k2 là key chung của nhóm trung gian, list(v2) là tập các values trong nhóm, và list(v3) là một danh sách các giá trị trả về của reduce thuộc kiểu dữ liệu v3. Do reduce được áp dụng vào nhiều nhóm trung gian độc lập nhau, chúng lại một lần nữa có thể được chạy song song với nhau.

Ví dụ cơ bản nhất của MR xuất hiện trong bài báo của Dean và Ghemawat: với mỗi từ (tiếng Anh chẳng hạn) xuất hiện trong một bộ rất nhiều files, đếm số lần xuất hiện của từ. Rõ ràng đây là một bài toán cơ bản và quan trọng mà một search engine phải làm. Nếu chỉ có vài chục files thì dễ rồi, nhưng nhớ rằng ta có nhiều triệu hay thậm chí nhiều tỉ files phân bố trong một cluster nhiều nghìn máy tính. Ta lập trình MR bằng cách viết 2 hàm cơ bản với pseudo-code như sau:

 void map(String name, String document):
 // name: document name
 // document: document contents
 for each word w in document:
   EmitIntermediate(w, "1");

void reduce(String word, Iterator partialCounts):
 // word: a word
 // partialCounts: a list of aggregated partial counts
 int result = 0;
 for each pc in partialCounts:
   result += ParseInt(pc);
 Emit(AsString(result));

Chỉ với hai primitives này, lập trình viên có rất nhiều flexibility để phân tích và xử lý các khối dữ liệu khổng lồ. MR đã được dùng để làm rất nhiều việc khác nhau, ví dụ như distributed grep, distributed sort, web link-graph reversal, term-vector per host, web access log stats, inverted index construction, document clustering, machine learning, statistical machine translation, large-scale graph computation … Khi implementation của MR được hoàn tất ở Google, MR đã được dùng để tái tạo toàn bộ Google index của WWW. Có thể xem mấy lectures này để thấy làm thế nào dùng MR giải quyết các vấn đề như distributed PageRank, thuật toán tìm được đi ngắn nhất của Dijkstra, các thuật toán clustering như k-means và kanopy, vân vân.

Đến đây chúng ta có thể kết luận sơ bộ là MR là một mô hình hùng mạnh, các primitives đơn giản của nó có thể dùng đề làm nhiều việc phân tích và xử lý dữ liệu khổng lồ môt cách hiệu quả. Năm ngoáiHadoop lập kỷ lục thế giới về sorting khối dữ liệu lớn (sort một petabyte trong 16 tiếng, và một terabyte trong 1 phút). Nghe đâu mỗi ngày có đến vài nghìn hay vài chục nghìn chương trình MR được chạy ở Google, chưa tính đến biết bao cty khác trên thế giới. Tuy nhiên, MR hiển nhiên không phải là thuốc chữa bá bệnh. Tùy theo bản chất bài toán của mình là gì mà chúng ta sẽ phải quyết định xem MR có phải là một framework thích hợp hay không.

Có lẽ, một trong các dấu hiệu cho thấy MR là giải pháp tốt là phần đã víết ở trên: khối lượng dữ liệu khổng lồ và các tác vụ phân tích và tính toán phức tạp và không lường trước được (ví dụ như các tác vụ liên quan đến hiện thực hóa các mô hình thống kê, data mining, hay các mô hình vật lý cực kỳ phức tạp với dữ liệu từ các máy gia tốc hạt, hoặc các mô hình dự báo khí tượng thủy văn). Trong trường hợp đó, một MPP-DBMS có lẽ là hơi bị hạn chế. Ngoài ra, nếu có một tác vụ tính toán nào đó chỉ cần làm 1 hoặc 2 lần thì MR có lẽ cũng tốt hơn, vì thời gian thiết kế và load một khối DL khổng lồ vào một MPP-DBMS không nhỏ, chẳng kém thời gian chạy thẳng MR luôn. Như Dean và Ghemawat và nhiều người khác đã chỉ ra, MR là một công cụ rất tốt để biến dữ liệu thô thành dữ liệu sẵn sàng để load vào DBMS!

NOSQL Patterns

Over the last couple years, we see an emerging data storage mechanism for storing large scale of data. These storage solution differs quite significantly with the RDBMS model and is also known as the NOSQL. Some of the key players include …

  • GoogleBigTable, HBase, Hypertable
  • AmazonDynamo, Voldemort, Cassendra, Riak
  • Redis
  • CouchDB, MongoDB

These solutions has a number of characteristics in common

  • Key value store
  • Run on large number of commodity machines
  • Data are partitioned and replicated among these machines
  • Relax the data consistency requirement. (because the CAP theorem proves that you cannot get Consistency, Availability and Partitioning at the the same time)

The aim of this blog is to extract the underlying technologies that these solutions have in common, and get a deeper understanding on the implication to your application’s design. I am not intending to compare the features of these solutions, nor to suggest which one to use.

API model

The underlying data model can be considered as a large Hashtable (key/value store).

The basic form of API access is

  • get(key) — Extract the value given a key
  • put(key, value) — Create or Update the value given its key
  • delete(key) — Remove the key and its associated value

More advance form of API allows to execute user defined function in the server environment

  • execute(key, operation, parameters) — Invoke an operation to the value (given its key) which is a special data structure (e.g. List, Set, Map …. etc).
  • mapreduce(keyList, mapFunc, reduceFunc) — Invoke amap/reduce function across a key range.

Machines layout

The underlying infratructure is composed of large number (hundreds or thousands) of cheap, commoditized, unreliable machines connected through a network. We call each machine a physical node (PN). Each PN has the same set of software configuration but may have varying hardware capacity in terms of CPU, memory and disk storage. Within each PN, there will be a variable number of virtual node (VN) running according to the available hardware capacity of the PN.


Data partitioning (Consistent Hashing)

Since the overall hashtable is distributed across many VNs, we need a way to map each key to the corresponding VN.

One way is to use
partition = key mod (total_VNs)

The disadvantage of this scheme is when we alter the number of VNs, then the ownership of existing keys has changed dramatically, which requires full data redistribution. Most large scale store use a “consistent hashing” technique to minimize the amount of ownership changes.


In the consistent hashing scheme, the key space is finite and lie on the circumference of a ring. The virtual node id is also allocated from the same key space. For any key, its owner node is defined as the first encountered virtual node if walking clockwise from that key. If the owner node crashes, all the key it owns will be adopted by its clockwise neighbor. Therefore, key redistribution happens only within the neighbor of the crashed node, all other nodes retains the same set of keys.

Data replication

To provide high reiability from individually unreliable resource, we need to replicate the data partitions.

Replication not only improves the overall reliability of data, it also helps performance by spreading the workload across multiple replicas.


While read-only request can be dispatched to any replicas, update request is more challenging because we need to carefully co-ordinate the update which happens in these replicas.

Membership Changes

Notice that virtual nodes can join and leave the network at any time without impacting the operation of the ring.

When a new node joins the network

  1. The joining node announce its presence and its id to some well known VNs or just broadcast)
  2. All the neighbors (left and right side) will adjust the change of key ownership as well as the change of replica memberships. This is typically done synchronously.
  3. The joining node starts to bulk copy data from its neighbor in parallel asynchronously.
  4. The membership change is asynchronously propagate to the other nodes.


Notice that other nodes may not have their membership view updated yet so they may still forward the request to the old nodes. But since these old nodes (which is the neighbor of the new joined node) has been updated (in step 2), so they will forward the request to the new joined node.

On the other hand, the new joined node may still in the process of downloading the data and not ready to serve yet. We use the vector clock (described below) to determine whether the new joined node is ready to serve the request and if not, the client can contact another replica.

When an existing node leaves the network (e.g. crash)

  1. The crashed node no longer respond to gossip message so its neighbors knows about it.
  2. The neighbor will update the membership changes and copy data asynchronously


We haven’t talked about how the virtual nodes is mapped into the physical nodes. Many schemes are possible with the main goal that Virtual Node replicas should not be sitting on the same physical node. One simple scheme is to assigned Virtual node to Physical node in a random manner but check to make sure that a physical node doesn’t contain replicas of the same key ranges.

Notice that since machine crashes happen at the physical node level, which has many virtual nodes runs on it. So when a single Physical node crashes, the workload (of its multiple virtual node) is scattered across many physical machines. Therefore the increased workload due to physical node crashes is evenly balanced.

Client Consistency

Once we have multiple copies of the same data, we need to worry about how to synchronize them such that the client can has a consistent view of the data.

There is a number of client consistency models

  1. Strict Consistency (one copy serializability): This provides the semantics as if there is only one copy of data. Any update is observed instantaneously.
  2. Read your write consistency: The allows the client to see his own update immediately (and the client can switch server between requests), but not the updates made by other clients
  3. Session consistency: Provide the read-your-write consistency only when the client is issuing the request under the same session scope (which is usually bind to the same server)
  4. Monotonic Read Consistency: This provide the time monotonicity guarantee that the client will only see more updated version of the data in future requests.
  5. Eventual Consistency: This provides the weakness form of guarantee. The client can see an inconsistent view as the update are in progress. This model works when concurrent access of the same data is very unlikely, and the client need to wait for some time if he needs to see his previous update.

Depends on which consistency model to provide, 2 mechanisms need to be arranged …

  • How the client request is dispatched to a replica
  • How the replicas propagate and apply the updates

There are various models how these 2 aspects can be done, with different tradeoffs.

Master Slave (or Single Master) Model

Under this model, each data partition has a single master and multiple slaves. In above model, B is the master of keyAB and C, D are the slaves. All update requests has to go to the master where update is applied and then asynchronously propagated to the slaves. Notice that there is a time window of data lost if the master crashes before it propagate its update to any slaves, so some system will wait synchronously for the update to be propagated to at least one slave.

Read requests can go to any replicas if the client can tolerate some degree of data staleness. This is where the read workload is distributed among many replicas. If the client cannot tolerate staleness for certain data, it also need to go to the master.

Note that this model doesn’t mean there is one particular physical node that plays the role as the master. The granularity of “mastership” happens at the virtual node level. Each physical node has some virtual nodes acts as master of some partitions while other virtual nodes acts as slaves of other partitions. Therefore, the write workload is also distributed across different physical node, although this is due to partitioning rather than replicas

When a physical node crashes, the masters of certain partitions will be lost. Usually, the most updated slave will be nominated to become the new master.

Master Slave model works very well in general when the application has a high read/write ratio. It also works very well when the update happens evenly in the key range. So it is the predominant model of data replication.

There are 2 ways how the master propagate updates to the slave;State transfer and Operation transfer. In State transfer, the master passes its latest state to the slave, which then replace its current state with the latest state. In operation transfer, the master propagate a sequence of operations to the slave which then apply the operations in its local state.

The state transfer model is more robust against message lost because as long as a latter more updated message arrives, the replica still be able to advance to the latest state.

Even in state transfer mode, we don’t want to send the full object for updating other replicas because changes typically happens within a small portion of the object. In will be a waste of network bandwidth if we send the unchanged portion of the object, so we need a mechanism to detect and send just the delta (the portion that has been changed). One common approach is break the object into chunks and compute a hash tree of the object. So the replica can just compare their hash tree to figure out which chunk of the object has been changed and only send those over.

In operation transfer mode, usually much less data need to be send over the network. However, it requires a reliable message mechanism with delivery order guarantee.

Multi-Master (or No Master) Model

If there is hot spots in certain key range, and there is intensive write request, the master slave model will be unable to spread the workload evenly. Multi-master model allows updates to happen at any replica (I think call it “No-Master” is more accurate).

If any client can issue any update to any server, how do we synchronize the states such that we can retain client consistency and also eventually every replica will get to the same state ? We describe a number of different approaches in following …

Quorum Based 2PC

To provide “strict consistency”, we can use a traditional 2PC protocol to bring all replicas to the same state at every update. Lets say there is N replicas for a data. When the data is update, there is a “prepare” phase where the coordinator ask every replica to confirm whether each of them is ready to perform the update. Each of the replica will then write the data to a log file and when success, respond to the coordinator.

After gathering all replicas responses positively, the coordinator will initiate the second “commit” phase and then ask every replicas to commit and each replica then write another log entry to confirm the update. Notice that there are some scalability issue as the coordinator need to “synchronously” wait for quite a lot of back and forth network roundtrip and disk I/O to complete.

On the other hand, if any one of the replica crashes, the update will be unsuccessful. As there are more replicas, chance of having one of them increases. Therefore, replication is hurting the availability rather than helping. This make traditional 2PC not a popular choice for high throughput transactional system.

A more efficient way is to use the quorum based 2PC (e.g. PAXOS). In this model, the coordinator only need to update W replicas (rather than all N replicas) synchronously. The coordinator still write to all the N replicas but only wait for positive acknowledgment for any W of the N to confirm. This is much more efficient from a probabilistic standpoint.

However, since no all replicas are update, we need to be careful when reading the data to make sure the read can reach at least one replica that has been previously updated successful. When reading the data, we need to read R replicas and return the one with the latest timestamp.

For “strict consistency”, the important condition is to make sure the read set and the write set overlap. ie: W + R > N


As you can see, the quorum based 2PC can be considered as a general 2PC protocol where the traditional 2PC is a special case where W = N and R = 1. The general quorum-based model allow us to pick W and R according to our tradeoff decisions between read and write workload ratio.

If the user cannot afford to pick W, R large enough, ie: W + R <= N, then the client is relaxing its consistency model to a weaker one.

If the client can tolerate a more relax consistency model, we don’t need to use the 2PC commit or quorum based protocol as above. Here we describe a Gossip model where updates are propagate asynchronous via gossip message exchanges and an auto-entropy protocol to apply the update such that every replica eventually get to the latest state.

Vector Clock

Vector Clock is a timestamp mechanism such that we can reason about causal relationship between updates. First of all, each replica keeps vector clock. Lets say replica i has its clock Vi. Vi[i] is the logical clock which if every replica follows certain rules to update its vector clock.

  • Whenever an internal operation happens at replica i, it will advance its clock Vi[i]
  • Whenever replica i send a message to replica j, it will first advance its clock Vi[i] and attach its vector clock Vi to the message
  • Whenever replica j receive a message from replica i, it will first advance its clock Vj[j] and then merge its clock with the clock Vm attached in the message. ie: Vj[k] = max(Vj[k], Vm[k])


A partial order relationship can be defined such that Vi > Vj iff for all k, Vi[k] >= Vj[k]. We can use these partial ordering to derive causal relationship between updates. The reasoning behind is

  • The effect of an internal operation will be seen immediately at the same node
  • After receiving a message, the receiving node knows the situation of the sending node at the time when the message is send. The situation is not only including what is happening at the sending node, but also all the other nodes that the sending node knows about.
  • In other words, Vi[i] reflects the time of the latest internal operation happens at node i. Vi[k] = 6 reflects replica i has known the situation of replica k up to its logical clock 6.

Notice that the term “situation” is used here in an abstract sense. Depends on what information is passed in the message, the situation can be different. This will affect how the vector clock will be advanced. In below, we describe the “state transfer model” and the “operation transfer model” which has different information passed in the message and the advancement of their vector clock will also be different.

Because state is always flow from the replica to the client but not the other way round, the client doesn’t have an entry in the Vector clock. The vector clock contains only one entry for each replica. However, the client will also keep a vector clock from the last replica it contacts. This is important for support the client consistency model we describe above. For example, to support monotonic read, the replica will make sure the vector clock attached to the data is > the client’s submitted vector clock in the request.

Gossip (State Transfer Model)

In a state transfer model, each replica maintain a vector clock as well as a state version tree where each state is neither > or < among each other (based on vector clock comparison). In other words, the state version tree contains all the conflicting updates.

At query time, the client will attach its vector clock and the replica will send back a subset of the state tree which precedes the client’s vector clock (this will provide monotonic read consistency). The client will then advance its vector clock by merging all the versions. This means the client is responsible to resolve the conflict of all these versions because when the client sends the update later, its vector clock will precede all these versions.


At update, the client will send its vector clock and the replica will check whether the client state precedes any of its existing version, if so, it will throw away the client’s update.


Replicas also gossip among each other in the background and try to merge their version tree together.

Gossip (Operation Transfer Model)

In an operation transfer approach, the sequence of applying the operations is very important. At the minimum causal order need to be maintained. Because of the ordering issue, each replica has to defer executing the operation until all the preceding operations has been executed. Therefore replicas save the operation request to a log file and exchange the log among each other and consolidate these operation logs to figure out the right sequence to apply the operations to their local store in an appropriate order.

“Causal order” means every replica will apply changes to the “causes” before apply changes to the “effect”. “Total order” requires that every replica applies the operation in the same sequence.

In this model, each replica keeps a list of vector clock, Vi is the vector clock the replica itself and Vj is the vector clock when replica i receive replica j’s gossip message. There is also a V-state that represent the vector clock of the last updated state.

When a query is submitted by the client, it will also send along its vector clock which reflect the client’s view of the world. The replica will check if it has a view of the state that is later than the client’s view.


When an update operation is received, the replica will buffer the update operation until it can be applied to the local state. Every submitted operation will be tag with 2 timestamp, V-client indicates the client’s view when he is making the update request. V-@receive is the replica’s view when it receives the submission.

This update operation request will be sitting in the queue until the replica has received all the other updates that this one depends on. This condition is reflected in the vector clock Vi when it is larger than V-client


On the background, different replicas exchange their log for the queued updates and update each other’s vector clock. After the log exchange, each replica will check whether certain operation can be applied (when all the dependent operation has been received) and apply them accordingly. Notice that it is possible that multiple operations are ready for applying at the same time, the replica will sort these operation in causal order (by using the Vector clock comparison) and apply them in the right order.


The concurrent update problem at different replica can also happen. Which means there can be multiple valid sequences of operation. In order for different replica to apply concurrent update in the same order, we need a total ordering mechanism.

One approach is whoever do the update first acquire a monotonic sequence number and late comers follow the sequence. On the other hand, if the operation itself is commutative, then the order to apply the operations doesn’t matter

After applying the update, the update operation cannot be immediately removed from the queue because the update may not be fully exchange to every replica yet. We continuously check the Vector clock of each replicas after log exchange and after we confirm than everyone has receive this update, then we’ll remove it from the queue.

Map Reduce Execution

Notice that the distributed store architecture fits well into distributed processing as well. For example, to process a Map/Reduce operationover an input key list.

The system will push the map and reduce function to all the nodes (ie: moving the processing logic towards the data). The map function of the input keys will be distributed among the replicas of owning those input, and then forward the map output to the reduce function, where the aggregation logic will be executed.


Handling Deletes

In a multi-master replication system, we use Vector clock timestamp to determine causal order, we need to handle “delete” very carefully such that we don’t lost the associated timestamp information of the deleted object, otherwise we cannot even reason the order of when to apply the delete.

Therefore, we typically handle delete as a special update by marking the object as “deleted” but still keep its metadata / timestamp information around. Around a long enough time that we are confident that every replica has marked this object deleted, then we garbage collected the deleted object to reclaim its space.

Storage Implementaton

One strategy is to use make the storage implementation pluggable. e.g. A local MySQL DB, Berkeley DB, Filesystem or even a in memory Hashtable can be used as a storage mechanism.

Another strategy is to implement the storage in a highly scalable way. Here are some techniques that I learn from CouchDB and Google BigTable.

CouchDB has a MVCC model that uses a copy-on-modified approach. Any update will cause a private copy being made which in turn cause the index also need to be modified and causing the a private copy of the index as well, all the way up to the root pointer.

Notice that the update happens in an append-only mode where the modified data is appended to the file and the old data becomes garbage. Periodic garbage collection is done to compact the data. Here is how the model is implemented in memory and disks

In Google BigTable model, the data is broken down into multiple generations and the memory is use to hold the newest generation. Any query will search the mem data as well as all the data sets on disks and merge all the return results. Fast detection of whether a generation contains a key can be done by checking a bloom filter.

When update happens, both the mem data and the commit log will be written so that if the machine crashes before the mem data flush to disk, it can be recovered from the commit log.

Kiến trúc của Google như thế nào?

Chắc rằng rất nhiều người Việt Nam có sử dụng Internet đều sử dụng Google thường xuyên. Nhân tiện tìm hiểu về scalability, tôi xin giới thiệu với mọi người khái quát mô hình kiến trúc của Google để mọi người tham khảo  .

Google là ông hoàng về scalability[1]. Giao diện sử dụng Google thật đơn giản. Mọi người đều biết hệ thống của Google lớn và phức tạp,rắc rối, tìm kiếm nhanh chóng như thế nào. Nhưng Google không chỉ là đại gia trong lĩnh vực tìm kiếm. Cách tiếp cận nền tảng của họ nhằm xây dựng những ứng dụng có khả năng mở rộng cho phép họ các xây dựng ứng dụng trong thế giới mạng với một mức độ cạnh tranh đáng sợ cho các đối thủ khác. Mục đích của họ luôn luôn xây dựng cơ sở hạ tầng có khả năng mở rộng cao để hỗ trợ những sản phẩm của họ. Họ làm điều đó như thế nào?

Nền tảng công nghệ

+ Linux

+ Các ngôn ngữ lập trình đa dạng: Python, Java, C++

Trong Google có gì?

Các con số thống kê

Năm 2006 : Google có khoảng 450.000 server cỡ nhỏ .

Năm 2005 Google đánh chỉ mục khoảng 8 tỉ trang web.

Hiện giờ có hơn 200 GFS cluster tại Google. Một cluster có từ 1000 đến 5000 máy. Dữ liệu chung của hàng ngàn máy sẽ nhận dữ liệu từ GFS cluster và chạy với một lượng dữ liệu lưu trữ cỡ 5 petabytes[4].Lưu lượng kết hợp đọc/ghi thông qua hệ thống GFS cluster có thể lên tới 40 gigabytes/giây.

Hiện có 6000 ứng dụng MapReduce của Google và hàng trăm ứng dụng mới được viết ra hàng tháng.

BigTable được mở rộng tới mức lưu trữ hàng tỉ địa chỉ URL, hàng trăm terabytes[4] hình ảnh vệ tinh, và tham chiếu của hàng trăm triệu người dùng.

Kiến trúc phân lớp

Kiến trúc hạ tầng của Google có thể chia thành 3 lớp:

- Sản phẩm: Tìm kiếm, quảng cáo, thư điện tử, bản đồ, video, chat, bloger

- Hệ thống cơ sở hạ tầng phân tán: GFS, MapReduce, và BigTable.

- Nền tảng tính toán: Một cụm các máy trong một cụm các data center[5]

*Khẳng định chắc chắn dễ dàng được triển khai với chi phí thấp.

Máy lưu trữ tin cậy với GFS (Google File System)

*Mức độ tin cậy, khả năng mở rộng là điểm cốt yếu cần thiết cho bất kỳ ứng dụng nào. GFS thỏa mãn được những yếu tố như vậy.

*Google File System – một lượng lớn các log file[3] có cấu trúc với một lượng rất lớn dữ liệu.

*Tại sao xây dựng nó thay vì sử dụng cách lưu trữ thông thường? Bởi vì nó thỏa mãn các yêu cầu:

- Độ tin cậy cao của các data centers

- Khả năng mở rộng tới hàng ngàn nút mạng

- Yêu cầu băng thông đọc/ghi lớn

- Cung cấp khối dữ liệu có kích cỡ lớn, lên tới hàng gigabytes.

Hệ thống gồm có master và chunk servers.

- Master server lưu giữ siêu dữ liệu[2] trong rất nhiều tập tin khác nhau. Dữ liệu được lưu trong file hệ thống theo từng khúc 64MB. Khi người dùng truy cập vào hệ thống của google, máy của họ sẽ nói chuyện với Master server. Các Master server này sẽ thực thi các hoạt động trên các file siêu dữ liệu và xác định dữ liệu thật sự trên các chunk server.

- Chunk servers lưu dữ liệu thật trên ổ đĩa. Mỗi khúc dữ liệu trên một chunk server sẽ có bản sao trên 3 chunk server khác. Với cách như vậy, tạo ra tính dư thừa dữ liệu và khúc dữ liệu đó vẫn được truy cập khi một trong các máy lưu trữ nó không hoạt động được. Khi có yêu cầu từ Master server, chương trình ứng dụng của người dùng sẽ nhận được các file trực tiếp từ chunk server.

Một ứng dụng mới được đưa ra sẽ sử dụng GFS cluster có sẵn hoặc sử dụng hệ thống GFS cluster của riêng nó.

Đó chính là chìa khóa xây dựng kiến trúc hạ tầng của Google để chắc chắn rằng người dùng chọn ứng dụng của họ. GFS có thể được điều chỉnh phù hợp với các ứng dụng cần thiết.

Một vài điều với dữ liệu sử dụng MapReduce

Bây giờ bạn đã có một hệ thống lưu trữ tốt, làm cách để xử lý một yêu cầu bất kỳ với nhiều dữ liệu như vậy? Có thể nó rằng bạn đã có hàng Tetabytes dữ liệu được lưu trữ trên hàng nghìn máy khác nhau. Cơ sở dữ liệu thì không thể mở rộng hoặc rất tốn kém để mở rộng ở mức này. Như vậy, MapReduce ra đời.

MapReduce là gì?

MapReduce là một mô hình chương trình và thực thi kết hợp với mục đích cho xử lý, sinh ra các tập dữ liệu lớn. Người dùng có thể dùng một hàm map để xử lý một cặp khóa/giá trị để tạo ra một tập các cặp khóa/giá trị trung gian. Hàm reduce sẽ trộn toàn bộ các giá trị trung gian với cùng khóa trung gian. Hệ thống chạy run-time sẽ kiểm tra các chi tiết của phân vùng dữ liệu đầu vào. Chương trình được lập lịch thực thi thông qua một nhóm các máy. Chương trình sẽ phát hiện được máy nào xử lý lỗi, và quản lý các yêu cầu giao tiếp giữa các máy với nhau. Nó sẽ cho phép lập trình viên không cần kinh nghiệp về các hệ thống xử lý song song và phân tán có thể sử dụng được tài nguyên của hệ thống phân tán lớn. ( còn tiếp …)

Lovevn Lược dịch từ : http://highscalability.com/google-architecture

Chú thích:

[1] Scalability : khả năng thay đổi để cung cấp một lượng dữ liệu lớn hơn hoặc nhỏ hơn hay khả năng thay đổi kích cỡ của hệ thống.

[2] Siêu dữ liệu: thường được biết đến như là “dữ liệu của dữ liệu”, nó là thông tin mô tả về nguồn gốc, kiểu loại, nội dung và một số đặc tính khác của dữ liệu.

[3] Tệp tin nhật ký(log file): Loại tập tin thường được dùng với mục đích bảo trì, nó chỉ ra file nào được truy cập và lưu trữ ở đâu. Phân tích tệp tin nhật ký trên một website để phát hiện ra người dùng truy cập site khi nào, họ vào từ địa chỉ nào …

[4] Terabytes: 1 terabytes ~ 1 nghìn tỷ bytes ~ 1 nghìn Gigabytes

Petabytes : 1 petabytes ~ 10^15 bytes ~ 2^50 bytes: 1 triệu tỷ bytes ~ 1 triệu Gigabytes

[5] Data center: Chứa một lượng lớn các hệ thống máy tính và các thiết bị giao tiếp với nhau nhằm thuận tiện cho quản lý dữ liệu và các thực thi trên dữ liệu.

BigTable Model with Cassandra and HBase

Recently in a number of “scalability discussion meeting”, I’ve seen the following pattern coming up repeatedly …

  1. To make your app scalable, you try to make your app layer “stateless”.
  2. OK, so you move the “state” out from your application layer out to a shared DB, or shared data layer.
  3. Now, how do we make the data tier scalable, by definition, we cannot make the data tier stateless.
  4. OK, now lets think about how to “partition” your data and spread them across multiple machines in such a way that workload is balanced.
  5. Now there are more boxes and what if some of them crashes.
  6. OK, we should replicate the data across machines.
  7. Now, how do we keep these data in sync …
  8. And then Cloud computing gets into the picture (as it always does). Now we are not just having a pool of machines but also the pool size can grow and shrink according to workload fluctuation (you don’t want to pay for something idle, right ?).
  9. Now we need to figure out as we add more machines into the pool or remove machine from the pool, how we should “redistribute” the data.

This is an area where NOSQL shines. In the last 18 months, NOSQL has become one of the hottest topic in the software industry. It has been introduced as a solution to large scale data storage problem at the range of Terabytes or Petabytes. Dozens of NOSQL products has come to the market, but two leaders HBase and Cassandra seems to stand out from the rest in terms of their adoption.

Given an increasing demand of explaining these 2 products recently, I decide to write a post on this.

Not to repeat the basic theory of NOSQL here, for a foundation ofdistributed system theory underlying the NOSQL design, please refer to my earlier blog

Both Hbase and Cassandra are based on Google BigTable model, here lets introduce some key characteristic underlying Bigtable first.

Fundamentally Distributed
BigTable is built from the ground up on a “highly distributed”, “share nothing” architecture. Data is supposed to store in large number of unreliable, commodity server boxes by “partitioning” and “replication”. Data partitioning means the data are partitioned by its key and stored in different servers. Replication means the same data element is replicated multiple times at different servers.


Column Oriented
Unlike traditional RDBMS implementation where each “row” is stored contiguous on disk, BigTable, on the other hand, store each column contiguously on disk. The underlying assumption is that in most cases not all columns are needed for data access, column oriented layout allows more records sitting in a disk block and hence can reduce the disk I/O.

Column oriented layout is also very effective to store very sparse data (many cells have NULL value) as well as multi-value cell. The following diagram illustrate the difference between a Row-oriented layout and a Column-oriented layout


Variable number of Columns
In RDBMS, each row must have a fixed set of columns defined by the table schema, and therefore it is not easy to support columns with multi-value attributes. The BigTable model introduces the “Column Family” concept such that a row has a fixed number of “column family” but within the “column family”, a row can have a variable number of columns that can be different in each row.


In the Bigtable model, the basic data storage unit is a cell, (addressed by a particular row and column). Bigtable allow multiple timestamp version of data within a cell. In other words, user can address a data element by the rowid, column name and the timestamp. At the configuration level, Bigtable allows the user to specify how many versions can be stored within each cell either by count (how many) or by freshness (how old).

At the physical level, BigTable store each column family contiguously on disk (imagine one file per column family), and physically sort the order of data by rowid, column name and timestamp. After that, the sorted data will be compressed so that a disk block size can store more data. On the other hand, since data within a column family usually has a similar pattern, data compression can be very effective.

Sequential write
BigTable model is highly optimized for write operation (insert/update/delete) with sequential write (no disk seek is needed). Basically, write happens by first appending a transaction entry to a log file (hence the disk write I/O is sequential with no disk seek), followed by writing the data into an in-memory Memtable . In case of the machine crashes and all in-memory state is lost, the recovery step will bring the Memtable up to date by replaying the updates in the log file.

All the latest update therefore will be stored at the Memtable, which will grow until reaching a size threshold, then it will flushed the Memtable to the disk as an SSTable (sorted by the String key). Over a period of time there will be multiple SSTables on the disk that store the data.

Merged read
Whenever a read request is received, the system will first lookup the Memtable by its row key to see if it contains the data. If not, it will look at the on-disk SSTable to see if the row-key is there. We call this the “merged read” as the system need to look at multiple places for the data. To speed up the detection, SSTable has a companion Bloom filter such that it can rapidly detect the absence of the row-key. In other words, only when the bloom filter returns positive will the system be doing a detail lookup within the SSTable.

Periodic Data Compaction
As you can imagine, it can be quite inefficient for the read operation when there are too many SSTables scattering around. Therefore, the system periodically merge the SSTable. Notice that since each of the SSTable is individually sorted by key, a simple “merge sort” is sufficient to merge multiple SSTable into one. The merge mechanism is based on a logarithm property where two SSTable of the same size will be merge into a single SSTable will doubling the size. Therefore the number of SSTable is proportion to O(logN) where N is the number of rows.

After looking at the common part, lets look at their difference of Hbase and Cassandra.

HBase
Based on the BigTable, HBase uses the Hadoop Filesystem (HDFS) as its data storage engine. The advantage of this approach is then HBase doesn’t need to worry about data replication, data consistency and resiliency because HDFS has handled it already. Of course, the downside is that it is also constrained by the characteristics of HDFS, which is not optimized for random read access. In addition, there will be an extra network latency between the DB server to the File server (which is the data node of Hadoop).


In the HBase architecture, data is stored in a farm of Region Servers. The “key-to-server” mapping is needed to locate the corresponding server and this mapping is stored as a “Table” similar to other user data table.

Before a client do any DB operation, it needs to first locate the corresponding region server.

  1. The client contacts a predefined Master server who replies the endpoint of a region server that holds a “Root Region” table.
  2. The client contacts the region server who replies the endpoint of a second region server who holds a “Meta Region” table, which contains a mapping from “user table” to “region server”.
  3. The client contacts this second region server, passing along the user table name. This second region server will lookup its meta region and reply an endpoint of a third region server who holds a “User Region”, which contains a mapping from “key range” to “region server”
  4. The client contacts this third region server, passing along the row key that it wants to lookup. This third region server will lookup its user region and reply the endpoint of a fourth region server who holds the data that the client is looking for.
  5. Client will cache the result along this process so subsequent request doesn’t need to go through this multi-step process again to resolve the corresponding endpoint.

In Hbase, the in-memory data storage (what we refer to as “Memtable” in above paragraph) is implemented in Memcache. The on-disk data storage (what we refer to as “SSTable” in above paragraph) is implemented as a HDFS file residing in Hadoop data node server. The Log file is also stored as an HDFS file. (I feel storing a transaction log file remotely will hurt performance)

Also in the HBase architecture, there is a special machine playing the “role of master” who monitors and coordinates the activities of all region servers (the heavy-duty worker node). To the best of my knowledge, the master node is the single point of failure at this moment.

For a more detail architecture description, Lars George has a very good explanation in the log file implementation as well as the data storage architecture of Hbase.

Cassandra
Also based on the BigTable model, Cassandra use the DHT (distributed hash table) model to partition its data, based on the paper described in the Amazon Dynamo model.

Consistent Hashing via O(1) DHT
Each machine (node) is associated with a particular id that is distributed in a keyspace (e.g. 128 bit). All the data element is also associated with a key (in the same key space). The server owns all the data whose key lies between its id and the preceding server’s id.

Data is also replicated across multiple servers. Cassandra offers multiple replication schema including storing the replicas in neighbor servers (whose id succeed the server owning the data), or a rack-aware strategy by storing the replicas in a physical location. The simple partition strategy is as follows …


Tunable Consistency Level
Unlike Hbase, Cassandra allows you to choose the consistency level that is suitable to your application, so you can gain more scalability if willing to tradeoff some data consistency.

For example, it allows you to choose how many ACK to receive from different replicas before considering a WRITE to be successful. Similarly, you can choose how many replica’s response to be received in the case of READ before return the result to the client.

By choosing the appropriate number for W and R response, you can choose the level of consistency you like. For example, to achieve Strict Consistency, we just need to pick W, R such that W + R > N. This including the possibility of (W = one and R = all), (R = one and W = all), (W = quorum and R = quorum). Of course, if you don’t need strict consistency, you can even choose a smaller value for W and R and gain a bigger availability. Regardless of what consistency level you choose, the data will be eventual consistent by the “hinted handoff”, “read repair” and “anti-entropy sync” mechanism described below.

Hinted Handoff
The client performs a write by send the request to any Cassandra node which will act as the proxy to the client. This proxy node will located N corresponding nodes that holds the data replicas and forward the write request to all of them. In case any node is failed, it will pick a random node as a handoff node and write the request with a hint telling it to forward the write request back to the failed node after it recovers. The handoff node will then periodically check for the recovery of the failed node and forward the write to it. Therefore, the original node will eventually receive all the write request.

Conflict Resolution
Since write can reach different replica, the corresponding timestamp of the data is used to resolve conflict, in other words, the latest timestamp wins and push the earlier timestamps into an earlier version (they are not lost)

Read Repair
When the client performs a “read”, the proxy node will issue N reads but only wait for R copies of responses and return the one with the latest version. In case some nodes respond with an older version, the proxy node will send the latest version to them asynchronously, hence these left-behind node will still eventually catch up with the latest version.

Anti-Entropy data sync
To ensure the data is still in sync even there is no READ and WRITE occurs to the data, replica nodes periodically gossip with each other to figure out if anyone out of sync. For each key range of data, each member in the replica group compute a Merkel tree (a hash encoding tree where the difference can be located quickly) and send it to other neighbors. By comparing the received Merkel tree with its own tree, each member can quickly determine which data portion is out of sync. If so, it will send the diff to the left-behind members.

Anti-entropy is the “catch-all” way to guarantee eventual consistency, but is also pretty expensive and therefore is not done frequently. By combining the data sync with read repair and hinted handoff, we can keep the replicas pretty up-to-date.

BigTable trade offs
To retain the scalability features of BigTable, some of the basic features of what RDBMS has provided is missing in the BigTable model. Here we highlight the rough edges of Bigtable.

1) Primitive transaction support
Transaction protection is only guaranteed within a single row. In other words, you cannot start a atomic transaction to modify multiple rows.

2) Primitive isolation support
While you are reading a row, other people may have modified the same row and update it before you. Your view is not current anymore but your later update can easily wipe off other people’s change.

There are many techniques how concurrent update can be isolated, including pessimistic approach like locking or optimistic approach by using vector clock to be the version stamp. But to the best of my understanding, there is no robust test-and-set operation in the BigTable model (this is some getLock mechanism in Hbase which I haven’t looked into), my impression is that there is no easy way to check there is no concurrent update happen in between.

Because of this limitation, I think BigTable model is more suitable for those applications where concurrent update to the same row is very rare, or some inconsistency is tolerable at the application level. Fortunately, there are still a lot of applications falling into this bucket.

3) No indexes
Notice that data within BigTable are all physically sorted; by rowid, column name and timestamp. There is no index from the column value to its containing rowid.

This model is quite different from RDBMS where you typically define a table and worry about defining the index later. There is no such “index” concept in BigTable and you need to carefully plan out the physical sorting order of your data layout.

Lacking index turns out to be quite inconvenient and many people using Bigtable ends up building their own index at the application level. This usually results in having a highly denormalized data model with lots of column family who store links to other tables. Any update to the base data need to carefully update these other column family as well. From a performance angle, this is actually better than maintaining index in RDBMS because Bigtable is optimized from writes. However, since it is now the application logic to maintain the index, this can be a source of application bugs.

4) No referential integrity enforcement
As mentioned above, since you are building artificial index at the application level, you need to maintain the integrity of your index as well. This includes update your index when the base data is inserted, modified or deleted. This kind of handling logic is traditionally residing at the RDBMS level, but since BigTable has no such referential integrity concept, this responsibility is now landed on your application logic.

5) Lack of surrounding tools
As NOSQL or BigTable is very new, the tools surrounding it is definitely not comparable to the RDBMS world at this moment, such tools includes report generation, BI, data warehouse … etc.

I observe the general trend that most NOSQL products are moving towards the direction to provide an ODBC / JDBC interface to integrate with existing tool markets easier. But at this moment, to the best of my understanding, such interface is not wide spread yet.

Design Patterns for Bigtable model
Due to the very different model of Bigtable, the data model design methodology is also quite different from traditional RDBMS schema design. Here is a sequence of steps that are pretty common …

1) Identify all your query scenarios
Since there is no index concept, you have to plan out carefully how your data is physically sorted. Therefore it is important to find all your query use cases first.

2) Define your “entity table” and its corresponding column families
For an entity table, it is pretty common to have one column family storing all the entity attributes, and multiple column families to store the links to other entities. (e.g. A “UserTable” may contain a column family “baseInfo” to store all attributes of the user, a column family “friend” to store the links to another user, a column family “company” to store links to another CompanyTable)

3) Define your “index table”
The “index table” is what your application build to support reverse lookup. The “key” is typically base on the search criteria you have identified in your query scenario. It is not uncommon that each query may have its own specific index table.

4) Make sure your application logic updates the index correctly
Since the index table has to be maintained by application logic, you need to check to make sure it is done correctly. In many cases, this can be quite a source of bugs.

It is important to realize that NOSQL is not advocating a replacement of RDBMS which has been proven in many lines of application. The NOSQL should be considered a complementary technologies for some niche area where RDBMS is not covering well.

Powered by WordPress | Theme: by 85ideas. Editor by Khoanguyen