This weblog goals to introduce Airbnb’s expertise upgrading Information Warehouse infrastructure to Spark and Iceberg.
On this weblog, we’ll introduce our motivations for upgrading our Information Warehouse Infrastructure to Spark 3 and Iceberg. We’ll briefly describe the present state of Airbnb information warehouse infrastructure and the challenges. We’ll then share our learnings from upgrading one important manufacturing workload: occasion information ingestion. Lastly, we’ll share the outcomes and the teachings discovered.
Airbnb’s Information Warehouse (DW) storage was beforehand migrated from legacy HDFS clusters to S3 to offer higher stability and scalability. Whereas our group has continued to enhance the reliability and stability of the workloads that function on information in S3, sure traits of those workloads and the infrastructure they rely upon introduce scalability and productiveness limitations that our customers encounter frequently.
With an rising variety of partitions, Hive’s backend DBMS’s load has grow to be a bottleneck, as has the load on partition operations (e.g., querying 1000’s of partitions for a month’s price of knowledge). As a workaround, we often add a stage of every day aggregation and preserve two tables for queries of various time granularities (e.g., hourly and every day). To avoid wasting on storage, we restrict intraday Hive tables to quick retention (three days), and preserve every day tables for longer retention (a number of years).
Hive was not initially designed for object storage. As an alternative, many assumptions have been made round HDFS when implementing options resembling renames and file listings. After we migrated from HDFS to S3 it subsequently required sure ensures to make sure that datasets have been constant on list-after-write operations. We custom-made the way in which Hive writes to S3, first writing to an HDFS momentary cluster after which shifting the information to S3 by way of an optimized distcp course of that writes to distinctive areas in the course of the commit section, storing file-listing info in a separate retailer for quick entry. This course of has carried out properly over the previous two years, but it surely requires further cluster assets to run.
At Airbnb, we use three compute engines to entry information in our Information Warehouse: Spark, Trino and Hive. Since every compute engine handles schema adjustments otherwise, adjustments to desk schemas have virtually all the time resulted in information high quality points or required engineers to carry out pricey rewrites.
Hive tables are partitioned by fastened columns, and partition columns can’t be simply modified. In case one must repartition a dataset, one has to create a brand new desk and reload your complete dataset.
These challenges have motivated us to improve our Information Warehouse infrastructure to a brand new stack primarily based on Iceberg and Spark 3, which addresses these issues and in addition offers usability enhancements.
Apache Iceberg is a desk format designed to deal with a number of of the shortcomings of conventional file system-based Information Warehousing storage codecs resembling Hive. Iceberg is designed to ship high-performance reads for enormous analytics tables, with options resembling serializable isolation, snapshot-based time journey, and predictable schema evolution. Some necessary Iceberg options that assist in a few of the challenges talked about early:
- Partition info isn’t saved within the Hive metastore, therefore eradicating a big supply of load to the metastore.
- Iceberg tables don’t require S3 listings, which removes the list-after-write consistency requirement, which may in flip eradicate the necessity for the additional discp job, and avoids totally the latency of the listing operation.
- Constant desk schema is outlined in Iceberg spec, which ensures constant habits throughout compute engines avoiding surprising habits when altering columns.
Apache Spark has grow to be the de facto normal for large information processing up to now 10 years. Spark 3 is a brand new main model launched in 2020, it comes with a protracted listing of options — new functionalities, bug fixes and efficiency enhancements. We deal with introducing Adaptive Question Execution (AQE) right here; you could find extra data on the Databricks blog.
AQE is a question optimization approach that makes use of runtime statistics to optimize the Spark question execution plan. This solves one of many best struggles of Spark cost-based optimization — inaccurate statistics collected earlier than question begins typically result in suboptimal question plans. AQE will determine information traits and enhance question plans because the question runs, rising question efficiency.
Spark 3 can be a prerequisite for Iceberg adoption. Iceberg desk write and skim assist utilizing Spark SQL is simply accessible on Spark 3.
The diagram under reveals the change we made:
At Airbnb, the Hive-based information ingestion framework processes >35 billion Kafka occasion messages and 1,000+ tables per day, and lands datasets starting from kilobytes to terabytes into hourly and every day partitions. The quantity and protection of datasets of various sizes, and time granularity requirement makes this framework a superb candidate to learn from our Spark+Iceberg tech stack.
Step one in migrating to the aforementioned Spark+Iceberg compute tech stack was to maneuver our Hive queries to Spark. This launched a brand new problem: Spark tuning. In contrast to Hive, which depends on information quantity stats, Spark makes use of preset shuffle partition values to find out process cut up sizes. Thus, choosing the right variety of shuffle partitions turned a giant problem in tuning the occasion information ingestion framework on Spark. Information quantity of various occasions varies rather a lot, and the information measurement of 1 occasion additionally adjustments over time. Determine 2 reveals the excessive variance of shuffle information measurement of Spark jobs processing a sampling of 100 several types of occasions.
There isn’t a set variety of shuffle partitions that might work properly for all occasions within the ingestion framework; if we choose a set quantity for all ingestion jobs, it is perhaps too massive for some jobs however too small for others, and each would lead to low efficiency. Whereas we have been exploring totally different options to tune shuffle partition parameters, we discovered that Adaptive Question Execution could possibly be an ideal resolution.
How does AQE assist?
In Spark 3.0, the AQE framework ships with a number of key options, together with dynamically switching be part of methods and dynamically optimizing skew joins. Nevertheless, probably the most important new function for our use case is dynamically coalescing shuffle partitions, which ensures that every Spark process operates on roughly the identical quantity of knowledge. It does this by combining adjoining small partitions into larger partitions at runtime. Since shuffle information can dynamically develop or shrink between totally different phases of a job, AQE is frequently re-optimizing the dimensions of every partition by coalescing all through a job’s lifetime. This introduced a terrific efficiency enhance.
AQE handles all circumstances in our information ingestion framework properly, together with edge circumstances of spiky occasions and new occasions. One be aware is that flattening of nested columns and compression of file storage format (in our case, Parquet GZIP) may generate pretty small output information for small process splits. To make sure output file sizes are massive sufficient to be effectively accessed, we will improve the AQE advisory shuffle partition measurement accordingly.
AQE Tuning Expertise
Let’s stroll by an instance to get a greater understanding of AQE and its tuning expertise. Say we run the instance question to load one dataset. The question has one Map stage to flatten occasions and one other Scale back stage to deal with deduplication. After adopting AQE and operating the job in Spark, we will see two highlighted steps get added to the bodily plan.
Now let’s take a better take a look at our tuning section. As proven in Desk 1, we went by a number of iterations of param setting. From our expertise, if the precise shuffle partition used is the same as the preliminary partition quantity we set, we must always improve the preliminary partition quantity to separate preliminary duties extra and get them coalesced. And if the common output file measurement is just too small, we will improve the advisory partition measurement to generate bigger shuffle partitions, and thus bigger output information. Upon inspecting shuffle information of every process, we might additionally lower executor reminiscence and the max variety of executors.
We additionally experimented with the tuned job parameters on datasets of various sizes, as proven in Desk 2 and three. From the outcomes, we will see that after tuned, AQE performs properly on datasets from zero bytes measurement to TB in measurement, all whereas utilizing a single set of job parameters.¹
From our consequence, it’s clear that AQE can alter the shuffle cut up measurement very near our predefined worth within the Scale back stage and thus generate outputs of goal file measurement as we count on. Moreover, since every shuffle cut up is near predefined worth, we will additionally decrease executor reminiscence from default values to make sure environment friendly useful resource allocation. As an extra massive benefit to the framework, we don’t have to do any particular dealing with to onboard new datasets.
How does Iceberg assist?
In our information ingestion framework, we discovered that we might reap the benefits of Iceberg’s flexibility to outline a number of partition specs to consolidate ingested information over time. Every information file written in a partitioned Iceberg desk belongs to precisely one partition, however we will management the granularity of the partition values over time. Ingested tables write new information with an hourly granularity (ds/hr), and a every day automated course of compresses the information on a every day partition (ds), with out dropping the hourly granularity, which later will be utilized to queries as a residual filter.
Our compaction course of is sensible sufficient to find out whether or not a data-rewrite is required to succeed in an optimum file measurement, in any other case simply rewriting the metadata to assign the already current information information to the every day partition. This has simplified the method for ingesting occasion information and offers a consolidated view of the information to the person throughout the similar desk. As an additional benefit, we’ve realized value financial savings within the total course of with this method.
As proven within the diagram under, within the consolidated Iceberg desk we change the partition spec from ds/hr to ds on the finish of day. As well as, now person queries are simpler to put in writing and capable of entry more energizing information with full historical past. Retaining just one copy of knowledge additionally helps enhance each compute and storage efficiencies and ensures information consistency.
Desk Consolidation Expertise
Consolidating hourly and every day information into one Iceberg desk requires adjustments in each the write and skim path. For the write path, to mitigate the aforementioned points brought on by small information, we power run a compaction in the course of the partition spec change. Tables 4 and 5 examine the statistics from our clever compaction jobs with the price of a full rewrite of all the information information related to the every day partition. For some massive tables we receive useful resource financial savings of > 90% by leveraging Iceberg’s skill to keep away from information copying throughout compaction.
For the learn path, since most information shoppers use Airflow’s partition sensors, we up to date the implementation of partition sensing. Particularly, we carried out a sign system to sense empty partitions in Iceberg tables, versus the prior methodology of trying up every Hive partition as an precise row in Hive metastore.
Evaluating the prior TEZ and Hive stack, we see greater than 50% compute useful resource saving and 40% job elapsed time discount in our information ingestion framework with Spark 3 and Iceberg. From a usability standpoint, we made it less complicated and sooner to eat saved information by leveraging Iceberg’s capabilities for native schema and partition evolution.
On this put up, we shared the upgrades we utilized to Airbnb’s information compute and storage tech stack. We hope that readers loved studying how our occasion information ingestion framework advantages from Adaptive Question Execution and Iceberg and that they think about making use of comparable tech stack adjustments to their use circumstances involving datasets of various measurement and time granularity.
If any such work pursuits you, please try our open roles here!
Particular because of Bruce Jin, Guang Yang, Adam Kocoloski and Jingwei Lu for his or her continued steering and assist!
Additionally numerous because of Mark Giangreco, Surashree Kulkarni and Shylaja Ramachandra for offering edits and nice strategies to the put up!