技术科普 | 基于 Flink + Doris 体验实时数仓建设
随着互联网的不断发展,数据的时效性对企业的精细化运营越来越重要,在每天产生的海量数据中,如何快速有效地挖掘出有价值的信息,对企业的运营决策有很大的帮助。
在该背景下,数仓建设就显得尤为重要。一般数仓也称传统数仓,数据通常以T+1的形式进行计算,时效性较差,因此,实时数仓的建设成了必然趋势,主要是为了解决传统数仓数据时效性的问题,一般用于实时分析、实时数据看板、业务指标实时监测等场景。
本期文章便为大家介绍实时数仓的基本原理,以及如何基于Flink + Doris搭建实时数仓。
1 实时数仓介绍
为了更好地理解实时数仓的建设流程,在详细介绍其构建方法之前,先为大家科普下实时计算与实时数仓之间的联系与区别。
实时计算
普通的实时计算会优先考虑时效性,采集数据源后,经过实时计算直接得到计算结果。这样虽保证了时效性,但可能会存在大量的重复计算,导致更多的计算资源开销。
实时计算的基本计算流程
实时数仓
实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据和计算的复用性。在一定程度上减少了数据的重复计算,计算流程上会存在更多依赖关系,因此数据的时效性会略差一点。
实时数仓的基本计算流程
实时数仓作为数仓的一种形态,它包含面向主题、集成、相对稳定等数据仓库本身的特性。与传统数仓不同的是,一般不保留中间的计算结果,无法呈现中间的历史变化情况。在传统数仓中,数据分层是一个比较重要的工作环节,通常划分为ODS层、DWD层、DIM层、DWM层、DWS层及ADS层。
同样,在实时数仓中,也要对数据处理流程进行分层,分层如下:
ODS层
原始数据层:也称贴源层、操作数据层,数据是最细粒度的,没做任何变动,如行为日志、系统日志和业务数据。
DWD层
明细数据层:保持和ODS层一样的数据颗粒度,主要是对ODS层数据做清洗和规范化操作,比如数据类型转换、数据对齐、数据过滤等。
DIM层(Hbase)
维度数据层:建立标准数据分析维度,可以降低数据计算口径和算法不统一风险,如时间维度、地区维度、产品维度、机构维度等。
DWM层
中间数据层:在DWD层的数据基础上,对数据做一些轻度聚合或关联操作,生成一些列的中间结果表,提升公共指标的复用性,减少重复加工处理,也可以和维度进行关联,形成宽表,其数据仍然是明细数据。
DWS层
汇总数据层:基于DWD或者DWM上的基础数据,根据主题划分将多个事实数据按维度进行聚合汇总,形成主题宽表。用于后续的业务查询、OLAP分析、数据分发等。
ADS层
应用数据层:提供给数据产品和数据分析使用,把OLAP分析引擎中的数据根据需要进行聚合筛选,并提供API接口服务。
2 案例简介
背景:
此案例以应用访问的行为日志进行流量分析,从简单的对应用访问日志统计PV、UV功能入手,体验用户行为日志构建实时数仓的数据处理流程。从数据的收集到分层计算,再到最后的数据应用,讲述整个实时数仓的构建流程。
2.1 架构设计
首先,收集数据源,主要包含行为日志、系统日志与业务数据:
行为日志直接通过日志服务接口写入到Kafka;
收集系统日志的常用方式为Flume + Kafka,最终将数据Sink到Kafka;
业务数据则通过Flink CDC解析MySQL或者MongoDB的日志获取,同样将数据存储到Kafka,都作为ODS层数据存储;然后使用Flink计算引擎对ODS层数据进行ETL处理,并将处理好的数据进行分流,将业务产生的数据写回Kafka作为DWD层,维度数据则分流到HBASE中作为DIM层;通过Flink对明细数据与维度数据进行关联聚合,将聚合后的数据写入实时OLAP分析引擎(如:ClickHouse、Doris),最后通过实时分析引擎对数据进行聚合查询提供应用服务。如下图所示:
数据源收集及处理流程
从上图DWS层可以看到,实时数据分析引擎存储可以是多种组合,可以选择ClickHouse或者Apache Doris,甚至可以是多种组件的组合,由此看出实时数仓构建方案的多样灵活,选择哪种实现方案,主要还要根据各自应用场景而定,没有哪一种OLAP引擎是万能的,比较常见的组合如下:
Kafka + Flink + ClickHouse 简称KFC
Kafka + Flink + Doris 简称KFD
Kafka + Flink + 其他实时OLAP引擎
2.2 OLAP引擎选择(Doris VS ClickHouse)
Doris和ClickHouse两种OLAP引擎都具备一定的优势,分别如下:
Doris和ClickHouse优势对比
那么,两者之间如何选择呢?建议如下:
2. 希望一站式的分析解决方案,少量投入研发资源,选择Doris。
通过以上分析比较,本案例为基于简单的业务场景体验实时数仓的建设流程,因此选择KFD方案实现更加合适。且Doris支持事物和幂等写入,与Flink结合能更好地实现数据精准一次性(Exactly-Once)处理。
3 案例详解
前文的案例简介中已明确描述,以应用访问的行为日志进行流量分析,从简单的对应用访问PV、UV功能入手,一步步探索实时数仓构建的流程。
3.1 数据准备
首先,准备数据源,为方便起见,对后文用到的数据结构进行简单定义。
行为数据定义
行为日志是直接写到Kafka,作为数仓的ODS层,创建Kafka主题(topic)并按照ODS层的命名规则命名为ods_base_log,表结构如下所示:
行为日志表结构
数据结构如下所示:
行为数据结构
维度数据定义
行为日志中只包含了应用的id,为了体现出数仓的维度聚合,用Flink CDC将MySQL的配置数据(应用列表)进行同步,将数据写入Kafka的ODS层,命名为ods_base_db。然后通过下一步处理,读取ODS层ods_base_db主题中的数据,并筛选出app_list维度数据,写到DIM层,存储于HBASE中,命名为dim_app_list,表结构如下所示:
维度数据表结构
数据结构如下所示:
维度数据结构
3.2 ODS层数据收集
对于行为数据收集,用SpringBoot实现一个Kafka生产者的日志写入服务,对应用服务访问时产生的行为日志,通过调用生产者服务接口写入到ods_base_log主题,如下所示:
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class LoggerController { @Autowired KafkaTemplate kafkaTemplate; @RequestMapping("/applog") public String logger(@RequestParam("param")String param){ // 日志数据进入ODS往ods_base_log主题写入 // 业务数据进入ODS往ods_base_db 主题写入 kafkaTemplate.send("ods_base_log", param); return param; } }
由于系统业务数据及维度数据都存储在业务数据库中,为了能实时捕获表的数据变动,则通过Flink CDC从MySQL(或MongoDB,由实际业务系统应用情况而定)中读取全库数据或部分表,并写入到Kafka的ods_base_db主题,简单的实现方式如下所示:
import com.kingdee.common.DmpConfig; import com.kingdee.function.MyDeserialization; import com.kingdee.utils.MyKafkaUtil; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkCDCWithMyDeserialization { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname(DmpConfig.MYSQL_HOST) .username(DmpConfig.MYSQL_USERNAME) .password(DmpConfig.MYSQL_PSW) .databaseList("busi_db") // 如果不添加该参数,则消费指定数据库中所有表的数据 // 如果指定,则消费指定表的数据,表名必须带上库名 //.tableList("busi_db.*") .deserializer(new MyDeserialization()) //.debeziumProperties() .startupOptions(StartupOptions.latest()) .build(); //addSource DataStreamSource<String> streamSource = env.addSource(sourceFunction); streamSource.print("streamSource>>>>>"); // 业务数据写入 ods_base_db 主题 // 日志数据写入 ods_base_log 主题 String sinkTopic = "ods_base_db"; streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); env.execute("FlinkCDCWithMyDeserialization"); } }
3.3 DWD层数据处理
对于明细数据处理,在不改变数据粒度的基础上,对ODS层数据进行ETL。通过Flink读取ods_base_db主题,对业务系统数据进行分流处理:
如果是业务数据则进行简单ETL后写回到Kafka的DWD层;如果是维度数据则写入到HBASE dim_app_list表中,通过Phoenix进行读写操作,在此不做过多讲解。对于日志数据和维度数据处理,主要有如下工作:
首先,筛选正式访问的行为,并转写入DWD层,实现方法如下:
1、读取Kafka数据源,处理ods_base_log数据,并将数据转换为JSON格式,如果转换异常,则数据作为侧输出流,同时判断数据记录是否为正式访问的行为(datatype=prd),如果是,则主流输出到Kafka主题dwd_pv_log,否则侧输出流输出,如下所示:
String topic = "ods_base_log"; String groupId = "base_log_app"; DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId)); OutputTag<String> outputTag = new OutputTag<String>("ErrDataTag"){}; OutputTag<String> otherOutputTag = new OutputTag<String>("other"){}; SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.process(new ProcessFunction<String, JSONObject>() { @Override public void processElement(String s, Context context, Collector<JSONObject> collector) throws Exception { try { JSONObject json = JSON.parseObject(s); if("prd".equals(json.getString("dataType"))){ collector.collect(json); }else{ context.output(otherOutputTag, s); } } catch (Exception e) { //发生异常,将数据写入测输出流 context.output(outputTag, s); } } }); jsonObjDs.getSideOutput(outputTag).print("Parse Err Data>>>"); jsonObjDs.getSideOutput(otherOutputTag).print("Other Data>>>"); jsonObjDs.map(r -> r.toJSONString()).addSink(MyKafkaUtil.getKafkaProducer("dwd_pv_log"));
2、读取Kafka数据源,处理ods_base_db数据,并将数据转换为JSON格式,同时过滤掉删除操作的记录,保留新增、修改的记录,此步包括了业务数据和维度数据的处理,将不同的数据类型按照配置进行分发。
String topic = "ods_base_db"; String groupId = "base_db_app"; DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId)); SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject) .filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { //取出数据的操作类型 String type = value.getString("type"); return !"delete".equals(type); } });
3、由于主流中包含了所有的业务数据与维度数据,要明确数据处理后的具体流向,因此,可以事先定义一个配置表,用于管理源头表的元数据,如:表类型,数据流向目标等信息,最后通过Flink CDC读取数据配置表数据,并将其转换为广播流,数据保存在状态变量中,便于后续流处理读取配置数据。实现如下:
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname(DmpConfig.MYSQL_HOST) .port(DmpConfig.MYSQL_PORT) .username(DmpConfig.MYSQL_USERNAME) .password(DmpConfig.MYSQL_PSW) .databaseList("busi_db") .tableList("meta_config") .startupOptions(StartupOptions.initial()) .deserializer(new MyDeserialization()) .build(); DataStreamSource<String> configDs = env.addSource(sourceFunction); MapStateDescriptor<String, DataConfigProcess> mapStateDescriptor = new MapStateDescriptor<>( "map-state", String.class, DataConfigProcess.class); BroadcastStream<String> broadcastStream = configDs.broadcast(mapStateDescriptor);
3.4 DWM层数据处理
从数据分析得知,统计PV、UV用到的源数据都是ods_base_log,PV统计的是时间范围内访问的次数,对每次访问进行累计,而UV统计的是独立访问用户数,在本案例中指时间范围内访问的用户数量。
由此可得,UV可以通过PV的记录去重后获取,直接在DWD层的dwd_pv_log基础上去重即可,避免了UV统计时读取ods_base_log源数据进行重新统计。实现方法如下:
1、通过Flink读取dwd_pv_log数据,转换成JSON格式,并提取watermark,如下所示:
String sourceTopic = "dwd_pv_log"; String groupId = "unique_visit_app"; String sinkTopic = "dwm_uv_log"; DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject).assignTimestampsAndWatermarks( WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3L)) .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() { @Override public long extractTimestamp(JSONObject value, long timestamp) { return value.getLong("logTs"); } }) );
2、按应用维度(appId)与用户id(userId)进行keyBy获得KeyedStream,实现分组功能,如下所示:
String sourceTopic = "dwd_pv_log"; String groupId = "unique_visit_app"; String sinkTopic = "dwm_uv_log"; DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject).assignTimestampsAndWatermarks( WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3L)) .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() { @Override public long extractTimestamp(JSONObject value, long timestamp) { return value.getLong("logTs"); } }) );
3、通过Flink的状态编程,按照appId和userId进行分组,如果当天出现过记录,则直接过滤去重,否则保留记录,并输出到Kafka的DWM层dwm_uv_log主题(仍是明细数据),如下所示:
SingleOutputStreamOperator<JSONObject> uvDs = keyedBy.filter(new RichFilterFunction<JSONObject>() { private ValueState<String> dateState; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<String> vsd = new ValueStateDescriptor<String>("value-state", String.class); StateTtlConfig stateTtlConfig = new StateTtlConfig .Builder(Time.hours(24L)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .build(); vsd.enableTimeToLive(stateTtlConfig); dateState = getRuntimeContext().getState(vsd); } @Override public boolean filter(JSONObject value) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String curDate = sdf.format(value.getLong("logTs")); if(!curDate.equals(dateState.value())){ dateState.update(curDate); return true; } return false; } }); uvDs.map(d -> d.toJSONString()).addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
3.5 DWS层数据聚合
对PV、UV进行统计,并输出以appId为维度的事实宽表,并映射成PvUvStats对象,结构如下:
数据表结构
具体实现涉及多个环节,步骤如下:
1、分别读取DWD层dwd_pv_log和DWM层dwm_uv_log数据,并转换为PvUvStats对象,dwd_pv_log的记录转换,将pvCnt设置为1L,uvCnt设置为0L,而dwm_uv_log记录则反之。
String groupId = "visit_stat_group"; String pvSourceTopic = "dwd_pv_log"; String uvSourceTopic = "dwm_uv_log"; DataStreamSource<String> pvDs = env.addSource(MyKafkaUtil.getKafkaConsumer(pvSourceTopic, groupId); DataStreamSource<String> uvDs = env.addSource(MyKafkaUtil.getKafkaConsumer(uvSourceTopic, groupId); SingleOutputStreamOperator<PvUvStats> visitorStatWithPvDs = pvDs.map(line -> { JSONObject obj = JSON.parseObject(line); return new PvUvStats("", "" obj.getString("appId"), "", 1L, 0L, obj.getLong("logTs")); }); SingleOutputStreamOperator<PvUvStats> visitorStatWithUvDs = uvDs.map(line -> { JSONObject obj = JSON.parseObject(line); return new PvUvStats("", "" obj.getString("appId"), "", 0L, 1L, obj.getLong("logTs")); });
2、对pv流、uv流进行合流,并提取watermark,用于后面的开窗聚合操作,可以根据各自情况设置数据的延迟处理时间,延迟时间由数据乱序的最大程度而定,这是由watermark的原理决定的。
DataStream<PvUvStats> unionDs = visitorStatWithPvDs.union(visitorStatWithUvDs); Single0utputStreamOperator<PvUvStats> statsWithWmDs = unionDs.assignTimestampsAndWatermarks( WatermarkStrategy.<PvUvStats>forBoundedOutOfOrderness(Duration.ofSeconds(15L)) .withTimestampAssigner(new SerializableTimestampAssigner<PvUvStats>(){ @Override public long extractTimestamp(PvUvStats pvUvStats,long ts){ return pvUvStats.getTs(); } } );
3、合流后进行开窗聚合,统计窗口范围内的Pv、Uv,为了能在聚合后能获取到窗口时间,采用自定义聚合函数(ReduceFunction)和窗口函数(WindowFunction)实现:在聚合函数中,通过reduce方法,进行计数合并;窗口函数则主要用于补全窗口时间信息,按格式转换为日期,最后输出完整的聚合结果记录。
SingleOutputStreamOperator<PvUvStats> reduceDs = statsWithWmDs.keyBy(s -> s.getAppId()) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<PvUvStats>(){ @Override public PvUvStats reduce(PvUvStats pl, PvUvStats p2) throws Exception { pl.setPvCnt(pl.getPvCnt() + p2.getPvCnt()); pl.setUvCnt(pl.getUvCnt() + p2.getUvCnt()); return p1; } }, new WindowFunction<PvUvStats, PvUvStats, String, TimeWindow>(){ @Override public void apply(String s, TimeWindow timeWindow, Iterable<PvUvStats> iterable, Collector<PvUvStats> collector) throws Exception { PvUvStats pvUvStats = iterable.iterator().next(); pvUvStats.setStt(DateUtil.toYMDhms(new Date(timeWindow.getStart()))); pvUvStats.setEdt(DateUtil.toYMDhms(new Date(timeWindow.getEnd()))); collector.collect(pvUvStats); } });
4、聚合结果中只有appId,想要补全应用名称,就要通过维度关联获取appName,在此使用Flink的异步处理方法读取外部系统HBASE的维度数据进行维度信息补充,具体可参考Flink API中的AsyncDataStream.unorderedWait实现。为了提高外部系统的读取效率,可以加入缓存及线程池的管理。
SingleOutputStreamOperator<PvUvStats> rsDs = AsyncDataStream.unorderedWait( reduceDs, new DimAsyncFunction<PvUvStats>("DIM_APP_LIST"){ @Override public String getKey(PvUvStats pvUvStats) { return pvUvStats.getAppId(); } @Override public void join(PvUvStats input, JSONObject dimInfo) throws ParseException { // 补充维度信息 input.setAppName(dimInfo.getString("appName")); } }, 60, TimeUnit.SECONS);
5、最后,将结果输出到DWS层Doris中,表结构可参考PvUvStats结构,可通过JDBC的方式直接对Doris进行读写操作。
// 写入到Doris String sql = "insert into visitor_stats values(?,?,?,?,?,?,?)"; rsDs.addSink(DorisUtil.getSink(sql))
在创建Doris表时,值得注意的是,根据各自的场景需要,选择不同的数据模型,主要分为Aggregate、Uniq、Duplicate三类数据模型。具体的使用方式以及应用场景,可以查阅Apache Doris使用文档,都比较详细地介绍了每一种数据模型的应用。
比如,在建表时使用了Aggregate模型,则维度作为key,度量为value,且对value选用SUM聚合模式,那么,当插入相同key(维度列表)时,value就会自动预聚合,对度量值(value)进行SUM汇总。
3.5 ADS层数据应用
数据经过以上多个环节的流程处理,最终将DWS层的计算结果存储在Doris中,ADS层数据应用直接访问Doris存储的数据进行聚合查询即可。visitor_stats表的数据样例如下所示,其中表结构的顺序与PvUvStats对象属性保持一致,Doris的字段存储有一定要求,必须是key前置(维度前置),度量(pvCnt、uvCnt)后置。
visitor_stats表的数据样例
通过JDBC连接Doris进行查询操作。由于本次体验实时数仓的构建流程,以demo的方式进行讲解,所以在此就不对数据场景的应用进行讲解,通过MySQL客户端连接操作,体验Doris的实时数据查询即可,如下图所示:
实时数据查询示例
在实际应用中,通过JDBC的方式访问Doris做聚合查询,主要供下游的实时OLAP分析、实时数据看板、业务指标实时监等多种场景,支持高并发,毫秒级效应。
4 划重点
实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据和计算的复用性。
通过整个实时数仓的建设流程探索,可以发现,数据在实时计算环节的流转过程,实际上和传统数仓非常相似,只是由Flink替代Hive作为了计算引擎,把数据的落盘存储由HDFS更换成了Kafka,但是数据模型的构建思路与数据处理的流转过程并没有发生太多的变化。
#往期推荐#
更多精彩内容,“码”上了解!↓
技术科普 | 基于 Flink + Doris 体验实时数仓建设
本文2024-09-23 01:00:45发表“云苍穹知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-cangqiong-143194.html