{{ it.name }}
{{ it.text }}
今天碰到一个需求:客户有张表,主键自增。由于种种原因,主键值并非连续,中间有空隙。为了使主键连续,重复利用这些空隙,目前是用 MySQL 的特殊语法:INSERT IGNORE。
这种方法非常简单,不过会带来额外的失败重试。比如我下面往表 ytt_t0 插入一条存在的记录,后期需要不停的重试才能保证插入完成。
mysql> insert ignore ytt_t0 (id) values (1); Query OK, 0 rows affected, 1 warning (0.00 sec) mysql> show warnings; +---------+------+----------------------------------------------+ | Level | Code | Message | +---------+------+----------------------------------------------+ | Warning | 1062 | Duplicate entry '1' for key 'ytt_t0.PRIMARY' | +---------+------+----------------------------------------------+ 1 row in set (0.00 sec)
客户纠结的问题是:那有没有一种从数据库角度来讲快速找出这些不连续主键值的方法呢?
肯定是有,不过我本人还是觉得这一块放在非数据库端会比较好。比如考虑在 Shell 端来实现这种需求,非常简单,效率又非常高。举个例子:
表 ytt_t0 包含以下数据:
最大值为 28,需要返回的结果为:5,6,7,8,9,10,11,16,17,18,20,21,22,23,24,25,26
mysql> select id from ytt_t0; +----+ | id | +----+ | 1 | | 2 | | 3 | | 4 | | 12 | | 13 | | 14 | | 15 | | 19 | | 27 | | 28 | +----+ 11 rows in set (0.00 sec
在 Shell 端用几条常用命令就可拿到这些空缺 ID 串:
root@debian-ytt1:/var/lib/mysql# mysql -S /tmp/mysqld_3306.sock -D ytt_big -e "select id from ytt_t0" -ss >ytt_t0_ids.txt root@debian-ytt1:/var/lib/mysql# for i in `seq 1 28`;do echo $i >> ids.txt;done; root@debian-ytt1:/var/lib/mysql# grep -vwf ytt_t0_ids.txt ids.txt |sed ':label;N;s/\n/,/;b label' 5,6,7,8,9,10,11,16,17,18,20,21,22,23,24,25,26
即使需要找的 ID 区间非常大,Shell 端效率也很不错。比如:把 ID 最大设置为 10W,看下执行时间。
数据端克隆张新表 ytt_t1,更新最大 ID 为 10W。
mysql> insert into ytt_t1 select * from ytt_t0; Query OK, 11 rows affected (0.01 sec) Records: 11 Duplicates: 0 Warnings: 0 mysql> update ytt_t1 set id = 100000 where id = 28; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0
此时 Shell 端完成同样的需求:(大概 3 秒钟的样子,生成连续 ID 文件的过程最花时间)
root@debian-ytt1:/var/lib/mysql# time for i in `seq 1 100000`;do echo $i >> ids_100000.txt;done; real 0m2.796s user 0m1.685s sys 0m1.090s root@debian-ytt1:/var/lib/mysql# time mysql -S /tmp/mysqld_3306.sock -D ytt_big -e "select id from ytt_t1" -ss >ytt_t1_ids.txt real 0m0.026s user 0m0.010s sys 0m0.015s root@debian-ytt1:/var/lib/mysql# time grep -vwf ytt_t1_ids.txt ids_100000.txt >ytt_t1_ids_diff.txt real 0m0.009s user 0m0.009s sys 0m0.000s
那么从数据库角度来讲,有没有还不错的方法呢?当然也是有的,接下来一一列举出来。
1、生成一个完整序列表,和原始表做 JOIN,就能拿出有空隙的主键值,适合所有 MySQL 版本。
这块儿时间开销有两个地方:一个是完整序列表生成的时间;另外一个是两表 JOIN 的时间。
序列表的生成:
mysql> create table ytt_seq(id serial primary key); Query OK, 0 rows affected (0.04 sec)
写个简单存储过程来生成序列表数据:
DELIMITER $$ USE `ytt_big`$$ DROP PROCEDURE IF EXISTS `sp_generate_series`$$ CREATE DEFINER=`root`@`%` PROCEDURE `sp_generate_series`( f_tbname VARCHAR(64), f_max_id BIGINT UNSIGNED ) BEGIN DECLARE i BIGINT DEFAULT 0; TRUNCATE ytt_seq; SET @@autocommit = 0; WHILE i < f_max_id DO SET @stmt = CONCAT("insert into ",f_tbname," select null"); PREPARE s1 FROM @stmt; EXECUTE s1; IF MOD(i,100) = 0 THEN COMMIT; END IF; SET i = i + 1; END WHILE; drop prepare s1; COMMIT; SET @@autocommit = 1; END$$ DELIMITER ;
根据原始表最大值生成序列表数据:
mysql> select max(id) from ytt_t0 into @max_id; Query OK, 1 row affected (0.00 sec) mysql> call sp_generate_series('ytt_seq',@max_id); Query OK, 0 rows affected (0.09 sec)
两表 JOIN 得到需要的结果:
mysql> SELECT -> GROUP_CONCAT(ytt_seq.id) AS result -> FROM -> ytt_t0 -> RIGHT JOIN ytt_seq USING (id) -> WHERE ytt_t0.id IS NULL; +-----------------------------------------------+ | result | +-----------------------------------------------+ | 5,6,7,8,9,10,11,16,17,18,20,21,22,23,24,25,26 | +-----------------------------------------------+ 1 row in set (0.00 sec)
增加最大值,针对表 ytt_t1 来看:
mysql> select max(id) from ytt_t1 into @max_id; Query OK, 1 row affected (0.01 sec) mysql> call sp_generate_series('ytt_seq',@max_id); Query OK, 0 rows affected (21.80 sec) mysql> SELECT -> ytt_seq.id AS result -> FROM -> ytt_t1 -> RIGHT JOIN ytt_seq USING (id) -> WHERE ytt_t1.id IS NULL; 99989 rows in set (0.25 sec)
从结果可以看到,生成这个 ID 序列表数据需要的时间为 21.8 秒最长,两表 JOIN 时间 0.25 秒,如果能提前生成这个序列表,这个时间就很快了。
2、模拟外部程序,用系统函数来处理,系统函数如果不能满足需求,可以自己写函数来处理,效率最差,毕竟这个不是数据库擅长的事情。(适合所有 MySQL 版本, 当然 MySQL 8.0 实现更加简单)
这里我利用 JSON_OVERLAPS 函数判断 JSON 数组是否重复, 自己写了两个函数,一个是生成 JSON 序列,另外一个是对两个 JSON 数组进行对比,拿出不属于交集的结果,当然最耗时间的依然还是生成 JSON 序列的函数:
先拿出老的 ID 序列:
mysql> select @arr1 from (select @arr1 := json_array_append(@arr1,'$',id) from ytt_t0,(select @arr1 := '[]') b) T limit 1; +------------------------------------------+ | @arr1 | +------------------------------------------+ | [1, 2, 3, 4, 12, 13, 14, 15, 19, 27, 28] | +------------------------------------------+ 1 row in set, 2 warnings (0.00 sec)
生成完整的 JSON 序列:
DELIMITER $$ USE `ytt_big`$$ DROP FUNCTION IF EXISTS `func_generate_series_json`$$ CREATE DEFINER=`root`@`%` FUNCTION `func_generate_series_json`( f_max_id BIGINT UNSIGNED ) RETURNS JSON BEGIN DECLARE v_result JSON DEFAULT '[]'; DECLARE i BIGINT UNSIGNED DEFAULT 1; WHILE i <= f_max_id DO SET v_result = JSON_ARRAY_APPEND(v_result,'$',i); SET i = i + 1; END WHILE; RETURN v_result; END$$ DELIMITER ;
再写一个 JSON 序列对比函数:
DELIMITER $$ USE `ytt_big`$$ DROP FUNCTION IF EXISTS `func_get_json_common_values`$$ CREATE DEFINER=`root`@`%` FUNCTION `func_get_json_common_values`( f_str JSON, f_sub_str JSON ) RETURNS JSON BEGIN DECLARE i,v_len INT UNSIGNED DEFAULT 0; DECLARE v_tmp_str,v_result,v_str JSON DEFAULT '[]'; SET v_str = f_str; SET v_len = JSON_LENGTH(v_str); WHILE i < v_len DO SET v_tmp_str = JSON_EXTRACT(v_str,'$[0]'); IF json_overlaps(v_tmp_str,f_sub_str) = 0 THEN SET v_result = JSON_ARRAY_APPEND(v_result,'$',v_tmp_str); END IF; SET v_str = JSON_REMOVE(v_str,'$[0]'); SET i = i + 1; END WHILE; RETURN v_result; END$$ DELIMITER ;
接下来执行这两个函数得到结果:(数据非常小,时间可以忽略不计)
mysql> select @arr1 from (select @arr1 := json_array_append(@arr1,'$',id) from ytt_t0,(select @arr1 := '[]') b) T limit 1; +------------------------------------------+ | @arr1 | +------------------------------------------+ | [1, 2, 3, 4, 12, 13, 14, 15, 19, 27, 28] | +------------------------------------------+ 1 row in set, 2 warnings (0.00 sec) mysql> select max(id) from ytt_t0 into @max_id; Query OK, 1 row affected (0.00 sec) mysql> select func_get_json_common_values(func_generate_series_json(@max_id),@arr1) as result; +-----------------------------------------------------------------+ | result | +-----------------------------------------------------------------+ | [5, 6, 7, 8, 9, 10, 11, 16, 17, 18, 20, 21, 22, 23, 24, 25, 26] | +-----------------------------------------------------------------+ 1 row in set (0.00 sec)
再来从表 ytt_t1 里拿结果,我这儿省略中间过程,只看函数最终执行时间:
mysql> select DB,COMMAND,TIME,INFO from information_schema.processlist where DB = 'ytt_big'\G *************************** 1. row *************************** DB: ytt_big COMMAND: Query TIME: 628 INFO: select func_get_json_common_values(func_generate_series_json(@max_id),@arr1) as result 1 row in set (0.00 sec)
好吧,执行了 10 分钟,没出来结果,放弃了!
3、直接用通过表达式来处理(MySQL 8.0 特有,数据库本身的特性,在数据库层面来讲,效率肯定最高)。实现非常简单,直接用WITH表达式生成序列,完了和原始表做 JOIN,一条 SQL 就得到结果:
mysql> WITH recursive tmp (id) AS -> ( -> SELECT 1 FROM DUAL -> UNION ALL -> SELECT id + 1 FROM tmp -> WHERE id < (SELECT MAX(id) FROM ytt_t0) -> ) -> SELECT -> id AS gap_result -> FROM tmp -> LEFT JOIN ytt_t0 USING (id) -> WHERE ytt_t0.id IS NULL; +------------+ | gap_result | +------------+ | 5 | | 6 | | 7 | | 8 | | 9 | | 10 | | 11 | | 16 | | 17 | | 18 | | 20 | | 21 | | 22 | | 23 | | 24 | | 25 | | 26 | +------------+ 17 rows in set (0.00 sec)
继续看下针对表 ytt_t1:(只花了 0.19 秒)
mysql> set cte_max_recursion_depth=1000000; Query OK, 0 rows affected (0.01 sec) mysql> WITH recursive tmp (id) AS -> ( -> SELECT 1 FROM DUAL -> UNION ALL -> SELECT id + 1 FROM tmp -> WHERE id < (SELECT MAX(id) FROM ytt_t1) -> ) -> SELECT -> id AS gap_result -> FROM tmp -> LEFT JOIN ytt_t1 USING (id) -> WHERE ytt_t1.id IS NULL; ... 99989 rows in set (0.19 sec)
其实结果显而易见,如果非要在数据库端实现这样的需求,MySQL 8.0 是最好的选择;要不然,放到外部程序是最好的选择。