Friday, September 21, 2012

Gumball: A race condition prevention technique for cache augmented SQL Database Management systems - Developed by Shahram Ghandeharizadeh and Jason Yap

It is not uncommon for a system these days to increase its scalability by addition of cache servers(key-value stores,KVS) complementing its RDBMS systems. Most frequently accessed RDBMS records are stored in caching systems as key-value pairs. When there is an update to the record in database, its corresponding cache entry is invalidated and gets recomputed on next cache miss. KVS such as memcached are widely used by popular sites such as Facebook, Youtube, Wikipedia, etc.

On a high traffic site, it is highly likely to end up with inconsistent or stale data when that there could be undesirable race conditions between two cache update operations. The paper chooses a social networking website scenario as an example. Consider a time when Alice is retrieving her profile after her update to it while the administrator is deleting her profile for breach of terms and conditions. There could be a race on KVS between update operation(from profile updates of Alice) and delete operation(from delete profile action of administrator). Gumball [1] Technique(GT), developed by Shahram Ghandeharizadeh( and Jason Yap( at USC, tries to solves these race conditions.

GT understands that caching systems have taken the overall system too far on scalability spectrum ignoring consistency and thus introducing a new scale of data staleness(consistency) on the over all system. It also understands that this system's position on scalability spectrum is mostly rigid while it fluctuates on consistency spectrum depending on number of occurrences of race conditions. GT tries to flip these positions so that it is rigid and far up as possible on the consistency spectrum while trading off to small degree on scalability spectrum. It also ensures that its position on scalability spectrum adjusts automatically depending on number of race conditions.

GT introduces a new type of entry in the KVS called gumball entry that can be associated on a per key basis. These entries store the timestamp Tdelete when a delete operation happens on a key. Subsequent deletes on this key that has no value, continue to update the Tdelete metadata. This gumball entry sticks on to the KVS only for a small duration of time, Δ, after which it is dies. Get operations continue to function with a minor variation of reporting back a Tmiss whenever it encounters a cache miss. 

The purpose of gumball entry is to provide sufficient information to subsequent put operations on this key for them to be able to detect and act on race conditions. The put operation race detection test rejects the put operation when one of the following checks pass:

  1. If the difference between current time TC and Tmiss is greater than Δ, there is no way to tell for sure if there lived a gumball entry while the key-value pair was being recomputed on a cache miss. It takes a cautious outlook here and rejects the put operation
  2. When there is a gumball entry with timestamp Tdelete, it can detect a race condition if the cache miss Tmiss happened before the gumball was created at Tdelete when the key was deleted. This is a positive pass in detecting the race condition and the put is rejected.
  3. Similar to above condition, if it finds a value for the key, it detects a race if Tmiss is before the timestamp of existing value. This is a positive pass in detecting the race condition and the put is rejected.

Whenever a put is rejected, subsequent get operations observe a miss and are redirected to RDBMS. This ensures the system sees consistent data while trading to a small degree, its position on the scalability spectrum. This position depends on the value of Δ, the time duration for which a gumball sticks to KVS key. Ideally, this value is set to the average time duration it takes to recompute and begin put operation of updated key-value pairs(on encountering a cache miss). If this value is lower than ideal value, higher percentage of puts will be rejected and subsequent gets observe a cache miss, pulling the system's position down more vigourously on scalability spectrum; if this value is higher than ideal value, gumballs stick around for longer duration improving better quality results in race condition detection tests, impacting scalability to a smaller extent than the former case. To see a balance between the two, the paper suggests dynamic computation of Δ, where the system learns and adjusts by increasing Δ gradually(limited to a dynamically capped value based on maximum response time within the last few seconds) on each put reject and decreasing it on each overwrite of gumball entry with actual value. This way the system's position on scalability spectrum is adjusted based on the number of occurrences of race conditions.

The paper describes the benchmark evaluation experiments that measure the amount of stale data elimited by Gumball Technique(GT), the impact of GT on system performance(while managing gumball entries and going through race condition check on each put),  and how quickly the technique learns about the workload and adjusts Δ. In their specially setup experiment(so that RDBMS query computations do not shadow the overhead of GT), it was found that there was 300 fold reduction in stale data, GT's impact on system performance was negligible in their experiments that encountered race conditions less than 3% of the time and GT always adjusted the value of Δ quickly for varying workloads ensuring it became cautious and maintains consistency.

GT is a simple technique that acknowledges that while caching system increase scalability, it adds an overhead of maintaining consistency in data between KVS and RDBMS. The technique is simple to implement and requires no specific feature from RDBMS or KVS, thus translating to better rate of returns via increased reduction in stale data. It also works well in a replicated setup of caching servers when there are is improper ordering of cache operations only on a few of the servers, hence it has no clock synchronization requirements. 

The paper on ACM is here and a detailed lab report is here


[1]  S. Ghandeharizadeh and J. Yap., Gumball: A Race Condition Prevention Technique for Cache Augmented SQL Database Management Systems., In the Second ACM SIGMOD Workshop on Databases and Social Networks (DBSocial), Scottsdale, Arizona, May 2012.

P.S. 1. Please note that Gumball Technique has a patent application. The patent reference for Gumball is:
US Provisional Patent Application No. 61/669,257.
Race Condition Technique That Prevents Caches From Becoming Inconsistent With The Database.S. Ghandeharizadeh and J.Yap.
Issued August 22, 2012.

P.S. 2. I am on the look out for a project using memcached that is willing to experiment and tweak its caching system by introduction of Gumball technique in an offline manner and then roll it out to production if the results are satisfactory. Please direct message me on twitter @abineshtd if you are interested.

Thursday, September 20, 2012

Google Megastore Paper Overview


Interactive online services place huge demands on storage systems with wide range of conflicting requirements such as scalability, low latency, consistency, high availability and agility in development. Megastore tries to address this problem by taking a middle ground and blending the scalability of NoSQL with convenience of traditional RDBMS. 

Data is partitioned into entity groups(it makes an assumption that most Internet services give natural way to partitioning to make this viable), and is synchronously replicated across data centers with help of Paxos algorithm. Each data center can house multiple entity groups.  Full ACID semantics are provided within these partitioned entity groups, but there are only limited consistency guarantees across them. Communication across entity groups use either the expensive two phase commit protocol or leverage Megastore's efficient asynchronous message queue. Synchronous, fault tolerant replication increases availability and independently partitioned entity groups acting as small databases stored in NoSQL datastores(Bigtable) provide high scalability.

This architecture is shown below:

Data Model

Megastore defines a data model that lies between the abstract tuples of an RDBMS and concrete row-column implementation of NoSQL. The data model is declared in schema, each schema contains a set of tables, each table containing a set of entities, which in turn contain a set of properties. Primary key consists of a sequence of properties and child tables declare foreign keys by referencing their parent table(root table). An entity group consists of a root entity along with all entities in child tables that reference the root entity.

Each entity is mapped to a single Bigtable row and the row keys are formed by concatenating primary key values.This stores data within the same entity group contiguously in hierarchical fashion(Root entity followed by child table entities). Column name is concatenation of table name and property name. This give way for storing multiple Megastore tables in single Bigtable.

There are two class of indexes: local index that indexes data within an entity group and is hence accessed atomically, global index that spans entity group and is hence not guaranteed to reflect recent updates. Index structures are stored in Bigtable and each index entry is stored in a row. The row key is constructed using indexed property values concatenated with primary key of the indexed entity.

Transaction and Concurrency control

Each entity group functions as a mini-database that provides serializable ACID semantics and has its own write ahead log. Versioned cells(timestamped) in Bigtable provides way to support MVCC and hence readers and writers don't block each other. 

Megastore provides current, snapshot and inconsistent reads. A write transaction begins with current read, determines next available log position and appends the mutation to the logs using Paxos. Transactions across entity groups can either use Queues(Eventual consistency) or two phase commit protocol(Strong consistency).

Implementation of Paxos between distributed datacenters

Paxos algorithm is a way to reach consensus among a group of replicas on a single value. The original algorithm is ill-suited for high latency network links as it demands multiple rounds of communication. Megastore does a few optimizations to the original system making it practical to the system. 

Any time local reads on all replicas(Fast reads) are made possible using a system called Coordinator per each replica. It tracks whether a particular entity group is up-to-date in the replica or not and stores sufficient state to serve local reads. Write algorithms ensure that Coordinator systems are conservative. Megastore assumes that it is highly likely to get batch writes from within the same region and hence optimizes this by allowing Paxos leader from previous round to continue to remain the leader(Fast writes) and avoid the prepare stage of Paxos algorithm. 

There are two other replica types: witness replica that doesn't store data and participates in Paxos voting only and Read-only replica that are non-voting replicas that contain full snapshots of the data. Each application server has a client library. To minimize roundtrip WAN latency, the libraries submit remote Paxos operations to stateless intermediary replication servers communicating with their local Bigtables. This architecture is shown below.

Read algorithm

  1. Query Local: Query the local coordinator to determine if the entity group is up-to-date locally
  2. Find Position: Determine the highest possibly-committed log position and select a replica that has applied through that log position.
    1. (Local read) If step 1 indicates that the local replica is up-to-date, read the highest accepted log position and timestamp from the local replica. 
    2. (Majority read) If the local replica is not up-to-date (or if step 1 or step 2.1 times out), read from a majority of replicas to and the maximum log position that any replica has seen, and pick a replica to read from. We select the most responsive or up-to-date replica, not always the local replica. 
  3. Catchup: As soon as a replica is selected, catch it up to the maximum known log position as follows: 
    1.  For each log position in which the selected replica does not know the consensus value, read the value from another replica. For any log positions without a known-committed value available, invoke Paxos to propose a no-op write. Paxos will drive a majority of replicas to converge on a single value -- either the no-op or a previously proposed write.
    2. Sequentially apply the consensus value of all unapplied log positions to advance the replica's state to the distributed consensus state
    3. In the event of failure, retry on another replica.
  4. Validate: If the local replica was selected and was not previously up-to-date, send the coordinator a validate message asserting that the (entity group; replica) pair reflects all committed writes. Do not wait for a reply -- if the request fails, the next read will retry. 
  5. Query Data: Read the selected replica using the timestamp of the selected log position. If the selected replica becomes unavailable, pick an alternate replica, perform catchup, and read from it instead. The results of a single large query may be assembled transparently from multiple replicas.

Write algorithm

  1. Accept Leader: Ask the leader to accept the value as proposal number zero. If successful, skip to step 3.
  2. Prepare: Run the Paxos Prepare phase at all replicas with a higher proposal number than any seen so far at this log position. Replace the value being written with the highest-numbered proposal discovered, if any.
  3. Accept: Ask remaining replicas to accept the value. If this fails on a majority of replicas, return to step 2 after a randomized backoff .
  4. Invalidate: Invalidate the coordinator at all full replicas that did not accept the value.
  5. Apply: Apply the value's mutations at as many replicas as possible. If the chosen value di ers from that originally proposed, return a conflict error.

Known problems

Megastore might appear to be unavailable during a single replica failure(Bigtable and coordinator), but the paper claims that in practice this is not a common problem since coordinator is a simple process, more stable than Bigtable server, with no external dependencies or persistent storage. Nevertheless, network and host failures can still make the coordinator unavailable. 

Coordinators resolve network partitions by using an out-of-band protocol to identify when other coordinators are up. Each Coordinator obtain specific Chubby locks during their startup and when it loses a majority, it will revert to a state specifying all entity groups as out-of-date. There is brief write outage, when a datacenter containing live coordinators becomes unavailable. When there is asymmetric network partition, coordinators hold Chubby locks but become unreachable. During this problem, a manual step is necessary to disable the partially isolated coordinator.

There are a variety of race conditions possible between the validate message for earlier writes and invalidate messages for later writes. Log position numbers help mitigate this. Another race associated with crash between an invalidate by a writer a position n and a validate at some position m <  n is mitigated by using a unique epoch number for each incarnation of the coordinator. Validates are only allowed to modify the coordinator state if the epoch remains unchanged since the most recent read of the coordinator.

When several application servers initiate writes to the same entity group and log position simultaneously, there is increased likelihood of conflict. Megastore limits the write rate to a few per second per entity to keep the conflict rates in check. To overcome this limitation and increase write throughput, the paper suggests aggressive sharding of data to many entity groups.

Megastore's performance degrades when a particular full replica becomes unavailable. There are a few techniques followed to mitigate this such as disabling clients at affected replica and rerouting to other replicas, disabling replica's coordinator to have minimum impact on write latency or disabling the replica entirely. 

Production Metrics

Megastore has been deployed within Google for several years; Observed average read latencies are tens of milliseconds, depending on the amount of data, showing most reads as local. Observed average write latencies are in 100-400 milliseconds range, depending on the distance between datacenters, the size of data and number of full replicas.


Megastore's latency penalty of synchronous replication across widely distributed replicas is more than offset by the convenience of singe system image. Megastore's over 100 running applications and infrastructure service for higher layers, is evidence of its ease of use, generality and power. It demonstrates the viability of a middle ground in feature set and replication consistency.

Link to original paper here

Monday, September 17, 2012

Google Bigtable Paper Summary


Bigtable is a widely applicable, scalable, distributed storage system for managing small to large scaled structured data with high performance and availability. Many Google products such as Google Analytics, Google Finance, Personalized Search, Google Earth, etc use Bigtable for workloads ranging from throughput oriented batch jobs to latency sensitive serving of data. While Bigtable shares many implementation strategies with other databases, it provides a simpler data model that supports dynamic control over data layout, format and locality properties.

Data model

Big table is sparse, distributed, persistent multidimensional sorted map. The map is accessed by a row key, column key and a timestamp; each value in the map is an uninterpreted array of bytes. 

Row keys are arbitrary strings. Every read or write on a single row is atomic. Total row range in a table is dynamically partitioned into subset of row ranges called tablets, which helps in distribution and load balancing.

Column keys are grouped into a small number of rarely changing Column families before data is stored under any column key. It follows the syntax of family:quantifierColumn family names must be printable but quantifier may be arbitrary strings. Access control and both disk and memory accounting are on per column family level

Each cell is timestamped either by Bigtable or by the application and these multiple versions of data are stored in decreasing timestamp order. There are two settings of timestamps available that determine garbage collection: One stores last n versions and other store versions in the last n seconds, minutes, hours, etc.

The following figure shows a single row from a table. The row key is "com.cnn.www", there are two column families: "contents" and "anchor", two columns under "anchor" column family and different versions of same data specified by t3,t5,t6,etc.


Bigtable API provides functions to create and delete tables and column families, change cluster, table and column family metadata such as access control rights, access individual values and iterate and filter data by column names across multiple column families. It provides single row transactions for atomic Read-Modify-Write operations on a single row key. It does not support transactions across row keys, but provides a client interface for batch writing across row keys. Sawzall scripts, developed at Google, provide data transformation, filtering and data summarization capabilities. They are not allowed to write back to Bigtable. MapReduce wrappers are provided that allow Bigtable to be sed both as an input source and output target for MapReduce jobs.

Building Blocks

  • Distributed Google File System(GFS) stores Bigtable log and data files in a cluster of machines that run a wide variety of other distributed applications.
  • Cluster management system schedules jobs, manages resources, monitors machine health and deals with failures
  • Google SSTable(Sorted String table) file format is used to store Bigtable data.
  • Chubby, a highly available and persistent distributed lock service, provides an interface of directories and small files that can be used as locks. Big table uses Chubby for:
    • ensuring that there is at-most only master at a time
    • storing bootstramp location of Bigtable data
    • discovering tablet servers
    • storing big table schema info(Column family info)
    • storing access control lists


Three major components of Big table implementation
  • Client library: interfaces between application and cluster of tablet servers
  • Master server: assigns tablets to tablet servers, monitors tablet server health and manages provisioning of tablet servers, manages schema changes such as table and column family creation, manages garbage collection of files in GFS; it does not mediate between client and tablet servers
  • Cluster of tablet servers : each tablet server houses a set of tablets, handles requests directly from clients(clients do not rely on master server for tablet locations), splits overgrown tablets.

Tablet Location

Tablet location information is cached by client libraries as they access them and managed by a three level hierarchy analogous to B+ trees.  First level is a Chubby file that stores the location of root tablet. In the second level, root tablet contains location of all tablets in a special METADATA table. Root tablet is treated specially and is never split to ensure the hierarchy is no more than three levels. In the third level, each METADATA tablet contain location of a set of user tablets

Tablet Assignment

Each table begins with a single tablet and as the table grows, tablet server splits it into multiple tablets. Each tablet is stored to one tablet server assigned by master server. For this assignment process, master server keeps track of live Tablet servers, current assignments of tablets to them and sends tablet load request to tablet servers that have enough room. 

Each tablet server holds a lock on chubby directory and when they terminate(eg: when cluster management system is taking the tablet server down), they try to release the lock so that master can begin reassigning its tablets more quickly.

Master server monitors the health of tablet servers  and reassigns its tablets when that tablet server loses its lock. It begins this reassignment process by trying to acquire the tablet server's chubby lock and deleting it. Then it moves all the tablets from the old tablet server to a new tablet server that has enough room.

When the master is started by cluster management system, it goes through the following routine:
  1. Acquire unique master lock on Chubby
  2. Scan Chubby directory to discover live tablet servers
  3. Find out tablet assignments on each of the live tablet servers
  4. Scan the METADATA table to detect unassigned tablets by comparing with information from previous step and add them to the set of unassigned tablets making it eligible for tablet assignment.(If the METADATA tablets have not been assigned yet, master server adds root tablet to set of unassigned tablets to ensure that they are assigned)
Master keeps track of creation or deletion new tables and merging of two tablets into one. This follows the normal assignment process of being added to set of unassigned tablets. Tablet split is a special case as it is initiated by tablet servers. During a split, the tablet server records the new tablet information in METADATA table and notifies the master. On receipt of this notification, master assigns this new tablet to a tablet server that has enough room.

Tablet serving

The tablets are stored in GFS as shown below

Write operation on a tablet server 
  • Check wellformed-ness of request and check authorization(by verifiying with list of permitted writers from a Chubby file), 
  • Make an entry in the commit log that stores redo records 
  • Inserts the updated content into the memtable.
Read operation on a tablet server
  • Check wellformed-ness of request and check authorization
  • Retrieve the tablet location information(list of SSTables and set of redo points, corresponding to the data, on the commit log) from METADATA table.
  • Read the indices of SSTables into memory, reconstruct memtable by applying redo actions
  • Return data


As write operations execute, the size of memtable increases. There are three levels of compaction to keep the size of memtable under bounds. Minor compaction freezes a memtable when it reaches a threshold size, converts it to an SSTable and persists it in GFS. Merging compaction merges a few SSTables and memtable into a single SSTable. Major compaction rewrites all SSTables into exactly one SSTable


There are several refinements done to achieve high performance, availability and reliability.
  • Locality groups of column families are formed by clients and are stored in a single SSTable. This locality group can also be configured to be stored in memory to avoid disk reads
  • Compression of SSTables blocks is a choice to client library. The compression format can be chosen by the client
  • There are two levels of caching employed to improve read performance. Scan Cache is a higher level cache that caches key-value pairs returned by SSTable. Block Cache is at lower level and caches SSTable blocks that were read from GFS
  • Bloom filters are applied to SSTables in a particular locality group. This allows client to ask whether an SSTable contains any data for the specified row/column pair.
  • single commit log file is used for the tablet server instead of one per tablet. This provides significant performance benefit during normal operation. To avoid GFS write hiccups contributing to overall latency, two separate threads(only one active; when one performs poorly, other is made active) each with their own commit logs are used. Sequence numbers for log entries give way for recovery from both the log files.
  • When master initiates reassignment of tablet from source tablet server to target, source server makes a minor compaction before it stops supporting it and offers it to master server for reassignment
  • Immutability of most parts of the system except SSTable caches and memtable eliminates the need of synchronization of access to SSTables. Copy-on-Write is employed on memcache to have reads and writes execute in parallel.

Performance Evaluation

In a Bigtable cluster with N tablet servers, the following benchmarks were run to measure performance and scalability as N varied.
  • Sequential reads and writes
  • Random reads and writes
  • Random reads(mem) : column families configured to be stored in memory
  • Scan: reads made through Big table API for scanning over all values in a row range
The following figures shows two views on performance of benchmarks when reading and writing 1000-byte values to Bigtable.

Performance results

  • Random reads are slower than most other operations as a read involves fetching 64KB SSTables blocks from different nodes in GFS and reassembling the memtable. For applications with more read than write, Bigtable recommends using smaller block size, typically 8KB.
  • Random reads from memory are much faster as they avoid fetching SSTable blocks from GFS.
  • Random and sequential writes perform better and random reads as writes are not flushed to GFS yet. And there is no significant difference between the two writes as they are recorded in the same commit log and memtable.
  • Sequential reads perform better than random reads as every 64KB block fetched from GFS is cached and used before attempting to fetch the next block.
  • Scans are even faster as the RPC overhead is amortized when accessing through the the Bigtable API.
  • Aggregate throughput increases dramatically by over a factor of 100 for every benchmark. But it is not linear.
  • Random read benchmark shows worst scaling because of huge amount of 64KB block reads being saturated by the capacity of the network in GFS.

Real applications

Google analytics stores help webmasters analyze traffic patterns by storing clicks data. Raw click table(~200 TB) maintains a row for each end-user session. The row name is tuple of website name and time when the session was created. This ensures single session is stored in single row and multiple sessions on a website are contiguous and stored chronologically. This table compresses to 14% of original size. Summary table(~20 TB) stores various predefined summaries for each website. This table is updated by scheduled MapReduce jobs that read from Raw click table. This table compresses to 29% of the original size.

Google Earth provides access to high resolution satellite imagery of the world's surface. This system uses one table to preprocess data and a different set of tables for serving client data. Preprocessing table contains 70 terabytes of data and therefore is served from disk. It relies on MapReduce over Bigtable to transform data. Each row in imagery table corresponds to a single location and rows are named to ensure adjacent locations are stored next to each other. This table contains a column family to keep track of data sources for each location. The serving system uses one table to index data stored in GFS. It is relatively small(~500GB) but must serve tens of thousands of queries per second per datacenter with low latency. So this is hosted across hundreds of tablet servers with in memory column families.

Personalized Search gives search results based on previous user actions on google products. Each user's data is stored in a single row with row name as the userid. A separate column family is reserved for each type of action. Timestamp value represents the time when the user performed the action. User profiling is done by MapReduce jobs over Bigtable and this is replicated across several clusters to increase availability and reduce latency.


  • Large distributed systems are vulnerable to many types of failures such as memory and network corruption, large clock skew, bugs in other systems(eg: Chubby), etc.
  • It is very important to delay adding new features until it is clear how they will be used. Eg: Not implementing general purpose transactions until some application direly needs them, which never happened. Most applications seem to require only single-row transactions.
  • It is important to have a proper system-level monitoring to detect and fix many problems such as lock contention on tablet data structures, slow writes to GFS, etc
  • The most important lesson is the value of simple design when dealing with a very huge system. It  avoids spending huge amounts of time in debugging the system behavior. 


Applications that use Bigtable have been observed to have benefitted from performance, high availability and scalability. The unusual interface to Bigtable compared to traditional databases, lack of general purpose transactions, etc have not been a hindrance given many google products successfully use Bigtable implementation. Google has had significant advantages building their own storage solution by being able to have full control and flexibility and by removing bottlenecks and inefficiencies as they arise.

The original paper is linked here.

Hello world

It has been in my agenda for a long time, here is me finally sitting down to write about my experiences in industry and academia.