Mussel — Airbnb’s Key-Worth Retailer for Derived Knowledge | by Shouyan guo | The Airbnb Tech Weblog | Oct, 2022

How Airbnb constructed a persistent, excessive availability and low latency key-value storage engine for accessing derived information from offline and streaming occasions.

By: Chandramouli Rangarajan, Shouyan Guo, Yuxi Jin

Inside Airbnb, many on-line providers want entry to derived information, which is information computed with giant scale information processing engines like Spark or streaming occasions like Kafka and saved offline. These providers require a top quality derived information storage system, with robust reliability, availability, scalability, and latency ensures for serving on-line visitors. For instance, the person profiler service shops and accesses real-time and historic person actions on Airbnb to ship a extra customized expertise.

On this submit, we’ll speak about how we leveraged plenty of open supply applied sciences, together with HRegion, Helix, Spark, Zookeeper,and Kafka to construct a scalable and low latency key-value retailer for tons of of Airbnb product and platform use instances.

Over the previous few years, Airbnb has advanced and enhanced our help for serving derived information, transferring from groups rolling out customized options to a multi-tenant storage platform referred to as Mussel. This evolution will be summarized into three phases:

Stage 1 (01/2015): Unified read-only key-value retailer (HFileService)

Earlier than 2015, there was no unified key-value retailer resolution inside Airbnb that met 4 key necessities:

  1. Scale to petabytes of information
  2. Environment friendly bulk load (batch era and importing)
  3. Low latency reads (<50ms p99)
  4. Multi-tenant storage service that can be utilized by a number of clients

Additionally, not one of the present options have been in a position to meet these necessities. MySQL doesn’t help bulk loading, Hbase’s large bulk loading (distcp) just isn’t optimum and dependable, RocksDB had no built-in horizontal sharding, and we didn’t have sufficient C++ experience to construct a bulk load pipeline to help RocksDB file format.

So we constructed HFileService, which internally used HFile (the constructing block of Hadoop HBase, which is predicated on Google’s SSTable):

Fig. 1: HFileService Structure
  1. Servers have been sharded and replicated to handle scalability and reliability points
  2. The variety of shards was fastened (equal to the variety of Hadoop reducers within the bulk load jobs) and the mapping of servers to shards saved in Zookeeper. We configured the variety of servers mapped to a particular shard by manually altering the mapping in Zookeeper
  3. A each day Hadoop job reworked offline information to HFile format and uploaded it to S3. Every server downloaded the information of their very own partitions to native disk and eliminated the previous variations of information
  4. Totally different information sources have been partitioned by main key. Purchasers decided the right shard their requests ought to go to by calculating the hash of the first key and modulo with the whole variety of shards. Then queried Zookeeper to get a listing of servers that had these shards and despatched the request to one in every of them

Stage 2 (10/2015): Retailer each real-time and derived information (Nebula)

Whereas we constructed a multi-tenant key-value retailer that supported environment friendly bulk load and low latency learn, it had its drawbacks. For instance, it didn’t help level, low-latency writes, and any replace to the saved information needed to undergo the each day bulk load job. As Airbnb grew, there was an elevated must have low latency entry to real-time information.

Subsequently, Nebula was constructed to help each batch-update and real-time information in a single system. It internally used DynamoDB to retailer real-time information and S3/HFile to retailer batch-update information. Nebula launched timestamp based mostly versioning as a model management mechanism. For learn requests, information can be learn from each a listing of dynamic tables and the static snapshot in HFileService, and the end result merged based mostly on timestamp.

To attenuate on-line merge operations, Nebula additionally had scheduled spark jobs that ran each day and merged snapshots of DynamoDB information with the static snapshot of HFileService. Zookeeper was used to coordinate write availability of dynamic tables, snapshots being marked prepared for learn, and dropping of stale tables.

Fig. 2: Nebula Structure

Stage 3 (2018): Scalable and low latency key-value storage engine (Mussel)

