In this blog, we will walk through an end-to-end data engineering project using dbt, Snowflake, and Apache Airflow. We’ll cover everything from setting up the environment to orchestrating the data pipeline and following best practices in data engineering.
Snowflake setup:
Create a Free Snowflake Account
If you don’t have Snowflake yet, sign up for a free trial:
🔗 https://signup.snowflake.com/
You’ll get $400 in free credits and a small compute warehouse to start.
Log in to https://app.snowflake.com/
Connect dbt Core with Snowflake
This guide will walk you through configuring dbt Core to work with Snowflake so you can run SQL transformations.
Prerequisites
Before proceeding, ensure you have:
- Snowflake Account (with a database, schema, and warehouse created)
- Python installed (dbt Core requires Python 3.7+)
- dbt Core installed in a virtual environment
Create a dedicated DBT User in Snowflake
(A) Log in to Snowflake
Use the Snowflake Web UI or SQL client.
(B) Create a dbt User
Run the following SQL commands:
-- Create a new user for dbt
CREATE USER dbt_user PASSWORD = 'YourSecurePassword'
LOGIN_NAME = 'dbt_user'
DEFAULT_ROLE = ACCOUNTADMIN
MUST_CHANGE_PASSWORD = FALSE;
-- Grant role to user
GRANT ROLE ACCOUNTADMIN TO USER dbt_user;
Grant Database, Schema & Warehouse Access
-- Grant access to the database
GRANT USAGE ON DATABASE finance_db TO ROLE ACCOUNTADMIN;
GRANT USAGE ON SCHEMA finance_db.raw TO ROLE ACCOUNTADMIN;
-- Grant warehouse access
GRANT USAGE ON WAREHOUSE finance_wh TO ROLE ACCOUNTADMIN;
-- Grant table access
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA finance_db.raw TO ROLE ACCOUNTADMIN;
Install dbt Core and Snowflake Adapter
Ensure you have dbt Core installed in a virtual environment:
(A) Create and Activate Virtual Environment
python3 -m venv dbt-env
source dbt-env/bin/activate # (For Mac/Linux)
dbt-env\Scripts\activate # (For Windows)
(B) Install dbt Core & Snowflake Adapter
pip install dbt-core dbt-snowflake
Configure dbt Profile for Snowflake
(A) Edit the profiles.yml
File
The profiles.yml
file is located at:
vi ~/.dbt/profiles.yml
(B) Add Snowflake Configuration to profiles.yml
Edit the file and add:
Get the Current User
To check which user is logged into Snowflake:
SELECT CURRENT_USER();
Get the Current Account
To retrieve the Snowflake account identifier:
SELECT lower(CURRENT_REGION()), lower(CURRENT_ACCOUNT());
add snowflake configuration in profiles.yml
ecommerce_project:
outputs:
dev:
type: snowflake
account: "<your_snowflake_account>.snowflakecomputing.com"
user: "dbt_user"
password: "YourSecurePassword"
role: "ACCOUNTADMIN"
database: "finance_db"
warehouse: "finance_wh"
schema: "raw"
threads: 4
target: dev
Test the Connection
Run dbt Debug
dbt debug
Define Sources
- Create a new file:
models/sources.yml
- Define the source tables like this:
version: 2
sources:
- name: raw_data
database: finance_db # Your Snowflake database
schema: raw # Schema where raw data is stored
tables:
- name: customers
- name: orders
- name: order_items
- name: products
Step 3: Create Staging Models
Example: models/staging/stg_customers.sql
SELECT
id AS customer_id,
name AS customer_name ,
email,
country
FROM {{ source('raw_data', 'customers') }}
Example: models/staging/stg_orders.sql
SELECT
id AS order_id,
customer_id,
order_date,
status AS order_status
FROM {{ source('raw_data', 'orders') }}
Example:models/staging/stg_products.sql
SELECT
id AS product_id,
name AS product_name,
category AS product_category,
price AS product_price
FROM {{ source('raw_data', 'products') }}
Example: models/staging/stg_order_items.sql
SELECT
id AS item_id,
order_id,
product_id,
quantity,
unit_price,
(quantity * unit_price) AS total_price
FROM {{ source('raw_data', 'order_items') }}
Example: models/marts/fct_daily_orders.sql
select
order_date,
O.order_id,
sum(total_price) AS total_price
from
{{ref('stg_orders')}} O
LEFT JOIN {{ref('stg_order_items')}} OI
ON O.ORDER_ID=OI.ORDER_ID
GROUP BY 1,2
create test files
version: 2
models:
- name: stg_orders
columns:
- name: order_status
tests:
- accepted_values:
values: ['Completed','Pending', 'Cancelled']
- name: stg_cutomers
columns:
- name: customer_id
tests:
- unique
dbt test
dbt docs generate
dbt docs serve --port 8088
How to Use Apache Airflow with dbt 🚀
Apache Airflow is great for scheduling and orchestrating dbt runs. Here’s how you can integrate them step by step:
Install Apache Airflow
If you haven’t installed Airflow yet, set it up using the following:
pip install apache-airflow
pip install apache-airflow-providers-snowflake # If using Snowflake
Then, initialize Airflow:
export AIRFLOW_HOME=~/airflow
airflow db init
Create an Airflow DAG for dbt Core
Navigate to your Airflow DAGs folder:
A DAG (Directed Acyclic Graph) defines tasks and dependencies.
Create a new folder for your DAGs:
mkdir -p ~/airflow/dags
nano ~/airflow/dags/dbt_dag.py
Open dbt_core_dag.py
in a code editor and add the following:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 2, 25), # Change as needed
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
'dbt_snowflake_workflow',
default_args=default_args,
description='Run dbt models using dbt Core',
schedule_interval='@daily', # Run daily
catchup=False,
)
# Define the path to your dbt project
DBT_PROJECT_DIR = "/Users/ansamali/snowflake_data_project" # Update this path
# Task 1: Run dbt models
dbt_run = BashOperator(
task_id='dbt_run',
bash_command=f'cd {DBT_PROJECT_DIR} && dbt run',
dag=dag,
)
# Task 2: Run dbt tests after models are built
dbt_test = BashOperator(
task_id='dbt_test',
bash_command=f'cd {DBT_PROJECT_DIR} && dbt test',
dag=dag,
)
# Define task dependencies
dbt_run >> dbt_test # dbt_run must finish before dbt_test starts
3️⃣ Start Airflow & Run the DAG
Start the scheduler and web server:
airflow scheduler &
airflow webserver &
By following these steps, you have successfully built a scalable data pipeline using dbt, Snowflake, and Apache Airflow. You can now extend this project by adding more transformations, automating deployments, or integrating with BI tools.