最强总结!数据库脏数据清理完整指南!!17认证网

正规官方授权
更专业・更权威

最强总结!数据库脏数据清理完整指南!!

数据质量是数据库管理中的关键问题。本文将系统地介绍数据库中脏数据的识别、清理和预防方法,帮助您提升数据质量!

一、脏数据类型识别

二、脏数据清理方法

三、数据质量管理

四、自动清理方案

一、脏数据类型识别

1. 常见脏数据类型

-- 1. 重复数据
SELECT name, email, COUNT(*) as count
FROM users
GROUP BY name, email
HAVING COUNT(*) > 1;

-- 2. 空值异常
SELECT *
FROM customers
WHERE phone IS NULL
   OR TRIM(phone) = ''
   OR phone = 'null'
   OR phone = 'undefined';

-- 3. 格式不统一
SELECT DISTINCT
    email,
    CASE 
        WHEN email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN '格式无效'
        ELSE '格式有效'
    END as validation
FROM users;

-- 4. 超出范围的值
SELECT *
FROM products
WHERE price < 0
   OR quantity < 0
   OR price > 999999;

-- 5. 冗余数据
SELECT p.*, c.name as category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
WHERE c.id IS NULL;

-- 6. 不一致的数据
SELECT o.order_id, o.total_amount, 
       SUM(oi.quantity * oi.unit_price) as calculated_total
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
GROUP BY o.order_id, o.total_amount
HAVING o.total_amount != SUM(oi.quantity * oi.unit_price);

2. 脏数据识别工具

数据质量分析SQL
-- 创建数据质量分析函数
DELIMITER $$

CREATE PROCEDURE analyze_table_quality(IN table_name VARCHAR(100))
BEGIN
    SET @sql = CONCAT('
    -- 1. 总记录数
    SELECT COUNT(*) as total_records FROM ', table_name, ';
    
    -- 2. 空值统计
    SELECT 
        column_name,
        COUNT(*) as null_count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM ', table_name, '), 2) as null_percentage
    FROM information_schema.columns
    WHERE table_name = "', table_name, '"
    GROUP BY column_name;
    
    -- 3. 重复值检测
    SELECT 
        COUNT(*) as total,
        COUNT(DISTINCT column_name) as distinct_count,
        ROUND((COUNT(*) - COUNT(DISTINCT column_name)) * 100.0 / COUNT(*), 2) as duplicate_percentage
    FROM ', table_name, '
    GROUP BY column_name
    HAVING COUNT(*) > COUNT(DISTINCT column_name);
    
    -- 4. 异常值检测 (数值字段)
    SELECT column_name,
           MIN(column_value) as min_value,
           MAX(column_value) as max_value,
           AVG(column_value) as avg_value,
           STDDEV(column_value) as std_dev
    FROM ', table_name, '
    WHERE column_name IN (
        SELECT column_name 
        FROM information_schema.columns 
        WHERE data_type IN ("int", "decimal", "float")
        AND table_name = "', table_name, '"
    )
    GROUP BY column_name;
    ');
    
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END $$

DELIMITER ;

-- 使用示例
CALL analyze_table_quality('customers');

二、脏数据清理方法

1. 基础清理操作

-- 1. 删除重复数据(保留最新记录)
WITH DuplicateCTE AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY email 
            ORDER BY created_at DESC
        ) as row_num
    FROM users
)
DELETE FROM DuplicateCTE WHERE row_num > 1;

-- 2. 标准化空值
UPDATE customers
SET 
    phone = NULL WHERE TRIM(phone) = '' OR phone = 'null' OR phone = 'undefined',
    email = NULL WHERE TRIM(email) = '' OR email = 'null' OR email = 'undefined';

-- 3. 格式统一化
UPDATE users
SET 
    name = TRIM(LOWER(name)),
    email = TRIM(LOWER(email));

-- 4. 修正超出范围的值
UPDATE products
SET 
    price = 0 WHERE price < 0,
    quantity = 0 WHERE quantity < 0,
    price = 999999 WHERE price > 999999;

-- 5. 清理无效外键
DELETE FROM order_items 
WHERE product_id NOT IN (SELECT id FROM products);

-- 6. 修正不一致数据
UPDATE orders o
SET total_amount = (
    SELECT SUM(quantity * unit_price)
    FROM order_items oi
    WHERE oi.order_id = o.order_id
);

2. 高级清理方案

