avatar


2.性能优化、一致性和实时性

Explain

基本语法

语法格式:

1
2
3
4
5
6
EXPLAIN [AST | SYNTAX | QUERY TREE | PLAN | PIPELINE | ESTIMATE | TABLE OVERRIDE] [setting = value, ...]
[
SELECT ... |
tableFunction(...) [COLUMNS (...)] [ORDER BY ...] [PARTITION BY ...] [PRIMARY KEY] [SAMPLE BY ...] [TTL ...]
]
[FORMAT ...]

参数解释:

  • PLAN:查看执行计划,默认值。
    • header,打印计划中各个步骤的head说明,默认关闭,默认值0。
    • description,打印计划中各个步骤的描述,默认开启,默认值1。
    • actions,打印计划中各个步骤的详细信息,默认关闭,默认值0。
  • AST:查看语法树。
  • SYNTAX:查看优化后的SQL。
  • PIPELINE:查看PIPELINE计划。
    • header:打印计划中各个步骤的 head 说明,默认关闭。
    • graph:用DOT图形语言描述管道图,默认关闭,需要查看相关的图形需要配合graphviz查看。
    • actions:如果开启了graph,紧凑打印打,默认开启。

PLAIN

默认值就是PLAIN。示例代码:

1
2
3
4
5
6
7
explain
select database, table, count(1) cnt
from system.parts
where database in ('datasets', 'system')
group by database, table
order by database, cnt desc
limit 2 by database;

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12

┌─explain───────────────────────────────────────┐
│ Expression (Projection) │
│ LimitBy │
│ Expression (Before LIMIT BY) │
│ Sorting (Sorting for ORDER BY) │
│ Expression (Before ORDER BY) │
│ Aggregating │
│ Expression (Before GROUP BY) │
│ Filter (WHERE) │
│ ReadFromStorage (SystemParts) │
└───────────────────────────────────────────────┘

我们还可以指定打开全部参数,查看执行计划。示例代码:

1
2
3
4
EXPLAIN header = 1, actions = 1,description = 1
SELECT number
from system.numbers
limit 10;

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13

┌─explain──────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ Header: number UInt64 │
│ Actions: INPUT :: 0 -> number UInt64 : 0 │
│ Positions: 0 │
│ Limit (preliminary LIMIT (without OFFSET)) │
│ Header: number UInt64 │
│ Limit 10 │
│ Offset 0 │
│ ReadFromSystemNumbers │
│ Header: number UInt64 │
└──────────────────────────────────────────────┘

AST

查看语法树。示例代码:

1
2
3
4
EXPLAIN AST
SELECT number
from system.numbers
limit 10;

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13

┌─explain─────────────────────────────────────┐
│ SelectWithUnionQuery (children 1) │
│ ExpressionList (children 1) │
│ SelectQuery (children 3) │
│ ExpressionList (children 1) │
│ Identifier number │
│ TablesInSelectQuery (children 1) │
│ TablesInSelectQueryElement (children 1) │
│ TableExpression (children 1) │
│ TableIdentifier system.numbers │
│ Literal UInt64_10 │
└─────────────────────────────────────────────┘

SYNTAX

查看优化后的SQL。

假设存在一个SQL,如下:

1
2
3
SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'kaka')
FROM
numbers(10);

查看优化后的SQL,示例代码:

1
2
3
4
EXPLAIN SYNTAX
SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'kaka')
FROM
numbers(10);

运行结果:

1
2
3
4
5

┌─explain─────────────────────────────────────────────────────────┐
│ SELECT if(number = 1, 'hello', if(number = 2, 'world', 'kaka')) │
│ FROM numbers(10) │
└─────────────────────────────────────────────────────────────────┘

似乎没有优化?开启三元运算符优化:

1
SET optimize_if_chain_to_multiif = 1;

再试一次,示例代码:

1
2
3
4
EXPLAIN SYNTAX
SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'kaka')
FROM
numbers(10);

运行结果:

1
2
3
4
5

┌─explain──────────────────────────────────────────────────────────┐
│ SELECT multiIf(number = 1, 'hello', number = 2, 'world', 'kaka') │
│ FROM numbers(10) │
└──────────────────────────────────────────────────────────────────┘

PIPELINE

示例代码:

1
2
3
4
EXPLAIN PIPELINE
SELECT sum(number)
FROM numbers_mt(100000)
GROUP BY number % 20;

运行结果:

1
2
3
4
5
6
7
8
9
10
11

┌─explain───────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ AggregatingTransform │
│ (Expression) │
│ ExpressionTransform │
│ (ReadFromSystemNumbers) │
│ NumbersRange 0 → 1 │
└───────────────────────────────┘

