Data Quality and Testing

Data Quality Dimensions

Key aspects of data quality to monitor:

Completeness:

  • Checking for missing values
  • Validating required fields
  • Monitoring record counts
  • Comparing against expected totals
  • Tracking data arrival

Accuracy:

  • Validating against known values
  • Cross-checking with reference data
  • Implementing business rule validation
  • Detecting anomalies and outliers
  • Verifying calculations

Consistency:

  • Checking for contradictory values
  • Validating referential integrity
  • Ensuring uniform formats
  • Comparing across systems
  • Monitoring derived values

Timeliness:

  • Tracking data freshness
  • Monitoring pipeline latency
  • Validating timestamp sequences
  • Alerting on delayed data
  • Measuring processing time

Uniqueness:

  • Detecting duplicates
  • Validating primary keys
  • Checking composite uniqueness constraints
  • Monitoring merge operations
  • Tracking deduplication metrics

Testing Strategies

Approaches to validate data quality:

Unit Testing:

  • Testing individual transformation functions
  • Validating business logic
  • Checking edge cases
  • Mocking dependencies
  • Automating with CI/CD

Example Python Unit Test:

import unittest
from transformations import calculate_revenue

class TestTransformations(unittest.TestCase):
    
    def test_calculate_revenue(self):
        # Test normal case
        input_data = {
            'quantity': 5,
            'unit_price': 10.0,
            'discount_percentage': 20.0
        }
        expected = 40.0  # 5 * 10 * (1 - 0.2)
        self.assertEqual(calculate_revenue(input_data), expected)
        
        # Test zero quantity
        input_data = {
            'quantity': 0,
            'unit_price': 10.0,
            'discount_percentage': 20.0
        }
        expected = 0.0
        self.assertEqual(calculate_revenue(input_data), expected)
        
        # Test no discount
        input_data = {
            'quantity': 5,
            'unit_price': 10.0,
            'discount_percentage': 0.0
        }
        expected = 50.0
        self.assertEqual(calculate_revenue(input_data), expected)

Integration Testing:

  • Testing complete data flows
  • Validating end-to-end processes
  • Using test environments
  • Simulating production scenarios
  • Checking system interactions

Data Quality Rules:

  • Implementing schema validation
  • Defining value constraints
  • Setting threshold-based rules
  • Creating relationship rules
  • Establishing format validation

Example dbt Tests:

# Schema.yml for dbt tests
version: 2

models:
  - name: orders
    description: "Cleaned orders table"
    columns:
      - name: order_id
        description: "Primary key of the orders table"
        tests:
          - unique
          - not_null
      
      - name: customer_id
        description: "Foreign key to customers table"
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id
      
      - name: order_date
        description: "Date when the order was placed"
        tests:
          - not_null
          - dbt_utils.date_in_range:
              min_date: '2020-01-01'
              max_date: '{{ current_date() }}'
      
      - name: status
        description: "Current status of the order"
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'returned', 'cancelled']

Monitoring and Alerting:

  • Setting up data quality dashboards
  • Implementing anomaly detection
  • Creating alerting thresholds
  • Tracking quality metrics over time
  • Establishing incident response procedures

Data Observability

Gaining visibility into data systems:

Observability Pillars:

  • Freshness monitoring
  • Volume tracking
  • Schema changes
  • Lineage visualization
  • Distribution analysis

Example Freshness Monitoring Query:

-- PostgreSQL query to monitor data freshness
WITH source_freshness AS (
    SELECT
        'sales_data' AS source_name,
        MAX(created_at) AS last_record_time,
        NOW() - MAX(created_at) AS staleness,
        CASE
            WHEN NOW() - MAX(created_at) > INTERVAL '1 day' THEN 'critical'
            WHEN NOW() - MAX(created_at) > INTERVAL '6 hours' THEN 'warning'
            ELSE 'healthy'
        END AS status
    FROM raw_data.sales
    
    UNION ALL
    
    SELECT
        'customer_data' AS source_name,
        MAX(created_at) AS last_record_time,
        NOW() - MAX(created_at) AS staleness,
        CASE
            WHEN NOW() - MAX(created_at) > INTERVAL '7 days' THEN 'critical'
            WHEN NOW() - MAX(created_at) > INTERVAL '3 days' THEN 'warning'
            ELSE 'healthy'
        END AS status
    FROM raw_data.customers
)
SELECT
    source_name,
    last_record_time,
    staleness,
    status
FROM source_freshness
ORDER BY 
    CASE status
        WHEN 'critical' THEN 1
        WHEN 'warning' THEN 2
        ELSE 3
    END,
    staleness DESC;

Observability Tools:

  • Great Expectations
  • Monte Carlo
  • Datadog
  • Prometheus with custom exporters
  • dbt metrics