Prepared-to-go pattern information pipelines with Dataflow | by Netflix Know-how Weblog | Dec, 2022

by Jasmine Omeke, Obi-Ike Nwoke, Olek Gorajek
This put up is for all information practitioners, who’re involved in studying about bootstrapping, standardization and automation of batch information pipelines at Netflix.
You might bear in mind Dataflow from the put up we wrote final 12 months titled Information pipeline asset administration with Dataflow. That article was a deep dive into one of many extra technical points of Dataflow and didn’t correctly introduce this device within the first place. This time we’ll attempt to give justice to the intro after which we are going to give attention to one of many very first options Dataflow got here with. That function is named pattern workflows, however earlier than we begin in let’s have a fast take a look at Dataflow usually.
Dataflow
Dataflow is a command line utility constructed to enhance expertise and to streamline the information pipeline growth at Netflix. Try this excessive stage Dataflow assist command output beneath:
$ dataflow --help
Utilization: dataflow [OPTIONS] COMMAND [ARGS]...Choices:
--docker-image TEXT Url of the docker picture to run in.
--run-in-docker Run dataflow in a docker container.
-v, --verbose Permits verbose mode.
--version Present the model and exit.
--help Present this message and exit.
Instructions:
migration Handle schema migration.
mock Generate or validate mock datasets.
mission Handle a Dataflow mission.
pattern Generate totally useful pattern workflows.
As you possibly can see, the Dataflow CLI is split into 4 important topic areas (or instructions). Essentially the most generally used one is dataflow mission, which helps of us in managing their information pipeline repositories via creation, testing, deployment and few different actions.
The dataflow migration command is a particular function, developed single handedly by Stephen Huenneke, to completely automate the communication and monitoring of an information warehouse desk adjustments. Because of the Netflix inside lineage system (constructed by Girish Lingappa) Dataflow migration can then allow you to establish downstream utilization of the desk in query. And at last it may well allow you to craft a message to all of the homeowners of those dependencies. After your migration has began Dataflow may also maintain monitor of its progress and allow you to talk with the downstream customers.
Dataflow mock command is one other standalone function. It permits you to create YAML formatted mock information recordsdata primarily based on chosen tables, columns and some rows of knowledge from the Netflix information warehouse. Its important goal is to allow simple unit testing of your information pipelines, however it may well technically be utilized in another conditions as a readable information format for small information units.
All of the above instructions are very more likely to be described in separate future weblog posts, however proper now let’s give attention to the dataflow pattern command.
Dataflow pattern workflows is a set of templates anybody can use to bootstrap their information pipeline mission. And by “pattern” we imply “an instance”, like meals samples in your native grocery retailer. One of many important causes this function exists is rather like with meals samples, to present you “a style” of the manufacturing high quality ETL code that you possibly can encounter contained in the Netflix information ecosystem.
All of the code you get with the Dataflow pattern workflows is totally useful, adjusted to your setting and remoted from different pattern workflows that others generated. This pipeline is secure to run the second it exhibits up in your listing. It’s going to, not solely, construct a pleasant instance mixture desk and fill it up with actual information, however it’ll additionally current you with an entire set of really useful elements:
- clear DDL code,
- correct desk metadata settings,
- transformation job (in a language of selection) wrapped in an optionally available WAP (Write, Audit, Publish) sample,
- pattern set of knowledge audits for the generated information,
- and a completely useful unit check on your transformation logic.
And final, however not least, these pattern workflows are being examined repeatedly as a part of the Dataflow code change protocol, so you possibly can make sure that what you get is working. That is one technique to construct belief with our inside person base.
Subsequent, let’s take a look on the precise enterprise logic of those pattern workflows.
Enterprise Logic
There are a number of variants of the pattern workflow you may get from Dataflow, however all of them share the identical enterprise logic. This was a aware resolution with a purpose to clearly illustrate the distinction between numerous languages wherein your ETL could possibly be written in. Clearly not all instruments are made with the identical use case in thoughts, so we’re planning so as to add extra code samples for different (than classical batch ETL) information processing functions, e.g. Machine Studying mannequin constructing and scoring.
The instance enterprise logic we use in our template computes the highest hundred motion pictures/exhibits in each nation the place Netflix operates each day. This isn’t an precise manufacturing pipeline operating at Netflix, as a result of it’s a extremely simplified code but it surely serves nicely the aim of illustrating a batch ETL job with numerous transformation levels. Let’s assessment the transformation steps beneath.
Step 1: each day, incrementally, sum up all viewing time of all motion pictures and exhibits in each nation
WITH STEP_1 AS (
SELECT
title_id
, country_code
, SUM(view_hours) AS view_hours
FROM some_db.source_table
WHERE playback_date = CURRENT_DATE
GROUP BY
title_id
, country_code
)
Step 2: rank all titles from most watched to least in each county
WITH STEP_2 AS (
SELECT
title_id
, country_code
, view_hours
, RANK() OVER (
PARTITION BY country_code
ORDER BY view_hours DESC
) AS title_rank
FROM STEP_1
)
Step 3: filter all titles to the highest 100
WITH STEP_3 AS (
SELECT
title_id
, country_code
, view_hours
, title_rank
FROM STEP_2
WHERE title_rank <= 100
)
Now, utilizing the above easy 3-step transformation, we are going to produce information that may be written to the next Iceberg desk:
CREATE TABLE IF NOT EXISTS $TARGET_DB.dataflow_sample_results (
title_id INT COMMENT "Title ID of the film or present."
, country_code STRING COMMENT "Nation code of the playback session."
, title_rank INT COMMENT "Rank of a given title in a given nation."
, view_hours DOUBLE COMMENT "Whole viewing hours of a given title in a given nation."
)
COMMENT
"Instance dataset delivered to you by Dataflow. For extra info on this
and different examples please go to the Dataflow documentation web page."
PARTITIONED BY (
date DATE COMMENT "Playback date."
)
STORED AS ICEBERG;
As you possibly can infer from the above desk construction we’re going to load about 19,000 rows into this desk each day. And they’ll look one thing like this:
sql> SELECT * FROM foo.dataflow_sample_results
WHERE date = 20220101 and country_code = 'US'
ORDER BY title_rank LIMIT 5;title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
11111111 | US | 1 | 123 | 20220101
44444444 | US | 2 | 111 | 20220101
33333333 | US | 3 | 98 | 20220101
55555555 | US | 4 | 55 | 20220101
22222222 | US | 5 | 11 | 20220101
(5 rows)
With the enterprise logic out of the best way, we will now begin speaking in regards to the elements, or the boiler-plate, of our pattern workflows.
Elements
Let’s take a look at the commonest workflow elements that we use at Netflix. These elements could not match into each ETL use case, however are used usually sufficient to be included in each template (or pattern workflow). The workflow creator, in spite of everything, has the ultimate phrase on whether or not they wish to use all of those patterns or maintain just some. Both means they’re right here to begin with, able to go, if wanted.
Workflow Definitions
Beneath you possibly can see a typical file construction of a pattern workflow bundle written in SparkSQL.
.
├── backfill.sch.yaml
├── each day.sch.yaml
├── important.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
Above bolded recordsdata outline a collection of steps (a.okay.a. jobs) their cadence, dependencies, and the sequence wherein they need to be executed.
That is a technique we will tie elements collectively right into a cohesive workflow. In each pattern workflow bundle there are three workflow definition recordsdata that work collectively to supply versatile performance. The pattern workflow code assumes a each day execution sample, however it is rather simple to regulate them to run at completely different cadence. For the workflow orchestration we use Netflix homegrown Maestro scheduler.
The important workflow definition file holds the logic of a single run, on this case one day-worth of knowledge. This logic consists of the next elements: DDL code, desk metadata info, information transformation and some audit steps. It’s designed to run for a single date, and meant to be referred to as from the each day or backfill workflows. This important workflow will also be referred to as manually throughout growth with arbitrary run-time parameters to get a really feel for the workflow in motion.
The each day workflow executes the important one each day for the predefined variety of earlier days. That is generally needed for the aim of catching up on some late arriving information. That is the place we outline a set off schedule, notifications schemes, and replace the “high water mark” timestamps on our goal desk.
The backfill workflow executes the important for a specified vary of days. That is helpful for restating information, most frequently due to a metamorphosis logic change, however generally as a response to upstream information updates.
DDL
Typically, step one in an information pipeline is to outline the goal desk construction and column metadata through a DDL assertion. We perceive that some of us select to have their output schema be an implicit results of the remodel code itself, however the specific assertion of the output schema just isn’t solely helpful for including desk (and column) stage feedback, but in addition serves as one technique to validate the remodel logic.
.
├── backfill.sch.yaml
├── each day.sch.yaml
├── important.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
Typically, we choose to execute DDL instructions as a part of the workflow itself, as an alternative of operating outdoors of the schedule, as a result of it simplifies the event course of. See beneath instance of hooking the desk creation SQL file into the important workflow definition.
- job:
id: ddl
kind: Spark
spark:
script: $S3./ddl/dataflow_sparksql_sample.sql
parameters:
TARGET_DB: $TARGET_DB
Metadata
The metadata step supplies context on the output desk itself in addition to the information contained inside. Attributes are set through Metacat, which is a Netflix inside metadata administration platform. Beneath is an instance of plugging that metadata step within the important workflow definition
- job:
id: metadata
kind: Metadata
metacat:
tables:
- $CATALOG/$TARGET_DB/$TARGET_TABLE
proprietor: $username
tags:
- dataflow
- pattern
lifetime: 123
column_types:
date: pk
country_code: pk
rank: pk
Transformation
The transformation step (or steps) might be executed within the developer’s language of selection. The instance beneath is utilizing SparkSQL.
.
├── backfill.sch.yaml
├── each day.sch.yaml
├── important.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
Optionally, this step can use the Write-Audit-Publish pattern to make sure that information is appropriate earlier than it’s made out there to the remainder of the corporate. See instance beneath:
- template:
id: wap
kind: wap
tables:
- $CATALOG/$DATABASE/$TABLE
write_jobs:
- job:
id: write
kind: Spark
spark:
script: $S3./src/sparksql_write.sql
Audits
Audit steps might be outlined to confirm information high quality. If a “blocking” audit fails, the job will halt and the write step just isn’t dedicated, so invalid information won’t be uncovered to customers. This step is optionally available and configurable, see a partial instance of an audit from the important workflow beneath.
data_auditor:
audits:
- operate: columns_should_not_have_nulls
blocking: true
params:
desk: $TARGET_TABLE
columns:
- title_id
…
Excessive-Water-Mark Timestamp
A profitable write will sometimes be adopted by a metadata name to set the legitimate time (or high-water mark) of a dataset. This permits different processes, consuming our desk, to be notified and begin their processing. See an instance excessive water mark job from the important workflow definition.
- job:
id: hwm
kind: HWM
metacat:
desk: $CATALOG/$TARGET_DB/$TARGET_TABLE
hwm_datetime: $EXECUTION_DATE
hwm_timezone: $EXECUTION_TIMEZONE
Unit Checks
Unit check artifacts are additionally generated as a part of the pattern workflow construction. They consist of knowledge mocks, the precise check code, and a easy execution harness relying on the workflow language. See the bolded file beneath.
.
├── backfill.sch.yaml
├── each day.sch.yaml
├── important.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
These unit assessments are supposed to check one “unit” of knowledge remodel in isolation. They are often run throughout growth to shortly seize code typos and syntax points, or throughout automated testing/deployment part, to be sure that code adjustments haven’t damaged any assessments.
We wish unit assessments to run shortly in order that we will have steady suggestions and quick iterations through the growth cycle. Operating code towards a manufacturing database might be sluggish, particularly with the overhead required for distributed information processing programs like Apache Spark. Mocks let you run assessments domestically towards a small pattern of “actual” information to validate your transformation code performance.
Languages
Over time, the extraction of knowledge from Netflix’s supply programs has grown to embody a wider vary of end-users, corresponding to engineers, information scientists, analysts, entrepreneurs, and different stakeholders. Specializing in comfort, Dataflow permits for these differing personas to go about their work seamlessly. A lot of our information customers make use of SparkSQL, pyspark, and Scala. A small however rising contingency of knowledge scientists and analytics engineers use R, backed by the Sparklyr interface or different information processing instruments, like Metaflow.
With an understanding that the information panorama and the applied sciences employed by end-users will not be homogenous, Dataflow creates a malleable path ahead. It solidifies completely different recipes or repeatable templates for information extraction. Inside this part, we’ll preview a number of strategies, beginning with sparkSQL and python’s method of making information pipelines with dataflow. Then we’ll segue into the Scala and R use circumstances.
To start, after putting in Dataflow, a person can run the next command to grasp get began.
$ dataflow pattern workflow --help
Dataflow (0.6.16)Utilization: dataflow pattern workflow [OPTIONS] RECIPE [TARGET_PATH]
Create a pattern workflow primarily based on chosen RECIPE and land it within the
specified TARGET_PATH.
Presently supported workflow RECIPEs are: spark-sql, pyspark,
scala and sparklyr.
If TARGET_PATH:
- if not specified, present listing is assumed
- factors to a listing, will probably be used because the goal location
Choices:
--source-path TEXT Supply path of the pattern workflows.
--workflow-shortname TEXT Workflow brief title.
--workflow-id TEXT Workflow ID.
--skip-info Skip the data in regards to the workflow pattern.
--help Present this message and exit.
As soon as once more, let’s assume we’ve a listing referred to as stranger-data wherein the person creates workflow templates in all 4 languages that Dataflow gives. To higher illustrate generate the pattern workflows utilizing Dataflow, let’s take a look at the total command one would use to create one in all these workflows, e.g:
$ cd stranger-data
$ dataflow pattern workflow spark-sql ./sparksql-workflow
By repeating the above command for every kind of transformation language we will arrive on the following listing construction:
.
├── pyspark-workflow
│ ├── important.sch.yaml
│ ├── each day.sch.yaml
│ ├── backfill.sch.yaml
│ ├── ddl
│ │ └── ...
│ ├── src
│ │ └── ...
│ └── tox.ini
├── scala-workflow
│ ├── construct.gradle
│ └── ...
├── sparklyR-workflow
│ └── ...
└── sparksql-workflow
└── ...
Earlier we talked in regards to the enterprise logic of those pattern workflows and we confirmed the Spark SQL model of that instance information transformation. Now let’s talk about completely different approaches to writing the information in different languages.
PySpark
This partial pySpark code beneath may have the identical performance because the SparkSQL instance above, but it surely makes use of Spark dataframes Python interface.
def important(args, spark):source_table_df = spark.desk(f"some_db.source_table)
viewing_by_title_country = (
source_table_df.choose("title_id", "country_code",
"view_hours")
.filter(col("date") == date)
.filter("title_id IS NOT NULL AND view_hours > 0")
.groupBy("title_id", "country_code")
.agg(F.sum("view_hours").alias("view_hours"))
)
window = Window.partitionBy(
"country_code"
).orderBy(col("view_hours").desc())
ranked_viewing_by_title_country = viewing_by_title_country.withColumn(
"title_rank", rank().over(window)
)
ranked_viewing_by_title_country.filter(
col("title_rank") <= 100
).withColumn(
"date", lit(int(date))
).choose(
"title_id",
"country_code",
"title_rank",
"view_hours",
"date",
).repartition(1).write.byName().insertInto(
target_table, overwrite=True
)
Scala
Scala is one other Dataflow supported recipe that gives the identical enterprise logic in a pattern workflow out of the field.
bundle com.netflix.sparkobject ExampleApp
import spark.implicits._
def readSourceTable(sourceDb: String, dataDate: String): DataFrame =
spark
.desk(s"$someDb.source_table")
.filter($"playback_start_date" === dataDate)
def viewingByTitleCountry(sourceTableDF: DataFrame): DataFrame =
sourceTableDF
.choose($"title_id", $"country_code", $"view_hours")
.filter($"title_id".isNotNull)
.filter($"view_hours" > 0)
.groupBy($"title_id", $"country_code")
.agg(F.sum($"view_hours").as("view_hours"))
def addTitleRank(viewingDF: DataFrame): DataFrame =
viewingDF.withColumn(
"title_rank", F.rank().over(
Window.partitionBy($"country_code").orderBy($"view_hours".desc)
)
)
def writeViewing(viewingDF: DataFrame, targetTable: String, dataDate: String): Unit =
viewingDF
.choose($"title_id", $"country_code", $"title_rank", $"view_hours")
.filter($"title_rank" <= 100)
.repartition(1)
.withColumn("date", F.lit(dataDate.toInt))
.writeTo(targetTable)
.overwritePartitions()
def important():
sourceTableDF = readSourceTable("some_db", "source_table", 20200101)
viewingDf = viewingByTitleCountry(sourceTableDF)
titleRankedDf = addTitleRank(viewingDF)
writeViewing(titleRankedDf)
R / sparklyR
As Netflix has a rising cohort of R customers, R is the most recent recipe out there in Dataflow.
suppressPackageStartupMessages(
library(sparklyr)
library(dplyr)
)...
important <- operate(args, spark)
title_df <- tbl(spark, g("some_db.source_table"))
title_activity_by_country <- title_df
important(args = args, spark = spark)