In Stage 3, we constructed a system that supported each learn and write on real-time and batch-update information with timestamp-based battle decision. Nevertheless, there have been alternatives for enchancment:

  1. Scale-out problem: It was cumbersome to manually edit partition mappings inside Zookeeper with rising information progress, or to horizontally scale the system for rising visitors by including extra nodes
  2. Enhance learn efficiency below spiky write visitors
  3. Excessive upkeep overhead: We would have liked to keep up HFileService and DynamoDB on the similar time
  4. Inefficient merging course of: The method of merging the delta replace from DynamoDB and HFileService each day turned very sluggish as our whole information measurement turned bigger. The each day replace information in DynamoDB was simply 1–2% of the baseline information in HFileService. Nevertheless, we re-published the complete snapshot (102% of whole information measurement) again to HFileService each day

To unravel the drawbacks, we got here up with a brand new key-value retailer system referred to as Mussel.

  1. We launched Helix to handle the partition mapping inside the cluster
  2. We leveraged Kafka as a replication log to duplicate the write to all the replicas as an alternative of writing on to the Mussel retailer
  3. We used HRegion as the one storage engine within the Mussel storage nodes
  4. We constructed a Spark pipeline to load the information from the information warehouse into storage nodes instantly

Let’s go into extra particulars within the following paragraphs.

Fig. 3: Mussel Structure

Handle partitions with Helix

In Mussel, with a view to make our cluster extra scalable, we elevated the variety of shards from 8 in HFileService to 1024. In Mussel, information is partitioned into these shards by the hash of the first keys, so we launched Apache Helix to handle these many logical shards. Helix manages the mapping of logical shards to bodily storage nodes mechanically. Every Mussel storage node might maintain a number of logical shards. Every logical shard is replicated throughout a number of Mussel storage nodes.

Leaderless Replication with Kafka

Since Mussel is a read-heavy retailer, we adopted a leaderless structure. Learn requests may very well be served by any of the Mussel storage nodes which have the identical logical shard, which will increase learn scalability. Within the write path, we wanted to contemplate the next:

  1. We need to clean the write visitors to keep away from the affect on the learn path
  2. Since we don’t have the chief node in every shard, we want a approach to ensure every Mussel storage node applies the write requests in the identical order so the information is constant throughout totally different nodes

To unravel these issues, we launched Kafka as a write-ahead-log right here. For write requests, as an alternative of instantly writing to the Mussel storage node, it’ll first write to Kafka asynchronously. We’ve got 1024 partitions for the Kafka subject, every partition belonging to 1 logical shard within the Mussel. Every Mussel storage node will ballot the occasions from Kafka and apply the change to its native retailer. Since there isn’t a leader-follower relationship between the shards, this configuration permits the right write ordering inside a partition, guaranteeing constant updates. The disadvantage right here is that it might probably solely present eventual consistency. Nevertheless, given the derived information use case, it’s an appropriate tradeoff to compromise on consistency within the curiosity of guaranteeing availability and partition tolerance.

Supporting each learn, write, and compaction in a single storage engine

With the intention to cut back the {hardware} value and operational load of managing DynamoDB, we determined to take away it and prolong HFileService as the one storage engine to serve each real-time and offline information. To raised help each learn and write operations, we used HRegion as an alternative of Hfile. HRegion is a totally useful key-value retailer with MemStore and BlockCache. Internally it makes use of a Log Structured Merged (LSM) Tree to retailer the information and helps each learn and write operations.

An HRegion desk incorporates column households, that are the logical and bodily grouping of columns. There are column qualifiers inside a column household, that are the columns. Column households comprise columns with time stamped variations. Columns solely exist when they’re inserted, which makes HRegion a sparse database. We mapped our consumer information to HRegion as the next:

With this mapping, for learn queries, we’re in a position to help:

  1. Level question by wanting up the information with main key
  2. Prefix/vary question by scanning information on secondary key
  3. Queries for the most recent information or information inside a particular time vary, as each real-time and offline information written to Mussel could have a timestamp