建表优化

数据类型

时间字段的类型

不建议用字符串类型或者数值类型表示时间。

  • 全String类型在以Hive为中心的数仓建设中常见,但ClickHouse环境不应受此影响。
  • 虽然ClickHouse底层将DateTime存储为时间戳Long类型,但不建议存储Long类型,因为DateTime不需要经过函数转换处理,执行效率高、可读性好。

如下,我们将create_time设为Int32,然后在partition by toYYYYMMDD(toDate(create_time))再用了toDate函数。多了一层函数转换处理。

1
2
3
4
5
6
7
8
9
10
create table t_type2
(
id UInt32,
sku_id String,
total_amount Decimal(16, 2),
create_time Int32
) engine = ReplacingMergeTree(create_time)
partition by toYYYYMMDD(toDate(create_time))
primary key (id)
order by (id, sku_id);

空值存储类型

ClickHouse官方已经明确指出Nullable类型几乎总是会拖累性能,因为:

  • 存储Nullable列时需要创建一个额外的文件来存储NULL的标记,
  • Nullable列无法被索引

除非极特殊情况,应直接使用字段默认值表示空,或者自行指定一个在业务中无意义的值(例如用-1表示没有商品ID)。

如下,我们创建一个表,其中一个字段Nullable,然后插入一些数据。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE t_null
(
x Int8,
y Nullable(Int8)
) ENGINE TinyLog;

INSERT INTO t_null
VALUES (1, NULL),
(2, 3);

SELECT x + y
FROM t_null;

然后查看文件,会发现多了一个文件y.null.bin。示例代码:

1
ll /var/lib/clickhouse/data/default/t_null/

运行结果:

1
2
3
4
5
total 16
-rw-r----- 1 clickhouse clickhouse 95 Aug 4 00:15 sizes.json
-rw-r----- 1 clickhouse clickhouse 28 Aug 4 00:15 x.bin
-rw-r----- 1 clickhouse clickhouse 28 Aug 4 00:15 y.bin
-rw-r----- 1 clickhouse clickhouse 28 Aug 4 00:15 y.null.bin

分区粒度

分区粒度根据业务特点决定,不宜过粗或过细。

一般选择按天分区。或者,分区大小控制在[10,30][10,30]个(单表一亿数据为例)。

索引

必须指定索引列。

ClickHouse中的索引列即排序列,通过order by指定,一般在查询条件中经常被用来充当筛选条件的属性被纳入进来。

可以是单一维度,也可以是组合维度的索引。

对于组合维度的索引,通常需要查询频率大的在前原则。

表参数

index_granularity

index_granularity是用来控制索引粒度的,默认是8192,如非必须不建议调整。

TTL

如果表中不是必须保留全量历史数据,建议指定TTL(生存时间值),可以免去手动过期历史数据的麻烦。

写入和删除优化

  • 尽量不要执行单条或小批量删除和插入操作,这样会产生小分区文件,给后台Merge任务带来巨大压力。
  • 不要一次写入太多分区,或数据写入太快,数据写入太快会导致Merge速度跟不上而报错,一般建议每秒钟发起2-3次写入操作,每次操作写入[2W,5W][2W,5W]条数据(具体依服务器性能而定)。

语法优化

采用ClickHouse官方提供的例子,数据下载和导入方法:
https://clickhouse.com/docs/en/getting-started/example-datasets/metrica

hits_v1表有130多个字段,880多万条数据;visits_v1表有100多个字段,160多万条数据。

COUNT优化

在调用count函数时,如果使用的是count()或者count(*),且没有where条件,会直接使用system.tablestotal_rows

示例代码:

1
2
3
EXPLAIN
SELECT count()
FROM datasets.hits_v1;

运行结果:

1
2
3
4
5
6

┌─explain──────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ MergingAggregated │
│ ReadFromPreparedSource (Optimized trivial count) │
└──────────────────────────────────────────────────────┘

解释说明:Optimized trivial count,这是对count的优化。

如果count具体的列字段,则不会使用此项优化。示例代码:

1
2
3
EXPLAIN
SELECT count(CounterID)
FROM datasets.hits_v1;

运行结果:

1
2
3
4
5
6
7

┌─explain─────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ Aggregating │
│ Expression │
│ ReadFromPreparedSource (_minmax_count_projection) │
└─────────────────────────────────────────────────────────┘

消除子查询重复字段

语句子查询中有两个重复的字段,应该去重。

我们可以通过EXPLAIN SYNTAX看一下,示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
EXPLAIN SYNTAX
SELECT a.UserID,
b.VisitID,
a.URL,
b.UserID
FROM hits_v1 AS a
LEFT JOIN (
SELECT UserID,
UserID as HaHa,
VisitID
FROM visits_v1) AS b
USING (UserID)
limit 3;

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

