数据质量是数据库管理中的关键问题。本文将系统地介绍数据库中脏数据的识别、清理和预防方法,帮助您提升数据质量!
一、脏数据类型识别
二、脏数据清理方法
三、数据质量管理
四、自动清理方案
一、脏数据类型识别
1. 常见脏数据类型
-- 1. 重复数据
SELECT name, email, COUNT(*) as count
FROM users
GROUP BY name, email
HAVING COUNT(*) > 1;
-- 2. 空值异常
SELECT *
FROM customers
WHERE phone IS NULL
OR TRIM(phone) = ''
OR phone = 'null'
OR phone = 'undefined';
-- 3. 格式不统一
SELECT DISTINCT
email,
CASE
WHEN email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN '格式无效'
ELSE '格式有效'
END as validation
FROM users;
-- 4. 超出范围的值
SELECT *
FROM products
WHERE price < 0
OR quantity < 0
OR price > 999999;
-- 5. 冗余数据
SELECT p.*, c.name as category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
WHERE c.id IS NULL;
-- 6. 不一致的数据
SELECT o.order_id, o.total_amount,
SUM(oi.quantity * oi.unit_price) as calculated_total
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
GROUP BY o.order_id, o.total_amount
HAVING o.total_amount != SUM(oi.quantity * oi.unit_price);
2. 脏数据识别工具
数据质量分析SQL
-- 创建数据质量分析函数
DELIMITER $$
CREATE PROCEDURE analyze_table_quality(IN table_name VARCHAR(100))
BEGIN
SET @sql = CONCAT('
-- 1. 总记录数
SELECT COUNT(*) as total_records FROM ', table_name, ';
-- 2. 空值统计
SELECT
column_name,
COUNT(*) as null_count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM ', table_name, '), 2) as null_percentage
FROM information_schema.columns
WHERE table_name = "', table_name, '"
GROUP BY column_name;
-- 3. 重复值检测
SELECT
COUNT(*) as total,
COUNT(DISTINCT column_name) as distinct_count,
ROUND((COUNT(*) - COUNT(DISTINCT column_name)) * 100.0 / COUNT(*), 2) as duplicate_percentage
FROM ', table_name, '
GROUP BY column_name
HAVING COUNT(*) > COUNT(DISTINCT column_name);
-- 4. 异常值检测 (数值字段)
SELECT column_name,
MIN(column_value) as min_value,
MAX(column_value) as max_value,
AVG(column_value) as avg_value,
STDDEV(column_value) as std_dev
FROM ', table_name, '
WHERE column_name IN (
SELECT column_name
FROM information_schema.columns
WHERE data_type IN ("int", "decimal", "float")
AND table_name = "', table_name, '"
)
GROUP BY column_name;
');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END $$
DELIMITER ;
-- 使用示例
CALL analyze_table_quality('customers');
二、脏数据清理方法
1. 基础清理操作
-- 1. 删除重复数据(保留最新记录)
WITH DuplicateCTE AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY email
ORDER BY created_at DESC
) as row_num
FROM users
)
DELETE FROM DuplicateCTE WHERE row_num > 1;
-- 2. 标准化空值
UPDATE customers
SET
phone = NULL WHERE TRIM(phone) = '' OR phone = 'null' OR phone = 'undefined',
email = NULL WHERE TRIM(email) = '' OR email = 'null' OR email = 'undefined';
-- 3. 格式统一化
UPDATE users
SET
name = TRIM(LOWER(name)),
email = TRIM(LOWER(email));
-- 4. 修正超出范围的值
UPDATE products
SET
price = 0 WHERE price < 0,
quantity = 0 WHERE quantity < 0,
price = 999999 WHERE price > 999999;
-- 5. 清理无效外键
DELETE FROM order_items
WHERE product_id NOT IN (SELECT id FROM products);
-- 6. 修正不一致数据
UPDATE orders o
SET total_amount = (
SELECT SUM(quantity * unit_price)
FROM order_items oi
WHERE oi.order_id = o.order_id
);
2. 高级清理方案
-- 1. 创建数据清理日志表
CREATE TABLE data_cleaning_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
column_name VARCHAR(100),
cleaning_type VARCHAR(50),
old_value TEXT,
new_value TEXT,
affected_rows INT,
cleaned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 2. 创建数据清理存储过程
DELIMITER $$
CREATE PROCEDURE clean_customer_data()
BEGIN
DECLARE affected_rows INT;
-- 开启事务
START TRANSACTION;
-- 2.1 清理电话号码格式
UPDATE customers
SET phone = REGEXP_REPLACE(phone, '[^0-9]', '')
WHERE phone IS NOT NULL AND phone REGEXP '[^0-9]';
SET affected_rows = ROW_COUNT();
IF affected_rows > 0 THEN
INSERT INTO data_cleaning_log
(table_name, column_name, cleaning_type, affected_rows)
VALUES
('customers', 'phone', 'format_standardization', affected_rows);
END IF;
-- 2.2 清理邮箱地址
UPDATE customers
SET email = LOWER(TRIM(email))
WHERE email IS NOT NULL AND (email != LOWER(TRIM(email)));
SET affected_rows = ROW_COUNT();
IF affected_rows > 0 THEN
INSERT INTO data_cleaning_log
(table_name, column_name, cleaning_type, affected_rows)
VALUES
('customers', 'email', 'format_standardization', affected_rows);
END IF;
-- 2.3 清理无效的年龄数据
UPDATE customers
SET age = NULL
WHERE age < 0 OR age > 120;
SET affected_rows = ROW_COUNT();
IF affected_rows > 0 THEN
INSERT INTO data_cleaning_log
(table_name, column_name, cleaning_type, affected_rows)
VALUES
('customers', 'age', 'invalid_value_removal', affected_rows);
END IF;
-- 提交事务
COMMIT;
-- 返回清理报告
SELECT
cleaning_type,
SUM(affected_rows) as total_cleaned,
MAX(cleaned_at) as cleaning_time
FROM data_cleaning_log
WHERE table_name = 'customers'
GROUP BY cleaning_type;
END $$
DELIMITER ;
-- 3. 创建数据质量检查触发器
DELIMITER $$
CREATE TRIGGER before_customer_insert
BEFORE INSERT ON customers
FOR EACH ROW
BEGIN
-- 3.1 格式化电话号码
IF NEW.phone IS NOT NULL THEN
SET NEW.phone = REGEXP_REPLACE(NEW.phone, '[^0-9]', '');
END IF;
-- 3.2 格式化邮箱
IF NEW.email IS NOT NULL THEN
SET NEW.email = LOWER(TRIM(NEW.email));
END IF;
-- 3.3 验证年龄
IF NEW.age < 0 OR NEW.age > 120 THEN
SET NEW.age = NULL;
END IF;
END $$
DELIMITER ;
-- 4. 创建定期数据清理作业
EVENT clean_data_daily
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_TIMESTAMP
DO
BEGIN
CALL clean_customer_data();
END;
三、数据质量管理
1.预防措施
-- 1. 创建强制约束
ALTER TABLE users
ADD CONSTRAINT check_email
CHECK (email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'),
ADD CONSTRAINT check_phone
CHECK (phone REGEXP '^[0-9]{11}$'),
ADD CONSTRAINT check_age
CHECK (age BETWEEN 0 AND 120);
-- 2. 创建唯一索引防止重复
CREATE UNIQUE INDEX idx_unique_email ON users(email);
-- 3. 设置默认值
ALTER TABLE products
ALTER COLUMN status SET DEFAULT 'active',
ALTER COLUMN created_at SET DEFAULT CURRENT_TIMESTAMP;
-- 4. 创建数据验证触发器
DELIMITER $$
CREATE TRIGGER before_user_insert
BEFORE INSERT ON users
FOR EACH ROW
BEGIN
-- 4.1 格式化数据
SET NEW.email = LOWER(TRIM(NEW.email));
SET NEW.name = TRIM(NEW.name);
-- 4.2 验证数据
IF LENGTH(NEW.password) < 8 THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Password must be at least 8 characters long';
END IF;
-- 4.3 设置默认值
IF NEW.status IS NULL THEN
SET NEW.status = 'active';
END IF;
END $$
DELIMITER ;
-- 5. 创建审计日志
CREATE TABLE audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
action_type ENUM('INSERT', 'UPDATE', 'DELETE'),
record_id BIGINT,
old_value JSON,
new_value JSON,
user_id BIGINT,
action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 6. 创建审计触发器
DELIMITER $$
CREATE TRIGGER after_user_change
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
INSERT INTO audit_log (
table_name,
action_type,
record_id,
old_value,
new_value,
user_id
)
VALUES (
'users',
'UPDATE',
NEW.id,
JSON_OBJECT(
'email', OLD.email,
'name', OLD.name,
'status', OLD.status
),
JSON_OBJECT(
'email', NEW.email,
'name', NEW.name,
'status', NEW.status
),
@current_user_id
);
END $$
DELIMITER ;
2. 监控方案
-- 1. 创建数据质量监控表
CREATE TABLE data_quality_metrics (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
metric_name VARCHAR(100),
metric_value DECIMAL(10,2),
threshold DECIMAL(10,2),
check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status ENUM('OK', 'WARNING', 'ERROR') DEFAULT 'OK'
);
-- 2. 创建监控存储过程
DELIMITER $$
CREATE PROCEDURE monitor_data_quality()
BEGIN
DECLARE total_records INT;
DECLARE null_percentage DECIMAL(10,2);
DECLARE duplicate_percentage DECIMAL(10,2);
DECLARE invalid_percentage DECIMAL(10,2);
-- 2.1 检查空值比例
SELECT
COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO null_percentage
FROM users
WHERE email IS NULL OR phone IS NULL;
INSERT INTO data_quality_metrics
(table_name, metric_name, metric_value, threshold, status)
VALUES
('users', 'null_percentage', null_percentage, 5.00,
CASE
WHEN null_percentage > 10 THEN 'ERROR'
WHEN null_percentage > 5 THEN 'WARNING'
ELSE 'OK'
END);
-- 2.2 检查重复值比例
WITH duplicates AS (
SELECT email, COUNT(*) as count
FROM users
GROUP BY email
HAVING COUNT(*) > 1
)
SELECT
COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO duplicate_percentage
FROM duplicates;
INSERT INTO data_quality_metrics
(table_name, metric_name, metric_value, threshold, status)
VALUES
('users', 'duplicate_percentage', duplicate_percentage, 1.00,
CASE
WHEN duplicate_percentage > 2 THEN 'ERROR'
WHEN duplicate_percentage > 1 THEN 'WARNING'
ELSE 'OK'
END);
-- 2.3 检查无效数据比例
SELECT
COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO invalid_percentage
FROM users
WHERE
email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' OR
phone NOT REGEXP '^[0-9]{11}$' OR
age NOT BETWEEN 0 AND 120;
INSERT INTO data_quality_metrics
(table_name, metric_name, metric_value, threshold, status)
VALUES
('users', 'invalid_percentage', invalid_percentage, 1.00,
CASE
WHEN invalid_percentage > 2 THEN 'ERROR'
WHEN invalid_percentage > 1 THEN 'WARNING'
ELSE 'OK'
END);
END $$
DELIMITER ;
-- 3. 创建报警触发器
DELIMITER $$
CREATE TRIGGER after_metric_insert
AFTER INSERT ON data_quality_metrics
FOR EACH ROW
BEGIN
-- 如果状态为ERROR,插入报警记录
IF NEW.status = 'ERROR' THEN
INSERT INTO alert_log (
metric_id,
alert_message,
alert_level
)
VALUES (
NEW.id,
CONCAT(
'Data quality alert for table ',
NEW.table_name,
': ',
NEW.metric_name,
' = ',
NEW.metric_value,
' (threshold: ',
NEW.threshold,
')'
),
'HIGH'
);
END IF;
END $$
DELIMITER ;
-- 4. 创建定时监控作业
CREATE EVENT monitor_data_quality_job
ON SCHEDULE EVERY 1 HOUR
DO CALL monitor_data_quality();
3. 报告和分析
-- 1. 创建数据质量报告视图
CREATE VIEW data_quality_report AS
SELECT
table_name,
metric_name,
AVG(metric_value) as avg_value,
MAX(metric_value) as max_value,
MIN(metric_value) as min_value,
COUNT(CASE WHEN status = 'ERROR' THEN 1 END) as error_count,
COUNT(CASE WHEN status = 'WARNING' THEN 1 END) as warning_count,
COUNT(*) as total_checks,
DATE(check_time) as check_date
FROM data_quality_metrics
GROUP BY table_name, metric_name, DATE(check_time);
-- 2. 创建趋势分析视图
CREATE VIEW data_quality_trends AS
SELECT
table_name,
metric_name,
WEEK(check_time) as week_number,
YEAR(check_time) as year,
AVG(metric_value) as avg_value,
MAX(metric_value) as max_value,
MIN(metric_value) as min_value
FROM data_quality_metrics
GROUP BY table_name, metric_name, WEEK(check_time), YEAR(check_time);
-- 3. 创建问题汇总报告
CREATE VIEW data_quality_issues AS
SELECT
m.table_name,
m.metric_name,
m.metric_value,
m.threshold,
m.status,
m.check_time,
a.alert_message,
a.alert_level,
a.created_at as alert_time
FROM data_quality_metrics m
LEFT JOIN alert_log a ON m.id = a.metric_id
WHERE m.status IN ('WARNING', 'ERROR')
ORDER BY m.check_time DESC;
-- 4. 创建数据质量评分函数
DELIMITER $$
CREATE FUNCTION calculate_quality_score(
p_table_name VARCHAR(100),
p_date DATE
)
RETURNS DECIMAL(5,2)
DETERMINISTIC
BEGIN
DECLARE quality_score DECIMAL(5,2);
SELECT
100 - (
COUNT(CASE WHEN status = 'ERROR' THEN 1 END) * 10 +
COUNT(CASE WHEN status = 'WARNING' THEN 1 END) * 5
) / COUNT(*) * 100 INTO quality_score
FROM data_quality_metrics
WHERE table_name = p_table_name
AND DATE(check_time) = p_date;
RETURN COALESCE(quality_score, 100);
END $$
DELIMITER ;
-- 5. 创建月度报告存储过程
DELIMITER $$
CREATE PROCEDURE generate_monthly_report(IN p_year INT, IN p_month INT)
BEGIN
-- 5.1 总体质量评分
SELECT
table_name,
calculate_quality_score(table_name, LAST_DAY(CONCAT(p_year,'-',p_month,'-01'))) as quality_score,
COUNT(DISTINCT CASE WHEN status = 'ERROR' THEN date(check_time) END) as days_with_errors,
COUNT(DISTINCT CASE WHEN status = 'WARNING' THEN date(check_time) END) as days_with_warnings
FROM data_quality_metrics
WHERE YEAR(check_time) = p_year AND MONTH(check_time) = p_month
GROUP BY table_name;
-- 5.2 主要问题分析
SELECT
table_name,
metric_name,
COUNT(*) as issue_count,
AVG(metric_value) as avg_value,
MAX(metric_value) as max_value
FROM data_quality_metrics
WHERE YEAR(check_time) = p_year
AND MONTH(check_time) = p_month
AND status IN ('WARNING', 'ERROR')
GROUP BY table_name, metric_name
ORDER BY issue_count DESC;
-- 5.3 改进建议
WITH RankedIssues AS (
SELECT
table_name,
metric_name,
COUNT(*) as issue_count,
ROW_NUMBER() OVER (PARTITION BY table_name ORDER BY COUNT(*) DESC) as rn
FROM data_quality_metrics
WHERE YEAR(check_time) = p_year
AND MONTH(check_time) = p_month
AND status IN ('WARNING', 'ERROR')
GROUP BY table_name, metric_name
)
SELECT
table_name,
metric_name,
issue_count,
CASE
WHEN metric_name LIKE '%null%' THEN '建议添加非空约束或默认值'
WHEN metric_name LIKE '%duplicate%' THEN '建议添加唯一索引'
WHEN metric_name LIKE '%invalid%' THEN '建议加强数据验证'
ELSE '建议进行深入分析'
END as recommendation
FROM RankedIssues
WHERE rn <= 3;
END $$
DELIMITER ;
四、自动化清理方案
1. 增量清理作业
-- 1. 创建清理任务表
CREATE TABLE cleaning_tasks (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
last_processed_id BIGINT,
batch_size INT DEFAULT 1000,
status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),
start_time TIMESTAMP,
end_time TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 2. 创建增量清理存储过程
DELIMITER $$
CREATE PROCEDURE clean_data_incrementally(
IN p_table_name VARCHAR(100),
IN p_batch_size INT
)
BEGIN
DECLARE v_task_id BIGINT;
DECLARE v_last_id BIGINT;
DECLARE v_max_id BIGINT;
DECLARE v_affected_rows INT;
DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;
-- 创建清理任务
INSERT INTO cleaning_tasks (table_name, batch_size, status, start_time)
VALUES (p_table_name, p_batch_size, 'RUNNING', NOW());
SET v_task_id = LAST_INSERT_ID();
-- 获取最大ID
SET @sql = CONCAT('SELECT MAX(id) INTO @max_id FROM ', p_table_name);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET v_max_id = @max_id;
-- 获取上次处理的ID
SELECT COALESCE(MAX(last_processed_id), 0)
INTO v_last_id
FROM cleaning_tasks
WHERE table_name = p_table_name
AND status = 'COMPLETED';
-- 开始批量处理
WHILE v_last_id < v_max_id AND NOT v_error_occurred DO
START TRANSACTION;
-- 清理重复数据
SET @sql = CONCAT('
WITH DuplicateRows AS (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY email
ORDER BY updated_at DESC
) as rn
FROM ', p_table_name, '
WHERE id > ', v_last_id, '
AND id <= ', v_last_id + p_batch_size, '
)
DELETE FROM ', p_table_name, '
WHERE id IN (
SELECT id FROM DuplicateRows WHERE rn > 1
)
');
PREPARE stmt FROM @sql;
EXECUTE stmt;
SET v_affected_rows = ROW_COUNT();
DEALLOCATE PREPARE stmt;
-- 清理无效数据
SET @sql = CONCAT('
UPDATE ', p_table_name, '
SET
email = CASE
WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
THEN NULL
ELSE LOWER(TRIM(email))
END,
phone = CASE
WHEN phone NOT REGEXP "^[0-9]{11}$"
THEN NULL
ELSE phone
END
WHERE id > ', v_last_id, '
AND id <= ', v_last_id + p_batch_size
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
SET v_affected_rows = v_affected_rows + ROW_COUNT();
DEALLOCATE PREPARE stmt;
-- 更新任务状态
UPDATE cleaning_tasks
SET last_processed_id = v_last_id + p_batch_size,
affected_rows = v_affected_rows
WHERE id = v_task_id;
SET v_last_id = v_last_id + p_batch_size;
COMMIT;
-- 添加延迟避免过度占用系统资源
DO SLEEP(0.1);
END WHILE;
-- 更新任务完成状态
UPDATE cleaning_tasks
SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),
end_time = NOW(),
error_message = IF(v_error_occurred, 'Error occurred during processing', NULL)
WHERE id = v_task_id;
END $$
DELIMITER ;
-- 3. 创建定时执行作业
CREATE EVENT incremental_cleaning_job
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_TIMESTAMP
DO
BEGIN
-- 为每个需要清理的表创建清理任务
CALL clean_data_incrementally('users', 1000);
CALL clean_data_incrementally('customers', 1000);
CALL clean_data_incrementally('orders', 1000);
END;
2. 并行清理方案
-- 1. 创建分区清理任务表
CREATE TABLE partition_cleaning_tasks (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
partition_key VARCHAR(100),
partition_value VARCHAR(100),
status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),
worker_id VARCHAR(36),
start_time TIMESTAMP,
end_time TIMESTAMP,
affected_rows INT,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 2. 创建工作进程注册表
CREATE TABLE cleaning_workers (
worker_id VARCHAR(36) PRIMARY KEY,
host_name VARCHAR(100),
last_heartbeat TIMESTAMP,
status ENUM('ACTIVE', 'INACTIVE'),
current_task_id BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 3. 创建并行清理存储过程
DELIMITER $$
CREATE PROCEDURE create_parallel_cleaning_tasks(
IN p_table_name VARCHAR(100),
IN p_partition_key VARCHAR(100),
IN p_num_partitions INT
)
BEGIN
DECLARE v_min_value VARCHAR(100);
DECLARE v_max_value VARCHAR(100);
DECLARE v_partition_size INT;
DECLARE i INT DEFAULT 0;
-- 获取分区范围
SET @sql = CONCAT('
SELECT
MIN(', p_partition_key, '),
MAX(', p_partition_key, ')
INTO @min_val, @max_val
FROM ', p_table_name
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET v_min_value = @min_val;
SET v_max_value = @max_val;
SET v_partition_size = CEIL((v_max_value - v_min_value) / p_num_partitions);
-- 创建分区任务
WHILE i < p_num_partitions DO
INSERT INTO partition_cleaning_tasks (
table_name,
partition_key,
partition_value,
status
)
VALUES (
p_table_name,
p_partition_key,
CONCAT(
v_min_value + (i * v_partition_size),
',',
LEAST(
v_min_value + ((i + 1) * v_partition_size),
v_max_value
)
),
'PENDING'
);
SET i = i + 1;
END WHILE;
END $$
-- 4. 创建工作进程心跳更新存储过程
CREATE PROCEDURE update_worker_heartbeat(
IN p_worker_id VARCHAR(36)
)
BEGIN
UPDATE cleaning_workers
SET last_heartbeat = NOW()
WHERE worker_id = p_worker_id;
-- 检查并重置超时的任务
UPDATE partition_cleaning_tasks
SET status = 'PENDING',
worker_id = NULL
WHERE worker_id IN (
SELECT worker_id
FROM cleaning_workers
WHERE TIME_TO_SEC(TIMEDIFF(NOW(), last_heartbeat)) > 300
)
AND status = 'RUNNING';
END $$
-- 5. 创建任务执行存储过程
CREATE PROCEDURE execute_cleaning_task(
IN p_worker_id VARCHAR(36),
IN p_task_id BIGINT
)
BEGIN
DECLARE v_table_name VARCHAR(100);
DECLARE v_partition_key VARCHAR(100);
DECLARE v_partition_value VARCHAR(100);
DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;
-- 获取任务信息
SELECT table_name, partition_key, partition_value
INTO v_table_name, v_partition_key, v_partition_value
FROM partition_cleaning_tasks
WHERE id = p_task_id;
-- 开始处理
UPDATE partition_cleaning_tasks
SET status = 'RUNNING',
worker_id = p_worker_id,
start_time = NOW()
WHERE id = p_task_id;
-- 解析分区范围
SET @partition_start = SUBSTRING_INDEX(v_partition_value, ',', 1);
SET @partition_end = SUBSTRING_INDEX(v_partition_value, ',', -1);
-- 执行清理操作
SET @sql = CONCAT('
UPDATE ', v_table_name, '
SET
email = CASE
WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
THEN NULL
ELSE LOWER(TRIM(email))
END,
phone = CASE
WHEN phone NOT REGEXP "^[0-9]{11}$"
THEN NULL
ELSE phone
END
WHERE ', v_partition_key, ' BETWEEN ', @partition_start, ' AND ', @partition_end
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
SET @affected = ROW_COUNT();
DEALLOCATE PREPARE stmt;
-- 更新任务状态
UPDATE partition_cleaning_tasks
SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),
end_time = NOW(),
affected_rows = IF(v_error_occurred, 0, @affected),
error_message = IF(v_error_occurred, 'Error during execution', NULL)
WHERE id = p_task_id;
END $$
DELIMITER ;
想了解更多干货,可通过下方扫码关注
详情咨询
可扫码添加上智启元官方客服微信👇