Technical Implementation

Database Design for Popup Analytics and Management: Complete Architecture Guide

Master comprehensive database design for popup analytics and management systems. Learn schema design, time-series data modeling, performance optimization, and scaling strategies for high-volume popup data.

M
Michael Rodriguez
Senior Database Architect at Nudgesmart
January 15, 2024
20 min read
⚙️

Technical Implementation Article

Important Notice: This content is for educational purposes only. Results may vary based on your specific business circumstances, industry, market conditions, and implementation. No specific outcomes are guaranteed. This is not legal advice - consult with technical professionals for specific guidance.

Database Design for Popup Analytics and Management: Complete Architecture Guide

Effective popup analytics and management require a robust database architecture capable of handling high-volume data collection, real-time analytics, and complex query patterns. A well-designed database system serves as the foundation for popup performance tracking, user behavior analysis, and campaign optimization. This comprehensive guide explores the intricacies of database design for popup systems, from schema architecture and data modeling to performance optimization and scaling strategies.

Modern popup applications generate vast amounts of data that must be efficiently collected, stored, and analyzed. Every user interaction, campaign impression, and conversion event creates data points that need to be captured and processed. Designing a database architecture that can handle this volume while maintaining performance and data integrity is crucial for building scalable popup analytics systems.

Fundamental Database Architecture

Multi-Database Strategy

Implement a polyglot persistence approach for optimal performance:

  • Primary Database (PostgreSQL/MySQL): Structured data for campaigns and configurations
  • Time-Series Database (InfluxDB/TimescaleDB): Analytics and metrics storage
  • Document Store (MongoDB): Flexible schema for dynamic content
  • Cache Layer (Redis): Real-time data and session storage
  • Data Warehouse (Snowflake/BigQuery): Long-term analytics and reporting

Database Schema Overview

Core database structure for popup management:

-- Campaign Management Schema
CREATE TABLE campaigns (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    type VARCHAR(50) NOT NULL, -- popup, slide-in, notification
    status VARCHAR(20) DEFAULT 'draft', -- draft, active, paused, archived
    shop_id VARCHAR(255) NOT NULL,
    config JSONB NOT NULL,
    targeting_rules JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    starts_at TIMESTAMP WITH TIME ZONE,
    ends_at TIMESTAMP WITH TIME ZONE,
    INDEX idx_campaigns_shop_id (shop_id),
    INDEX idx_campaigns_status (status),
    INDEX idx_campaigns_type (type)
);

