Flink CEP—应用服务调用频率的实时监测与预警

栏目:云苍穹知识作者:金蝶来源:金蝶云社区发布:2024-09-23浏览:3

Flink CEP—应用服务调用频率的实时监测与预警

在日常应用服务运维中,为了防止客户端的一些恶意调用行为,应用服务器会记录每次请求产生的行为日志作为判断依据,对其进行实时监测预警。如果单位时间内请求次数达到指定阀值,则潜在恶意调用的风险,系统需要及时输出预警并通知运维人员;反之,单位时间内请求次数在预设的正常范围内,则无需操作。


例如,预设单个服务接口在3秒内,被同一客户端连续请求<200次,则视为正常。当个某客户端在3秒内,对同一服务接口请求次数>=200次,则立马发出恶意请求预警,并通知运维人员。对于流量监测预警的场景,实现方案有很多种,小编在这里主要介绍Flink的实现方案。




1 方案分析


针对上述业务场景,对访问流量的监测预警功能,可有效满足上述需求。Flink可以实现特定时间范围内的实时监测预警功能,大部分人可能会立马想到Flink的事件时间窗口统计,以及Flink CEP(复杂事件处理)。但是,选择事件时间窗口统计还是Flink CEP实现,下面我们对这两种实现方案进行分析。


1.1 方案一:事件时间窗口统计


在Flink中定义了三类时间:事件时间(Event Time),即事件实际发生的时间;处理时间(Processing Time),即事件被处理的时间;摄入时间(Ingestion Time),即事件进入流处理框架的时间。三种时间的区别与联系如下图所示:


Flink三种时间类型


以上图为例,图的最左边为数据产生的源头,主要由电脑终端、移动设备等源源不断的生成,常见的数据如:业务系统产生的业务数据、系统日志、用户操作产生行为数据等,此处是记录数据产生的时间,即为事件时间(Event Time);数据生成后,通过网络传输将数据回传并写入服务器的消息队列,由Flink计算引擎读取消息队列数据,读取数据的时间则为摄入时间(Ingestion Time);最后,数据通过Flink算子进行相关计算,如数据聚合、数据转换等处理,此时数据计算的时间即为处理时间(Processing Time)


窗口是Flink的一类算子,是将无边界的数据流划分成一个个有边界的数据集,从而可以对每一个数据集进行分析处理。在实际应用中主要划分为时间窗口计数窗口会话窗口。在此,小编主要介绍一下时间窗口,时间窗口分为两种:


滚动窗口(Tumbling Window):滚动窗口的时间是对齐的,窗口长度固定没有重叠,如下图所示,滚动窗口统计固定时间范围事件发生的总次数,如下图所示:


滚动窗口事件划分解读


滑动窗口(Sliding Window):滑动窗口是固定窗口更广义的一种形式,由固定窗口的窗口长度和滑动步长组成,窗口长度固定可以重叠。如下图所示,滑动窗口统计时间范围事件发生的总次数。


滑动窗口事件划分解读


结合以上两种窗口,现在假设要求3秒内事件发生的次数<10次视为正常,那如何合理地设定时间窗口参数就显得极其重要,先看下图:


不同时间轴窗口事件划分解读


以上图为例,有两个时间轴,根据设定的需求,按照Time1的划分方式,整个监测流程下来都是正常的,不会出现恶意请求的预警操作;但在同一数据流的情况下,按Time2的划分方式,则就会出现大量的预警操作,这种情况大部分会出现在随机事件发生的时间分布不均匀,且设置的固定时间窗口恰好跨越事件发生的时间范围的时候。


这两个时间轴的划分,即使窗口长度一致,但仅仅是窗口的起始位置不同,也会出现截然不同的结果。因此,无论是事件时间滚动窗口,还是事件时间滑动窗口,都有可能会存在这样的问题。


1.2 方案二:复杂事件处理(CEP)


Flink CEP是在Flink上层实现的复杂事件处理库。它可以在无限事件流中检测出特定的事件模型,可以掌握数据中的重要部分。如下图所示:


复杂事件匹配过程


在Flink CEP中,在使用事件时间时,为了保证事件按照正确的顺序被处理,当一个事件到来后,首先,会进入一个缓冲区。在缓冲区里,事件都按照时间戳从小到大排序。当watermark到达后,缓冲区中所有小于watermark的事件将被处理。这意味着watermark之间的数据都按照时间戳顺序处理。


Flink CEP提供了Pattern API, 用于对输入流进行复杂事件规则定义,提取符合规则的序列。Pattern API,又叫模式API,它可以定义从输入流中抽取的复杂模式序列,每个复杂的模式序列包括多个简单的模式。


模式序列可以看作是模式构成的图,基于用户指定的条件从一个Pattern转换到另一个Pattern,一个Pattern匹配一个输入时间的序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。


对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,需要指定跳过策略AfterMatchSkipStrategy。匹配跳过策略,解决了匹配后,从哪里开始重新匹配的问题。有五种跳过策略,如下:


  • NO_SKIP:每个成功的匹配都会被输出;

  • SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配;

  • SKIP_PAST_LAST_EVENT:丢弃起始在匹配的开始和结束之间的所有部分匹配;

  • SKIP_TO_FIRST:丢弃起始在匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配;

  • SKIP_TO_LAST:丢弃起点在匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。


更详细的使用说明可以到Flink API官网进行了解,包括模式API的详细定义使用,官网中给出了大量的样例说明,此处不再累赘。


官网地址:https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/


2 案例分析


