华体会体育
Mou Mou Jidian Generator
发电机维修 发电机回收
发电机出售 发电机租赁
客户统一服务热线

070-49920329
18670126996

4进口发电机组
您的位置: 主页 > 产品中心 > 进口发电机组 >
趣头条基于 Flink+ClickHouse 构建实时数据分析平台

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

本文摘要:摘要:本文由趣头条数据平台卖力人王金海分享,主要先容趣头条 Flink-to-Hive 小时级场景和 Flink-to-ClickHouse 秒级场景。作者:王金海;泉源:云栖社区内容分为以下四部门:一、业务场景与现状分析二、Flink-to-Hive 小时级场景三、Flink-to-ClickHouse 秒级场景四、未来生长与思考一、业务场景与现状分析趣头条查询的页面分为离线查询页面和实时查询页面。

华体会体育

摘要:本文由趣头条数据平台卖力人王金海分享,主要先容趣头条 Flink-to-Hive 小时级场景和 Flink-to-ClickHouse 秒级场景。作者:王金海;泉源:云栖社区内容分为以下四部门:一、业务场景与现状分析二、Flink-to-Hive 小时级场景三、Flink-to-ClickHouse 秒级场景四、未来生长与思考一、业务场景与现状分析趣头条查询的页面分为离线查询页面和实时查询页面。趣头条今年所实现的革新是在实时查询中接入了 ClickHouse 盘算引擎。

凭据差别的业务场景,实时数据报表中会展现数据指标曲线图和详细的数据指标表。现在数据指标的收罗和盘算为每五分钟一个时间窗口,固然也存在三分钟或一分钟的特殊情况。数据指标数据全部从 Kafka 实时数据中导出,并导入 ClickHouse 举行盘算。二、Flink-to-Hive 小时级场景1.小时级实现架构图如下图所示,Database 中的 Binlog 导出到 Kafka,同时 Log Server 数据也会上报到 Kafka。

所有数据实时落地到 Kafka 之后,通过 Flink 抽取到 HDFS。下图中 HDFS 到 Hive 之间为虚线,即 Flink 并非直接落地到 Hive,Flink 落地到 HDFS 后,再落地到 Hive 的时间可能是小时级、半小时级甚至分钟级,需要知道数据的 Event time 已经到何时,再触发 alter table,add partition,add location 等,写入其分区。这时需要有一个法式监控当前 Flink 任务的数据时间已经消费到什么时候,如9点的数据,落地时需要检察 Kafka 中消费的数据是否已经到达9点,然后在 Hive 中触发分区写入。2.实现原理趣头条主要使用了 Flink 高阶版本的一个特性——StreamingFileSink。

StreamingFileSink 主要有几点功效。第一, forBulkFormat 支持 avro、parquet 花样,即列式存储花样。第二, withBucketAssigner 自界说按数据时间分桶,此处会界说一个EventtimeBucket,既按数据时间举行数据落地到离线中。第三, OnCheckPointRollingPolicy,凭据 CheckPoint 时间举行数据落地,在一定的 CheckPoint 时间内数据落地并回稳。

根据 CheckPoint 落地另有其它计谋,如根据数据巨细。第四, StreamingFileSink 是 Exactly-Once 语义实现。

Flink 中有两个 Exactly-Once 语义实现,第一个是 Kafka,第二个是 StreamingFileSink。下图为 OnCheckPointRollingPolicy 设计的每10分钟落地一次到HDFS文件中的 demo。

■ 如何实现 Exactly-Once下图左侧为一个简朴的二 PC 模型。Coordinator 发送一个 prepare,执行者开始触发 ack 行动,Coordinator 收到 ack 所有消息后,所有 ack 开始触发 commit,所有执行者举行落地,将其转化到 Flink 的模型中,Source 收到 checkpoint barrier 流时,开始触发一个 snapshot。