-- 1. 创建数据清理日志表
CREATE TABLE data_cleaning_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100),
    column_name VARCHAR(100),
    cleaning_type VARCHAR(50),
    old_value TEXT,
    new_value TEXT,
    affected_rows INT,
    cleaned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 2. 创建数据清理存储过程
DELIMITER $$

CREATE PROCEDURE clean_customer_data()
BEGIN
    DECLARE affected_rows INT;
    
    -- 开启事务
    START TRANSACTION;
    
    -- 2.1 清理电话号码格式
    UPDATE customers 
    SET phone = REGEXP_REPLACE(phone, '[^0-9]', '')
    WHERE phone IS NOT NULL AND phone REGEXP '[^0-9]';
    
    SET affected_rows = ROW_COUNT();
    IF affected_rows > 0 THEN
        INSERT INTO data_cleaning_log 
        (table_name, column_name, cleaning_type, affected_rows)
        VALUES 
        ('customers', 'phone', 'format_standardization', affected_rows);
    END IF;
    
    -- 2.2 清理邮箱地址
    UPDATE customers 
    SET email = LOWER(TRIM(email))
    WHERE email IS NOT NULL AND (email != LOWER(TRIM(email)));
    
    SET affected_rows = ROW_COUNT();
    IF affected_rows > 0 THEN
        INSERT INTO data_cleaning_log 
        (table_name, column_name, cleaning_type, affected_rows)
        VALUES 
        ('customers', 'email', 'format_standardization', affected_rows);
    END IF;
    
    -- 2.3 清理无效的年龄数据
    UPDATE customers 
    SET age = NULL
    WHERE age < 0 OR age > 120;
    
    SET affected_rows = ROW_COUNT();
    IF affected_rows > 0 THEN
        INSERT INTO data_cleaning_log 
        (table_name, column_name, cleaning_type, affected_rows)
        VALUES 
        ('customers', 'age', 'invalid_value_removal', affected_rows);
    END IF;
    
    -- 提交事务
    COMMIT;
    
    -- 返回清理报告
    SELECT 
        cleaning_type,
        SUM(affected_rows) as total_cleaned,
        MAX(cleaned_at) as cleaning_time
    FROM data_cleaning_log
    WHERE table_name = 'customers'
    GROUP BY cleaning_type;
    
END $$

DELIMITER ;

-- 3. 创建数据质量检查触发器
DELIMITER $$

CREATE TRIGGER before_customer_insert
BEFORE INSERT ON customers
FOR EACH ROW
BEGIN
    -- 3.1 格式化电话号码
    IF NEW.phone IS NOT NULL THEN
        SET NEW.phone = REGEXP_REPLACE(NEW.phone, '[^0-9]', '');
    END IF;
    
    -- 3.2 格式化邮箱
    IF NEW.email IS NOT NULL THEN
        SET NEW.email = LOWER(TRIM(NEW.email));
    END IF;
    
    -- 3.3 验证年龄
    IF NEW.age < 0 OR NEW.age > 120 THEN
        SET NEW.age = NULL;
    END IF;
END $$

DELIMITER ;

-- 4. 创建定期数据清理作业
EVENT clean_data_daily
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_TIMESTAMP
DO
BEGIN
    CALL clean_customer_data();
END;

三、数据质量管理

1.预防措施

-- 1. 创建强制约束
ALTER TABLE users
    ADD CONSTRAINT check_email 
        CHECK (email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'),
    ADD CONSTRAINT check_phone 
        CHECK (phone REGEXP '^[0-9]{11}$'),
    ADD CONSTRAINT check_age 
        CHECK (age BETWEEN 0 AND 120);

-- 2. 创建唯一索引防止重复
CREATE UNIQUE INDEX idx_unique_email ON users(email);

-- 3. 设置默认值
ALTER TABLE products
    ALTER COLUMN status SET DEFAULT 'active',
    ALTER COLUMN created_at SET DEFAULT CURRENT_TIMESTAMP;

-- 4. 创建数据验证触发器
DELIMITER $$

CREATE TRIGGER before_user_insert
BEFORE INSERT ON users
FOR EACH ROW
BEGIN
    -- 4.1 格式化数据
    SET NEW.email = LOWER(TRIM(NEW.email));
    SET NEW.name = TRIM(NEW.name);
    
    -- 4.2 验证数据
    IF LENGTH(NEW.password) < 8 THEN
        SIGNAL SQLSTATE '45000'
        SET MESSAGE_TEXT = 'Password must be at least 8 characters long';
    END IF;
    
    -- 4.3 设置默认值
    IF NEW.status IS NULL THEN
        SET NEW.status = 'active';
    END IF;
END $$

DELIMITER ;

-- 5. 创建审计日志
CREATE TABLE audit_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100),
    action_type ENUM('INSERT', 'UPDATE', 'DELETE'),
    record_id BIGINT,
    old_value JSON,
    new_value JSON,
    user_id BIGINT,
    action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 6. 创建审计触发器