┌─explain───────────────┐
│ SELECT │
│ UserID, │
│ VisitID, │
│ URL, │
│ b.UserID │
│ FROM hits_v1 AS a │
│ ALL LEFT JOIN │
│ ( │
│ SELECT │
│ UserID, │
│ VisitID │
│ FROM visits_v1 │
│ ) AS b USING (UserID) │
│ LIMIT 3 │
└───────────────────────┘

谓词下推

group byhaving子句,并且没有with cubewith rollup或者with totals修饰的时候;应该将having过滤下推到where提前过滤。

如下,HAVING name变成了WHERE name,在group by之前过滤。示例代码:

1
2
3
4
5
EXPLAIN SYNTAX
SELECT UserID
FROM hits_v1
GROUP BY UserID
HAVING UserID = '8585742290196126178'

运行结果:

1
2
3
4
5
6
7

┌─explain──────────────────────────────┐
│ SELECT UserID │
│ FROM hits_v1 │
│ WHERE UserID = '8585742290196126178' │
│ GROUP BY UserID │
└──────────────────────────────────────┘

对于子查询,EXPLAIN SYNTAX也支持谓词下推。示例代码:

1
2
3
4
EXPLAIN SYNTAX
SELECT *
FROM (SELECT UserID FROM visits_v1)
WHERE UserID = '8585742290196126178'

运行结果:

1
2
3
4
5
6
7
8
9
10
┌─explain──────────────────────────────────┐
│ SELECT UserID │
│ FROM │
│ ( │
│ SELECT UserID │
│ FROM visits_v1 │
│ WHERE UserID = '8585742290196126178' │
│ ) │
│ WHERE UserID = '8585742290196126178' │
└──────────────────────────────────────────┘

聚合计算外推

对于聚合函数内的计算,应该外推。

示例代码:

1
2
3
EXPLAIN SYNTAX
SELECT sum(UserID * 2)
FROM visits_v1

运行结果:

1
2
3
4
5

┌─explain────────────────┐
│ SELECT sum(UserID) * 2 │
│ FROM visits_v1 │
└────────────────────────┘

聚合函数去除

对于聚合键(group by key),使用minmax等聚合函数,是没有意义的,应该去除。

示例代码:

1
2
3
4
5
6
EXPLAIN SYNTAX
SELECT sum(UserID * 2),
max(VisitID),
max(UserID)
FROM visits_v1
GROUP BY UserID;

运行结果:

1
2
3
4
5
6
7
8
9

┌─explain──────────────┐
│ SELECT │
│ sum(UserID) * 2, │
│ max(VisitID), │
│ UserID │
│ FROM visits_v1 │
│ GROUP BY UserID │
└──────────────────────┘

去除重复的key

order by key

对于重复的order by key,应该去除。

示例代码:

1
2
3
4
5
6
7
EXPLAIN SYNTAX
SELECT *
FROM visits_v1
ORDER BY UserID ASC,
UserID ASC,
VisitID ASC,
VisitID ASC

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

┌─explain───────────────────────────────────┐
│ SELECT │
│ CounterID, │
│ StartDate, │
│ │
│ 【部分运行结果略】 │
│ │
│ `Market.GoodQuantity`, │
│ `Market.GoodPrice`, │
│ IslandID │
│ FROM visits_v1 │
│ ORDER BY │
│ UserID ASC, │
│ VisitID ASC │
└───────────────────────────────────────────┘

limit by key

对于重复的limit by key,应该去除。

示例代码:

1
2
3
4
5
EXPLAIN SYNTAX
SELECT *
FROM visits_v1
LIMIT 3 BY VisitID, VisitID
LIMIT 10

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

┌─explain───────────────────────────────────┐
│ SELECT │
│ CounterID, │
│ StartDate, │
│ Sign, │
│ │
│ 【部分运行结果略】 │
│ │
│ `Market.GoodQuantity`, │
│ `Market.GoodPrice`, │
│ IslandID │
│ FROM visits_v1 │
│ LIMIT 3 BY VisitID │
│ LIMIT 10 │
└───────────────────────────────────────────┘

USING Key

对于重复的USING Key,应该去除。

示例代码:

1
2
3
4
5
6
7
8
EXPLAIN SYNTAX
SELECT a.UserID,
a.UserID,
b.VisitID,
a.URL,
b.UserID
FROM hits_v1 AS a
LEFT JOIN visits_v1 AS b USING (UserID, UserID)

运行结果:

1
2
3
4
5
6
7
8
9
10
11

