Data Transformation
Modern ELT with dbt
Implementing analytics engineering best practices:
dbt Core Concepts:
- Models as SQL SELECT statements
- Modular transformation logic
- Version-controlled transformations
- Testing and documentation
- Dependency management
Example dbt Model:
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
sort='customer_id',
dist='customer_id'
)
}}
WITH customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customer_orders AS (
SELECT
customer_id,
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
COUNT(order_id) AS number_of_orders,
SUM(amount) AS lifetime_value
FROM orders
GROUP BY customer_id
),
final AS (
SELECT
customers.customer_id,
customers.first_name,
customers.last_name,
customers.email,
customers.created_at,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.number_of_orders,
customer_orders.lifetime_value
FROM customers
LEFT JOIN customer_orders USING (customer_id)
)
SELECT * FROM final
dbt Project Structure:
dbt_project/
├── dbt_project.yml # Project configuration
├── packages.yml # External dependencies
├── profiles.yml # Connection profiles
├── README.md # Project documentation
├── analysis/ # Ad-hoc analytical queries
├── data/ # Seed data files
├── macros/ # Reusable SQL functions
├── models/ # SQL transformation models
│ ├── marts/ # Business-layer models
│ │ ├── core/
│ │ │ ├── dim_customers.sql
│ │ │ ├── dim_products.sql
│ │ │ ├── fct_orders.sql
│ │ │ └── schema.yml
│ │ └── marketing/
│ │ ├── customer_segmentation.sql
│ │ └── schema.yml
│ └── staging/ # Source-aligned models
│ ├── stg_customers.sql
│ ├── stg_orders.sql
│ ├── stg_products.sql
│ └── schema.yml
├── snapshots/ # Slowly changing dimension logic
└── tests/ # Custom data tests
dbt Best Practices:
- Follow a consistent naming convention
- Implement a layered architecture
- Write modular, reusable models
- Document models and columns
- Test critical assumptions
Incremental Processing
Efficiently handling growing datasets:
Incremental Load Patterns:
- Timestamp-based incremental loads
- Change data capture (CDC)
- Slowly changing dimensions (SCD)
- Merge operations
- Partitioning strategies
Example Incremental dbt Model:
-- models/events/incremental_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
partition_by={
'field': 'event_date',
'data_type': 'date'
}
)
}}
WITH source_data AS (
SELECT
event_id,
event_type,
user_id,
event_timestamp,
DATE(event_timestamp) AS event_date,
payload
FROM {{ source('events', 'raw_events') }}
{% if is_incremental() %}
-- Only process new or updated records when running incrementally
WHERE event_timestamp > (
SELECT MAX(event_timestamp) FROM {{ this }}
)
{% endif %}
)
SELECT
event_id,
event_type,
user_id,
event_timestamp,
event_date,
payload,
{{ current_timestamp() }} AS processed_at
FROM source_data
Incremental Processing Challenges:
- Handling late-arriving data
- Managing schema evolution
- Ensuring idempotent operations
- Tracking processing metadata
- Optimizing merge operations