Data Transformation

Modern ELT with dbt

Implementing analytics engineering best practices:

dbt Core Concepts:

  • Models as SQL SELECT statements
  • Modular transformation logic
  • Version-controlled transformations
  • Testing and documentation
  • Dependency management

Example dbt Model:

-- models/marts/core/dim_customers.sql
{{
    config(
        materialized='table',
        sort='customer_id',
        dist='customer_id'
    )
}}

WITH customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customer_orders AS (
    SELECT
        customer_id,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS most_recent_order_date,
        COUNT(order_id) AS number_of_orders,
        SUM(amount) AS lifetime_value
    FROM orders
    GROUP BY customer_id
),

final AS (
    SELECT
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customers.email,
        customers.created_at,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        customer_orders.number_of_orders,
        customer_orders.lifetime_value
    FROM customers
    LEFT JOIN customer_orders USING (customer_id)
)

SELECT * FROM final

dbt Project Structure:

dbt_project/
├── dbt_project.yml          # Project configuration
├── packages.yml             # External dependencies
├── profiles.yml             # Connection profiles
├── README.md                # Project documentation
├── analysis/                # Ad-hoc analytical queries
├── data/                    # Seed data files
├── macros/                  # Reusable SQL functions
├── models/                  # SQL transformation models
│   ├── marts/               # Business-layer models
│   │   ├── core/
│   │   │   ├── dim_customers.sql
│   │   │   ├── dim_products.sql
│   │   │   ├── fct_orders.sql
│   │   │   └── schema.yml
│   │   └── marketing/
│   │       ├── customer_segmentation.sql
│   │       └── schema.yml
│   └── staging/             # Source-aligned models
│       ├── stg_customers.sql
│       ├── stg_orders.sql
│       ├── stg_products.sql
│       └── schema.yml
├── snapshots/               # Slowly changing dimension logic
└── tests/                   # Custom data tests

dbt Best Practices:

  • Follow a consistent naming convention
  • Implement a layered architecture
  • Write modular, reusable models
  • Document models and columns
  • Test critical assumptions

Incremental Processing

Efficiently handling growing datasets:

Incremental Load Patterns:

  • Timestamp-based incremental loads
  • Change data capture (CDC)
  • Slowly changing dimensions (SCD)
  • Merge operations
  • Partitioning strategies

Example Incremental dbt Model:

-- models/events/incremental_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        incremental_strategy='merge',
        partition_by={
            'field': 'event_date',
            'data_type': 'date'
        }
    )
}}

WITH source_data AS (
    SELECT
        event_id,
        event_type,
        user_id,
        event_timestamp,
        DATE(event_timestamp) AS event_date,
        payload
    FROM {{ source('events', 'raw_events') }}
    
    {% if is_incremental() %}
        -- Only process new or updated records when running incrementally
        WHERE event_timestamp > (
            SELECT MAX(event_timestamp) FROM {{ this }}
        )
    {% endif %}
)

SELECT
    event_id,
    event_type,
    user_id,
    event_timestamp,
    event_date,
    payload,
    {{ current_timestamp() }} AS processed_at
FROM source_data

Incremental Processing Challenges:

  • Handling late-arriving data
  • Managing schema evolution
  • Ensuring idempotent operations
  • Tracking processing metadata
  • Optimizing merge operations

Data Storage and Access Patterns