┌─explain─────────────────────────────────────┐
│ SELECT │
│ UserID, │
│ UserID, │
│ VisitID, │
│ URL, │
│ b.UserID │
│ FROM hits_v1 AS a │
│ ALL LEFT JOIN visits_v1 AS b USING (UserID) │
└─────────────────────────────────────────────┘

标量替换

如果子查询只返回一行数据,应该在被引用的时候用标量替换。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
EXPLAIN SYNTAX
WITH
(SELECT sum(bytes)
FROM system.parts
WHERE active) AS total_disk_usage
SELECT (sum(bytes) / total_disk_usage) * 100 AS table_disk_usage,
table
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10

运行结果:

1
2
3
4
5
6
7
8
9
10
11

┌─explain─────────────────────────────────────────────────────────────────────────┐
│ WITH identity(_CAST(0, 'Nullable(UInt64)')) AS total_disk_usage │
│ SELECT │
│ (sum(bytes_on_disk AS bytes) / total_disk_usage) * 100 AS table_disk_usage, │
│ table │
│ FROM system.parts │
│ GROUP BY table │
│ ORDER BY table_disk_usage DESC │
│ LIMIT 10 │
└─────────────────────────────────────────────────────────────────────────────────┘

三元运算优化

如果开启了optimize_if_chain_to_multiif,三元运算符应替换成multiIf

例子略,在上文讨论SYNTAX,举的例子就是multiIf的。

单表查询优化

prewhere替代where

prewhere作用

prewherewhere语句的作用相同,用来过滤数据。

不同之处在于:

  • prewhere只支持*MergeTree系列引擎的表
  • prewhere,首先会读取指定的列数据,来判断数据过滤,等待数据过滤之后再读取select声明的列字段来补全其余属性。
    即,prewhere会优化执行过滤阶段的数据读取方式,减少IO操作。
    当查询列明显多于筛选列时使用,prewhere语句比where语句处理的数据量更少性能更高。

optimize_move_to_prewhere

默认情况下,optimize_move_to_prewhere是开启的。

我们可以通过SHOW [CHANGED] SETTINGS LIKE|ILIKE <name>确认一下。

  • changed,只显示和默认值不同的参数,也就是被改动过的。
  • LIKE,模糊匹配,区分大小写。
  • ILIKE,模糊匹配,不区分大小写,insensitive like。

示例代码:

1
show settings ilike '%optimize_move_to_prewhere%';

运行结果:

1
2
3
4
5

┌─name───────────────────────────────┬─type─┬─value─┐
│ optimize_move_to_prewhere │ Bool │ 1 │
│ optimize_move_to_prewhere_if_final │ Bool │ 0 │
└────────────────────────────────────┴──────┴───────┘

如果optimize_move_to_prewhere不是1,可以通过set进行修改,示例代码:

1
set optimize_move_to_prewhere=0;

注意:optimize_move_to_prewhere=0之间,不要有空格。

注意事项

一般ClickHouse会将where自动优化成prewhere,但是某些场景即使开启优化,也不会自动转换成prewhere,需要手动指定prewhere。例如:

  • 使用常量表达式。
  • 使用默认值为alias类型的字段。
  • 包含了arrayJOINglobalInglobalNotIn或者indexHint的查询。
  • select查询的列字段和where的谓词相同,不是包含,是相同。
  • 使用了主键字段。

数据采样

作用

通过采样运算可以提升数据分析的性能。

SAMPLE

关键字SAMPLE,示例代码:

1
2
3
4
5
6
7
SELECT Title, count(*) AS PageViews
FROM hits_v1
SAMPLE 0.1
WHERE CounterID = 57
GROUP BY Title
ORDER BY PageViews DESC
LIMIT 1000;

注意

采样修饰符只有在MergeTree表中才有效,且在创建表时需要指定采样策略。

