电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

技术科普 | 基于 Flink + Doris 体验实时数仓建设

来源:金蝶云社区作者:金蝶2024-09-236

技术科普 | 基于 Flink + Doris 体验实时数仓建设

随着互联网的不断发展,数据的时效性对企业的精细化运营越来越重要,在每天产生的海量数据中,如何快速有效地挖掘出有价值的信息,对企业的运营决策有很大的帮助。


在该背景下,数仓建设就显得尤为重要。一般数仓也称传统数仓,数据通常以T+1的形式进行计算,时效性较差,因此,实时数仓的建设成了必然趋势,主要是为了解决传统数仓数据时效性的问题,一般用于实时分析、实时数据看板、业务指标实时监测等场景。


本期文章便为大家介绍实时数仓的基本原理,以及如何基于Flink + Doris搭建实时数仓




1 实时数仓介绍


为了更好地理解实时数仓的建设流程,在详细介绍其构建方法之前,先为大家科普下实时计算实时数仓之间的联系与区别。


  • 实时计算


普通的实时计算会优先考虑时效性,采集数据源后,经过实时计算直接得到计算结果。这样虽保证了时效性,但可能会存在大量的重复计算,导致更多的计算资源开销。


上传图片

实时计算的基本计算流程


  • 实时数仓


实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据和计算的复用性。在一定程度上减少了数据的重复计算,计算流程上会存在更多依赖关系,因此数据的时效性会略差一点。


上传图片

实时数仓的基本计算流程


实时数仓作为数仓的一种形态,它包含面向主题、集成、相对稳定等数据仓库本身的特性。与传统数仓不同的是,一般不保留中间的计算结果,无法呈现中间的历史变化情况。在传统数仓中,数据分层是一个比较重要的工作环节,通常划分为ODS层、DWD层、DIM层、DWM层、DWS层及ADS层


同样,在实时数仓中,也要对数据处理流程进行分层,分层如下:


  • ODS层


原始数据层:也称贴源层、操作数据层,数据是最细粒度的,没做任何变动,如行为日志、系统日志和业务数据。


  • DWD层


明细数据层:保持和ODS层一样的数据颗粒度,主要是对ODS层数据做清洗和规范化操作,比如数据类型转换、数据对齐、数据过滤等。


  • DIM层(Hbase)


维度数据层:建立标准数据分析维度,可以降低数据计算口径和算法不统一风险,如时间维度、地区维度、产品维度、机构维度等。


  • DWM层


中间数据层:在DWD层的数据基础上,对数据做一些轻度聚合关联操作,生成一些列的中间结果表,提升公共指标的复用性,减少重复加工处理,也可以和维度进行关联,形成宽表,其数据仍然是明细数据。


  • DWS层


汇总数据层:基于DWD或者DWM上的基础数据,根据主题划分将多个事实数据按维度进行聚合汇总,形成主题宽表。用于后续的业务查询、OLAP分析、数据分发等。


  • ADS层


应用数据层:提供给数据产品和数据分析使用,把OLAP分析引擎中的数据根据需要进行聚合筛选,并提供API接口服务。


2 案例简介


背景:

此案例以应用访问的行为日志进行流量分析,从简单的对应用访问日志统计PV、UV功能入手,体验用户行为日志构建实时数仓的数据处理流程。从数据的收集到分层计算,再到最后的数据应用,讲述整个实时数仓的构建流程


2.1 架构设计


首先,收集数据源,主要包含行为日志、系统日志业务数据


  • 行为日志直接通过日志服务接口写入到Kafka;

  • 收集系统日志的常用方式为Flume + Kafka,最终将数据Sink到Kafka;

  • 业务数据则通过Flink CDC解析MySQL或者MongoDB的日志获取,同样将数据存储到Kafka,都作为ODS层数据存储;然后使用Flink计算引擎对ODS层数据进行ETL处理,并将处理好的数据进行分流,将业务产生的数据写回Kafka作为DWD层,维度数据则分流到HBASE中作为DIM层;通过Flink对明细数据与维度数据进行关联聚合,将聚合后的数据写入实时OLAP分析引擎(如:ClickHouse、Doris),最后通过实时分析引擎对数据进行聚合查询提供应用服务。如下图所示:


上传图片

数据源收集及处理流程