每个算子的 CheckPoint、snapshot 都完成之后,CheckPoint 会给 Job Manager 发送 notifyCheckpointComplete。下图中二阶段模型和 Flink 模型左侧三条线部门是一致的。因此用 Flink 可以实现二阶段提交协议。

■ 如何使用 Flink 实现二阶段提交协议首先,StreamingFileSink 实现两个接口,CheckpointedFunction 和CheckpointListener。CheckpointedFunction 实现 initializeState 和 snapshotState 函数。

CheckpointListener 是 notifyCheckpointComplete 的方法实现,因此这两个接口可以实现二阶段提交语义。initializeStateinitializeState 在任务启动时会触发三个行动。第一个是 commitPendingFile。

实时数据落地到 Hdfs 上有三个状态。第一个状态是 in-progress ,正在举行状态。

第二个状态是 pending 状态,第三个状态是 finished 状态。initializeState 在任务启动时还会触发 restoreInProgressFile,算子实时写入。

如果 CheckPoint 还未乐成时法式泛起问题,再次启动时 initializeState 会 commit PendingFile,然后接纳 Hadoop 2.7+ 版本的 truncate 方式重置或截断 in-progress 文件。invoke实时写入数据。snapshotState触发 CheckPoint 时会将 in-progress 文件转化为 pending state,同时记载数据长度(truncate 方式需要截断长度)。snapshotState 并非真正将数据写入 HDFS,而是写入 ListState。

华体会体育

Flink 在 Barrier 对齐状态时内部实现 Exactly-Once 语义,可是实现外部端到端的 Exactly-Once 语义比力难题。Flink 内部实现 Exactly-Once 通过 ListState,将数据全部存入 ListState,等候所有算子 CheckPoint 完成,再将 ListState 中的数据刷到 HDFS 中。

notifyCheckpointCompletenotifyCheckpointComplete 会触发 pending 到 finished state 的数据写入。实现方法是 rename,Streaming 不停向 HDFS 写入暂时文件,所有行动竣事后通过 rename 行动写成正式文件。3.跨集群多 nameservices趣头条的实时集群和离线集群是独立的,离线集群有多套,实时集群现在有一套。通过实时集群写入离线集群,会发生 HDFS nameservices 问题。

在实时集群中将所有离线集群的 nameservices 用 namenode HA 的方式全部打入实时集群并不合适。那么如何在任务中通过实时集群提交到各个离线集群?如下图所示,在 Flink 任务的 resource 下面,在 HDFS 的 xml 中间加入。在 PropertyHong Kong 中添加 nameservices,如 stream 是实时集群的 namenode HA 设置,data 是即将写入的离线集群的 namenode HA 设置。

那么两个集群中间的 HDFS set 不需要相互修改,直接可以在客户端实现。4.多用户写入权限实时要写入离线 HDFS,可能会涉及用户权限问题。实时提交的用户已经界说好该用户在所有法式中都是同一个用户,但离线中是多用户的,因此会造成实时和离线用户差池等。趣头条在 API 中添加了 withBucketUser 写 HDFS。

设置好 nameservices后,接下来只需要知道该 HDFS 路径通过哪个用户来写,好比设置一个 stream 用户写入。API 层级的利益是一个 Flink 法式可以指定多个差别的 HDFS 和差别的用户。多用户写入的实现是在 Hadoop file system 中加一个 ugi.do as ,署理用户。以上为趣头条使用 Flink 方式举行实时数据同步到 Hive 的一些事情。

其中可能会泛起小文件问题,小文件是后台法式举行定期 merge,如果 CheckPoint 距离时间较短,如3分钟一次,会泛起大量小文件问题。三、Flink-to-ClickHouse 秒级场景1.秒级实现架构图趣头条现在有许多实时指标,平均每五分钟或三分钟盘算一次,如果每一个实时指标用一个 Flink 任务,或者一个 Flink SQL 来写,好比消费一个 Kafka Topic,需要盘算其日活、新增、流程等等当用户提出一个新需求时,需要改当前的 Flink 任务或者启动一个新的 Flink 任务消费 Topic。因此会泛起 Flink 任务不停修改或者不停起新的 Flink 任务的问题。

