时间:2022-12-06来源:www.pcxitongcheng.com作者:电脑系统城
业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse
ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。
列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。
行式
列式
我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。
这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。
为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?
常见原因1:
建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。
解决方案:
建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));
常见原因2:
ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。
解决方案:
在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。
这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:
ReplicatedMergeTree
ReplicatedSummingMergeTree
ReplicatedReplacingMergeTree
ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree
ReplicatedVersionedCollapsingMergeTree
ReplicatedGraphiteMergeTree
副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。
先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。
1 | create database test on cluster default_cluster; |
由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE test.test_data_shade on cluster default_cluster ( `data` Map(String, String), `uid` String, `remote_addr` String, ` time ` Datetime64, `status` Int32, ...其它字段省略 `dt` String ) ENGINE = ReplicatedMergeTree() partition by dt order by (dt, sipHash64(uid)); |
这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。
1 | CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed( 'default_cluster' , 'test' , 'test_data_shade' , sipHash64(uid)); |
在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。
这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。
这里贴一下我的Pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 |
< dependency > < groupId >ru.yandex.clickhouse</ groupId > < artifactId >clickhouse-jdbc</ artifactId > < version >0.3.1-patch</ version > < classifier >shaded</ classifier > < exclusions > < exclusion > < groupId >*</ groupId > < artifactId >*</ artifactId > </ exclusion > </ exclusions > </ dependency > |
Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。
连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。
这里官方推荐的是连接驱动是0.3.2:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
< dependency > <!-- please stop using ru.yandex.clickhouse as it's been deprecated --> < groupId >com.clickhouse</ groupId > < artifactId >clickhouse-jdbc</ artifactId > < version >0.3.2-patch11</ version > < classifier >all</ classifier > < exclusions > < exclusion > < groupId >*</ groupId > < artifactId >*</ artifactId > </ exclusion > </ exclusions > </ dependency > |
Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.
官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:
github.com/ClickHouse/…
以上就是详解Flink同步Kafka数据到ClickHouse分布式表的详细内容
2023-03-15
Navicat远程连接MongoDB最全实现方法以及报错解决2023-03-15
MongoDB的启动方法详细总结2023-03-11
详解分库分表后非分片键如何查询GROUP BY 语句用于结合合计函数,根据一个或多个列对结果集进行分组,下面这篇文章主要给大家介绍了关于高版本Mysql使用group by分组报错的解决方案,文中通过实例代码介绍的非常详细,需要的朋友可以参考下...
2023-03-06