In the world of modern data analytics and management, orchestrating complex data pipelines efficiently is paramount. Tools like Apache Airflow and dbt (data build tool) have emerged as popular choices for managing and executing these pipelines seamlessly. In this blog post, we’ll explore how to integrate Airflow with dbt using a dynamic DAG factory approach to streamline your data processing tasks.
Introduction to Airflow and dbt
Apache Airflow: Airflow is an open-source platform created by Airbnb for orchestrating complex workflows and data pipelines. It allows users to define, schedule, and monitor workflows as Directed Acyclic Graphs (DAGs), making it ideal for managing ETL (Extract, Transform, Load) processes and data processing pipelines.
dbt (data build tool): dbt is another open-source tool designed specifically for data analysts and engineers. It enables data transformations directly within the data warehouse and promotes the use of SQL for modeling data. dbt focuses on the transformation layer of the ELT (Extract, Load, Transform) process, providing features for data modeling, testing, and documentation.
The Integration: Airflow DAG Factory with dbt
To illustrate the integration between Airflow and dbt, we’ll utilize a Python script that dynamically generates Airflow DAGs based on the metadata provided by dbt. Let’s break down the key components of this integration:
1. Dynamic DAG Generation
The provided Python script serves as a DAG factory, creating multiple DAGs based on the metadata extracted from dbt. Each DAG corresponds to a specific aspect of the data pipeline, such as staging, incremental updates, snapshots, or full refreshes.
2. Task Operators
Within each generated DAG, task operators are defined to execute dbt commands corresponding to different stages of the data pipeline. These tasks leverage the BashOperator to execute dbt commands, such as running models, snapshots, or seeds.
3. Dependency Management
The script establishes dependencies between tasks based on the relationships defined in the dbt metadata. This ensures that tasks are executed in the correct order, respecting the dependencies between different data models and pipeline stages.
4. Schedule Configuration
Each generated DAG is configured with a schedule interval that aligns with the desired frequency of data processing tasks. This allows for automated execution of the data pipeline according to predefined schedules, ensuring timely updates and data freshness.
Benefits of Integration
1. Scalability and Flexibility
By leveraging Airflow’s dynamic DAG generation capabilities, the integration with dbt provides scalability and flexibility in managing complex data pipelines. New models, transformations, or pipeline stages can be easily added or modified without manual intervention.
2. Enhanced Monitoring and Error Handling
Airflow’s built-in monitoring and logging capabilities enable users to track the execution of data pipeline tasks in real-time. Additionally, error handling features allow for automatic retries and notifications in case of task failures, ensuring robustness and reliability.
3. Standardization and Reproducibility
With dbt’s focus on SQL-based transformations and version-controlled modeling, the integration promotes standardization and reproducibility across data pipeline processes. This facilitates collaboration among data teams and ensures consistency in data transformations.
Below is sample code for a running dag factory, that generates dag based on dbt tagging feature, you might need to adapt the below code based on the Database used.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 # Import necessary libraries and modules
from airflow import DAG, macros
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Parse nodes from dbt manifest
import json
# Define constants
DBT_HOME = "/usr/local/airflow/dags/dbt"
JSON_MANIFEST_DBT = DBT_HOME + "/target/manifest.json"
PARENT_MAP = "parent_map"
def sanitise_node_names(value):
# Function to sanitize node names
segments = value.split(".")
if segments[0] == "model":
return value.split(".")[-1]
if segments[0] == "seed":
return value.split(".")[-1]
if segments[0] == "snapshot":
return value.split(".")[-1]
if segments[0] == "source":
return value.split(".")[-2] + "." + value.split(".")[-1]
def get_node_structure():
# Function to extract node structure from dbt manifest
with open(JSON_MANIFEST_DBT) as json_data:
data = json.load(json_data)
ancestors_data = data[PARENT_MAP]
tree = {}
for node in ancestors_data:
ancestors = list(set(ancestors_data[node]))
ancestors_2 = []
for ancestor in ancestors:
if sanitise_node_names(ancestor) is not None:
ancestors_2.append(sanitise_node_names(ancestor))
clean_node_name = sanitise_node_names(node)
if (clean_node_name is not None) and (ancestors_2 is not None):
tree[clean_node_name] = {}
tree[clean_node_name]["ancestors"] = ancestors_2
tree[clean_node_name]["tags"] = ["na"]
tree[clean_node_name]["freshness"] = ["false"]
if len(clean_node_name.split(".")) < 2:
tree[clean_node_name]["tags"] = data["nodes"][node]["tags"]
for node in ancestors_data:
clean_node_name = sanitise_node_names(node)
if (clean_node_name is not None) and (ancestors_2 is not None):
if (
len(clean_node_name.split(".")) == 2
and data["sources"][node]["freshness"] is not None
):
tree[
data["sources"][node]["source_name"]
+ "."
+ data["sources"][node]["name"]
]["tags"] = ["staging"]
tree[
data["sources"][node]["source_name"]
+ "."
+ data["sources"][node]["name"]
]["freshness"] = ["true"]
return tree
# Define default arguments for DAGs
default_args = {
"owner": "dbt",
"depends_on_past": False,
"start_date": datetime(2022, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=3),
}
# Instantiate DAGs
staging_dag = DAG(
"2_staging_dbt_models",
default_args=default_args,
description="Managing dbt data pipeline",
schedule_interval="30 0 * * *",
catchup=False,
max_active_runs=1,
max_active_tasks=3,
dagrun_timeout=timedelta(minutes=90),
tags=["dbt", "staging"],
)
marts_dag = DAG(
"3_marts_dbt_models",
default_args=default_args,
description="Managing dbt data pipeline",
schedule_interval="0 1,10,18 * * *",
max_active_runs=1,
max_active_tasks=3,
dagrun_timeout=timedelta(minutes=60),
catchup=False,
tags=["dbt", "marts"],
)
init_once_dag = DAG(
"4_init_once_dbt_models",
default_args=default_args,
description="Managing dbt data pipeline",
schedule_interval=None,
params={"model_name": ""},
)
seed_dag = DAG(
"5_seeds_dbt_models",
default_args=default_args,
description="Managing dbt data pipeline",
schedule_interval=None,
)
all_operators = {}
# Get node structure from dbt manifest
nodes = get_node_structure()
# Create operators for each node
for node in nodes:
if "staging" in nodes[node]["tags"] and "false" in nodes[node]["freshness"]:
date_end = "{{ data_interval_end }}"
date_start = "{{ data_interval_start }}"
# Define BashOperator for staging tasks
tmp_operator = BashOperator(
task_id=node,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
'\ndbt run --profiles-dir {dbt_home} --models {nodeName} --vars \'{{"start_date":"{start_date}", "end_date":"{end_date}"}}\' '.format(
dbt_home=DBT_HOME,
nodeName=node,
start_date=date_start,
end_date=date_end,
),
dag=staging_dag,
)
all_operators[node] = tmp_operator
elif "marts" in nodes[node]["tags"] and "incremental" in nodes[node]["tags"]:
date_end = "{{ data_interval_end + macros.timedelta(hours = 2) }}"
date_start = "{{ data_interval_start - macros.timedelta(hours = 8) }}"
variables = '{{"start_date":"{{ data_interval_start }}","end_date":"{{ data_interval_end }}"}}'
# Define BashOperator for incremental updates
tmp_operator = BashOperator(
task_id=node,
pool_slots=1500,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
'\ndbt run --profiles-dir {dbt_home} --models {nodeName} --vars \'{{"start_date":"{start_date}", "end_date":"{end_date}"}}\' '.format(
dbt_home=DBT_HOME,
nodeName=node,
start_date=date_start,
end_date=date_end,
),
dag=marts_dag,
)
all_operators[node] = tmp_operator
elif "true" in nodes[node]["freshness"]:
date_end = "{{ data_interval_end + macros.timedelta(hours = 2) }}"
date_start = "{{ data_interval_start - macros.timedelta(hours = 8) }}"
variables = '{{"start_date":"{{ data_interval_start }}","end_date":"{{ data_interval_end }}"}}'
# Define BashOperator for data freshness checks
tmp_operator = BashOperator(
task_id="freshness_" + node,
pool_slots=1500,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
"\ncd " + DBT_HOME + " &&"
"\ndbt source freshness --profiles-dir {dbt_home} --select source:{nodeName} ".format(
dbt_home=DBT_HOME, nodeName=node
),
dag=staging_dag,
)
all_operators[node] = tmp_operator
elif "snapshot" in nodes[node]["tags"]:
# Define BashOperator for snapshots
tmp_operator = BashOperator(
task_id=node,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
"\ndbt snapshot --select " + node + " --profiles-dir " + DBT_HOME + "",
dag=marts_dag,
)
all_operators[node] = tmp_operator
elif "full-refresh" in nodes[node]["tags"]:
date_end = "{{ ds }}"
date_start = "{{ yesterday_ds }}"
# Define BashOperator for full refreshes
tmp_operator = BashOperator(
task_id=node,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
"\ndbt run --models {{ params.model_name }} --full-refresh --profiles-dir "
+ DBT_HOME
+ "",
dag=init_once_dag,
)
all_operators[node] = tmp_operator
elif "seeds" in nodes[node]["tags"]:
date_end = "{{ ds }}"
date_start = "{{ yesterday_ds }}"
# Define BashOperator for seeding
tmp_operator = BashOperator(
task_id=node,
bash_command="\nexport dbt_env={{ var.json.dbt_env.name}} && "
"\ndbt seed --models " + node + " --profiles-dir " + DBT_HOME + "",
dag=seed_dag,
)
all_operators[node] = tmp_operator
# Assign operator dependencies based on node relationships
for node in nodes:
for parent in nodes[node]["ancestors"]:
if any(x in nodes[parent]["tags"] for x in nodes[node]["tags"]):
all_operators[parent] >> all_operators[node]
Conclusion
Integrating Apache Airflow with dbt offers a powerful solution for managing and executing data pipelines efficiently. By combining Airflow’s workflow orchestration capabilities with dbt’s data transformation features, organizations can streamline their data processing workflows, improve data quality, and accelerate time-to-insights.
In summary, the integration enables:
- Automated execution of data pipelines
- Flexible and scalable pipeline management
- Enhanced monitoring and error handling
- Standardization and reproducibility of data transformations
With this integration, data teams can focus on deriving valuable insights from their data, driving informed decision-making and business success.
To debug your dag please check the following post
For further explanation please don’t hesitate to contact me or drop a comment.
Note: Don’t miss out on the latest updates and insights in data engineering and analytics! Subscribe to our newsletter to stay informed about new blog posts, tutorials, and industry trends. Stay ahead of the curve and unlock the full potential of your data pipeline with our expert tips and best practices. Subscribe now and join our community of data enthusiasts!
Here are some references for Apache Airflow and dbt:
Apache Airflow:
- Official Documentation: Apache Airflow Documentation
- GitHub Repository: Apache Airflow GitHub Repository
- Tutorials and Guides: Airflow Tutorials
dbt (data build tool):
- Official Documentation: dbt Documentation
- GitHub Repository: dbt GitHub Repository
These resources provide comprehensive documentation, tutorials, community support, and insights into using Apache Airflow and dbt effectively for data engineering and analytics workflows.