趣头条实验在 Flink 后接入 ClickHouse,实现整体的 OLAP。下图为秒级实现架构图。从 Kafka 到 Flink,到 Hive,到 ClickHouse 集群,对接外部 Horizon(实时报表),QE(实时 adhoc 查询),千寻(数据分析),用户画像(实时圈人)。

2.Why Flink+ClickHouse指标实现 sql 化形貌:分析师提出的指标基本都以 SQL 举行形貌。指标的上下线互不影响:一个 Flink 任务消费 Topic,如果还需要其它指标,可以保证指标的上下线互不影响。数据可回溯,利便异常排查:当日活下降,需要回溯排查是哪些指标口径的逻辑问题,好比是报的数据差异或是数据流 Kafka 掉了,或者是因为用户没有上报某个指标导致日活下降,而 Flink 则无法举行回溯。

盘算快,一个周期内完成所有指标盘算:需要在五分钟内将成百上千的所有维度的指标全部盘算完成。支持实时流,漫衍式部署,运维简朴:支持 Kafka 数据实时流。

现在趣头条 Flink 集群有 100+ 台 32 核 128 G 3.5T SSD,日数据量 2000+ 亿,日查询量 21w+ 次,80% 查询在 1s 内完成。下图为单表测试效果。

ClickHouse 单表测试速度快。但受制于架构,ClickHouse 的 Join 较弱。

下图是处置惩罚相对较为庞大的 SQL,count+group by+order by,ClickHouse 在 3.6s内完成 26 亿数据盘算。3.Why ClickHouse so FastClickHouse 接纳列式存储 +LZ4、ZSTD 数据压缩。其次,盘算存储联合当地化+向量化执行。

Presto 数据可能存储在 Hadoop 集群或者 HDFS 中,实时拉取数据举行盘算。而 ClickHouse 盘算存储当地化是指每一台盘算机械存在当地 SSD 盘,只需要盘算自己的数据,再举行节点合并。同时,LSM merge tree+Index。

将数据写入 ClickHouse 之后,会在后台开始一个线程将数据举行 merge,做 Index 索引。如建常见的 DT 索引和小时级数据索引,以提高查询性能。第四,SIMD+LLVM 优化。

SIMD 是单指令多数据集。第五,SQL 语法及 UDF 完善。ClickHouse 对此有很大需求。

在数据分析或者维度下拽时需要更高的特性,如时间窗口的一部门功效点。Merge Tree:如下图所示。第一层为实时数据写入。

后台举行每一层级数据的merge。merge 时会举行数据排序,做 Index 索引。ClickHouse Connector:ClickHouse 有两个观点,Local table 和Distributed table。一般是写 Local table ,读 Distributed table。

ClickHouse 一般以 5~10w一个批次举行数据写入,5s一个周期。趣头条还实现了 RoundRobinClickHouseDataSource。BalancedClickHouseDataSource :MySQL 中设置一个 IP 和端口号就可以写入数据,而 BalancedClickHouseDataSource 需要写 Local 表,因此必须要知道该集群有几多个 Local 表,每一个 Local 表的 IP 和端口号。如有一百台机械,需要将一百台机械的 IP 和端口号全部设置好,再举行写入。

BalancedClickHouseDataSource 有两个 schedule。scheduleActualization和 scheduleConnectionsCleaning。

设置一百台机械的 IP 和端口号,会泛起某些机械不毗连或者服务不响应问题,scheduleActualization 会定期发现机械无法毗连的问题,触发下线或删除 IP 等行动。scheduleConnectionsCleaning 会定期清理 ClickHouse 中无用的 http 请求。RoundRobinClickHouseDataSource:趣头条对BalancedClickHouseDataSource 举行增强的效果,实现了三个语义。

