技术科普 | 基于 Flink + Doris 体验实时数仓建设

随着互联网的不断发展,数据的时效性对企业的精细化运营越来越重要,在每天产生的海量数据中,如何快速有效地挖掘出有价值的信息,对企业的运营决策有很大的帮助。
在该背景下,数仓建设就显得尤为重要。一般数仓也称传统数仓,数据通常以T+1的形式进行计算,时效性较差,因此,实时数仓的建设成了必然趋势,主要是为了解决传统数仓数据时效性的问题,一般用于实时分析、实时数据看板、业务指标实时监测等场景。
本期文章便为大家介绍实时数仓的基本原理,以及如何基于Flink + Doris搭建实时数仓。
1 实时数仓介绍
为了更好地理解实时数仓的建设流程,在详细介绍其构建方法之前,先为大家科普下实时计算与实时数仓之间的联系与区别。
实时计算
普通的实时计算会优先考虑时效性,采集数据源后,经过实时计算直接得到计算结果。这样虽保证了时效性,但可能会存在大量的重复计算,导致更多的计算资源开销。

实时计算的基本计算流程
实时数仓
实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据和计算的复用性。在一定程度上减少了数据的重复计算,计算流程上会存在更多依赖关系,因此数据的时效性会略差一点。

实时数仓的基本计算流程
实时数仓作为数仓的一种形态,它包含面向主题、集成、相对稳定等数据仓库本身的特性。与传统数仓不同的是,一般不保留中间的计算结果,无法呈现中间的历史变化情况。在传统数仓中,数据分层是一个比较重要的工作环节,通常划分为ODS层、DWD层、DIM层、DWM层、DWS层及ADS层。
同样,在实时数仓中,也要对数据处理流程进行分层,分层如下:
ODS层
原始数据层:也称贴源层、操作数据层,数据是最细粒度的,没做任何变动,如行为日志、系统日志和业务数据。
DWD层
明细数据层:保持和ODS层一样的数据颗粒度,主要是对ODS层数据做清洗和规范化操作,比如数据类型转换、数据对齐、数据过滤等。
DIM层(Hbase)
维度数据层:建立标准数据分析维度,可以降低数据计算口径和算法不统一风险,如时间维度、地区维度、产品维度、机构维度等。
DWM层
中间数据层:在DWD层的数据基础上,对数据做一些轻度聚合或关联操作,生成一些列的中间结果表,提升公共指标的复用性,减少重复加工处理,也可以和维度进行关联,形成宽表,其数据仍然是明细数据。
DWS层
汇总数据层:基于DWD或者DWM上的基础数据,根据主题划分将多个事实数据按维度进行聚合汇总,形成主题宽表。用于后续的业务查询、OLAP分析、数据分发等。
ADS层
应用数据层:提供给数据产品和数据分析使用,把OLAP分析引擎中的数据根据需要进行聚合筛选,并提供API接口服务。
2 案例简介
背景:
此案例以应用访问的行为日志进行流量分析,从简单的对应用访问日志统计PV、UV功能入手,体验用户行为日志构建实时数仓的数据处理流程。从数据的收集到分层计算,再到最后的数据应用,讲述整个实时数仓的构建流程。
2.1 架构设计
首先,收集数据源,主要包含行为日志、系统日志与业务数据:
行为日志直接通过日志服务接口写入到Kafka;
收集系统日志的常用方式为Flume + Kafka,最终将数据Sink到Kafka;
业务数据则通过Flink CDC解析MySQL或者MongoDB的日志获取,同样将数据存储到Kafka,都作为ODS层数据存储;然后使用Flink计算引擎对ODS层数据进行ETL处理,并将处理好的数据进行分流,将业务产生的数据写回Kafka作为DWD层,维度数据则分流到HBASE中作为DIM层;通过Flink对明细数据与维度数据进行关联聚合,将聚合后的数据写入实时OLAP分析引擎(如:ClickHouse、Doris),最后通过实时分析引擎对数据进行聚合查询提供应用服务。如下图所示:

数据源收集及处理流程
从上图DWS层可以看到,实时数据分析引擎存储可以是多种组合,可以选择ClickHouse或者Apache Doris,甚至可以是多种组件的组合,由此看出实时数仓构建方案的多样灵活,选择哪种实现方案,主要还要根据各自应用场景而定,没有哪一种OLAP引擎是万能的,比较常见的组合如下:
Kafka + Flink + ClickHouse 简称KFC
Kafka + Flink + Doris 简称KFD
Kafka + Flink + 其他实时OLAP引擎
2.2 OLAP引擎选择(Doris VS ClickHouse)
Doris和ClickHouse两种OLAP引擎都具备一定的优势,分别如下:

Doris和ClickHouse优势对比
那么,两者之间如何选择呢?建议如下:
2. 希望一站式的分析解决方案,少量投入研发资源,选择Doris。
通过以上分析比较,本案例为基于简单的业务场景体验实时数仓的建设流程,因此选择KFD方案实现更加合适。且Doris支持事物和幂等写入,与Flink结合能更好地实现数据精准一次性(Exactly-Once)处理。
3 案例详解
前文的案例简介中已明确描述,以应用访问的行为日志进行流量分析,从简单的对应用访问PV、UV功能入手,一步步探索实时数仓构建的流程。
3.1 数据准备
首先,准备数据源,为方便起见,对后文用到的数据结构进行简单定义。
行为数据定义
行为日志是直接写到Kafka,作为数仓的ODS层,创建Kafka主题(topic)并按照ODS层的命名规则命名为ods_base_log,表结构如下所示:

行为日志表结构
数据结构如下所示:

行为数据结构
维度数据定义
行为日志中只包含了应用的id,为了体现出数仓的维度聚合,用Flink CDC将MySQL的配置数据(应用列表)进行同步,将数据写入Kafka的ODS层,命名为ods_base_db。然后通过下一步处理,读取ODS层ods_base_db主题中的数据,并筛选出app_list维度数据,写到DIM层,存储于HBASE中,命名为dim_app_list,表结构如下所示:

维度数据表结构
数据结构如下所示:

维度数据结构
3.2 ODS层数据收集
对于行为数据收集,用SpringBoot实现一个Kafka生产者的日志写入服务,对应用服务访问时产生的行为日志,通过调用生产者服务接口写入到ods_base_log主题,如下所示:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class LoggerController {
@Autowired
KafkaTemplate kafkaTemplate;
@RequestMapping("/applog")
public String logger(@RequestParam("param")String param){
// 日志数据进入ODS往ods_base_log主题写入
// 业务数据进入ODS往ods_base_db 主题写入
kafkaTemplate.send("ods_base_log", param);
return param;
}
}由于系统业务数据及维度数据都存储在业务数据库中,为了能实时捕获表的数据变动,则通过Flink CDC从MySQL(或MongoDB,由实际业务系统应用情况而定)中读取全库数据或部分表,并写入到Kafka的ods_base_db主题,简单的实现方式如下所示:
import com.kingdee.common.DmpConfig;
import com.kingdee.function.MyDeserialization;
import com.kingdee.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithMyDeserialization {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname(DmpConfig.MYSQL_HOST)
.username(DmpConfig.MYSQL_USERNAME)
.password(DmpConfig.MYSQL_PSW)
.databaseList("busi_db")
// 如果不添加该参数,则消费指定数据库中所有表的数据
// 如果指定,则消费指定表的数据,表名必须带上库名
//.tableList("busi_db.*")
.deserializer(new MyDeserialization())
//.debeziumProperties()
.startupOptions(StartupOptions.latest())
.build();
//addSource
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
streamSource.print("streamSource>>>>>");
// 业务数据写入 ods_base_db 主题
// 日志数据写入 ods_base_log 主题
String sinkTopic = "ods_base_db";
streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
env.execute("FlinkCDCWithMyDeserialization");
}
}3.3 DWD层数据处理
对于明细数据处理,在不改变数据粒度的基础上,对ODS层数据进行ETL。通过Flink读取ods_base_db主题,对业务系统数据进行分流处理:
如果是业务数据则进行简单ETL后写回到Kafka的DWD层;如果是维度数据则写入到HBASE dim_app_list表中,通过Phoenix进行读写操作,在此不做过多讲解。对于日志数据和维度数据处理,主要有如下工作:
首先,筛选正式访问的行为,并转写入DWD层,实现方法如下:
1、读取Kafka数据源,处理ods_base_log数据,并将数据转换为JSON格式,如果转换异常,则数据作为侧输出流,同时判断数据记录是否为正式访问的行为(datatype=prd),如果是,则主流输出到Kafka主题dwd_pv_log,否则侧输出流输出,如下所示:
String topic = "ods_base_log";
String groupId = "base_log_app";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId));
OutputTag<String> outputTag = new OutputTag<String>("ErrDataTag"){};
OutputTag<String> otherOutputTag = new OutputTag<String>("other"){};
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
try {
JSONObject json = JSON.parseObject(s);
if("prd".equals(json.getString("dataType"))){
collector.collect(json);
}else{
context.output(otherOutputTag, s);
}
} catch (Exception e) {
//发生异常,将数据写入测输出流
context.output(outputTag, s);
}
}
});
jsonObjDs.getSideOutput(outputTag).print("Parse Err Data>>>");
jsonObjDs.getSideOutput(otherOutputTag).print("Other Data>>>");
jsonObjDs.map(r -> r.toJSONString()).addSink(MyKafkaUtil.getKafkaProducer("dwd_pv_log"));2、读取Kafka数据源,处理ods_base_db数据,并将数据转换为JSON格式,同时过滤掉删除操作的记录,保留新增、修改的记录,此步包括了业务数据和维度数据的处理,将不同的数据类型按照配置进行分发。
String topic = "ods_base_db";
String groupId = "base_db_app";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出数据的操作类型
String type = value.getString("type");
return !"delete".equals(type);
}
});3、由于主流中包含了所有的业务数据与维度数据,要明确数据处理后的具体流向,因此,可以事先定义一个配置表,用于管理源头表的元数据,如:表类型,数据流向目标等信息,最后通过Flink CDC读取数据配置表数据,并将其转换为广播流,数据保存在状态变量中,便于后续流处理读取配置数据。实现如下:
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname(DmpConfig.MYSQL_HOST)
.port(DmpConfig.MYSQL_PORT)
.username(DmpConfig.MYSQL_USERNAME)
.password(DmpConfig.MYSQL_PSW)
.databaseList("busi_db")
.tableList("meta_config")
.startupOptions(StartupOptions.initial())
.deserializer(new MyDeserialization())
.build();
DataStreamSource<String> configDs = env.addSource(sourceFunction);
MapStateDescriptor<String, DataConfigProcess> mapStateDescriptor = new MapStateDescriptor<>(
"map-state",
String.class,
DataConfigProcess.class);
BroadcastStream<String> broadcastStream = configDs.broadcast(mapStateDescriptor);3.4 DWM层数据处理
从数据分析得知,统计PV、UV用到的源数据都是ods_base_log,PV统计的是时间范围内访问的次数,对每次访问进行累计,而UV统计的是独立访问用户数,在本案例中指时间范围内访问的用户数量。
由此可得,UV可以通过PV的记录去重后获取,直接在DWD层的dwd_pv_log基础上去重即可,避免了UV统计时读取ods_base_log源数据进行重新统计。实现方法如下:
1、通过Flink读取dwd_pv_log数据,转换成JSON格式,并提取watermark,如下所示:
String sourceTopic = "dwd_pv_log";
String groupId = "unique_visit_app";
String sinkTopic = "dwm_uv_log";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject).assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3L))
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject value, long timestamp) {
return value.getLong("logTs");
}
})
);2、按应用维度(appId)与用户id(userId)进行keyBy获得KeyedStream,实现分组功能,如下所示:
String sourceTopic = "dwd_pv_log";
技术科普 | 基于 Flink + Doris 体验实时数仓建设
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