DELIMITER $$

CREATE TRIGGER after_user_change
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
    INSERT INTO audit_log (
        table_name,
        action_type,
        record_id,
        old_value,
        new_value,
        user_id
    )
    VALUES (
        'users',
        'UPDATE',
        NEW.id,
        JSON_OBJECT(
            'email', OLD.email,
            'name', OLD.name,
            'status', OLD.status
        ),
        JSON_OBJECT(
            'email', NEW.email,
            'name', NEW.name,
            'status', NEW.status
        ),
        @current_user_id
    );
END $$

DELIMITER ;

2. 监控方案

-- 1. 创建数据质量监控表
CREATE TABLE data_quality_metrics (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100),
    metric_name VARCHAR(100),
    metric_value DECIMAL(10,2),
    threshold DECIMAL(10,2),
    check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status ENUM('OK', 'WARNING', 'ERROR') DEFAULT 'OK'
);

-- 2. 创建监控存储过程
DELIMITER $$

CREATE PROCEDURE monitor_data_quality()
BEGIN
    DECLARE total_records INT;
    DECLARE null_percentage DECIMAL(10,2);
    DECLARE duplicate_percentage DECIMAL(10,2);
    DECLARE invalid_percentage DECIMAL(10,2);
    
    -- 2.1 检查空值比例
    SELECT 
        COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO null_percentage
    FROM users 
    WHERE email IS NULL OR phone IS NULL;
    
    INSERT INTO data_quality_metrics 
        (table_name, metric_name, metric_value, threshold, status)
    VALUES 
        ('users', 'null_percentage', null_percentage, 5.00,
        CASE 
            WHEN null_percentage > 10 THEN 'ERROR'
            WHEN null_percentage > 5 THEN 'WARNING'
            ELSE 'OK'
        END);
    
    -- 2.2 检查重复值比例
    WITH duplicates AS (
        SELECT email, COUNT(*) as count
        FROM users
        GROUP BY email
        HAVING COUNT(*) > 1
    )
    SELECT 
        COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO duplicate_percentage
    FROM duplicates;
    
    INSERT INTO data_quality_metrics 
        (table_name, metric_name, metric_value, threshold, status)
    VALUES 
        ('users', 'duplicate_percentage', duplicate_percentage, 1.00,
        CASE 
            WHEN duplicate_percentage > 2 THEN 'ERROR'
            WHEN duplicate_percentage > 1 THEN 'WARNING'
            ELSE 'OK'
        END);
        
    -- 2.3 检查无效数据比例
    SELECT 
        COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO invalid_percentage
    FROM users 
    WHERE 
        email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' OR
        phone NOT REGEXP '^[0-9]{11}$' OR
        age NOT BETWEEN 0 AND 120;
    
    INSERT INTO data_quality_metrics 
        (table_name, metric_name, metric_value, threshold, status)
    VALUES 
        ('users', 'invalid_percentage', invalid_percentage, 1.00,
        CASE 
            WHEN invalid_percentage > 2 THEN 'ERROR'
            WHEN invalid_percentage > 1 THEN 'WARNING'
            ELSE 'OK'
        END);
END $$

DELIMITER ;

-- 3. 创建报警触发器
DELIMITER $$

CREATE TRIGGER after_metric_insert
AFTER INSERT ON data_quality_metrics
FOR EACH ROW
BEGIN
    -- 如果状态为ERROR,插入报警记录
    IF NEW.status = 'ERROR' THEN
        INSERT INTO alert_log (
            metric_id,
            alert_message,
            alert_level
        )
        VALUES (
            NEW.id,
            CONCAT(
                'Data quality alert for table ', 
                NEW.table_name, 
                ': ', 
                NEW.metric_name, 
                ' = ', 
                NEW.metric_value,
                ' (threshold: ',
                NEW.threshold,
                ')'
            ),
            'HIGH'
        );
    END IF;
END $$

DELIMITER ;

-- 4. 创建定时监控作业
CREATE EVENT monitor_data_quality_job
ON SCHEDULE EVERY 1 HOUR
DO CALL monitor_data_quality();

