End-to-End Data Engineering project with dbt Core , Snowflake and Apache Airflow

End-to-End Data Engineering project with dbt Core , Snowflake and Apache Airflow

In this blog, we will walk through an end-to-end data engineering project using dbt, Snowflake, and Apache Airflow. We’ll cover everything from setting up the environment to orchestrating the data pipeline and following best practices in data engineering.

Snowflake setup:

Create a Free Snowflake Account

If you don’t have Snowflake yet, sign up for a free trial:
🔗 https://signup.snowflake.com/

You’ll get $400 in free credits and a small compute warehouse to start.

Log in to https://app.snowflake.com/

Connect dbt Core with Snowflake

This guide will walk you through configuring dbt Core to work with Snowflake so you can run SQL transformations.

Prerequisites

Before proceeding, ensure you have:

  • Snowflake Account (with a database, schema, and warehouse created)
  • Python installed (dbt Core requires Python 3.7+)
  • dbt Core installed in a virtual environment

Create a dedicated DBT User in Snowflake

(A) Log in to Snowflake

Use the Snowflake Web UI or SQL client.

(B) Create a dbt User

Run the following SQL commands:

-- Create a new user for dbt
CREATE USER dbt_user PASSWORD = 'YourSecurePassword' 
LOGIN_NAME = 'dbt_user' 
DEFAULT_ROLE = ACCOUNTADMIN 
MUST_CHANGE_PASSWORD = FALSE;

-- Grant role to user
GRANT ROLE ACCOUNTADMIN TO USER dbt_user;

Grant Database, Schema & Warehouse Access

-- Grant access to the database
GRANT USAGE ON DATABASE finance_db TO ROLE ACCOUNTADMIN;
GRANT USAGE ON SCHEMA finance_db.raw TO ROLE ACCOUNTADMIN;

-- Grant warehouse access
GRANT USAGE ON WAREHOUSE finance_wh TO ROLE ACCOUNTADMIN;

-- Grant table access
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA finance_db.raw TO ROLE ACCOUNTADMIN;

Install dbt Core and Snowflake Adapter

Ensure you have dbt Core installed in a virtual environment:

(A) Create and Activate Virtual Environment

python3 -m venv dbt-env
source dbt-env/bin/activate  # (For Mac/Linux)
dbt-env\Scripts\activate     # (For Windows)

(B) Install dbt Core & Snowflake Adapter

pip install dbt-core dbt-snowflake

Configure dbt Profile for Snowflake

(A) Edit the profiles.yml File

The profiles.yml file is located at:

vi  ~/.dbt/profiles.yml

(B) Add Snowflake Configuration to profiles.yml

Edit the file and add:

Get the Current User

To check which user is logged into Snowflake:

SELECT CURRENT_USER();

Get the Current Account

To retrieve the Snowflake account identifier:

SELECT lower(CURRENT_REGION()), lower(CURRENT_ACCOUNT());

add snowflake configuration in profiles.yml

ecommerce_project:
  outputs:
    dev:
      type: snowflake
      account: "<your_snowflake_account>.snowflakecomputing.com"
      user: "dbt_user"
      password: "YourSecurePassword"
      role: "ACCOUNTADMIN"
      database: "finance_db"
      warehouse: "finance_wh"
      schema: "raw"
      threads: 4
  target: dev

Test the Connection

Run dbt Debug

dbt debug

Define Sources

  1. Create a new file: models/sources.yml
  2. Define the source tables like this:
version: 2

sources:
  - name: raw_data
    database: finance_db    # Your Snowflake database
    schema: raw             # Schema where raw data is stored
    tables:
      - name: customers
      - name: orders
      - name: order_items
      - name: products

Step 3: Create Staging Models

Example: models/staging/stg_customers.sql

SELECT 
    id AS customer_id, 
    name AS customer_name ,
    email, 
    country
FROM {{ source('raw_data', 'customers') }}

Example: models/staging/stg_orders.sql

SELECT 
    id AS order_id, 
    customer_id, 
    order_date, 
    status AS order_status
FROM {{ source('raw_data', 'orders') }}

Example: models/staging/stg_products.sql

SELECT
    id AS product_id,
    name AS product_name,
    category AS product_category,
    price AS product_price
FROM {{ source('raw_data', 'products') }}

Example: models/staging/stg_order_items.sql

SELECT
    id AS item_id,
    order_id,
    product_id,
    quantity,
    unit_price, 
    (quantity * unit_price) AS total_price 
FROM {{ source('raw_data', 'order_items') }}

Example: models/marts/fct_daily_orders.sql

select
    order_date, 
    O.order_id,
    sum(total_price) AS total_price  
from 
    {{ref('stg_orders')}} O 
LEFT JOIN {{ref('stg_order_items')}} OI
ON O.ORDER_ID=OI.ORDER_ID
GROUP BY 1,2

create test files

version: 2

models:
  - name: stg_orders
    columns:
      - name: order_status
        tests:
          - accepted_values:
              values: ['Completed','Pending', 'Cancelled']
  - name: stg_cutomers
    columns:
      - name: customer_id
        tests:
        - unique

dbt test
dbt docs generate
dbt docs serve --port 8088

How to Use Apache Airflow with dbt 🚀

Apache Airflow is great for scheduling and orchestrating dbt runs. Here’s how you can integrate them step by step:

Install Apache Airflow

If you haven’t installed Airflow yet, set it up using the following:

pip install apache-airflow
pip install apache-airflow-providers-snowflake  # If using Snowflake

Then, initialize Airflow:

export AIRFLOW_HOME=~/airflow
airflow db init

Create an Airflow DAG for dbt Core

Navigate to your Airflow DAGs folder:

A DAG (Directed Acyclic Graph) defines tasks and dependencies.
Create a new folder for your DAGs:

mkdir -p ~/airflow/dags

nano ~/airflow/dags/dbt_dag.py

Open dbt_core_dag.py in a code editor and add the following:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 2, 25),  # Change as needed
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'dbt_snowflake_workflow',
    default_args=default_args,
    description='Run dbt models using dbt Core',
    schedule_interval='@daily',  # Run daily
    catchup=False,
)

# Define the path to your dbt project
DBT_PROJECT_DIR = "/Users/ansamali/snowflake_data_project"  # Update this path

# Task 1: Run dbt models
dbt_run = BashOperator(
    task_id='dbt_run',
    bash_command=f'cd {DBT_PROJECT_DIR} && dbt run',
    dag=dag,
)

# Task 2: Run dbt tests after models are built
dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command=f'cd {DBT_PROJECT_DIR} && dbt test',
    dag=dag,
)

# Define task dependencies
dbt_run >> dbt_test  # dbt_run must finish before dbt_test starts

3️⃣ Start Airflow & Run the DAG

Start the scheduler and web server:

airflow scheduler &
airflow webserver &

By following these steps, you have successfully built a scalable data pipeline using dbt, Snowflake, and Apache Airflow. You can now extend this project by adding more transformations, automating deployments, or integrating with BI tools.

More From Author

Mastering dbt: Your Complete Step-by-Step Handbook