通过对Flink事件时间窗口统计实现与Flink CEP复杂事件处理的两种方案分析对比,本案例将使用Flink CEP来实现对应用服务调用的实时监测与预警。根据前面提到的业务场景需求,预设服务接口如果在3s内,被连续调用超过200次,就认为是恶意调用,需要及时输出预警信息。其环境要求如下:


  • Flink 1.13.2

  • Scala 2.12

  • JDK 1.8


2.1 构建应用—Flink CEP


首先,创建Flink CEP的业务处理类,用户可自定义命名。在此,小编命名为:Visit200TimesP3SCep,对要用到的数据结构进行定义,代码示例如下:


访问日志数据:客户端应用名称,API接口名称,请求方式,请求时间戳
case class VisitLogData(app:String, api:String, request_method:String, ts:Long)
// 恶意请求预警结果:客户端应用名称,API接口名称,首次请求时间戳,末次请求时间戳,预警信息
case class VisitWarningData(app:String, api:String, startTs:Long, endTs:Long, msg:String)


随后,构建Flink的执行环境,并读取日志行为数据:


val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.addSource(new VisitLogDataSource())



在正式的生产环境中,行为数据日志的来源往往来自于Kafka。在本案例中为了更好地展示实现成果,将数据来源改成自定义生成,由VisitLogDataSource实现,代码示例如下:


class VisitLogDataSource extends SourceFunction[String]{
  var running = true
  val appList = Array("应用A", "应用B", "应用C")
  val apiList = Array("企业信息查询", "企业模糊查询", "新闻信息查询")
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    val rand = new Random()
    while(running){
      (0 to (rand.nextInt(20) + 20)).foreach(_ => {
        val app = appList(rand.nextInt(appList.length))
        val api = apiList(rand.nextInt(apiList.length))
        val ts = System.currentTimeMillis()
        val line = s"${app},${api},POST,${ts}"
        sourceContext.collect(line)
      })
      //Thread.sleep(50)
    }
  }
  override def cancel(): Unit = running = false
}



在生产分布式环境中,由于网络传输以及在Kafka多分区消费的情况下,读取Flink时,数据可能会存在延迟或者乱序


因此,Flink读取数据流后,需要对数据进行类型转换,将行为日志记录的字符串转行成VisitLogData格式,并在数据中提取事件时间作为watermark,解决数据流中乱序数据问题。代码示例如下:


val visitLogStream = inputStream.map(line => {
  val datas = line.split(",")
  VisitLogData(datas(0), datas(1), datas(2), datas(3).toLong)
}).assignTimestampsAndWatermarks(
  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1L))
    .withTimestampAssigner(new SerializableTimestampAssigner[VisitLogData]{
      override def extractTimestamp(t: VisitLogData, l: Long): Long = t.ts
    })
)


接下来,是实现监测预警的核心处理部分,即Flink CEP的使用,可以分为三个步骤:


第一步,定义匹配模式(Pattern),根据需求进行自定义匹配


第二步,将Pattern应用到数据流上,得到一个PatternStream。


第三步,检出符合模式的数据流,调用select返回匹配结果。筛选出的结果在此只做简单的打印输出。生产环境中可以输出到预警流中,供下游业务实现监测预警功能。代码示例如下:


val limitTimes = 200
val withInTime = 3L
// 1.定义匹配的模式
val pattern = Pattern.begin[VisitLogData]("start")
  .where(_.request_method != null)
  .times(limitTimes - 1)
  .consecutive()
  .next("end").where(_.request_method != null)
  .within(Time.seconds(withInTime))
// 2.将模式应用到数据流上,得到一个PatternStream
val patternStream = CEP.pattern(visitLogStream.keyBy(v => (v.app, v.api)), pattern)
// 3.检出符合模式的数据流,需要调用select
val visitWarningStream = patternStream.select(new VisitLogWarningEvent(limitTimes, withInTime))
visitWarningStream.print("warning")



从模式中选取,获得匹配结果,定义 VisitLogWarningEvent 类实现 PatternSelectFunction 接口及select方法,在方法中封装预警结果 VisitWarningData 作为返回值数,代码示例如下:


class VisitLogWarningEvent(limitTimes:Int, withInTime:Long)
  extends PatternSelectFunction[VisitLogData, VisitWarningData]{
  override def select(map: util.Map[String, util.List[VisitLogData]]): VisitWarningData = {
    // 当前匹配到的时间序列,就保存在Map里
    val startRecord = map.get("start").get(0)
    val endRecord = map.get("end").get(0)
    val msg = withInTime + "秒内连续访问接口超过" + limitTimes + "次"
    VisitWarningData(startRecord.app, startRecord.api, startRecord.ts, endRecord.ts, msg)
  }
}



到此,功能代码已实现完成,启动Flink作业进行测试,预警输出结果如下所示:


预警结果示例


3 划重点


Flink CEP是在Flink上层实现的复杂事件处理库。它可以在无限事件流中检测出特定的事件模型,可以掌握数据中的重要部分。其中,实现监测预警的核心处理部分,即Flink CEP的使用,可以分为三个步骤:


  • 第一步,定义匹配模式(Pattern),根据需求进行自定义匹配


  • 第二步,将Pattern应用到数据流上,得到一个PatternStream;


  • 第三步,检出符合模式的数据流,调用select返回匹配结果。


#往期推荐#


# 五分钟带你了解爬虫平台的构建与应用

# 一文全面了解“基于k8s的调度系统Pipeline”技术

一文带您了解“事件图谱”

# 一文带你了解动态安全库存模型


更多精彩内容,“码”上了解!↓



Flink CEP—应用服务调用频率的实时监测与预警

在日常应用服务运维中,为了防止客户端的一些恶意调用行为,应用服务器会记录每次请求产生的行为日志作为判断依据,对其进行实时监测与预警...
点击下载文档
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息