Creating Dynamic Sourcing Pipelines, Part II
In the second of our new technical blog series, we dive into the architectural implementation and the function of metadata
In part one of this series, we discussed the background and reasons for creating dynamic sourcing pipelines – including an overview of the pipeline components. In this part, we will describe the architecture and metadata in more detail. Let’s get started!
Architectural Overview
Our extract, transform, and load system (ETL) is written in Python, which is well known by GeoPhy’s engineers. Airflow is also built in python, hence easy for us to extend. For a similar reason, we chose PostgreSQL for the database technology. Postgres also shines for geospatial data, our most common type of data.
A Python/Postgres stack may be relatively boring, but that’s a good thing here. Boring means proven. Boring does the trick. We don’t need to look for cutting-edge technology. For our data sources and use-case, boring gives good performance.
The architecture has four main components:
Aurora Database
We chose the Aurora (AWS) database for the Airflow backend, the metadata, and the data produced by pipelines.
Kubernetes
Our application Airflow runs on Kubernetes, an orchestration engine that abstracts one or more servers (nodes) to a single platform. That platform can deploy, scale, and manage containerized applications.
The deployable units are called pods, which have one or more containers. A container is a unit of software packaging code and the dependencies it needs to run the application. We deploy the Airflow web server and scheduler in separate pods.
Airflow taps KubernetesExecutor to execute the tasks in pods. When a task is finished, the pod ceases to exist. This powerful and resilient implementation easily scales up resources on demand. Kubernetes is resilient because whenever a node dies, it automatically restarts the Airflow application on a new node. These are really some important abstractions that come out-of-the-box from Kubernetes.
Gitlab repositories
The GitLab repositories are the golden source for code. Teams push code to the team repo. The repo has at least two branches (master and development). To build/test code, a developer can push the code to the development branch. A push triggers the Continuous Integration pipeline (CI-pipeline), and code gets synchronized to the S3 code bucket on the development environment. Once code is reviewed and approved it gets promoted to master (Production) and a similar CI-pipeline ships code to the Production environment.
S3 Buckets
The code bucket has two main functions:
- It acts as a collection point where all Directed Acyclic Graphs (DAGs) are collected in a single file structure.
- The file structure is mounted inside the pods on Kubernetes. Every 30 seconds the pods get synchronized with the bucket.
Tasks have an ephemeral character. If a pod ceases to exist, data in it is lost. The data produced by the pod must be stored external. The logs bucket receives the logs coming from task pods. The user can retrieve the logs through the Airflow UI.
————-
Airflow is a shared responsibility between the Dev teams and the Ops team. Ops is responsible for all infrastructure components to be available and functional. The Dev teams are responsible for the code they run on the environment.
Metadata
The metadata plays a key role in the implementation. The metadata tables are divided into two groups: Static and Process.
- The static metadata tables are the green tables in the graph and they are loaded the first time a source runs.
- The process metadata tables are yellow and they are loaded in each process.
This how our metadata model looks:
Static
Source
The source table represents data at the source
level. Each row in the source table maps to a DAG and has one target table in the historical layer. This mapping is clear. The abstraction of a source has a loose interpretation. The engineer has some freedom to choose how to define a source. For instance: he can choose to combine multiple time series in an API into a single DAG or split it out over multiple DAGs. One important detail here: Time series must share the same key and preferably have similar characteristics!
The source
table defines the schedule, the quality checks to perform, the owner of the pipeline, the geographical granularity of the data, a reference to the specific wrangle/extract functions, and how we load data (new, changed, and/or removed) into the historical layer.
Attribute (schema)
The attribute
table maps source attributes to target attributes. In the historical layer everything is named by internal standardized conventions (target attributes). The attribute
table maps the source_name
to target_name
and includes a specified target_datatype
and key
attributes.
Source attribute
The source_attribute
keeps information that is required to extract data from the source. For instance: specific API parameters to filter, endpoints, and source mappings. We do not store API keys! They live in Kubernetes secrets.
Process
Delivery
A delivery
represents a delivered dataset. This can be a file or a group of files. The data and metadata is linked with the delivery table. Every record in the historical layer has a delivery_sqn
. The link to S3 is the delivery_filename
. Other important attributes are the delivery type (delta or snapshot), the delivery_date
and the process_date
.
Delivery Status
The delivery_status
table captures the status of a specific layer (e.g. file X has successfully been stored on S3). The relevant attributes are file size and number of records. For every stage in the process, the number of records is counted (with a zip file on S3 this is not possible). The record count and status tracking enables monitoring completeness. It orchestrates the process as we will see in part 3.
Data quality checks
The data_quality_checks
table stores the result of the performed quality checks that are executed at runtime. The data quality checks are performed per delivery. Some of the checks that need to be performed are: check for duplicate keys, duplicate rows and null values in key. These data quality checks will cause critical errors and this should halt the process. To make it explicit, records with null values in key columns will ‘fall-out’ and do not show up in the historical layer.
Idempotency in pipeline stages
Once a DAG run is triggered it will take into account all available data (processed and unprocessed). It will call the download page and retrieve all file names. Then per source there is logic defined whether a file has already been downloaded. This information is stored in the delivery table with the delivery_filename.
We compare the available files with the already downloaded files. The result is a set of files that is available and not yet processed. The new files will be processed and metadata will be inserted to ensure that with the next DAG run, these files are skipped. The logic (compare processed vs. available) to identify unprocessed data is applied along all steps (raw, staging, DQ, and historical) in the pipeline. No matter where a DAG fails, it will always pick-up unprocessed deliveries in any stage. We’ll dive into all the details in the next part!
Conclusion
We have discussed the architectural implementation and how all components interact. We have set the stage for the metadata and explained the function of each table. With this knowledge we are ready to break down the generic pipeline in the next part.
————————–
The series has three parts.
Part 1: We outline the reasons to start this project.
Part 2: We elaborate on the implementation’s architecture and metadata.
Part 3: We examine automation, the pipeline in detail, and delta load.