3. 报告和分析

-- 1. 创建数据质量报告视图
CREATE VIEW data_quality_report AS
SELECT 
    table_name,
    metric_name,
    AVG(metric_value) as avg_value,
    MAX(metric_value) as max_value,
    MIN(metric_value) as min_value,
    COUNT(CASE WHEN status = 'ERROR' THEN 1 END) as error_count,
    COUNT(CASE WHEN status = 'WARNING' THEN 1 END) as warning_count,
    COUNT(*) as total_checks,
    DATE(check_time) as check_date
FROM data_quality_metrics
GROUP BY table_name, metric_name, DATE(check_time);

-- 2. 创建趋势分析视图
CREATE VIEW data_quality_trends AS
SELECT 
    table_name,
    metric_name,
    WEEK(check_time) as week_number,
    YEAR(check_time) as year,
    AVG(metric_value) as avg_value,
    MAX(metric_value) as max_value,
    MIN(metric_value) as min_value
FROM data_quality_metrics
GROUP BY table_name, metric_name, WEEK(check_time), YEAR(check_time);

-- 3. 创建问题汇总报告
CREATE VIEW data_quality_issues AS
SELECT 
    m.table_name,
    m.metric_name,
    m.metric_value,
    m.threshold,
    m.status,
    m.check_time,
    a.alert_message,
    a.alert_level,
    a.created_at as alert_time
FROM data_quality_metrics m
LEFT JOIN alert_log a ON m.id = a.metric_id
WHERE m.status IN ('WARNING', 'ERROR')
ORDER BY m.check_time DESC;

-- 4. 创建数据质量评分函数
DELIMITER $$

CREATE FUNCTION calculate_quality_score(
    p_table_name VARCHAR(100),
    p_date DATE
)
RETURNS DECIMAL(5,2)
DETERMINISTIC
BEGIN
    DECLARE quality_score DECIMAL(5,2);
    
    SELECT 
        100 - (
            COUNT(CASE WHEN status = 'ERROR' THEN 1 END) * 10 +
            COUNT(CASE WHEN status = 'WARNING' THEN 1 END) * 5
        ) / COUNT(*) * 100 INTO quality_score
    FROM data_quality_metrics
    WHERE table_name = p_table_name
    AND DATE(check_time) = p_date;
    
    RETURN COALESCE(quality_score, 100);
END $$

DELIMITER ;

-- 5. 创建月度报告存储过程
DELIMITER $$

CREATE PROCEDURE generate_monthly_report(IN p_year INT, IN p_month INT)
BEGIN
    -- 5.1 总体质量评分
    SELECT 
        table_name,
        calculate_quality_score(table_name, LAST_DAY(CONCAT(p_year,'-',p_month,'-01'))) as quality_score,
        COUNT(DISTINCT CASE WHEN status = 'ERROR' THEN date(check_time) END) as days_with_errors,
        COUNT(DISTINCT CASE WHEN status = 'WARNING' THEN date(check_time) END) as days_with_warnings
    FROM data_quality_metrics
    WHERE YEAR(check_time) = p_year AND MONTH(check_time) = p_month
    GROUP BY table_name;
    
    -- 5.2 主要问题分析
    SELECT 
        table_name,
        metric_name,
        COUNT(*) as issue_count,
        AVG(metric_value) as avg_value,
        MAX(metric_value) as max_value
    FROM data_quality_metrics
    WHERE YEAR(check_time) = p_year 
    AND MONTH(check_time) = p_month
    AND status IN ('WARNING', 'ERROR')
    GROUP BY table_name, metric_name
    ORDER BY issue_count DESC;
    
    -- 5.3 改进建议
    WITH RankedIssues AS (
        SELECT 
            table_name,
            metric_name,
            COUNT(*) as issue_count,
            ROW_NUMBER() OVER (PARTITION BY table_name ORDER BY COUNT(*) DESC) as rn
        FROM data_quality_metrics
        WHERE YEAR(check_time) = p_year 
        AND MONTH(check_time) = p_month
        AND status IN ('WARNING', 'ERROR')
        GROUP BY table_name, metric_name
    )
    SELECT 
        table_name,
        metric_name,
        issue_count,
        CASE 
            WHEN metric_name LIKE '%null%' THEN '建议添加非空约束或默认值'
            WHEN metric_name LIKE '%duplicate%' THEN '建议添加唯一索引'
            WHEN metric_name LIKE '%invalid%' THEN '建议加强数据验证'
            ELSE '建议进行深入分析'
        END as recommendation
    FROM RankedIssues
    WHERE rn <= 3;