本例的建表语句如下,注意SAMPLE BY intHash32(UserID)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE datasets.hits_v1
(
WatchID UInt64,
JavaEnable UInt8,
Title String,

【部分SQL略】

IslandID FixedString(16),
RequestNum UInt32,
RequestTry UInt8
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity = 8192

列裁剪

应避免使用select *操作,查询的性能会与查询的字段大小和数量成线性表换,字段越少,消耗的IO资源越少,性能就会越高。

分区裁剪

分区裁剪就是只读取需要的分区,在过滤条件中指定。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
select WatchID,
JavaEnable,
Title,
GoodEvent,
EventTime,
EventDate,
CounterID,
ClientIP,
ClientIP6,
RegionID,
UserID
from datasets.hits_v1
where EventDate = '2014-03-23';

order by 结合where、limit

千万以上数据集进行order by查询时需要搭配where条件和limit语句一起使用。

避免构建虚拟列

如非必须,不要在结果集上构建虚拟列,虚拟列非常消耗资源浪费性能,可以考虑在前端进行处理,或者在表中构造实际字段进行额外存储。

反例,示例代码:

1
2
SELECT Income, Age, Income / Age as IncRate
FROM datasets.hits_v1;

应该在拿到IncomeAge后,在前端进行处理,或者在表中构造实际字段进行额外存储。

uniqCombined替代count(distinct )

uniqCombined底层采用类似HyperLogLog算法实现,能接收2%2\%左右的数据误差,可直接使用这种去重方式提升查询性能;而count(distinct )会使用uniqExact精确去重。

uniqCombined(UserID),示例代码:

1
select uniqCombined(UserID) from hits_v1;

运行结果:

1
2
3
┌─uniqCombined(UserID)─┐
│ 119862 │
└──────────────────────┘

count(distinct UserID),示例代码:

1
select count(distinct UserID) from hits_v1;

运行结果:

1
2
3
┌─uniqExact(UserID)─┐
│ 119689 │
└───────────────────┘

注意,不建议在千万级不同数据上执行distinct去重查询,可以考虑改为近似去重uniqCombined

使用物化视图

关于物化视图,我们会在下文讨论。

多表关联优化

用IN代替JOIN

当多表联查时,查询的数据仅从其中一张表出时,可考虑用IN操作,而不是JOIN。示例代码:

1
2
3
select a.*
from hits_v1 a
where a.CounterID in (select CounterID from visits_v1)

小表在右

比如A JOIN B,ClickHouse的实现方式是,先把右边的表B都加载到内存,然后遍历表A,每一条数据都和内存中的B表进行匹配。

无论是Left joinRight join还是Inner join,都是拿着右表中的每一条记录到左表中查找该记录是否存在。

所以,多表JOIN时要满足小表在右的原则,右表关联时被加载到内存中与左表进行比较。

分布式表使用GLOBAL

两张分布式表上的INJOIN之前必须加上GLOBAL关键字;这样,右表只会在接收查询请求的那个节点查询一次,并将其分发到其他节点上。

如果不加GLOBAL关键字的话,每个节点都会单独发起一次对右表的查询,而右表又是分布式表,就导致右表一共会被查询N2N^2次(NN是该分布式表的分片数量),这就是查询放大,会带来很大开销。

使用字典表

将一些需要关联分析的业务创建成字典表进行JOIN操作,前提是字典表不宜太大,因为字典表会常驻内存。

提前过滤

通过增加逻辑过滤可以减少数据扫描,达到提高执行速度及降低内存消耗的目的。

物化视图(优化)

什么是物化视图

我们知道,视图是从一个或者多个基本表(或视图)导出的虚拟表,实质上是将底层结构隐藏封装起来,使得我们只需要专注于上层的查询,简化了我们的查询条件,但数据仍然是从基础表中查询,并不会提高任何查询效率。

而物化视图是将上述的虚拟表变成一个真实的表,将多个表的数据通过预设的条件提前存储到这张表中,从而达到一个预聚合的目的。这个时候我们查询的数据就来源于预聚合好的表。

优缺点

  • 优点:查询效率高
  • 缺点:
    • 维护成本高:物化视图需要占用存储空间,而且需要定期更新,这会增加维护成本。
    • 数据不一致:由于物化视图是基于原始表的数据生成的,如果原始表的数据发生了变化,而物化视图没有及时更新,就会导致数据不一致的问题。
    • 查询性能下降:物化视图虽然可以提高查询性能,但是在更新物化视图时需要加锁,这会影响到其他查询的性能。

使用

创建物化视图,语法:

1
2
3
4
5
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]Materialized_name [TO[db.]name] [ON CLUSTER cluster] 
ENGINE = engine_name()
ORDER BY expr
[POPULATE]
AS SELECT ...

参数说明:

参数 说明
db 数据库的名称,默认为当前选择的数据库。
Materialized_name 物化视图名。
TO[db.]name 将物化视图的数据写入到新表中。
如果需要将物化视图的数据写入新表,不能使用POPULATE关键字。
[ON CLUSTER cluster] 在每一个节点上都创建一个物化视图,固定为ON CLUSTER default。
ENGINE = engine_name() 表引擎类型。
[POPULATE] POPULATE关键字。如果创建物化视图时指定了POPULATE关键字,则在创建时将SELECT子句所指定的源表数据插入到物化视图中。不指定POPULATE关键字时,物化视图只会包含在物化视图创建后新写入源表的数据。
一般不推荐使用POPULATE关键字,因为在物化视图创建期间写入源表的数据将不会写入物化视图中。
SELECT ... SELECT子句。当数据写入物化视图中SELECT子句所指定的源表时,插入的数据会通过SELECT子句查询进行转换并将最终结果插入到物化视图中。
SELECT查询可以包含DISTINCT、GROUP BY、ORDER BY和LIMIT等,但是相应的转换是在每个插入数据块上独立执行的。