从上图DWS层可以看到,实时数据分析引擎存储可以是多种组合,可以选择ClickHouse或者Apache Doris,甚至可以是多种组件的组合,由此看出实时数仓构建方案的多样灵活,选择哪种实现方案,主要还要根据各自应用场景而定,没有哪一种OLAP引擎是万能的,比较常见的组合如下:


  • Kafka + Flink + ClickHouse 简称KFC

  • Kafka + Flink + Doris 简称KFD

  • Kafka + Flink + 其他实时OLAP引擎


2.2 OLAP引擎选择(Doris VS ClickHouse)


Doris和ClickHouse两种OLAP引擎都具备一定的优势,分别如下:


上传图片

Doris和ClickHouse优势对比


那么,两者之间如何选择呢?建议如下:


1. 业务场景复杂,数据规模巨大,希望投入研发力量做定制开发,选ClickHouse

2. 希望一站式的分析解决方案,少量投入研发资源,选择Doris


通过以上分析比较,本案例为基于简单的业务场景体验实时数仓的建设流程,因此选择KFD方案实现更加合适。且Doris支持事物和幂等写入,与Flink结合能更好地实现数据精准一次性(Exactly-Once)处理。


3 案例详解


前文的案例简介中已明确描述,以应用访问的行为日志进行流量分析,从简单的对应用访问PV、UV功能入手,一步步探索实时数仓构建的流程。


3.1 数据准备


首先,准备数据源,为方便起见,对后文用到的数据结构进行简单定义。


  • 行为数据定义


行为日志是直接写到Kafka,作为数仓的ODS层,创建Kafka主题(topic)并按照ODS层的命名规则命名为ods_base_log,表结构如下所示:


上传图片

行为日志表结构


数据结构如下所示:


上传图片

行为数据结构


  • 维度数据定义


行为日志中只包含了应用的id,为了体现出数仓的维度聚合,用Flink CDC将MySQL的配置数据(应用列表)进行同步,将数据写入Kafka的ODS层,命名为ods_base_db。然后通过下一步处理,读取ODS层ods_base_db主题中的数据,并筛选出app_list维度数据,写到DIM层,存储于HBASE中,命名为dim_app_list,表结构如下所示:


上传图片

维度数据表结构


数据结构如下所示:


上传图片


维度数据结构


3.2 ODS层数据收集


对于行为数据收集,用SpringBoot实现一个Kafka生产者的日志写入服务,对应用服务访问时产生的行为日志,通过调用生产者服务接口写入到ods_base_log主题,如下所示:


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class LoggerController {
  
    @Autowired
    KafkaTemplate kafkaTemplate;
  
    @RequestMapping("/applog")
    public String logger(@RequestParam("param")String param){
        // 日志数据进入ODS往ods_base_log主题写入
        // 业务数据进入ODS往ods_base_db 主题写入
        kafkaTemplate.send("ods_base_log", param);
        return param;
    }
}



由于系统业务数据及维度数据都存储在业务数据库中,为了能实时捕获表的数据变动,则通过Flink CDC从MySQL(或MongoDB,由实际业务系统应用情况而定)中读取全库数据或部分表,并写入到Kafka的ods_base_db主题,简单的实现方式如下所示:


import com.kingdee.common.DmpConfig;
import com.kingdee.function.MyDeserialization;
import com.kingdee.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithMyDeserialization {
  
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(DmpConfig.MYSQL_HOST)
                .username(DmpConfig.MYSQL_USERNAME)
                .password(DmpConfig.MYSQL_PSW)
                .databaseList("busi_db")
                // 如果不添加该参数,则消费指定数据库中所有表的数据
                // 如果指定,则消费指定表的数据,表名必须带上库名
                //.tableList("busi_db.*")
                .deserializer(new MyDeserialization())
                //.debeziumProperties()
                .startupOptions(StartupOptions.latest())
                .build();
    
        //addSource
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    
        streamSource.print("streamSource>>>>>");
        // 业务数据写入 ods_base_db 主题
        // 日志数据写入 ods_base_log 主题
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
    
        env.execute("FlinkCDCWithMyDeserialization");
    }
}


3.3 DWD层数据处理


对于明细数据处理,在不改变数据粒度的基础上,对ODS层数据进行ETL。通过Flink读取ods_base_db主题,对业务系统数据进行分流处理:


如果是业务数据则进行简单ETL后写回到Kafka的DWD层;如果是维度数据则写入到HBASE dim_app_list表中,通过Phoenix进行读写操作,在此不做过多讲解。对于日志数据和维度数据处理,主要有如下工作:


首先,筛选正式访问的行为,并转写入DWD层,实现方法如下:


1、读取Kafka数据源,处理ods_base_log数据,并将数据转换为JSON格式,如果转换异常,则数据作为侧输出流,同时判断数据记录是否为正式访问的行为(datatype=prd),如果是,则主流输出到Kafka主题dwd_pv_log,否则侧输出流输出,如下所示:


String topic = "ods_base_log";
String groupId = "base_log_app";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId));
OutputTag<String> outputTag = new OutputTag<String>("ErrDataTag"){};
OutputTag<String> otherOutputTag = new OutputTag<String>("other"){};
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
  try {
    JSONObject json = JSON.parseObject(s);
    if("prd".equals(json.getString("dataType"))){
      collector.collect(json);
    }else{
      context.output(otherOutputTag, s);
    }
  } catch (Exception e) {
    //发生异常,将数据写入测输出流
    context.output(outputTag, s);
}
}
});
jsonObjDs.getSideOutput(outputTag).print("Parse Err Data>>>");
jsonObjDs.getSideOutput(otherOutputTag).print("Other Data>>>");
jsonObjDs.map(r -> r.toJSONString()).addSink(MyKafkaUtil.getKafkaProducer("dwd_pv_log"));


2、读取Kafka数据源,处理ods_base_db数据,并将数据转换为JSON格式,同时过滤掉删除操作的记录,保留新增、修改的记录,此步包括了业务数据和维度数据的处理,将不同的数据类型按照配置进行分发。


String topic = "ods_base_db";
String groupId = "base_db_app";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId));    
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject)
  .filter(new FilterFunction<JSONObject>() {
    @Override
    public boolean filter(JSONObject value) throws Exception {
      //取出数据的操作类型
      String type = value.getString("type");
      return !"delete".equals(type);
    }
  });


3、由于主流中包含了所有的业务数据与维度数据,要明确数据处理后的具体流向,因此,可以事先定义一个配置表,用于管理源头表的元数据,如:表类型,数据流向目标等信息,最后通过Flink CDC读取数据配置表数据,并将其转换为广播流,数据保存在状态变量中,便于后续流处理读取配置数据。实现如下:


DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
        .hostname(DmpConfig.MYSQL_HOST)
        .port(DmpConfig.MYSQL_PORT)
        .username(DmpConfig.MYSQL_USERNAME)
        .password(DmpConfig.MYSQL_PSW)
        .databaseList("busi_db")
        .tableList("meta_config")
        .startupOptions(StartupOptions.initial())
        .deserializer(new MyDeserialization())
        .build();
DataStreamSource<String> configDs = env.addSource(sourceFunction);
MapStateDescriptor<String, DataConfigProcess> mapStateDescriptor = new MapStateDescriptor<>(
        "map-state",
        String.class,
        DataConfigProcess.class);
BroadcastStream<String> broadcastStream = configDs.broadcast(mapStateDescriptor);


3.4 DWM层数据处理


从数据分析得知,统计PV、UV用到的源数据都是ods_base_log,PV统计的是时间范围内访问的次数,对每次访问进行累计,而UV统计的是独立访问用户数,在本案例中指时间范围内访问的用户数量。


由此可得,UV可以通过PV的记录去重后获取,直接在DWD层的dwd_pv_log基础上去重即可,避免了UV统计时读取ods_base_log源数据进行重新统计。实现方法如下:


1、通过Flink读取dwd_pv_log数据,转换成JSON格式,并提取watermark,如下所示:


String sourceTopic = "dwd_pv_log";
String groupId = "unique_visit_app";
String sinkTopic = "dwm_uv_log";
DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject).assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3L))
      .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
        @Override
        public long extractTimestamp(JSONObject value, long timestamp) {
          return value.getLong("logTs");
        }
      })
  );



2、按应用维度(appId)与用户id(userId)进行keyBy获得KeyedStream,实现分组功能,如下所示:


String sourceTopic = "dwd_pv_log";

技术科普 | 基于 Flink + Doris 体验实时数仓建设

随着互联网的不断发展,数据的时效性对企业的精细化运营越来越重要,在每天产生的海量数据中,如何快速有效地挖掘出有价值的信息,对企业的...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信