-- Campaign Variants for A/B Testing
CREATE TABLE campaign_variants (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    campaign_id UUID NOT NULL REFERENCES campaigns(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    traffic_allocation INTEGER DEFAULT 50, -- Percentage of traffic
    config JSONB NOT NULL,
    is_control BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE(campaign_id, name)
);

-- Targeting Rules
CREATE TABLE targeting_rules (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    campaign_id UUID NOT NULL REFERENCES campaigns(id) ON DELETE CASCADE,
    rule_type VARCHAR(50) NOT NULL, -- device, location, behavior, custom
    conditions JSONB NOT NULL,
    priority INTEGER DEFAULT 0,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Analytics Tables Structure

Time-series optimized analytics schema:

-- Campaign Impressions
CREATE TABLE campaign_impressions (
    id BIGSERIAL PRIMARY KEY,
    campaign_id UUID NOT NULL,
    variant_id UUID,
    session_id VARCHAR(255) NOT NULL,
    user_id VARCHAR(255),
    shop_id VARCHAR(255) NOT NULL,
    page_url TEXT,
    device_type VARCHAR(50),
    browser VARCHAR(100),
    country_code VARCHAR(2),
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    metadata JSONB,
    INDEX idx_impressions_campaign_time (campaign_id, timestamp),
    INDEX idx_impressions_session (session_id),
    INDEX idx_impressions_shop_time (shop_id, timestamp)
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE campaign_impressions_2024_01 PARTITION OF campaign_impressions
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- User Interactions
CREATE TABLE campaign_interactions (
    id BIGSERIAL PRIMARY KEY,
    campaign_id UUID NOT NULL,
    variant_id UUID,
    session_id VARCHAR(255) NOT NULL,
    interaction_type VARCHAR(50) NOT NULL, -- view, click, close, submit
    interaction_data JSONB,
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    time_to_interaction INTEGER, -- Milliseconds from impression
    INDEX idx_interactions_campaign_time (campaign_id, timestamp),
    INDEX idx_interactions_type (interaction_type),
    INDEX idx_interactions_session (session_id)
) PARTITION BY RANGE (timestamp);

-- Conversion Events
CREATE TABLE campaign_conversions (
    id BIGSERIAL PRIMARY KEY,
    campaign_id UUID NOT NULL,
    variant_id UUID,
    session_id VARCHAR(255) NOT NULL,
    conversion_type VARCHAR(50) NOT NULL, -- email_signup, purchase, download
    conversion_value DECIMAL(10,2),
    conversion_data JSONB,
    attribution_window INTEGER DEFAULT 7, -- Days
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_conversions_campaign_time (campaign_id, timestamp),
    INDEX idx_conversions_type (conversion_type),
    INDEX idx_conversions_session (session_id)
) PARTITION BY RANGE (timestamp);

Time-Series Data Modeling

Metrics Collection Strategy

Implement efficient metrics collection:

-- Time-Series Metrics Table
CREATE TABLE campaign_metrics (
    time_bucket TIMESTAMP WITH TIME ZONE NOT NULL,
    campaign_id UUID NOT NULL,
    metric_name VARCHAR(100) NOT NULL,
    metric_value NUMERIC NOT NULL,
    tags JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    PRIMARY KEY (time_bucket, campaign_id, metric_name)
);

-- Real-time Metrics Aggregation
CREATE MATERIALIZED VIEW hourly_campaign_metrics AS
SELECT
    date_trunc('hour', timestamp) as time_bucket,
    campaign_id,
    COUNT(*) as impressions,
    COUNT(DISTINCT session_id) as unique_sessions,
    COUNT(CASE WHEN interaction_type = 'click' THEN 1 END) as clicks,
    COUNT(CASE WHEN interaction_type = 'submit' THEN 1 END) as submissions,
    AVG(time_to_interaction) as avg_time_to_interaction
FROM campaign_impressions i
LEFT JOIN campaign_interactions int ON i.session_id = int.session_id
GROUP BY date_trunc('hour', timestamp), campaign_id;

-- Create unique index for concurrent refreshes
CREATE UNIQUE INDEX idx_hourly_metrics_unique
ON hourly_campaign_metrics (time_bucket, campaign_id);

-- Function to refresh materialized view
CREATE OR REPLACE FUNCTION refresh_hourly_metrics()
RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_campaign_metrics;
END;
$$ LANGUAGE plpgsql;

Data Aggregation Patterns

Implement multi-level aggregation for performance:

  • Raw Events: Individual user interactions (1-minute retention)
  • Minute-level: Real-time dashboard data (24-hour retention)
  • Hourly: Detailed analytics (30-day retention)
  • Daily: Historical reporting (1-year retention)
  • Monthly: Long-term trends (7-year retention)

Time-Series Optimization

Optimize time-series queries and storage:

-- Time-Series Extensions (PostgreSQL)
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- Convert tables to hypertables
SELECT create_hypertable('campaign_impressions', 'timestamp');
SELECT create_hypertable('campaign_interactions', 'timestamp');
SELECT create_hypertable('campaign_conversions', 'timestamp');

-- Create continuous aggregates for performance
CREATE MATERIALIZED VIEW daily_campaign_stats
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 day', timestamp) as day,
    campaign_id,
    COUNT(*) as impressions,
    COUNT(DISTINCT session_id) as unique_visitors,
    COUNT(CASE WHEN interaction_type = 'click' THEN 1 END) as clicks,
    COUNT(CASE WHEN interaction_type = 'submit' THEN 1 END) as submissions,
    AVG(CASE WHEN interaction_type = 'click' THEN time_to_interaction END) as avg_click_time
FROM campaign_interactions
GROUP BY day, campaign_id;

-- Set refresh policy
SELECT add_continuous_aggregate_policy('daily_campaign_stats',
    start_offset => INTERVAL '1 month',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour');

User Interaction Tracking

Session Management

Implement robust session tracking:

-- User Sessions Table
CREATE TABLE user_sessions (
    session_id VARCHAR(255) PRIMARY KEY,
    shop_id VARCHAR(255) NOT NULL,
    user_id VARCHAR(255),
    first_seen TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_seen TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    page_views INTEGER DEFAULT 1,
    total_time INTEGER DEFAULT 0, -- Seconds
    device_type VARCHAR(50),
    browser VARCHAR(100),
    country_code VARCHAR(2),
    referrer TEXT,
    utm_source VARCHAR(255),
    utm_medium VARCHAR(255),
    utm_campaign VARCHAR(255),
    custom_attributes JSONB,
    INDEX idx_sessions_shop_time (shop_id, last_seen),
    INDEX idx_sessions_user (user_id),
    INDEX idx_sessions_device (device_type)
);

-- Session Events
CREATE TABLE session_events (
    id BIGSERIAL PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL REFERENCES user_sessions(session_id),
    event_type VARCHAR(100) NOT NULL, -- page_view, popup_view, click, scroll
    event_data JSONB,
    page_url TEXT,
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_events_session_time (session_id, timestamp),
    INDEX idx_events_type (event_type)
) PARTITION BY RANGE (timestamp);

-- Update session activity
CREATE OR REPLACE FUNCTION update_session_activity()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE user_sessions
    SET
        last_seen = NEW.timestamp,
        page_views = page_views + 1,
        total_time = EXTRACT(EPOCH FROM (NEW.timestamp - last_seen)) + total_time
    WHERE session_id = NEW.session_id;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger for automatic session updates
CREATE TRIGGER trigger_update_session_activity
    AFTER INSERT ON session_events
    FOR EACH ROW
    EXECUTE FUNCTION update_session_activity();

Behavior Tracking

Capture detailed user behavior patterns:

-- User Behavior Events
CREATE TABLE behavior_events (
    id BIGSERIAL PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL,
    user_id VARCHAR(255),
    event_category VARCHAR(100) NOT NULL, -- engagement, navigation, conversion
    event_action VARCHAR(100) NOT NULL, -- scroll_depth, time_on_page, exit_intent
    event_value NUMERIC,
    context JSONB,
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_behavior_session_time (session_id, timestamp),
    INDEX idx_behavior_category (event_category),
    INDEX idx_behavior_action (event_action)
);

-- Scroll Depth Tracking
CREATE TABLE scroll_events (
    id BIGSERIAL PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL,
    page_url TEXT NOT NULL,
    max_scroll_depth INTEGER, -- Percentage
    scroll_events JSONB, -- Detailed scroll timestamps
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_scroll_session (session_id),
    INDEX idx_scroll_depth (max_scroll_depth)
);

-- Exit Intent Detection
CREATE TABLE exit_intent_events (
    id BIGSERIAL PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL,
    page_url TEXT NOT NULL,
    mouse_trajectory JSONB, -- Mouse position history
    exit_velocity INTEGER, -- Pixels per second
    trigger_element VARCHAR(255), -- Element mouse was heading toward
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_exit_session (session_id),
    INDEX idx_exit_velocity (exit_velocity)
);

Data Privacy and Anonymization

Implement GDPR-compliant data handling:

-- User Consent Management
CREATE TABLE user_consent (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id VARCHAR(255),
    session_id VARCHAR(255),
    consent_type VARCHAR(100) NOT NULL, -- analytics, marketing, essential
    granted BOOLEAN NOT NULL,
    granted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    revoked_at TIMESTAMP WITH TIME ZONE,
    ip_address INET,
    user_agent TEXT,
    consent_version VARCHAR(50) DEFAULT '1.0',
    INDEX idx_consent_user (user_id),
    INDEX idx_consent_session (session_id),
    INDEX idx_consent_type (consent_type)
);

-- Data Anonymization Function
CREATE OR REPLACE FUNCTION anonymize_user_data(user_identifier VARCHAR(255))
RETURNS void AS $$
BEGIN
    -- Anonymize personal data in sessions
    UPDATE user_sessions
    SET
        user_id = CONCAT('anon_', MD5(user_id)),
        ip_address = NULL,
        referrer = NULL,
        utm_source = NULL,
        utm_medium = NULL,
        utm_campaign = NULL
    WHERE user_id = user_identifier OR session_id = user_identifier;

    -- Remove PII from events
    UPDATE session_events
    SET event_data = event_data - 'email' - 'name' - 'phone' - 'address'
    WHERE session_id IN (
        SELECT session_id FROM user_sessions
        WHERE user_id = CONCAT('anon_', MD5(user_identifier))
    );

    -- Log anonymization action
    INSERT INTO audit_log (action, entity_type, entity_id, timestamp)
    VALUES ('anonymize', 'user_data', user_identifier, NOW());
END;
$$ LANGUAGE plpgsql;

Performance Optimization

Database Indexing Strategy

Implement comprehensive indexing for query performance:

-- Performance Indexes
-- Composite indexes for common query patterns
CREATE INDEX idx_campaigns_shop_status_type ON campaigns(shop_id, status, type);
CREATE INDEX idx_campaigns_active_dates ON campaigns(status, starts_at, ends_at)
    WHERE status IN ('active', 'paused');

-- Partial indexes for filtered queries
CREATE INDEX idx_active_campaigns ON campaigns(id, shop_id)
    WHERE status = 'active' AND
    (starts_at IS NULL OR starts_at <= NOW()) AND
    (ends_at IS NULL OR ends_at > NOW());

-- JSONB indexes for configuration queries
CREATE INDEX idx_campaign_config_gin ON campaigns USING GIN(config);
CREATE INDEX idx_targeting_rules_gin ON targeting_rules USING GIN(conditions);

-- Time-series indexes with BRIN for large tables
CREATE INDEX idx_impressions_time_brin ON campaign_impressions USING BRIN(timestamp);
CREATE INDEX idx_interactions_time_brin ON campaign_interactions USING BRIN(timestamp);

-- Expression indexes for calculated fields
CREATE INDEX idx_campaigns_search ON campaigns USING GIN(to_tsvector('english', name || ' ' || COALESCE(config->>'description', '')));

-- Covering indexes for dashboard queries
CREATE INDEX idx_dashboard_campaign_stats ON campaign_impressions(campaign_id, timestamp, session_id)
    INCLUDE (device_type, country_code, page_url);

Query Optimization Techniques

Optimize common analytical queries:

-- Optimized Analytics Queries
-- Campaign Performance Dashboard
WITH campaign_stats AS (
    SELECT
        c.id,
        c.name,
        COUNT(DISTINCT i.session_id) as total_impressions,
        COUNT(DISTINCT CASE WHEN int.interaction_type = 'click' THEN i.session_id END) as unique_clicks,
        COUNT(DISTINCT CASE WHEN int.interaction_type = 'submit' THEN i.session_id END) as conversions,
        ROUND(
            COUNT(DISTINCT CASE WHEN int.interaction_type = 'click' THEN i.session_id END)::NUMERIC /
            NULLIF(COUNT(DISTINCT i.session_id), 0) * 100, 2
        ) as ctr_percentage
    FROM campaigns c
    LEFT JOIN campaign_impressions i ON c.id = i.campaign_id
        AND i.timestamp >= NOW() - INTERVAL '30 days'
    LEFT JOIN campaign_interactions int ON i.session_id = int.session_id
        AND int.interaction_type IN ('click', 'submit')
    WHERE c.shop_id = $1 AND c.status = 'active'
    GROUP BY c.id, c.name
)
SELECT
    id,
    name,
    total_impressions,
    unique_clicks,
    conversions,
    ctr_percentage,
    CASE
        WHEN total_impressions >= 1000 THEN 'high_traffic'
        WHEN total_impressions >= 100 THEN 'medium_traffic'
        ELSE 'low_traffic'
    END as traffic_category
FROM campaign_stats
ORDER BY total_impressions DESC;

-- Real-time Metrics with Caching
CREATE OR REPLACE FUNCTION get_campaign_metrics(campaign_uuid UUID, time_window INTERVAL)
RETURNS JSONB AS $$
DECLARE
    result JSONB;
    cache_key TEXT;
BEGIN
    cache_key := 'campaign_metrics_' || campaign_uuid || '_' || EXTRACT(EPOCH FROM time_window);

    -- Try cache first
    result := cache_get(cache_key);
    IF result IS NOT NULL THEN
        RETURN result;
    END IF;

    -- Calculate metrics
    SELECT jsonb_build_object(
        'impressions', COUNT(*),
        'unique_sessions', COUNT(DISTINCT session_id),
        'clicks', COUNT(CASE WHEN interaction_type = 'click' THEN 1 END),
        'submissions', COUNT(CASE WHEN interaction_type = 'submit' THEN 1 END),
        'avg_time_to_interaction', AVG(time_to_interaction),
        'last_updated', NOW()
    ) INTO result
    FROM campaign_interactions
    WHERE campaign_id = campaign_uuid
    AND timestamp >= NOW() - time_window;

    -- Cache for 5 minutes
    PERFORM cache_set(cache_key, result, 300);

    RETURN result;
END;
$$ LANGUAGE plpgsql;

Connection Pooling and Load Balancing

Implement high-availability database architecture:

-- Read Replica Configuration
-- Application-level read/write splitting
class DatabaseManager {
    constructor(config) {
        this.primaryPool = new Pool({
            host: config.primary.host,
            port: config.primary.port,
            database: config.database,
            user: config.user,
            password: config.password,
            max: 20, // Maximum connections
            idleTimeoutMillis: 30000,
            connectionTimeoutMillis: 2000,
        });

        this.replicaPools = config.replicas.map(replica =>
            new Pool({
                host: replica.host,
                port: replica.port,
                database: config.database,
                user: config.user,
                password: config.password,
                max: 10,
                readOnly: true
            })
        );
    }

    async query(sql, params = [], options = {}) {
        const pool = options.readOnly ?
            this.getReplicaPool() :
            this.primaryPool;

        const client = await pool.connect();
        try {
            const result = await client.query(sql, params);
            return result;
        } finally {
            client.release();
        }
    }

    getReplicaPool() {
        // Simple round-robin replica selection
        const index = Math.floor(Math.random() * this.replicaPools.length);
        return this.replicaPools[index];
    }

    async transaction(callback) {
        const client = await this.primaryPool.connect();
        try {
            await client.query('BEGIN');
            const result = await callback(client);
            await client.query('COMMIT');
            return result;
        } catch (error) {
            await client.query('ROLLBACK');
            throw error;
        } finally {
            client.release();
        }
    }
}

Data Warehousing and Analytics

ELT Pipeline Architecture

Implement Extract-Load-Transform for analytics:

-- Data Warehouse Schema
-- Dimension Tables
CREATE DIMENSION TABLE dw_campaigns (
    campaign_key SERIAL PRIMARY KEY,
    campaign_id UUID NOT NULL,
    campaign_name VARCHAR(255) NOT NULL,
    campaign_type VARCHAR(50) NOT NULL,
    shop_id VARCHAR(255) NOT NULL,
    created_date DATE NOT NULL,
    is_active BOOLEAN NOT NULL,
    effective_from TIMESTAMP WITH TIME ZONE NOT NULL,
    effective_to TIMESTAMP WITH TIME ZONE,
    CURRENT_ROW BOOLEAN DEFAULT TRUE
);

CREATE DIMENSION TABLE dw_users (
    user_key SERIAL PRIMARY KEY,
    user_id VARCHAR(255),
    user_hash VARCHAR(64), -- MD5 hash for privacy
    first_seen_date DATE NOT NULL,
    device_type VARCHAR(50),
    browser_family VARCHAR(50),
    country_code VARCHAR(2),
    effective_from TIMESTAMP WITH TIME ZONE NOT NULL,
    effective_to TIMESTAMP WITH TIME ZONE,
    CURRENT_ROW BOOLEAN DEFAULT TRUE
);

CREATE DIMENSION TABLE dw_dates (
    date_key SERIAL PRIMARY KEY,
    full_date DATE NOT NULL UNIQUE,
    day_of_week INTEGER NOT NULL,
    day_of_month INTEGER NOT NULL,
    month INTEGER NOT NULL,
    quarter INTEGER NOT NULL,
    year INTEGER NOT NULL,
    is_weekend BOOLEAN NOT NULL,
    is_holiday BOOLEAN DEFAULT FALSE
);

-- Fact Tables
CREATE FACT TABLE dw_campaign_events (
    event_key BIGSERIAL PRIMARY KEY,
    campaign_key INTEGER NOT NULL REFERENCES dw_campaigns(campaign_key),
    user_key INTEGER REFERENCES dw_users(user_key),
    date_key INTEGER NOT NULL REFERENCES dw_dates(date_key),
    hour_of_day INTEGER NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    event_count INTEGER DEFAULT 1,
    conversion_value DECIMAL(10,2) DEFAULT 0,
    session_duration INTEGER, -- Seconds
    page_depth INTEGER,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Pre-aggregated fact table for performance
CREATE FACT TABLE dw_daily_campaign_summary (
    summary_key BIGSERIAL PRIMARY KEY,
    campaign_key INTEGER NOT NULL REFERENCES dw_campaigns(campaign_key),
    date_key INTEGER NOT NULL REFERENCES dw_dates(date_key),
    total_impressions INTEGER NOT NULL DEFAULT 0,
    unique_users INTEGER NOT NULL DEFAULT 0,
    total_clicks INTEGER NOT NULL DEFAULT 0,
    total_conversions INTEGER NOT NULL DEFAULT 0,
    conversion_value_total DECIMAL(12,2) DEFAULT 0,
    avg_session_duration INTEGER DEFAULT 0,
    bounce_rate DECIMAL(5,2) DEFAULT 0,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE(campaign_key, date_key)
);

Analytics Transformations

Business intelligence transformations:

-- Analytics Transformation Functions
-- Campaign Performance Attribution
CREATE OR REPLACE FUNCTION calculate_campaign_attribution(campaign_uuid UUID, lookback_days INTEGER DEFAULT 30)
RETURNS TABLE (
    attribution_date DATE,
    total_conversions INTEGER,
    attributed_conversions INTEGER,
    attribution_rate DECIMAL(5,2),
    revenue_attributed DECIMAL(12,2)
) AS $$
BEGIN
    RETURN QUERY
    WITH campaign_touches AS (
        SELECT
            DATE(i.timestamp) as touch_date,
            i.session_id,
            FIRST_VALUE(i.timestamp) OVER (
                PARTITION BY i.session_id
                ORDER BY i.timestamp ASC
            ) as first_touch_time,
            COUNT(*) as touch_count
        FROM campaign_impressions i
        WHERE i.campaign_id = campaign_uuid
        AND i.timestamp >= CURRENT_DATE - INTERVAL '1 day' * lookback_days
        GROUP BY DATE(i.timestamp), i.session_id
    ),
    conversions AS (
        SELECT
            DATE(c.timestamp) as conversion_date,
            c.session_id,
            c.conversion_value,
            MIN(c.timestamp) OVER (
                PARTITION BY c.session_id
                ORDER BY c.timestamp ASC
            ) as first_conversion_time
        FROM campaign_conversions c
        WHERE c.timestamp >= CURRENT_DATE - INTERVAL '1 day' * lookback_days
    ),
    attributed_conversions AS (
        SELECT
            ct.touch_date,
            COUNT(DISTINCT c.session_id) as attributed_count,
            COALESCE(SUM(c.conversion_value), 0) as attributed_revenue
        FROM campaign_touches ct
        JOIN conversions c ON ct.session_id = c.session_id
            AND ct.first_touch_time <= c.first_conversion_time
            AND ct.first_touch_time >= c.first_conversion_time - INTERVAL '7 days'
        GROUP BY ct.touch_date
    )
    SELECT
        ac.touch_date as attribution_date,
        COALESCE(ac.attributed_count, 0) as total_conversions,
        COALESCE(ac.attributed_count, 0) as attributed_conversions,
        CASE
            WHEN COUNT(DISTINCT c.session_id) > 0 THEN
                ROUND(ac.attributed_count::NUMERIC / COUNT(DISTINCT c.session_id) * 100, 2)
            ELSE 0
        END as attribution_rate,
        COALESCE(ac.attributed_revenue, 0) as revenue_attributed
    FROM attributed_conversions ac
    RIGHT JOIN (
        SELECT DATE(timestamp) as conversion_date, COUNT(DISTINCT session_id) as total_conv_count
        FROM campaign_conversions
        WHERE timestamp >= CURRENT_DATE - INTERVAL '1 day' * lookback_days
        GROUP BY DATE(timestamp)
    ) conv_summary ON ac.touch_date = conv_summary.conversion_date
    GROUP BY ac.touch_date, ac.attributed_count, ac.attributed_revenue, conv_summary.total_conv_count
    ORDER BY attribution_date DESC;
END;
$$ LANGUAGE plpgsql;

-- User Journey Analysis
CREATE OR REPLACE FUNCTION analyze_user_journey(campaign_uuid UUID)
RETURNS JSONB AS $$
DECLARE
    result JSONB;
BEGIN
    WITH user_sequences AS (
        SELECT
            i.session_id,
            ARRAY_AGG(
                ROW(i.timestamp, int.interaction_type, int.interaction_data)::TEXT
                ORDER BY i.timestamp
            ) as interaction_sequence,
            MIN(i.timestamp) as session_start,
            MAX(i.timestamp) as session_end,
            COUNT(DISTINCT int.interaction_type) as unique_interactions
        FROM campaign_impressions i
        LEFT JOIN campaign_interactions int ON i.session_id = int.session_id
        WHERE i.campaign_id = campaign_uuid
        AND i.timestamp >= CURRENT_DATE - INTERVAL '7 days'
        GROUP BY i.session_id
    ),
    journey_patterns AS (
        SELECT
            interaction_sequence,
            COUNT(*) as frequency,
            AVG(EXTRACT(EPOCH FROM (session_end - session_start))) as avg_session_duration
        FROM user_sequences
        GROUP BY interaction_sequence
        ORDER BY frequency DESC
        LIMIT 10
    )
    SELECT jsonb_build_object(
        'top_journey_patterns', (
            SELECT jsonb_agg(
                jsonb_build_object(
                    'sequence', interaction_sequence,
                    'frequency', frequency,
                    'avg_duration_seconds', ROUND(avg_session_duration)
                )
            ) FROM journey_patterns
        ),
        'unique_users_analyzed', (SELECT COUNT(*) FROM user_sequences),
        'avg_interactions_per_session', (SELECT AVG(unique_interactions) FROM user_sequences),
        'analysis_date', CURRENT_DATE
    ) INTO result;

    RETURN result;
END;
$$ LANGUAGE plpgsql;

Scaling Strategies

Horizontal Scaling Architecture

Implement database sharding for high-volume systems:

-- Database Sharding Strategy
-- Shard by shop_id for tenant isolation
CREATE OR REPLACE FUNCTION get_shard_name(shop_identifier VARCHAR(255))
RETURNS TEXT AS $$
BEGIN
    -- Simple hash-based sharding
    RETURN 'shard_' || (ABS(MD5(shop_identifier)::bigint % 8));
END;
$$ LANGUAGE plpgsql;

-- Cross-shard query coordinator
CREATE OR REPLACE FUNCTION query_cross_shard(query_sql TEXT, shard_count INTEGER DEFAULT 8)
RETURNS REFCURSOR AS $$
DECLARE
    result REFCURSOR;
    shard_query TEXT;
    shard_results JSONB[];
BEGIN
    -- Execute query on all shards and combine results
    FOR i IN 0..shard_count-1 LOOP
        shard_query := REPLACE(query_sql, '{{SHARD}}', 'shard_' || i);
        -- In production, this would use dblink or foreign data wrappers
        EXECUTE shard_query INTO shard_results[i];
    END LOOP;

    -- Combine and return results
    OPEN result FOR
    SELECT jsonb_array_elements(shard_results[0]) ||
           jsonb_array_elements(shard_results[1]) ||
           -- ... continue for all shards
           jsonb_array_elements(shard_results[7]);

    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- Shard migration logic
CREATE OR REPLACE FUNCTION migrate_shard_data(
    source_shard TEXT,
    target_shard TEXT,
    batch_size INTEGER DEFAULT 1000
)
RETURNS INTEGER AS $$
DECLARE
    migrated_count INTEGER := 0;
    batch_migrated INTEGER;
    last_id BIGINT := 0;
BEGIN
    LOOP
        -- Migrate data in batches
        INSERT INTO target_shard || '.campaign_impressions'
        SELECT * FROM source_shard || '.campaign_impressions'
        WHERE id > last_id
        ORDER BY id
        LIMIT batch_size
        RETURNING COUNT(*) INTO batch_migrated;

        migrated_count := migrated_count + batch_migrated;
        last_id := COALESCE((SELECT MAX(id) FROM target_shard || '.campaign_impressions'), 0);

        -- Break if no more data to migrate
        EXIT WHEN batch_migrated = 0;

        -- Log progress
        RAISE NOTICE 'Migrated % records to shard %', migrated_count, target_shard;

        -- Brief pause to avoid overwhelming the system
        PERFORM pg_sleep(0.1);
    END LOOP;

    RETURN migrated_count;
END;
$$ LANGUAGE plpgsql;

Caching Strategies

Implement multi-layer caching for performance:

-- Database-level caching
CREATE OR REPLACE FUNCTION get_cached_campaign_config(campaign_uuid UUID)
RETURNS JSONB AS $$
DECLARE
    cached_config JSONB;
    cache_key TEXT;
BEGIN
    cache_key := 'campaign_config_' || campaign_uuid;

    -- Try memory cache first
    cached_config := cache_get(cache_key);
    IF cached_config IS NOT NULL THEN
        RETURN cached_config;
    END IF;

    -- Query database and cache result
    SELECT config INTO cached_config
    FROM campaigns
    WHERE id = campaign_uuid;

    -- Cache for 1 hour
    PERFORM cache_set(cache_key, cached_config, 3600);

    RETURN cached_config;
END;
$$ LANGUAGE plpgsql;

-- Application-level Redis cache
class CampaignCache {
    constructor(redisClient) {
        this.redis = redisClient;
        this.localCache = new Map();
        this.cacheStats = {
            hits: 0,
            misses: 0,
            localHits: 0,
            redisHits: 0
        };
    }

    async getCampaign(campaignId) {
        const cacheKey = `campaign:${campaignId}`;

        // Check local cache first (fastest)
        if (this.localCache.has(cacheKey)) {
            this.cacheStats.hits++;
            this.cacheStats.localHits++;
            return this.localCache.get(cacheKey);
        }

        // Check Redis cache
        try {
            const cached = await this.redis.get(cacheKey);
            if (cached) {
                const data = JSON.parse(cached);
                this.localCache.set(cacheKey, data);
                this.cacheStats.hits++;
                this.cacheStats.redisHits++;
                return data;
            }
        } catch (error) {
            console.error('Redis cache error:', error);
        }

        // Cache miss - fetch from database
        this.cacheStats.misses++;
        return null;
    }

    async setCampaign(campaignId, data, ttl = 3600) {
        const cacheKey = `campaign:${campaignId}`;
        const jsonString = JSON.stringify(data);

        // Set in local cache (shorter TTL)
        this.localCache.set(cacheKey, data);

        // Set in Redis (longer TTL)
        try {
            await this.redis.setex(cacheKey, ttl, jsonString);
        } catch (error) {
            console.error('Redis set error:', error);
        }
    }

    async invalidateCampaign(campaignId) {
        const cacheKey = `campaign:${campaignId}`;

        // Remove from local cache
        this.localCache.delete(cacheKey);

        // Remove from Redis
        try {
            await this.redis.del(cacheKey);
        } catch (error) {
            console.error('Redis delete error:', error);
        }
    }

    getCacheStats() {
        const total = this.cacheStats.hits + this.cacheStats.misses;
        return {
            ...this.cacheStats,
            hitRate: total > 0 ? (this.cacheStats.hits / total * 100).toFixed(2) + '%' : '0%',
            localHitRate: this.cacheStats.hits > 0 ?
                (this.cacheStats.localHits / this.cacheStats.hits * 100).toFixed(2) + '%' : '0%'
        };
    }
}

Auto-scaling Infrastructure

Cloud-native scaling approach:

# Kubernetes Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: popup-analytics-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: popup-analytics
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: database_connections
      target:
        type: AverageValue
        averageValue: "50"
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60

# Database connection pool autoscaling
apiVersion: v1
kind: ConfigMap
metadata:
  name: database-config
data:
  pool-config.yaml: |
    max_connections: 100
    min_connections: 10
    connection_timeout: 30
    idle_timeout: 300
    max_lifetime: 3600
    health_check_interval: 30
    auto_scaling:
      enabled: true
      scale_up_threshold: 80
      scale_down_threshold: 30
      max_connections_per_pod: 20

Backup and Disaster Recovery

Database Backup Strategy

Implement comprehensive backup procedures:

-- Automated Backup Scripts
#!/bin/bash
# backup_database.sh

set -euo pipefail

# Configuration
DB_HOST="localhost"
DB_PORT="5432"
DB_NAME="popup_analytics"
BACKUP_DIR="/backups/postgresql"
S3_BUCKET="popup-analytics-backups"
RETENTION_DAYS=30

# Create backup directory
mkdir -p "$BACKUP_DIR"

# Generate timestamp
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
BACKUP_FILE="$BACKUP_DIR/popup_analytics_$TIMESTAMP.sql"

# Log function
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# Create full backup
log "Starting full database backup..."
pg_dump -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" \
    --format=custom \
    --compress=9 \
    --verbose \
    --file="$BACKUP_FILE"

log "Full backup completed: $BACKUP_FILE"

# Verify backup integrity
log "Verifying backup integrity..."
if pg_restore --list "$BACKUP_FILE" > /dev/null 2>&1; then
    log "Backup integrity check passed"
else
    log "ERROR: Backup integrity check failed"
    exit 1
fi

# Compress and upload to S3
log "Uploading backup to S3..."
gzip "$BACKUP_FILE"
aws s3 cp "$BACKUP_FILE.gz" "s3://$S3_BUCKET/postgresql/"

# Clean up local files
rm -f "$BACKUP_FILE.gz"

# Clean up old backups
log "Cleaning up old local backups..."
find "$BACKUP_DIR" -name "*.sql.gz" -mtime +$RETENTION_DAYS -delete

# Clean up S3 backups
log "Cleaning up old S3 backups..."
aws s3 ls "s3://$S3_BUCKET/postgresql/" | \
    while read -r line; do
        createDate=$(echo "$line" | awk '{print $1" "$2}')
        createDate=$(date -d "$createDate" +%s)
        olderThan=$(date -d "$RETENTION_DAYS days ago" +%s)
        if [[ $createDate -lt $olderThan ]]; then
            fileName=$(echo "$line" | awk '{print $4}')
            if [[ "$fileName" != "" ]]; then
                aws s3 rm "s3://$S3_BUCKET/postgresql/$fileName"
            fi
        fi
    done

log "Backup process completed successfully"

Point-in-Time Recovery

Implement PITR capabilities:

-- Point-in-Time Recovery Setup
-- Enable WAL archiving
-- postgresql.conf
wal_level = replica
archive_mode = on
archive_command = 'aws s3 cp %p s3://popup-analytics-wal-archive/%f'
archive_timeout = 300

-- recovery.conf for PITR
restore_command = 'aws s3 cp s3://popup-analytics-wal-archive/%f %p'
recovery_target_time = '2024-01-15 14:30:00 PST'

-- Automated PITR Script
#!/bin/bash
# point_in_time_recovery.sh

set -euo pipefail

RECOVERY_TIME="$1"
BACKUP_FILE="$2"
RECOVERY_DIR="/tmp/popup_recovery"
RECOVERED_DB="popup_analytics_recovered"

if [[ -z "$RECOVERY_TIME" || -z "$BACKUP_FILE" ]]; then
    echo "Usage: $0 '' ''"
    echo "Example: $0 '2024-01-15 14:30:00' 'popup_analytics_20240115_120000.sql'"
    exit 1
fi

mkdir -p "$RECOVERY_DIR"

# Download backup from S3 if not local
if [[ ! -f "$BACKUP_FILE" ]]; then
    echo "Downloading backup from S3..."
    aws s3 cp "s3://popup-analytics-backups/postgresql/$BACKUP_FILE" "$BACKUP_FILE"
fi

# Extract backup
echo "Extracting backup..."
pg_restore -d "$RECOVERED_DB" --clean --if-exists --verbose "$BACKUP_FILE"

# Apply WAL logs for point-in-time recovery
echo "Applying WAL logs until $RECOVERY_TIME..."
pg_ctl start -D "$RECOVERY_DIR" -o "-c recovery_target_time='$RECOVERY_TIME'"

# Verify recovery
echo "Verifying recovered database..."
psql -d "$RECOVERED_DB" -c "SELECT COUNT(*) FROM campaigns;"
psql -d "$RECOVERED_DB" -c "SELECT MAX(timestamp) FROM campaign_impressions;"

echo "Recovery completed successfully"
echo "Recovered database: $RECOVERED_DB"
echo "Recovery point: $RECOVERY_TIME"

High Availability Setup

Database replication and failover:

-- PostgreSQL Streaming Replication
-- Primary server configuration
-- postgresql.conf on primary
wal_level = replica
max_wal_senders = 3
max_replication_slots = 3
wal_keep_segments = 64
archive_mode = on
archive_command = 'rsync -a %p replica_server:/archive/%f'

-- replica.conf on replica
standby_mode = 'on'
primary_conninfo = 'host=primary_server port=5432 user=replicator'
recovery_target_timeline = 'latest'

-- Automatic failover with Patroni
apiVersion: v1
kind: ConfigMap
metadata:
  name: patroni-config
data:
  patroni.yml: |
    scope: popup-analytics
    namespace: /db/
    name: popup-analytics-1

    restapi:
      listen: 0.0.0.0:8008
      connect_address: popup-analytics-1:8008

    etcd:
      hosts: etcd-1:2379,etcd-2:2379,etcd-3:2379

    bootstrap:
      dcs:
        ttl: 30
        loop_wait: 10
        retry_timeout: 10
        maximum_lag_on_failover: 1048576
        postgresql:
          use_pg_rewind: true
          use_slots: true
          parameters:
            max_connections: 200
            max_prepared_transactions: 0
            wal_level: replica
            hot_standby: "on"
            max_wal_senders: 5
            max_replication_slots: 5
            wal_log_hints: "on"
            archive_mode: "on"
            archive_command: "aws s3 cp %p s3://popup-analytics-wal-archive/%f"

    pg_hba:
      - host replication replicator 0.0.0.0/0 md5
      - host all all 0.0.0.0/0 md5

-- Health check and monitoring
apiVersion: v1
kind: Service
metadata:
  name: popup-analytics-primary
spec:
  selector:
    app: popup-analytics
    role: master
  ports:
  - port: 5432
    targetPort: 5432
  type: ClusterIP

apiVersion: v1
kind: Service
metadata:
  name: popup-analytics-replica
spec:
  selector:
    app: popup-analytics
    role: replica
  ports:
  - port: 5432
    targetPort: 5432
  type: ClusterIP

Real-time Analytics Implementation

Streaming Data Pipeline

Real-time event processing architecture:

-- Kafka Topics for Event Streaming
# Campaign Events Topic Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: campaign-events
  labels:
    strimzi.io/cluster: popup-analytics
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 86400000  # 24 hours
    segment.bytes: 1073741824  # 1GB
    cleanup.policy: delete

# Real-time Analytics Consumer
class RealtimeAnalyticsConsumer {
    constructor(kafkaClient, redisClient, databaseClient) {
        this.kafka = kafkaClient;
        this.redis = redisClient;
        this.db = databaseClient;
        this.consumer = null;
        this.aggregationWindow = 60000; // 1 minute
        this.currentWindow = {};
    }

    async start() {
        this.consumer = this.kafka.consumer({ groupId: 'analytics-processor' });

        await this.consumer.subscribe({
            topics: ['campaign-events', 'user-interactions', 'conversions']
        });

        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                await this.processMessage(topic, JSON.parse(message.value.toString()));
            }
        });

        // Start periodic aggregation
        setInterval(() => this.aggregateAndPersist(), this.aggregationWindow);
    }

    async processMessage(topic, event) {
        const timestamp = Date.now();
        const windowKey = Math.floor(timestamp / this.aggregationWindow);

        // Add to current aggregation window
        if (!this.currentWindow[windowKey]) {
            this.currentWindow[windowKey] = {
                timestamp: windowKey * this.aggregationWindow,
                events: {},
                counts: {}
            };
        }

        const window = this.currentWindow[windowKey];

        if (topic === 'campaign-events') {
            // Track campaign impressions
            const campaignId = event.campaignId;
            window.events[campaignId] = window.events[campaignId] || {
                impressions: 0,
                clicks: 0,
                submissions: 0,
                uniqueSessions: new Set()
            };

            window.events[campaignId].impressions++;
            window.events[campaignId].uniqueSessions.add(event.sessionId);

        } else if (topic === 'user-interactions') {
            // Track user interactions
            const campaignId = event.campaignId;
            window.events[campaignId] = window.events[campaignId] || {
                impressions: 0,
                clicks: 0,
                submissions: 0,
                uniqueSessions: new Set()
            };

            if (event.type === 'click') {
                window.events[campaignId].clicks++;
            } else if (event.type === 'submit') {
                window.events[campaignId].submissions++;
            }
        }

        // Update Redis for real-time dashboard
        await this.updateRealtimeMetrics(event);
    }

    async updateRealtimeMetrics(event) {
        const campaignId = event.campaignId;
        const timestamp = Date.now();

        // Update campaign counters
        const pipeline = this.redis.pipeline();

        // Increment counters
        pipeline.hincrby(`campaign:${campaignId}:counters`, 'impressions', 1);

        // Update real-time metrics
        pipeline.zadd(`campaign:${campaignId}:timeline`, timestamp, JSON.stringify({
            timestamp,
            type: event.type,
            sessionId: event.sessionId
        }));

        // Clean old timeline data (keep last 24 hours)
        const cutoffTime = timestamp - (24 * 60 * 60 * 1000);
        pipeline.zremrangebyscore(`campaign:${campaignId}:timeline`, 0, cutoffTime);

        await pipeline.exec();
    }

    async aggregateAndPersist() {
        const now = Date.now();
        const currentWindowKey = Math.floor(now / this.aggregationWindow);

        // Process completed windows
        Object.keys(this.currentWindow).forEach(windowKey => {
            if (windowKey < currentWindowKey) {
                this.persistWindowData(this.currentWindow[windowKey]);
                delete this.currentWindow[windowKey];
            }
        });
    }

    async persistWindowData(windowData) {
        const timestamp = new Date(windowData.timestamp);

        Object.entries(windowData.events).forEach(([campaignId, metrics]) => {
            this.db.query(
                `INSERT INTO campaign_metrics (time_bucket, campaign_id, metric_name, metric_value)
                 VALUES ($1, $2, $3, $4)
                 ON CONFLICT (time_bucket, campaign_id, metric_name)
                 DO UPDATE SET metric_value = campaign_metrics.metric_value + EXCLUDED.metric_value`,
                [
                    timestamp,
                    campaignId,
                    'impressions',
                    metrics.impressions
                ]
            );

            this.db.query(
                `INSERT INTO campaign_metrics (time_bucket, campaign_id, metric_name, metric_value)
                 VALUES ($1, $2, $3, $4)
                 ON CONFLICT (time_bucket, campaign_id, metric_name)
                 DO UPDATE SET metric_value = campaign_metrics.metric_value + EXCLUDED.metric_value`,
                [
                    timestamp,
                    campaignId,
                    'unique_sessions',
                    metrics.uniqueSessions.size
                ]
            );
        });

        console.log(`Persisted window data for ${Object.keys(windowData.events).length} campaigns`);
    }
}

Dashboard Data Sources

Real-time dashboard queries and optimizations:

-- Real-time Dashboard Queries
-- Campaign Performance Overview
CREATE OR REPLACE FUNCTION get_campaign_dashboard(campaign_uuid UUID, time_range INTERVAL DEFAULT '24 hours')
RETURNS JSONB AS $$
DECLARE
    result JSONB;
    real_time_metrics JSONB;
    historical_data JSONB;
BEGIN
    -- Get real-time metrics from cache/Redis
    SELECT jsonb_build_object(
        'current_impressions', (SELECT COUNT(*) FROM redis_query('campaign:' || campaign_uuid || ':counters', 'impressions')),
        'current_ctr', calculate_realtime_ctr(campaign_uuid),
        'active_sessions', get_active_session_count(campaign_uuid),
        'last_interaction', get_last_interaction_time(campaign_uuid)
    ) INTO real_time_metrics;

    -- Get aggregated historical data
    WITH hourly_stats AS (
        SELECT
            date_trunc('hour', time_bucket) as hour,
            SUM(CASE WHEN metric_name = 'impressions' THEN metric_value ELSE 0 END) as impressions,
            SUM(CASE WHEN metric_name = 'unique_sessions' THEN metric_value ELSE 0 END) as unique_sessions,
            SUM(CASE WHEN metric_name = 'clicks' THEN metric_value ELSE 0 END) as clicks,
            SUM(CASE WHEN metric_name = 'submissions' THEN metric_value ELSE 0 END) as submissions
        FROM campaign_metrics
        WHERE campaign_id = campaign_uuid
        AND time_bucket >= NOW() - time_range
        GROUP BY date_trunc('hour', time_bucket)
        ORDER BY hour DESC
    )
    SELECT jsonb_agg(
        jsonb_build_object(
            'hour', hour,
            'impressions', impressions,
            'unique_sessions', unique_sessions,
            'clicks', clicks,
            'submissions', submissions,
            'ctr', CASE
                WHEN impressions > 0 THEN ROUND(clicks::NUMERIC / impressions * 100, 2)
                ELSE 0
            END
        )
    ) INTO historical_data
    FROM hourly_stats;

    -- Combine results
    SELECT jsonb_build_object(
        'campaign_id', campaign_uuid,
        'real_time', real_time_metrics,
        'historical', historical_data,
        'time_range', time_range,
        'last_updated', NOW()
    ) INTO result;

    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- User Behavior Heatmap
CREATE OR REPLACE FUNCTION get_interaction_heatmap(campaign_uuid UUID, time_range INTERVAL DEFAULT '7 days')
RETURNS JSONB AS $$
DECLARE
    result JSONB;
BEGIN
    WITH hourly_interactions AS (
        SELECT
            EXTRACT(HOUR FROM timestamp) as hour_of_day,
            EXTRACT(DOW FROM timestamp) as day_of_week,
            COUNT(*) as interaction_count,
            COUNT(DISTINCT session_id) as unique_sessions
        FROM campaign_interactions
        WHERE campaign_id = campaign_uuid
        AND timestamp >= NOW() - time_range
        GROUP BY EXTRACT(HOUR FROM timestamp), EXTRACT(DOW FROM timestamp)
    ),
    max_interactions AS (
        SELECT MAX(interaction_count) as max_count FROM hourly_interactions
    )
    SELECT jsonb_build_object(
        'heatmap_data', (
            SELECT jsonb_agg(
                jsonb_build_object(
                    'hour', hour_of_day,
                    'day_of_week', day_of_week,
                    'interactions', interaction_count,
                    'intensity', ROUND(interaction_count::NUMERIC / max_count * 100)
                )
            ) FROM hourly_interactions CROSS JOIN max_interactions
        ),
        'peak_hour', (SELECT hour_of_day FROM hourly_interactions ORDER BY interaction_count DESC LIMIT 1),
        'peak_day', (SELECT day_of_week FROM hourly_interactions ORDER BY interaction_count DESC LIMIT 1),
        'total_interactions', (SELECT SUM(interaction_count) FROM hourly_interactions)
    ) INTO result;

    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- Conversion Funnel Analysis
CREATE OR REPLACE FUNCTION get_conversion_funnel(campaign_uuid UUID, time_range INTERVAL DEFAULT '30 days')
RETURNS JSONB AS $$
DECLARE
    result JSONB;
BEGIN
    WITH funnel_stages AS (
        -- Stage 1: Impressions
        SELECT
            'impressions' as stage,
            COUNT(DISTINCT session_id) as count
        FROM campaign_impressions
        WHERE campaign_id = campaign_uuid
        AND timestamp >= NOW() - time_range

        UNION ALL

        -- Stage 2: Views (popup displayed for sufficient time)
        SELECT
            'views' as stage,
            COUNT(DISTINCT session_id) as count
        FROM campaign_interactions
        WHERE campaign_id = campaign_uuid
        AND interaction_type = 'view'
        AND timestamp >= NOW() - time_range

        UNION ALL

        -- Stage 3: Engagements (clicks or interactions)
        SELECT
            'engagements' as stage,
            COUNT(DISTINCT session_id) as count
        FROM campaign_interactions
        WHERE campaign_id = campaign_uuid
        AND interaction_type IN ('click', 'hover', 'focus')
        AND timestamp >= NOW() - time_range

        UNION ALL

        -- Stage 4: Conversions
        SELECT
            'conversions' as stage,
            COUNT(DISTINCT session_id) as count
        FROM campaign_conversions
        WHERE campaign_id = campaign_uuid
        AND timestamp >= NOW() - time_range
    ),
    funnel_with_rates AS (
        SELECT
            stage,
            count,
            LAG(count) OVER (ORDER BY
                CASE stage
                    WHEN 'impressions' THEN 1
                    WHEN 'views' THEN 2
                    WHEN 'engagements' THEN 3
                    WHEN 'conversions' THEN 4
                END
            ) as previous_count
        FROM funnel_stages
    )
    SELECT jsonb_build_object(
        'stages', (
            SELECT jsonb_agg(
                jsonb_build_object(
                    'stage', stage,
                    'count', count,
                    'conversion_rate', CASE
                        WHEN previous_count > 0 THEN
                            ROUND(count::NUMERIC / previous_count * 100, 2)
                        ELSE NULL
                    END
                )
            ) FROM funnel_with_rates
        ),
        'overall_conversion_rate', (
            SELECT ROUND(
                conversions::NUMERIC / impressions * 100, 2
            )
            FROM (
                SELECT
                    SUM(CASE WHEN stage = 'conversions' THEN count ELSE 0 END) as conversions,
                    SUM(CASE WHEN stage = 'impressions' THEN count ELSE 0 END) as impressions
                FROM funnel_stages
            ) s
        )
    ) INTO result;

    RETURN result;
END;
$$ LANGUAGE plpgsql;

Conclusion

Designing a robust database architecture for popup analytics and management requires careful consideration of data modeling, performance optimization, scalability, and data governance. The comprehensive approach outlined in this guide provides a solid foundation for building systems capable of handling high-volume data collection while maintaining performance and data integrity.

Key success factors include implementing appropriate database partitioning strategies, establishing efficient caching layers, designing scalable data pipelines, and ensuring robust backup and recovery procedures. Regular monitoring and optimization of database performance, along with thoughtful data retention policies, will help maintain system efficiency as data volumes grow.

Remember that database design is an iterative process. Start with a solid foundation, monitor performance under real-world conditions, and continuously refine your architecture based on actual usage patterns and business requirements. The principles and techniques covered in this guide will serve as a strong starting point for building scalable, efficient popup analytics systems.

Compliance Notice: This technical guide is for educational purposes only and does not guarantee specific results. Database implementations should be tailored to your specific business requirements, compliance obligations, and technical constraints. Always consult with database architects and legal professionals when implementing data storage solutions that handle user information.

TAGS

databaseanalyticspopup-managementdata-modelingtime-seriesperformancescalingbackup-recoveryreal-time
M

Michael Rodriguez

Senior Database Architect at Nudgesmart

Never Miss an Update

Get the latest conversion optimization tips and strategies delivered straight to your inbox.

Join 5,000+ subscribers. Unsubscribe anytime.