示例:

创建SELECT子句指定的源表。示例代码:

1
2
3
4
5
6
CREATE TABLE test
(
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);

写入数据至源表。示例代码:

1
INSERT INTO test VALUES(1,'a'),(2,'b'),(3,'c');

创建基于源表的物化视图。示例代码:

1
2
3
4
5
6
CREATE MATERIALIZED VIEW test_view
ENGINE = MergeTree()
ORDER BY (id)
AS
SELECT *
FROM test;

查询物化视图,因为未指定POPULATE关键字,所以查询到的数据为空。示例代码:

1
SELECT * FROM test_view;

运行结果:

1
0 rows in set. Elapsed: 0.006 sec.

写入数据至源表。示例代码:

1
INSERT INTO test VALUES(4,'a'),(5,'b'),(6,'c');

查询物化视图。示例代码:

1
SELECT * FROM test_view;

运行结果:

1
2
3
4
5
6

┌─id─┬─name─┐
│ 4 │ a │
│ 5 │ b │
│ 6 │ c │
└────┴──────┘

一致性

现象

假设存在一张表,如下:

1
2
3
4
5
6
7
CREATE TABLE test_a
(
user_id UInt64,
score String,
create_time DateTime DEFAULT toDateTime(0)
) ENGINE = ReplacingMergeTree(create_time)
ORDER BY user_id

这张表用的是ReplacingMergeTree索引,且ORDER BY user_id,所以user_id将会是数据去重更新的标识(类似主键)。

先插入1000万测试数据;再插入50万条数据,这50万条数据的user_id是重复的。所以,理论上,数据总条数应该还是1000万条。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
INSERT INTO TABLE test_a(user_id, score)
WITH (SELECT ['A','B','C','D','E','F','G']) AS dict
SELECT number AS user_id, dict[number % 7 + 1]
FROM numbers(10000000)

INSERT INTO TABLE test_a(user_id, score, create_time)
WITH (SELECT ['AA', 'BB', 'CC', 'DD', 'EE','FF']) AS dict
SELECT number AS user_id,
dict[number % 7 + 1],
now() AS create_time
FROM
numbers(500000)

然后,我们查询一下总记录数。示例代码:

1
SELECT COUNT() FROM test_a;

运行结果:

1
2
3
4

┌──count()─┐
│ 10500000 │
└──────────┘

因为,ReplacingMergeTree,数据的去重只会在数据合并期间进行,合并会在后台一个不确定的时间进行。

除了ReplacingMergeTree,其他的MergeTree、SummingMergeTree,也都会出现短暂的数据不一致的情况。

三种方法

在某些对一致性非常敏感的场景,通常有以下几种解决方案:

  1. 手动OPTIMIZE
  2. 通过group by去重
  3. 通过FINAL查询

手动OPTIMIZE

在写入数据后,立刻执行OPTIMIZE强制触发新写入分区的合并动作。

但是不建议,因为OPTIMIZE语句会引起对数据的大量读写。

示例代码:

1
OPTIMIZE TABLE test_a FINAL

通过group by去重

插入一条数据,示例代码:

1
2
3
INSERT
INTO TABLE test_a(user_id, score, create_time)
VALUES (0, 'AAAA', now());

查询总记录数,示例代码:

1
SELECT COUNT() FROM test_a;

运行结果:

1
2
3
4

┌──count()─┐
│ 10000001 │
└──────────┘

通过group by查询总记录数,示例代码:

1
2
3
4
5
6
7
SELECT count()
from (
SELECT user_id,
argMax(score, create_time) AS score,
max(create_time) AS ctime
FROM test_a
GROUP BY user_id);

解释说明:

  • argMax(field1,field2):按照field2的最大值取field1的值。
  • 如果想查询最新的数据,可以用上文的子查询。

通过FINAL查询

在查询语句后增加FINAL修饰符,这样在查询的过程中将会执行Merge的特殊逻辑(例如数据去重,预聚合等)。

但是这种方法在早期版本基本没有人使用,因为在增加FINAL之后,我们的查询将会变成一个单线程的执行过程,查询速度非常慢。

20.5.2.7-stable版本中,FINAL查询支持多线程执行,并且可以通过max_final_threads参数控制单个查询的线程数,但是目前读取part部分的动作依然是串行的。

