Flink CEP—应用服务调用频率的实时监测与预警

在日常应用服务运维中,为了防止客户端的一些恶意调用行为,应用服务器会记录每次请求产生的行为日志作为判断依据,对其进行实时监测与预警。如果单位时间内请求次数达到指定阀值,则潜在恶意调用的风险,系统需要及时输出预警并通知运维人员;反之,单位时间内请求次数在预设的正常范围内,则无需操作。
例如,预设单个服务接口在3秒内,被同一客户端连续请求<200次,则视为正常。当个某客户端在3秒内,对同一服务接口请求次数>=200次,则立马发出恶意请求预警,并通知运维人员。对于流量监测预警的场景,实现方案有很多种,小编在这里主要介绍Flink的实现方案。
1 方案分析
针对上述业务场景,对访问流量的监测预警功能,可有效满足上述需求。Flink可以实现特定时间范围内的实时监测预警功能,大部分人可能会立马想到Flink的事件时间窗口统计,以及Flink CEP(复杂事件处理)。但是,选择事件时间窗口统计还是Flink CEP实现,下面我们对这两种实现方案进行分析。
1.1 方案一:事件时间窗口统计
在Flink中定义了三类时间:事件时间(Event Time),即事件实际发生的时间;处理时间(Processing Time),即事件被处理的时间;摄入时间(Ingestion Time),即事件进入流处理框架的时间。三种时间的区别与联系如下图所示:

Flink三种时间类型
以上图为例,图的最左边为数据产生的源头,主要由电脑终端、移动设备等源源不断的生成,常见的数据如:业务系统产生的业务数据、系统日志、用户操作产生行为数据等,此处是记录数据产生的时间,即为事件时间(Event Time);数据生成后,通过网络传输将数据回传并写入服务器的消息队列,由Flink计算引擎读取消息队列数据,读取数据的时间则为摄入时间(Ingestion Time);最后,数据通过Flink算子进行相关计算,如数据聚合、数据转换等处理,此时数据计算的时间即为处理时间(Processing Time)。
窗口是Flink的一类算子,是将无边界的数据流划分成一个个有边界的数据集,从而可以对每一个数据集进行分析处理。在实际应用中主要划分为时间窗口、计数窗口及会话窗口。在此,小编主要介绍一下时间窗口,时间窗口分为两种:
滚动窗口(Tumbling Window):滚动窗口的时间是对齐的,窗口长度固定,没有重叠,如下图所示,滚动窗口统计固定时间范围事件发生的总次数,如下图所示:

滚动窗口事件划分解读
滑动窗口(Sliding Window):滑动窗口是固定窗口更广义的一种形式,由固定窗口的窗口长度和滑动步长组成,窗口长度固定,可以重叠。如下图所示,滑动窗口统计时间范围事件发生的总次数。

滑动窗口事件划分解读
结合以上两种窗口,现在假设要求3秒内事件发生的次数<10次视为正常,那如何合理地设定时间窗口参数就显得极其重要,先看下图:

不同时间轴窗口事件划分解读
以上图为例,有两个时间轴,根据设定的需求,按照Time1的划分方式,整个监测流程下来都是正常的,不会出现恶意请求的预警操作;但在同一数据流的情况下,按Time2的划分方式,则就会出现大量的预警操作,这种情况大部分会出现在随机事件发生的时间分布不均匀,且设置的固定时间窗口恰好跨越事件发生的时间范围的时候。
这两个时间轴的划分,即使窗口长度一致,但仅仅是窗口的起始位置不同,也会出现截然不同的结果。因此,无论是事件时间滚动窗口,还是事件时间滑动窗口,都有可能会存在这样的问题。
1.2 方案二:复杂事件处理(CEP)
Flink CEP是在Flink上层实现的复杂事件处理库。它可以在无限事件流中检测出特定的事件模型,可以掌握数据中的重要部分。如下图所示:

复杂事件匹配过程
在Flink CEP中,在使用事件时间时,为了保证事件按照正确的顺序被处理,当一个事件到来后,首先,会进入一个缓冲区。在缓冲区里,事件都按照时间戳从小到大排序。当watermark到达后,缓冲区中所有小于watermark的事件将被处理。这意味着watermark之间的数据都按照时间戳顺序处理。
Flink CEP提供了Pattern API, 用于对输入流进行复杂事件规则定义,提取符合规则的序列。Pattern API,又叫模式API,它可以定义从输入流中抽取的复杂模式序列,每个复杂的模式序列包括多个简单的模式。
模式序列可以看作是模式构成的图,基于用户指定的条件从一个Pattern转换到另一个Pattern,一个Pattern匹配一个输入时间的序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,需要指定跳过策略AfterMatchSkipStrategy。匹配跳过策略,解决了匹配后,从哪里开始重新匹配的问题。有五种跳过策略,如下:
NO_SKIP:每个成功的匹配都会被输出;
SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配;
SKIP_PAST_LAST_EVENT:丢弃起始在匹配的开始和结束之间的所有部分匹配;
SKIP_TO_FIRST:丢弃起始在匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配;
SKIP_TO_LAST:丢弃起点在匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
更详细的使用说明可以到Flink API官网进行了解,包括模式API的详细定义使用,官网中给出了大量的样例说明,此处不再累赘。
官网地址:https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/
2 案例分析
通过对Flink事件时间窗口统计实现与Flink CEP复杂事件处理的两种方案分析对比,本案例将使用Flink CEP来实现对应用服务调用的实时监测与预警。根据前面提到的业务场景需求,预设服务接口如果在3s内,被连续调用超过200次,就认为是恶意调用,需要及时输出预警信息。其环境要求如下:
Flink 1.13.2;
Scala 2.12;
JDK 1.8。
2.1 构建应用—Flink CEP
首先,创建Flink CEP的业务处理类,用户可自定义命名。在此,小编命名为:Visit200TimesP3SCep,对要用到的数据结构进行定义,代码示例如下:
访问日志数据:客户端应用名称,API接口名称,请求方式,请求时间戳 case class VisitLogData(app:String, api:String, request_method:String, ts:Long) // 恶意请求预警结果:客户端应用名称,API接口名称,首次请求时间戳,末次请求时间戳,预警信息 case class VisitWarningData(app:String, api:String, startTs:Long, endTs:Long, msg:String)
随后,构建Flink的执行环境,并读取日志行为数据:
val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.addSource(new VisitLogDataSource())
在正式的生产环境中,行为数据日志的来源往往来自于Kafka。在本案例中为了更好地展示实现成果,将数据来源改成自定义生成,由VisitLogDataSource实现,代码示例如下:
class VisitLogDataSource extends SourceFunction[String]{
var running = true
val appList = Array("应用A", "应用B", "应用C")
val apiList = Array("企业信息查询", "企业模糊查询", "新闻信息查询")
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val rand = new Random()
while(running){
(0 to (rand.nextInt(20) + 20)).foreach(_ => {
val app = appList(rand.nextInt(appList.length))
val api = apiList(rand.nextInt(apiList.length))
val ts = System.currentTimeMillis()
Flink CEP—应用服务调用频率的实时监测与预警
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