华体会体育app官方下载

testOnBorrow 设置为 true,实验 ping 看能否获取毗连。用 ClickHouse 写入时是一个 batch,再将 testOnReturn 设置为 false,testWhileIdel 设置为true,填入官方 scheduleActualization 和 scheduleConnectionsCleaning 的功效。ClickHouse 后台不停举行 merge,如果 insert 过快使后台 merge 速度变慢,跟不上 insert,泛起报错。

因此需要只管不停往下写,等写完当前机械,再写下一个机械,以5s距离举行写入,使 merge 速度能够只管与 insert 速度保持一致。4.BackfillFlink 导入 ClickHouse,在数据查询或展示报表时,会遇到一些问题,好比 Flink 任务泛起故障、报错或数据反压等,或 ClickHouse 集群泛起不行响应,zk 跟不上,insert 过快或集群负载等问题,这会导致整个任务泛起问题。如果流数据量突然暴增,启动 Flink 可能泛起一段时间内不停追数据的情况,需要举行调整并行度等操作资助 Flink 追数据。

但这时已经泛起数据积压,若还要加大 Flink 并发度处置惩罚数据,ClickHouse 限制 insert 不能过快,否则会导致恶性循环。因此当 Flink 故障或 ClickHouse 集群故障时,等候 ClickHouse 集群恢复后,Flink 任务从最新数据开始消费,不再追已往一段时间的数据,通过 Hive 将数据导入到 ClickHouse。

由于之前已经通过 Kafka 将数据实时落地到 Hive,通过 Hive 将数据写入 ClickHouse 中。ClickHouse 有分区,只需要将上一个小时的数据删除,导入 Hive 的一小时数据,就可以继续举行数据查询操作。

Backfill 提供了 Flink 任务小时级容错以及 ClickHouse 集群小时级容错机制。未来生长与思考1.Connector SQL 化现在, Flink-to-Hive 以及 Flink-to-ClickHouse 都是趣头条较为固化的场景,只需指定 HDFS 路径以及用户,其余历程都可以通过 SQL 化形貌。

2.Delta lakeFlink 是流批一体盘算引擎,可是没有流批一体的存储。趣头条会用 HBase、Kudu、Redis 等能够与 Flink 实时交互的 KV 存储举行数据盘算。如盘算新增问题,现在趣头条的方案是需要将 Hive 历史用户刷到 Redis 或 HBase 中,与 Flink 举行实时交互判断用户是否新增。但因为 Hive 中的数据和 Redis 中的数据是存储为两份数据。

其次 Binlog 抽取数据会涉及 delete 行动,Hbase,Kudu 支持数据修改,定期回到 Hive 中。带来的问题是 HBase,Kudu 中存在数据,Hive 又生存了一份数据,多出一份或多份数据。

如果有流批一体的存储支持上述场景,当 Flink 任务过来,可以与离线数据举行实时交互,包罗实时查询 Hive 数据等,可以实时判断用户是否新增,对数据举行实时修改、更新或 delete,也能支持 Hive 的批的行动存储。未来,趣头条思量对 Flink 做流批的存储,使 Flink 生态统一为流批联合。

作者先容:王金海,10 年互联网历练,先后在唯品会卖力用户画像系统,提供人群的个性化营销服务;饿了么担任架构师,卖力大数据任务调理、元数据开发、任务画像等事情;现为趣头条数据中心平台卖力人,卖力大数据基础盘算层(spark、presto、flink、clickhouse)、平台服务层(libra 实时盘算、kepler 离线调理)、数据产物层(qe即时查询、horizon 数据报表、metadata 元数据、数据权限等)、以及团队建设。


本文关键词:趣,头条,基于,Flink+ClickHouse,构建,实时,摘要,华体会体育app

本文来源:华体会体育-www.tjjdcgt.com

Copyright © 2005-2021 www.tjjdcgt.com. 华体会体育科技 版权所有  ICP备案:ICP备84339771号-4