示例代码:

1
SELECT COUNT() FROM test_a final;

运行结果:

1
2
3
4

┌──count()─┐
│ 10000000 │
└──────────┘

使用的查询 FINAL 执行速度比类似的查询慢一点,因为:

  • 在查询执行期间合并数据。
  • 除了读取查询中指定的列之外,还读取主键列。

MaterializeMySQL(实时性)

概述

对于MySQL和ClickHouse的同步,尤其是实时性同步,很多方案会基于binlog将数据写入到ClickHouse中。例如,基于canal监听binlog事件,写入到ClickHouse。

ClickHouse在20.8.2.3版本新增加了MaterializeMySQL的DATABASE引擎(库级别的引擎),ClickHouse服务做为MySQL的副本,读取binlog并执行DDL和DML请求,实现了基于Binlog 机制的业务数据库实时同步功能。

特点

  1. 支持全量和增量同步,在database创建之初会全量同步MySQL中的表和数据,之后则会通过binlog进行增量同步。
  2. 为其所创建的每张ReplacingMergeTree自动增加了_sign_version字段。用于适配MySQL中可能会有的,频繁大量的修改和删除操作。
    • _version用作ReplacingMergeTree的ver版本参数,每当监听到insertupdatedelete事件时,在database内全局自增。
    • _sign用于标记是否被删除,取值1或者-1

实现细则

DDL

MySQL的DDL会被转换成相应的ClickHouse的DDL操作(ALTER, CREATE, DROP, RENAME)。如果ClickHouse不能解析某个DDL操作,该操作将被忽略(不会报错)。

数据复制

MaterializeMySQL不支持直接插入、删除和更新查询,而是将DDL语句进行相应转换:

  • MySQL的INSERT操作会被转换为INSERT with _sign=1_version++
  • MySQL的DELETE操作会被转换为INSERT with _sign=-1_version++
  • MySQL的UPDATE操作会被转换成INSERT with _sign=1_version++

SELECT查询

如果在SELECT查询中没有指定_version,则使用FINAL修饰符,返回_version的最大值对应的数据,即最新版本的数据。
如果在SELECT查询中没有指定_sign,则默认使用WHERE _sign=1,即返回未删除状态的数据。

索引转换

ClickHouse数据库表会自动将MySQL主键和索引子句转换为ORDER BY元组。

案例实践

MySQL的准备

确保MySQL开启了binlog功能,且格式为ROW

修改/etc/my.cnf,在[mysqld]下添加:

1
2
3
server-id=1
log-bin=mysql-bin
binlog_format=ROW

修改完成后,重启MySQL,生效。

开启GTID模式

如果ClickHouse的版本是20.8以上,那么还需要MySQL开启GTID模式,这种方式在MySQL主从模式下可以确保数据同步的一致性(主从切换时)。

修改/etc/my.cnf,在[mysqld]下添加:

1
2
3
4
5
gtid-mode=on
# 设置为主从强一致性
enforce-gtid-consistency=1
# 记录日志
log-slave-updates=1

修改完成后,重启MySQL,生效。

准备MySQL表和数据

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE DATABASE testck;

CREATE TABLE `testck`.`t_organization`
(
`id` int NOT NULL AUTO_INCREMENT,
`code` int NOT NULL,
`name` text DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY (`code`)
) ENGINE = InnoDB;


INSERT INTO testck.t_organization (code, name, update_time)
VALUES (1000, 'Realinsight', NOW());

INSERT INTO testck.t_organization (code, name, update_time)
VALUES (1001, 'Realindex', NOW());

INSERT INTO testck.t_organization (code, name, update_time)
VALUES (1002, 'EDT', NOW());

CREATE TABLE `testck`.`t_user`
(
`id` int NOT NULL AUTO_INCREMENT,
`code` int,
PRIMARY KEY (`id`)
) ENGINE = InnoDB;

INSERT INTO testck.t_user (code)
VALUES (1);

新建用户

我们还可以为ClickHouse,专门新建一个MySQL的用户。示例代码:

1
2
3
CREATE USER 'ck'@'%' IDENTIFIED BY 'MySQL@2024';
grant all privileges on *.* to 'ck'@'%';
FLUSH PRIVILEGES;

ClickHouse的准备

开启ClickHouse物化引擎

开启ClickHouse物化引擎,示例代码:

1
SET allow_experimental_database_materialized_mysql = 1

创建MaterializeMySQL数据库

创建MaterializeMySQL数据库,示例代码:

1
2
CREATE DATABASE test_binlog ENGINE =
MaterializeMySQL('127.0.0.1:3306','testck','ck','MySQL@2024');
  • 其中4个参数分别是MySQL地址、databse、username和password。

