Post

Part 2 - Orchestrating Snowflake Data Transformations with DBT on Amazon ECS through Apache Airflow

Overview:

In our previous post, we explored the setup of DBT on an ECR private repository through an AWS pipeline. In this blog, our emphasis will be on configuring MWAA and initiating DBT processes using Amazon’s managed Apache Airflow (MWAA). Please find the source code on my GitRepo.

Architecture:

img-description Architecture

Refer to my previous blog for instructions on configuring DBT on an ECR Private Repository. PART 1

MWAA:

Amazon Managed Workflows allow developers the ability to quickly deploy an Airflow instance on AWS that utilises a combination of other AWS services to optimise the overall set-up.

img-description MWAA

STEP 1: To execute DBT within Airflow, the initial step is to establish MWAA.

Here are the steps for configuring MWAA:

  • 1.Select the S3 bucket from which your MWAA will retrieve the Directed Acyclic Graph (DAG) files.

img-description MWAA S3 Bucket Config

  • 2.Choose the Environment Class according to the number of DAGs run in your environment.

img-description MWAA ENV Config

  • 3.IAM Role permission for Airflow: The following is a list of IAM permissions necessary to run our Airflow. Given that our Directed Acyclic Graph (DAG) is located in the S3 bucket, the MWAA role inherently has S3 bucket access.

IAM Role for SSM:

Additionally, in our DAG, we’ll be retrieving ECS cluster and Task details from the Parameter Store, which necessitates the required access.

1
2
3
4
5
6
7
8
9
10
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "ssm:GetParameter",
            "Resource": "*"
        }
    ]
}

IAM Role for ECS Task Execution: Considering that we are making calls to the ECR repository, please ensure that the Task Execution policy has the necessary permissions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ecr:GetAuthorizationToken",
                "ecr:BatchCheckLayerAvailability",
                "ecr:GetDownloadUrlForLayer",
                "ecr:BatchGetImage",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

MWAA set up has been completed.

STEP 2: To invoke DBT from Airflow, it is essential to configure the ECS Task definition and cluster.

Set up ECS Cluster and Task definition:

img-description AWS ECS

img-description AWS ECS running

STEP 3: Place your DAG Code in the S3 Bucket.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime

from airflow.providers.amazon.aws.operators.ecs import  EcsRunTaskOperator


import boto3


ssm = boto3.client('ssm')

default_args={
        "start_date": days_ago(2),
        "owner": "jayaananth",
        "email": ["jayaananth@gmail.com"],
        "retries": 1,
        "retry_delay" :timedelta(minutes=5)
    }

