本文共 13996 字,大约阅读时间需要 46 分钟。
PostgreSQL , 时序数据库 , 时序 , 滑动窗口 , 递归查询 , subquery , 窗口查询 , 求最新值
在很多场景中,都会有数据合并、清洗的需求。
例如:
1、记录了表的变更明细(insert,update,delete),需要合并明细,从明细中快速取到每个PK的最新值。
2、有很多传感器,不断的在上报数据,要快速的取出每个传感器的最新状态。
对于这类需求,可以使用窗口查询,但是如何加速,如何快速的取出批量数据呢?
PostgreSQL是最高级的开源数据库,优化方法之多,超乎你的想象。
1、唯一值较少时,并且唯一值范围未知时,使用递归。
方法如下:
2、唯一值较少时,并且唯一值范围确定时,使用subquery。
方法如下:
3、唯一值较多时,1 使用窗口查询,比前面的方法更加适合。
4、唯一值较多时,2 使用流式计算,比方法3更加优秀。
方法如下:
本文将对前三种方法做一个比较。
方法4 流计算就不用比了,因为它什么时候都是最强大的。通杀所有场景。等pipelineDB 插件化吧,阿里云RDS PG 10会集成pipelineDB的功能。
以500万数据为例,对比这几种方法的适应场景。
1、建表
\timing drop table test; create unlogged table test(id int , info text, crt_time timestamp);
2、构造数据
insert into test select ceil(random()*1000000), md5(random()::text), clock_timestamp() from generate_series(1,5000000);
3、创建索引
create index idx_test_1 on test (id, crt_time desc);
4、递归查询效率
explain (analyze,verbose,timing,costs,buffers) with recursive skip as ( ( select test as v from test where id in (select id from test where id is not null order by id,crt_time desc limit 1) limit 1 ) union all ( select ( select t as v from test t where t.id>(s.v).id and t.id is not null order by id,crt_time desc limit 1 ) from skip s where (s.v).id is not null ) -- 这里的where (s.v).id is not null 一定要加, 否则就死循环了. ) select (t.v).id, (t.v).info, (t.v).crt_time from skip t where t.* is not null;
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CTE Scan on skip t (cost=54.35..56.37 rows=100 width=44) (actual time=0.042..6626.084 rows=993288 loops=1) Output: (t.v).id, (t.v).info, (t.v).crt_time Filter: (t.* IS NOT NULL) Rows Removed by Filter: 1 Buffers: shared hit=3976934 CTE skip -> Recursive Union (cost=0.91..54.35 rows=101 width=69) (actual time=0.034..6006.615 rows=993289 loops=1) Buffers: shared hit=3976934 -> Limit (cost=0.91..0.93 rows=1 width=69) (actual time=0.033..0.033 rows=1 loops=1) Output: test.* Buffers: shared hit=8 -> Nested Loop (cost=0.91..10.19 rows=500 width=69) (actual time=0.032..0.032 rows=1 loops=1) Output: test.* Buffers: shared hit=8 -> HashAggregate (cost=0.48..0.49 rows=1 width=4) (actual time=0.021..0.021 rows=1 loops=1) Output: test_1.id Group Key: test_1.id Buffers: shared hit=4 -> Limit (cost=0.43..0.47 rows=1 width=12) (actual time=0.016..0.016 rows=1 loops=1) Output: test_1.id, test_1.crt_time Buffers: shared hit=4 -> Index Only Scan using idx_test_1 on public.test test_1 (cost=0.43..173279.36 rows=5000002 width=12) (actual time=0.015..0.015 rows=1 loops=1) Output: test_1.id, test_1.crt_time Index Cond: (test_1.id IS NOT NULL) Heap Fetches: 1 Buffers: shared hit=4 -> Index Scan using idx_test_1 on public.test (cost=0.43..9.64 rows=6 width=73) (actual time=0.009..0.009 rows=1 loops=1) Output: test.*, test.id Index Cond: (test.id = test_1.id) Buffers: shared hit=4 -> WorkTable Scan on skip s (cost=0.00..5.14 rows=10 width=32) (actual time=0.006..0.006 rows=1 loops=993289) Output: (SubPlan 1) Filter: ((s.v).id IS NOT NULL) Rows Removed by Filter: 0 Buffers: shared hit=3976926 SubPlan 1 -> Limit (cost=0.43..0.49 rows=1 width=81) (actual time=0.005..0.005 rows=1 loops=993288) Output: t_1.*, t_1.id, t_1.crt_time Buffers: shared hit=3976926 -> Index Scan using idx_test_1 on public.test t_1 (cost=0.43..102425.17 rows=1666667 width=81) (actual time=0.005..0.005 rows=1 loops=993288) Output: t_1.*, t_1.id, t_1.crt_time Index Cond: ((t_1.id > (s.v).id) AND (t_1.id IS NOT NULL)) Buffers: shared hit=3976926 Planning time: 0.354 ms Execution time: 6706.105 ms (45 rows)
1、建表
\timing drop table test; create unlogged table test(id int , info text, crt_time timestamp);
2、构造数据
insert into test select ceil(random()*1000), md5(random()::text), clock_timestamp() from generate_series(1,5000000);
3、创建索引
create index idx_test_1 on test (id, crt_time desc);
4、递归查询效率
查询语句不变
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CTE Scan on skip t (cost=55.09..57.11 rows=100 width=44) (actual time=0.046..8.859 rows=1000 loops=1) Output: (t.v).id, (t.v).info, (t.v).crt_time Filter: (t.* IS NOT NULL) Rows Removed by Filter: 1 Buffers: shared hit=4007 CTE skip -> Recursive Union (cost=0.91..55.09 rows=101 width=69) (actual time=0.039..8.203 rows=1001 loops=1) Buffers: shared hit=4007 -> Limit (cost=0.91..1.67 rows=1 width=69) (actual time=0.038..0.038 rows=1 loops=1) Output: test.* Buffers: shared hit=8 -> Nested Loop (cost=0.91..6335.47 rows=8333 width=69) (actual time=0.038..0.038 rows=1 loops=1) Output: test.* Buffers: shared hit=8 -> HashAggregate (cost=0.48..0.49 rows=1 width=4) (actual time=0.021..0.021 rows=1 loops=1) Output: test_1.id Group Key: test_1.id Buffers: shared hit=4 -> Limit (cost=0.43..0.47 rows=1 width=12) (actual time=0.016..0.017 rows=1 loops=1) Output: test_1.id, test_1.crt_time Buffers: shared hit=4 -> Index Only Scan using idx_test_1 on public.test test_1 (cost=0.43..173279.55 rows=5000002 width=12) (actual time=0.015..0.015 rows=1 loops=1) Output: test_1.id, test_1.crt_time Index Cond: (test_1.id IS NOT NULL) Heap Fetches: 1 Buffers: shared hit=4 -> Index Scan using idx_test_1 on public.test (cost=0.43..6284.98 rows=5000 width=73) (actual time=0.015..0.015 rows=1 loops=1) Output: test.*, test.id Index Cond: (test.id = test_1.id) Buffers: shared hit=4 -> WorkTable Scan on skip s (cost=0.00..5.14 rows=10 width=32) (actual time=0.008..0.008 rows=1 loops=1001) Output: (SubPlan 1) Filter: ((s.v).id IS NOT NULL) Rows Removed by Filter: 0 Buffers: shared hit=3999 SubPlan 1 -> Limit (cost=0.43..0.49 rows=1 width=81) (actual time=0.007..0.007 rows=1 loops=1000) Output: t_1.*, t_1.id, t_1.crt_time Buffers: shared hit=3999 -> Index Scan using idx_test_1 on public.test t_1 (cost=0.43..102425.80 rows=1666667 width=81) (actual time=0.007..0.007 rows=1 loops=1000) Output: t_1.*, t_1.id, t_1.crt_time Index Cond: ((t_1.id > (s.v).id) AND (t_1.id IS NOT NULL)) Buffers: shared hit=3999 Planning time: 0.353 ms Execution time: 8.980 ms (45 rows)
1、subquery查询效率
如果ID的取值范围特别广,SUBQUERY就很不划算。
需要维护一张唯一ID表,这里使用generate_series来代替这张表,用于测试。
explain (analyze,verbose,timing,costs,buffers) select (select test from test where id=t.id order by crt_time desc limit 1) from generate_series(1,1000000) t(id);
QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------- Function Scan on pg_catalog.generate_series t (cost=0.00..1976.65 rows=1000 width=32) (actual time=70.682..2835.109 rows=1000000 loops=1) Output: (SubPlan 1) Function Call: generate_series(1, 1000000) Buffers: shared hit=3997082 SubPlan 1 -> Limit (cost=0.43..1.97 rows=1 width=77) (actual time=0.002..0.002 rows=1 loops=1000000) Output: test.*, test.crt_time Buffers: shared hit=3997082 -> Index Scan using idx_test_1 on public.test (cost=0.43..9.64 rows=6 width=77) (actual time=0.002..0.002 rows=1 loops=1000000) Output: test.*, test.crt_time Index Cond: (test.id = t.id) Buffers: shared hit=3997082 Planning time: 0.119 ms Execution time: 2892.712 ms (14 rows)
1、subquery查询效率
查询语句有变
explain (analyze,verbose,timing,costs,buffers) select (select test from test where id=t.id order by crt_time desc limit 1) from generate_series(1,1000) t(id);
QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------ Function Scan on pg_catalog.generate_series t (cost=0.00..1699.41 rows=1000 width=32) (actual time=0.107..7.041 rows=1000 loops=1) Output: (SubPlan 1) Function Call: generate_series(1, 1000) Buffers: shared hit=4000 SubPlan 1 -> Limit (cost=0.43..1.69 rows=1 width=77) (actual time=0.006..0.007 rows=1 loops=1000) Output: test.*, test.crt_time Buffers: shared hit=4000 -> Index Scan using idx_test_1 on public.test (cost=0.43..6284.98 rows=5000 width=77) (actual time=0.006..0.006 rows=1 loops=1000) Output: test.*, test.crt_time Index Cond: (test.id = t.id) Buffers: shared hit=4000 Planning time: 0.131 ms Execution time: 7.126 ms (14 rows)
1、窗口查询效率
explain (analyze,verbose,timing,costs,buffers) select id,info,crt_time from (select row_number() over (partition by id order by crt_time desc) as rn, * from test) t where rn=1;
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,info,crt_time from (select row_number() over (partition by id order by crt_time desc) as rn, * from test) t where rn=1; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------- Subquery Scan on t (cost=0.43..310779.41 rows=25000 width=45) (actual time=0.027..6398.308 rows=993288 loops=1) Output: t.id, t.info, t.crt_time Filter: (t.rn = 1) Rows Removed by Filter: 4006712 Buffers: shared hit=5018864 -> WindowAgg (cost=0.43..248279.39 rows=5000002 width=53) (actual time=0.026..5973.497 rows=5000000 loops=1) Output: row_number() OVER (?), test.id, test.info, test.crt_time Buffers: shared hit=5018864 -> Index Scan using idx_test_1 on public.test (cost=0.43..160779.35 rows=5000002 width=45) (actual time=0.019..4058.476 rows=5000000 loops=1) Output: test.id, test.info, test.crt_time Buffers: shared hit=5018864 Planning time: 0.121 ms Execution time: 6446.901 ms (13 rows)
1、窗口查询效率
查询语句不变
QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------- Subquery Scan on t (cost=0.43..310779.61 rows=25000 width=45) (actual time=0.027..6176.801 rows=1000 loops=1) Output: t.id, t.info, t.crt_time Filter: (t.rn = 1) Rows Removed by Filter: 4999000 Buffers: shared hit=4744850 read=18157 -> WindowAgg (cost=0.43..248279.58 rows=5000002 width=53) (actual time=0.026..5822.576 rows=5000000 loops=1) Output: row_number() OVER (?), test.id, test.info, test.crt_time Buffers: shared hit=4744850 read=18157 -> Index Scan using idx_test_1 on public.test (cost=0.43..160779.55 rows=5000002 width=45) (actual time=0.020..4175.082 rows=5000000 loops=1) Output: test.id, test.info, test.crt_time Buffers: shared hit=4744850 read=18157 Planning time: 0.108 ms Execution time: 6176.924 ms (13 rows)
数据量 | 唯一值个数 | 窗口查询(ms) | subquery(ms) | 递归查询(ms) |
---|---|---|---|---|
500 万 | 100万 | 6446 | 2892 | 6706 |
500 万 | 100万 | 6176 | 7 | 9 |
随着物联网的发展,越来越多的时序数据产生,求时序数据的最新值,滑动窗口内的最新值,已经成为时序业务里面非常场景的需求。
PostgreSQL是最先进的开源数据库,它不像很多数据库解决问题往往只有一种方法。PostgreSQL解决一个问题有多种方法,什么方法最优,就看你对它的了解了。
1、唯一值较少时,并且唯一值范围未知时,使用递归。
2、唯一值较少时,并且唯一值范围确定时(例如范围是100万,但是此批数据只出现了50万个,那么如果你有这50万个的ID,性能是最好的,否则需要扫100万。例如用户数是1亿,一个区间的活跃用户可能只有几万。),使用subquery。
3、唯一值较多时,1 使用窗口查询,比前面的方法更加适合。
4、唯一值较多时,2 使用流式计算,比方法3更加优秀。
转载地址:http://sbvul.baihongyu.com/