可能会有如下报错:

1
2
Received exception from server (version 23.12.6):
Code: 695. DB::Exception: Received from localhost:9000. DB::Exception: Load job 'startup MaterializedMySQL database test_binlog' failed: Code: 537. DB::Exception: Illegal MySQL variables, the MaterializedMySQL engine requires default_authentication_plugin='mysql_native_password'. (ILLEGAL_MYSQL_VARIABLE),. (ASYNC_LOAD_FAILED)

MySQL8.0.4开始默认采用的caching_sha2_password密码插件。如果使用以前的较老的客户端,与MySQL8连接时,会出现上述报错。可以修改my.cnf中的配置来使用与老版本兼容的密码插件,解决客户端与MySQL版本不兼容问题。将注释掉的default-authentication-plugin=mysql_native_password打开, 重启MySQL即可。

然后我们查看test_binlogt_organizationt_user的数据,会发现已经有了。

实验

修改数据

在MySQL中修改数据。示例代码:

1
2
3
update t_organization
set name =CONCAT(name, '-v1')
where id = 1

查询ClickHouse中的数据,示例代码:

1
2
select *
from test_binlog.t_organization;

运行结果:

1
2
3
4
5
┌─id─┬─code─┬─name───────────┬─────────update_time─┐
│ 1 │ 1000 │ Realinsight-v1 │ 2024-08-04 23:51:42 │
│ 2 │ 1001 │ Realindex │ 2024-08-04 23:51:42 │
│ 3 │ 1002 │ EDT │ 2024-08-04 23:51:42 │
└────┴──────┴────────────────┴─────────────────────┘

特别的,如果我们显性的指出_sign_version,示例代码:

1
2
select *, _sign, _version
from test_binlog.t_organization;

运行结果:

1
2
3
4
5
6
7
8
9

┌─id─┬─code─┬─name────────┬─────────update_time─┬─_sign─┬─_version─┐
│ 1 │ 1000 │ Realinsight │ 2024-08-04 23:51:42 │ 1 │ 1 │
│ 2 │ 1001 │ Realindex │ 2024-08-04 23:51:42 │ 1 │ 1 │
│ 3 │ 1002 │ EDT │ 2024-08-04 23:51:42 │ 1 │ 1 │
└────┴──────┴─────────────┴─────────────────────┴───────┴──────────┘
┌─id─┬─code─┬─name───────────┬─────────update_time─┬─_sign─┬─_version─┐
│ 1 │ 1000 │ Realinsight-v1 │ 2024-08-04 23:51:42 │ 1 │ 2 │
└────┴──────┴────────────────┴─────────────────────┴───────┴──────────┘

删除数据

MySQL删除数据。示例代码:

1
2
3
DELETE
FROM t_organization
where id = 2;

查询ClicKHouse中的数据。示例代码:

1
2
select *
from test_binlog.t_organization;

运行结果:

1
2
3
4
5

┌─id─┬─code─┬─name───────────┬─────────update_time─┐
│ 1 │ 1000 │ Realinsight-v1 │ 2024-08-04 23:51:42 │
│ 3 │ 1002 │ EDT │ 2024-08-04 23:51:42 │
└────┴──────┴────────────────┴─────────────────────┘

显性的指出_sign_version,示例代码:

1
2
select *, _sign, _version
from test_binlog.t_organization;

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12

┌─id─┬─code─┬─name────────┬─────────update_time─┬─_sign─┬─_version─┐
│ 1 │ 1000 │ Realinsight │ 2024-08-04 23:51:42 │ 1 │ 1 │
│ 2 │ 1001 │ Realindex │ 2024-08-04 23:51:42 │ 1 │ 1 │
│ 3 │ 1002 │ EDT │ 2024-08-04 23:51:42 │ 1 │ 1 │
└────┴──────┴─────────────┴─────────────────────┴───────┴──────────┘
┌─id─┬─code─┬─name───────────┬─────────update_time─┬─_sign─┬─_version─┐
│ 1 │ 1000 │ Realinsight-v1 │ 2024-08-04 23:51:42 │ 1 │ 2 │
└────┴──────┴────────────────┴─────────────────────┴───────┴──────────┘
┌─id─┬─code─┬─name──────┬─────────update_time─┬─_sign─┬─_version─┐
│ 2 │ 1001 │ Realindex │ 2024-08-04 23:51:42 │ -1 │ 3 │
└────┴──────┴───────────┴─────────────────────┴───────┴──────────┘

删除表和新增表

在MySQL删除表和新增表,在ClickHouse处都会被同步执行。

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/12102
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板