Part 2 - Orchestrating Snowflake Data Transformations with DBT on Amazon ECS through Apache Airflow
Overview:
In our previous post, we explored the setup of DBT on an ECR private repository through an AWS pipeline. In this blog, our emphasis will be on configuring MWAA and initiating DBT processes using Amazon’s managed Apache Airflow (MWAA). Please find the source code on my GitRepo.
Architecture:
Refer to my previous blog for instructions on configuring DBT on an ECR Private Repository. PART 1
MWAA:
Amazon Managed Workflows allow developers the ability to quickly deploy an Airflow instance on AWS that utilises a combination of other AWS services to optimise the overall set-up.
STEP 1: To execute DBT within Airflow, the initial step is to establish MWAA.
Here are the steps for configuring MWAA:
- 1.Select the S3 bucket from which your MWAA will retrieve the Directed Acyclic Graph (DAG) files.
- 2.Choose the Environment Class according to the number of DAGs run in your environment.
- 3.IAM Role permission for Airflow: The following is a list of IAM permissions necessary to run our Airflow. Given that our Directed Acyclic Graph (DAG) is located in the S3 bucket, the MWAA role inherently has S3 bucket access.
IAM Role for SSM:
Additionally, in our DAG, we’ll be retrieving ECS cluster and Task details from the Parameter Store, which necessitates the required access.
1
2
3
4
5
6
7
8
9
10
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ssm:GetParameter",
"Resource": "*"
}
]
}
IAM Role for ECS Task Execution: Considering that we are making calls to the ECR repository, please ensure that the Task Execution policy has the necessary permissions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
MWAA set up has been completed.
STEP 2: To invoke DBT from Airflow, it is essential to configure the ECS Task definition and cluster.
Set up ECS Cluster and Task definition:
STEP 3: Place your DAG Code in the S3 Bucket.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
import boto3
ssm = boto3.client('ssm')
default_args={
"start_date": days_ago(2),
"owner": "jayaananth",
"email": ["jayaananth@gmail.com"],
"retries": 1,
"retry_delay" :timedelta(minutes=5)
}
with DAG("dbt_scd2_snowflake_ecs_operator", start_date=datetime(2022, 1 ,1),
schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
# Get ECS configuration from SSM parameters
ecs_cluster = str(ssm.get_parameter(Name='/mwaa/ecs/cluster', WithDecryption=True)['Parameter']['Value'])
ecs_task_definition = str(ssm.get_parameter(Name='/mwaa/ecs/task_definition', WithDecryption=True)['Parameter']['Value'])
ecs_subnets = str(ssm.get_parameter(Name='/mwaa/vpc/private_subnets', WithDecryption=True)['Parameter']['Value'])
ecs_security_group = str(ssm.get_parameter(Name='/mwaa/vpc/security_group', WithDecryption=True)['Parameter']['Value'])
##ecs_awslogs_group = str(ssm.get_parameter(Name='/mwaa/cw/log_group', WithDecryption=True)['Parameter']['Value'])
#ecs_awslogs_stream_prefix = str(ssm.get_parameter(Name='/mwaa/cw/log_stream', WithDecryption=True)['Parameter']['Value'])
print(ecs_task_definition)
# Run Docker container via ECS operator
task_model_ecs_operator = EcsRunTaskOperator(
task_id="snowflake_dbt_model_ecs_operator",
dag=dag,
aws_conn_id="aws_default",
cluster=ecs_cluster,
task_definition=ecs_task_definition,
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": "jay-snowflake",
"command": ["dbt","run","--select", "models/emp_fact.sql"]
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": [ecs_security_group],
"subnets": ecs_subnets.split(",") ,
},
},#
# awslogs_group="ecs_awslogs_group",
#awslogs_stream_prefix="ecs_awslogs_stream_prefix"
)
task_snapshot_ecs_operator = EcsRunTaskOperator(
task_id="ecs_snowflake_operator",
dag=dag,
aws_conn_id="aws_default",
cluster=ecs_cluster,
task_definition=ecs_task_definition,
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": "jay-snowflake",
"command": ["dbt","snapshot","--select", "snapshots/scd_emp.sql"]
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": [ecs_security_group],
"subnets": ecs_subnets.split(",") ,
},
},
# awslogs_group="ecs_awslogs_group",
#awslogs_stream_prefix="ecs_awslogs_stream_prefix"
)
task_model_ecs_operator.set_downstream(task_snapshot_ecs_operator)
In the container override section, we will provide the ECR image name and specify the command to execute when Airflow triggers the job.
The DAG will retrieve the Task definition and cluster information from the Systems Manager (SSM) Parameter Store.
STEP 4: Trigger your DAG.
In the image below, you can see your DAG executing the ECS Task function.
Example: In the below example, we will capture the result pre and post after airflow jobs.
Source:
Source Table Employee_Department
Target:
Update the source record to capture the result.
1
2
UPDATE employee SET LAST_NAME='JAYARAM',
updated_at=CURRENT_TIMESTAMP() WHERE emp_no=1;
Target post run:
Advantages of Scheduled DBT Model Deployment for Snowflake Utilising AWS ECS and Airflow:
Automation: Scheduled deployments in AWS ECS and Airflow automate the process of running DBT models, reducing manual intervention and minimising the risk of errors.
Efficiency: Automation saves time and resources, making it more efficient to manage and update your DBT models, which can be particularly useful when dealing with large datasets.
Monitoring: Airflow provides monitoring and logging capabilities, allowing you to track the progress and performance of your DBT tasks, making it easier to troubleshoot issues.
Scheduling Flexibility: You can schedule DBT runs during off-peak hours or based on business requirements, ensuring that the data transformation processes do not impact regular operations.
Error Handling: Airflow enables you to set up error-handling mechanisms, such as notifications or retries, to ensure that your DBT tasks are robust and resilient.
Conclusion:
In this blog, we have explored the construction of Snowflake Data Transformations using DBT on Amazon ECS within the context of Apache Airflow. This approach offers a versatile and all-encompassing structure for organisations to improve their data transformation processes.
Note: This article was originally published on Cevo Australia’s website
If you enjoy the article, Please Subscribe.