with DAG("dbt_scd2_snowflake_ecs_operator", start_date=datetime(2022, 1 ,1), 
    schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
    


# Get ECS configuration from SSM parameters
    ecs_cluster               = str(ssm.get_parameter(Name='/mwaa/ecs/cluster', WithDecryption=True)['Parameter']['Value'])
    ecs_task_definition       = str(ssm.get_parameter(Name='/mwaa/ecs/task_definition', WithDecryption=True)['Parameter']['Value'])
    ecs_subnets               = str(ssm.get_parameter(Name='/mwaa/vpc/private_subnets', WithDecryption=True)['Parameter']['Value'])
    ecs_security_group        = str(ssm.get_parameter(Name='/mwaa/vpc/security_group', WithDecryption=True)['Parameter']['Value'])
##ecs_awslogs_group         = str(ssm.get_parameter(Name='/mwaa/cw/log_group', WithDecryption=True)['Parameter']['Value'])
#ecs_awslogs_stream_prefix = str(ssm.get_parameter(Name='/mwaa/cw/log_stream', WithDecryption=True)['Parameter']['Value'])
    print(ecs_task_definition)

# Run Docker container via ECS operator
    task_model_ecs_operator = EcsRunTaskOperator(
        task_id="snowflake_dbt_model_ecs_operator",
        dag=dag,
        aws_conn_id="aws_default",
        cluster=ecs_cluster,
        task_definition=ecs_task_definition,
        launch_type="FARGATE",
        overrides={
          "containerOverrides": [
                {
                  "name": "jay-snowflake",
                  "command": ["dbt","run","--select", "models/emp_fact.sql"]
                },
            ],
        },
        network_configuration={
           "awsvpcConfiguration": {
                "securityGroups": [ecs_security_group],
                "subnets": ecs_subnets.split(",") ,
            },
        },#
        # awslogs_group="ecs_awslogs_group",
        #awslogs_stream_prefix="ecs_awslogs_stream_prefix"
    )
    

    task_snapshot_ecs_operator = EcsRunTaskOperator(
        task_id="ecs_snowflake_operator",
        dag=dag,
        aws_conn_id="aws_default",
        cluster=ecs_cluster,
        task_definition=ecs_task_definition,
        launch_type="FARGATE",
        overrides={
          "containerOverrides": [
                {
                  "name": "jay-snowflake",
                  "command": ["dbt","snapshot","--select", "snapshots/scd_emp.sql"]
                },
            ],
        },
        network_configuration={
            "awsvpcConfiguration": {
                "securityGroups": [ecs_security_group],
                "subnets": ecs_subnets.split(",") ,
            },
        },
        # awslogs_group="ecs_awslogs_group",
        #awslogs_stream_prefix="ecs_awslogs_stream_prefix"
    )
task_model_ecs_operator.set_downstream(task_snapshot_ecs_operator)

In the container override section, we will provide the ECR image name and specify the command to execute when Airflow triggers the job.

The DAG will retrieve the Task definition and cluster information from the Systems Manager (SSM) Parameter Store.

img-description AWS SSM

STEP 4: Trigger your DAG.

img-description MWAA DAG Job

In the image below, you can see your DAG executing the ECS Task function.

img-description AWS ECS Task Function

Example: In the below example, we will capture the result pre and post after airflow jobs.

Source:

img-description Source Table Employee

img-description Source Table Department

img-description Source Table Employee_Department

Target:

img-description Target Emp_fact Table

img-description Target Emp_fact Table

Update the source record to capture the result.

1
2
UPDATE employee SET LAST_NAME='JAYARAM',
 updated_at=CURRENT_TIMESTAMP() WHERE emp_no=1; 

Target post run:

img-description Post Target Emp_fact Table

img-description Post Target SCD_fact Table

img-description Post Target SCD_fact Table

Advantages of Scheduled DBT Model Deployment for Snowflake Utilising AWS ECS and Airflow:

  • Automation: Scheduled deployments in AWS ECS and Airflow automate the process of running DBT models, reducing manual intervention and minimising the risk of errors.

  • Efficiency: Automation saves time and resources, making it more efficient to manage and update your DBT models, which can be particularly useful when dealing with large datasets.

  • Monitoring: Airflow provides monitoring and logging capabilities, allowing you to track the progress and performance of your DBT tasks, making it easier to troubleshoot issues.

  • Scheduling Flexibility: You can schedule DBT runs during off-peak hours or based on business requirements, ensuring that the data transformation processes do not impact regular operations.

  • Error Handling: Airflow enables you to set up error-handling mechanisms, such as notifications or retries, to ensure that your DBT tasks are robust and resilient.

Conclusion:

In this blog, we have explored the construction of Snowflake Data Transformations using DBT on Amazon ECS within the context of Apache Airflow. This approach offers a versatile and all-encompassing structure for organisations to improve their data transformation processes.

Note: This article was originally published on Cevo Australia’s website

If you enjoy the article, Please Subscribe.

This blog page is designed to keep you informed anything and everything about data and to support your career growth

* indicates required

Intuit Mailchimp

If you love the article, Please consider supporting me by buying a coffee for $1.

This post is licensed under CC BY 4.0 by the author.