END $$

DELIMITER ;

四、自动化清理方案

1. 增量清理作业

-- 1. 创建清理任务表
CREATE TABLE cleaning_tasks (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100),
    last_processed_id BIGINT,
    batch_size INT DEFAULT 1000,
    status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 2. 创建增量清理存储过程
DELIMITER $$

CREATE PROCEDURE clean_data_incrementally(
    IN p_table_name VARCHAR(100),
    IN p_batch_size INT
)
BEGIN
    DECLARE v_task_id BIGINT;
    DECLARE v_last_id BIGINT;
    DECLARE v_max_id BIGINT;
    DECLARE v_affected_rows INT;
    DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;
    DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;
    
    -- 创建清理任务
    INSERT INTO cleaning_tasks (table_name, batch_size, status, start_time)
    VALUES (p_table_name, p_batch_size, 'RUNNING', NOW());
    SET v_task_id = LAST_INSERT_ID();
    
    -- 获取最大ID
    SET @sql = CONCAT('SELECT MAX(id) INTO @max_id FROM ', p_table_name);
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
    SET v_max_id = @max_id;
    
    -- 获取上次处理的ID
    SELECT COALESCE(MAX(last_processed_id), 0) 
    INTO v_last_id 
    FROM cleaning_tasks 
    WHERE table_name = p_table_name 
    AND status = 'COMPLETED';
    
    -- 开始批量处理
    WHILE v_last_id < v_max_id AND NOT v_error_occurred DO
        START TRANSACTION;
        
        -- 清理重复数据
        SET @sql = CONCAT('
            WITH DuplicateRows AS (
                SELECT id,
                    ROW_NUMBER() OVER (
                        PARTITION BY email 
                        ORDER BY updated_at DESC
                    ) as rn
                FROM ', p_table_name, '
                WHERE id > ', v_last_id, '
                AND id <= ', v_last_id + p_batch_size, '
            )
            DELETE FROM ', p_table_name, '
            WHERE id IN (
                SELECT id FROM DuplicateRows WHERE rn > 1
            )
        ');
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        SET v_affected_rows = ROW_COUNT();
        DEALLOCATE PREPARE stmt;
        
        -- 清理无效数据
        SET @sql = CONCAT('
            UPDATE ', p_table_name, '
            SET 
                email = CASE 
                    WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
                    THEN NULL 
                    ELSE LOWER(TRIM(email))
                END,
                phone = CASE
                    WHEN phone NOT REGEXP "^[0-9]{11}$"
                    THEN NULL
                    ELSE phone
                END
            WHERE id > ', v_last_id, '
            AND id <= ', v_last_id + p_batch_size
        );
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        SET v_affected_rows = v_affected_rows + ROW_COUNT();
        DEALLOCATE PREPARE stmt;
        
        -- 更新任务状态
        UPDATE cleaning_tasks 
        SET last_processed_id = v_last_id + p_batch_size,
            affected_rows = v_affected_rows
        WHERE id = v_task_id;
        
        SET v_last_id = v_last_id + p_batch_size;
        
        COMMIT;
        
        -- 添加延迟避免过度占用系统资源
        DO SLEEP(0.1);
    END WHILE;
    
    -- 更新任务完成状态
    UPDATE cleaning_tasks 
    SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),
        end_time = NOW(),
        error_message = IF(v_error_occurred, 'Error occurred during processing', NULL)
    WHERE id = v_task_id;
    
END $$

DELIMITER ;

-- 3. 创建定时执行作业
CREATE EVENT incremental_cleaning_job
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_TIMESTAMP
DO
BEGIN
    -- 为每个需要清理的表创建清理任务
    CALL clean_data_incrementally('users', 1000);
    CALL clean_data_incrementally('customers', 1000);
    CALL clean_data_incrementally('orders', 1000);
END;

2. 并行清理方案