As a result of we’ve got over 4000 consumer tables in Mussel, every person desk is mapped to a column household in HRegion as an alternative of its personal desk to cut back scalability challenges on the metadata administration layer. Additionally, as HRegion is a column-based storage engine, every column household is saved in a separate file to allow them to be learn/written independently.

For write requests, it consumes the write request from Kafka and calls the HRegion put API to put in writing the information instantly. For every desk, it might probably additionally help customizing the max model and TTL (time-to-live).

Once we serve write requests with HRegion, one other factor to contemplate is compaction. Compaction must be run with a view to clear up information that’s deleted or has reached max model or max TTL. Additionally when the MemStore in HRegion reaches a sure measurement, it’s flushed to disk right into a StoreFile. Compaction will merge these recordsdata collectively with a view to cut back disk search and enhance learn efficiency. Nevertheless, then again, when compaction is operating, it causes larger cpu and reminiscence utilization and blocks writes to stop JVM (Java Digital Machine) heap exhaustion, which impacts the learn and write efficiency of the cluster.

Right here we use Helix to mark Mussel storage nodes for every logical shard into two kinds of sources: on-line nodes and batch nodes. For instance, if we’ve got 9 Mussel storage nodes for one logical shard, 6 of them are on-line nodes and three of them are batch nodes. The connection between on-line and batch are:

  1. They each serve write requests
  2. Solely on-line nodes serve learn requests and we fee restrict the compaction on on-line nodes to have good learn efficiency
  3. Helix schedules a each day rotation between on-line nodes and batch nodes. Within the instance above, it strikes 3 on-line nodes to batch and three batch nodes to on-line so these 3 new batch nodes can carry out full velocity main compaction to wash up previous information

With this variation, now we’re in a position to help each learn and write with a single storage engine.

Supporting bulk load from information warehouse

We help two kinds of bulk load pipelines from information warehouse to Mussel by way of Airflow jobs: merge kind and exchange kind. Merge kind means merging the information from the information warehouse and the information from earlier write with older timestamps in Mussel. Exchange means importing the information from the information warehouse and deleting all the information with earlier timestamps.

We make the most of Spark to remodel information from the information warehouse into HFile format and add to S3. Every Mussel storage node downloads the recordsdata and makes use of HRegion bulkLoadHFiles API to load these HFiles into the column household.

With this bulk load pipeline, we will simply load the delta information into the cluster as an alternative of the complete information snapshot daily. Earlier than the migration, the person profile service wanted to load about 4TB information into the cluster each day. After, it solely must load about 40–80GB, drastically decreasing the associated fee and enhancing the efficiency of the cluster.

In the previous few years, Airbnb has come a great distance in offering a high-quality derived information retailer for our engineers. The latest key-value retailer Mussel is broadly used inside Airbnb and has develop into a foundational constructing block for any key-value based mostly utility with robust reliability, availability, scalability, and efficiency ensures. Since its introduction, there have been ~4000 tables created in Mussel, storing ~130TB information in our manufacturing clusters with out replication. Mussel has been working reliably to serve giant quantities of learn, write, and bulk load requests: For instance, mussel-general, our largest cluster, has achieved >99.9% availability, common learn QPS > 800k and write QPS > 35k, with common P95 learn latency lower than 8ms.

Despite the fact that Mussel can serve our present use instances effectively, there are nonetheless many alternatives to enhance. For instance, we’re wanting ahead to offering the read-after-write consistency to our clients. We additionally need to allow auto-scale and repartition based mostly on the visitors within the cluster. We’re wanting ahead to sharing extra particulars about this quickly.

Mussel is a collaborative effort of Airbnb’s storage staff together with: Calvin Zou, Dionitas Santos, Ruan Maia, Wonhee Cho, Xiaomou Wang, Yanhan Zhang.

Fascinated with engaged on the Airbnb Storage staff? Try this position: Staff Software Engineer, Distributed Storage