Creating dynamic sourcing pipelines
In the first of our new technical blog series, we look at why GeoPhy uses Airflow
Airflow is a great tool with endless possibilities for building and scheduling workflows. At GeoPhy we use it to build pipelines dynamically, combining generic and specific components. Our system maximizes reusability and maintainability, by creating each of these pipelines from the same code. It also keeps flexibility over the specific components where you need it.
This article is the first of a three part series describing how GeoPhy uses Airflow to create dynamic sourcing pipelines. In part two we’ll go deep into the metadata framework that we developed. This framework does not come out-of-the-box from Airflow; it enables transparency in data, process control, and data lineage.
In part three we’ll walk you through the pipeline in detail showing how every step works towards generalization of data. It will also show how metadata interacts with the pipeline. And we will describe how we use plain SQL queries to load data in a delta mechanism within a generic set-up. This approach drastically reduces the size of the data while still ensuring that the system reflects all facts. We conclude by showing you how easy it is to build a pipeline in the framework.
Background
Evra is a Geophy web application for commercial real estate investors, lenders, and appraisers. Users enter addresses and Evra displays tons of useful data points regarding demographics, crime, income, COVID score, commuting statistics, and much more – all detailed at a very granular geographic level. The application also displays a map including amenities, transit points, and neighborhood ratings.
All these bits of data tell our customers the value of properties that they are seeking to buy or provide a loan against as well as detail the surrounding neighborhood and metropolitan area. Evra helps them explore new markets, investigate individual properties, and prioritize the investment of more time in their investments.
We recently added a module that aggregates credit metrics of the tenants in a single apartment building. The application displays data on several different geographic levels: small (e.g. neighborhood), medium (e.g. metro) and high levels (e.g. national).
The data has both high resolution (points) and large coverage. We also gather data from a large number of sources that the application uses to calculate a valuation. With every new data release, we retrain the machine learning model that does these calculations. So we need to properly manage each of the various sources we use as model inputs.
This series describes the data flow from external sources to storing data in the historical layer.
Requirements
For sourcing purposes, our team must get the data as soon as the source publishes it, irrespective of frequency of delivering data to the application or training. (The data is also used for analytical purposes and exploration.)
In managing the data delivery, our team must be in full control over the data flow so we can guarantee the quality. Recall that there are six dimensions to data quality: integrity, completeness, timeliness, validity, consistency, and accuracy.
To keep performance high, we will split the quality checking into two separate processes. The principle is to keep the pipelines as light as possible. We’ll only cover items that are necessary for runtime. These are:
- Completeness: ensure that input is reflected in output
- Timeliness: ensure we have latest data as soon as possible
- Validity: validate that data has no critical errors
- Consistency: by design the data is semantically consistent (pre-runtime)
The second process verifies quality checks after runtime. It can be executed independent of the sourcing pipeline.
As the company is growing, we add new data sources rapidly. The team must be able to swiftly bring that new data into an ‘exploration zone’.
We have five requirements for our extract, transform, and load (ETL) system:
- Our workflows must be fully automated. (Set it and forget it)
- Our workflows must be idempotent.
- We must be able to quickly answer questions about the data
- We must keep the raw data and deliver both the most fresh as well as historical data
- We must be able to add new sources quickly and cleanly.
Scheduler tool
We have considered several options, including Luigi and Airflow. Some GeoPhy engineers already had experience with Luigi. One of its strengths is building complex pipelines. Unfortunately, it lacks a scheduler, requiring us either to trigger them manually or to create CRON jobs that trigger them, incurring a lot of overhead.
Airflow also builds data pipelines. It defines the pipelines as Directed Acyclic Graphs (DAGs) in Python scripts. The tasks, dependencies, and order of execution are defined in these scripts. Airflow also allows you to monitor workflows, look up logs, and tons of other interesting stuff.
Airflow has an active community with a large number of commits to GitHub adding new features and updating existing ones. Plenty of tutorials exist on the usual learning platforms, making it easy to get started with Airflow. At time of writing, Airflow just released version 2 with many new features.
Metadata
Metadata is data that is describing other data. As an example: metadata is the name of the file, download date, and location of a dataset that you download.
Airflow keeps track of metadata in its backend about the pipeline runs (DAG run). The metadata from Airflow is used to run the application and look up information about the state of the pipeline. Airflow does not keep track of data flowing through the pipelines. This requirement is one that we need to build in-house.
We have two use-cases for metadata:
- Metadata describes our data, giving us quick and useful insights. Some examples include: how much data is loaded, where is it stored, when did the process run, and completeness. Metadata helps us quickly get answers on data-related questions and in debugging. We also use it to monitor data that flows through the pipelines.
- Metadata controls the process. All data deliveries get a link to the metadata to orchestrate the data load. The state of the delivery is recorded in the metadata, enabling fine grained and automatic orchestration, as well as data lineage.
Raw and historical data
Our customers – lenders, investors, and appraisers – face heavy regulation. We provide our customers valuations and the data they need to affirm valuations for real estate properties.
We use data lineage to assure customers that we provide the correct inputs for GeoPhy’s model and the application. Data lineage tracks data from output, with transformations, back to input. The output is the historical layer, and the input is the raw data. We need to keep (and link) them.
The raw data is not suitable for exploration, querying, or analysis because different external sources come in different formats. It is complicated and computationally heavy to link them together on the fly. So, we produce a unified historical layer of the data that reflects all facts with attached time and location. The data is still not usable in all cases, however, the ‘purposeless’ unification makes it very easy to use for all above mentioned use-cases. It needs an extra transformation to get the data in the right shape for any of the above mentioned cases.. The unification is there to provide this flexibility.
Pipeline overview
Our generic pipeline is very simple and has very straightforward ETL steps. In this section I am giving a brief overview of all stages.
Generic sourcing pipeline in Airflow
Test and build
The first time it runs, a pipeline writes all configuration parameters into three metadata tables. These tables contain a) static information about the source, b) how the pipeline is executed, and c) the source and target schema. Furthermore, the pipeline creates all required tables for the staging, data quality, and historical layers. The pipeline skips this step whenever it recognizes that metadata and tables are already in place.
Extract
In the extract process, a source-specific component takes all files from external sources into account. The function compares the files with already downloaded files and downloads the new files into the raw data storage which is an S3 bucket.
Wrangle, Run Quality Check and Delta Load
In our approach, we process data into the historical layer in a two-step process. We do this for two reasons:
- It’s beneficial to conduct an essential Run Quality Check on the data before we load it into the historical layer; if we identify any critical issues, we can halt the process and analyze the issue.
- The sources deliver the data in many different formats, yet the historical layer stores data according to a standardized format. Standardization and defining changes in a single step is too complicated. Wrangling standardizes the data into key/value format and loads data to staging. Delta Load identifies the changes (new, changed, and removed) by comparing them against the current state in the historical layer.
Skeleton of Historical layer
The historical layer has a few generic columns in each table.
id: (part of primary key) The id is the technical key, a hash (md5) of the record including column headers and source name to avoid collisions (at least on input side).
delivery_sqn: (part of primary key) The delivery sequence is a reference to the metadata. This has additional data describing a dataset and links to other relevant metadata tables.
change_indicator: The change indicator shows that a record is Created, Updated, or Deleted.
ingestion_date: The date when the record got ingested in the historical layer.
Reproducibility of the historical layer
With a data bucket that retains all historical data, it is easy to rebuild the historical layer. We can safely drop the historical layer, the staging layer, data quality tables, and the metadata. The only data that is not removed is the raw data. We have even built a DAG that programmatically drops all components for a pipeline. With the orchestration mechanism, we can run the DAG again and process all data in the correct order. We only have to trigger it once. For development purposes, this approach speeds up the iterations for developing a pipeline.
We also require this functionality in production for two reasons:
- We need to be able to drop and reload data in the historical layer (without running manual queries on a Production environment) if data is corrupted.
- If there is a schema change (e.g. new column) the existing hashed keys are referencing the old schema. A new column (no matter if it’s empty) will affect the MD5 hash and is therefore invalid.
Conclusion
In this blog we outlined the reasons to start the project. We have elaborated on the requirements like automation, transparency and time-to-market. We have underpinned our tool and design choices and looked at the pipeline from 5,000 feet. In our next post, we’ll take a look at our implementation’s architecture and metadata.
————————–
This series has three parts.
Part 1: We outline the reasons to start this project.
Part 2: We elaborate on the implementation’s architecture and metadata.
Part 3: We examine automation, the pipeline in detail, and delta load.