-- 1. 创建分区清理任务表
CREATE TABLE partition_cleaning_tasks (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100),
    partition_key VARCHAR(100),
    partition_value VARCHAR(100),
    status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),
    worker_id VARCHAR(36),
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    affected_rows INT,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 2. 创建工作进程注册表
CREATE TABLE cleaning_workers (
    worker_id VARCHAR(36) PRIMARY KEY,
    host_name VARCHAR(100),
    last_heartbeat TIMESTAMP,
    status ENUM('ACTIVE', 'INACTIVE'),
    current_task_id BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 3. 创建并行清理存储过程
DELIMITER $$

CREATE PROCEDURE create_parallel_cleaning_tasks(
    IN p_table_name VARCHAR(100),
    IN p_partition_key VARCHAR(100),
    IN p_num_partitions INT
)
BEGIN
    DECLARE v_min_value VARCHAR(100);
    DECLARE v_max_value VARCHAR(100);
    DECLARE v_partition_size INT;
    DECLARE i INT DEFAULT 0;
    
    -- 获取分区范围
    SET @sql = CONCAT('
        SELECT 
            MIN(', p_partition_key, '),
            MAX(', p_partition_key, ')
        INTO @min_val, @max_val
        FROM ', p_table_name
    );
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
    
    SET v_min_value = @min_val;
    SET v_max_value = @max_val;
    SET v_partition_size = CEIL((v_max_value - v_min_value) / p_num_partitions);
    
    -- 创建分区任务
    WHILE i < p_num_partitions DO
        INSERT INTO partition_cleaning_tasks (
            table_name,
            partition_key,
            partition_value,
            status
        )
        VALUES (
            p_table_name,
            p_partition_key,
            CONCAT(
                v_min_value + (i * v_partition_size),
                ',',
                LEAST(
                    v_min_value + ((i + 1) * v_partition_size),
                    v_max_value
                )
            ),
            'PENDING'
        );
        SET i = i + 1;
    END WHILE;
END $$

-- 4. 创建工作进程心跳更新存储过程
CREATE PROCEDURE update_worker_heartbeat(
    IN p_worker_id VARCHAR(36)
)
BEGIN
    UPDATE cleaning_workers
    SET last_heartbeat = NOW()
    WHERE worker_id = p_worker_id;
    
    -- 检查并重置超时的任务
    UPDATE partition_cleaning_tasks
    SET status = 'PENDING',
        worker_id = NULL
    WHERE worker_id IN (
        SELECT worker_id 
        FROM cleaning_workers
        WHERE TIME_TO_SEC(TIMEDIFF(NOW(), last_heartbeat)) > 300
    )
    AND status = 'RUNNING';
END $$

-- 5. 创建任务执行存储过程
CREATE PROCEDURE execute_cleaning_task(
    IN p_worker_id VARCHAR(36),
    IN p_task_id BIGINT
)
BEGIN
    DECLARE v_table_name VARCHAR(100);
    DECLARE v_partition_key VARCHAR(100);
    DECLARE v_partition_value VARCHAR(100);
    DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;
    DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;
    
    -- 获取任务信息
    SELECT table_name, partition_key, partition_value
    INTO v_table_name, v_partition_key, v_partition_value
    FROM partition_cleaning_tasks
    WHERE id = p_task_id;
    
    -- 开始处理
    UPDATE partition_cleaning_tasks
    SET status = 'RUNNING',
        worker_id = p_worker_id,
        start_time = NOW()
    WHERE id = p_task_id;
    
    -- 解析分区范围
    SET @partition_start = SUBSTRING_INDEX(v_partition_value, ',', 1);
    SET @partition_end = SUBSTRING_INDEX(v_partition_value, ',', -1);
    
    -- 执行清理操作
    SET @sql = CONCAT('
        UPDATE ', v_table_name, '
        SET 
            email = CASE 
                WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
                THEN NULL 
                ELSE LOWER(TRIM(email))
            END,
            phone = CASE
                WHEN phone NOT REGEXP "^[0-9]{11}$"
                THEN NULL
                ELSE phone
            END
        WHERE ', v_partition_key, ' BETWEEN ', @partition_start, ' AND ', @partition_end
    );
    
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    SET @affected = ROW_COUNT();
    DEALLOCATE PREPARE stmt;
    
    -- 更新任务状态
    UPDATE partition_cleaning_tasks
    SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),
        end_time = NOW(),
        affected_rows = IF(v_error_occurred, 0, @affected),
        error_message = IF(v_error_occurred, 'Error during execution', NULL)
    WHERE id = p_task_id;
END $$

DELIMITER ;

想了解更多干货,可通过下方扫码关注

详情咨询

可扫码添加上智启元官方客服微信👇

未经允许不得转载:17认证网 » 最强总结!数据库脏数据清理完整指南!!
分享到:0

评论已关闭。

400-663-6632
咨询老师
咨询老师
咨询老师