Flink CEP—应用服务调用频率的实时监测与预警
在日常应用服务运维中,为了防止客户端的一些恶意调用行为,应用服务器会记录每次请求产生的行为日志作为判断依据,对其进行实时监测与预警。如果单位时间内请求次数达到指定阀值,则潜在恶意调用的风险,系统需要及时输出预警并通知运维人员;反之,单位时间内请求次数在预设的正常范围内,则无需操作。
例如,预设单个服务接口在3秒内,被同一客户端连续请求<200次,则视为正常。当个某客户端在3秒内,对同一服务接口请求次数>=200次,则立马发出恶意请求预警,并通知运维人员。对于流量监测预警的场景,实现方案有很多种,小编在这里主要介绍Flink的实现方案。
1 方案分析
针对上述业务场景,对访问流量的监测预警功能,可有效满足上述需求。Flink可以实现特定时间范围内的实时监测预警功能,大部分人可能会立马想到Flink的事件时间窗口统计,以及Flink CEP(复杂事件处理)。但是,选择事件时间窗口统计还是Flink CEP实现,下面我们对这两种实现方案进行分析。
1.1 方案一:事件时间窗口统计
在Flink中定义了三类时间:事件时间(Event Time),即事件实际发生的时间;处理时间(Processing Time),即事件被处理的时间;摄入时间(Ingestion Time),即事件进入流处理框架的时间。三种时间的区别与联系如下图所示:

Flink三种时间类型
以上图为例,图的最左边为数据产生的源头,主要由电脑终端、移动设备等源源不断的生成,常见的数据如:业务系统产生的业务数据、系统日志、用户操作产生行为数据等,此处是记录数据产生的时间,即为事件时间(Event Time);数据生成后,通过网络传输将数据回传并写入服务器的消息队列,由Flink计算引擎读取消息队列数据,读取数据的时间则为摄入时间(Ingestion Time);最后,数据通过Flink算子进行相关计算,如数据聚合、数据转换等处理,此时数据计算的时间即为处理时间(Processing Time)。
窗口是Flink的一类算子,是将无边界的数据流划分成一个个有边界的数据集,从而可以对每一个数据集进行分析处理。在实际应用中主要划分为时间窗口、计数窗口及会话窗口。在此,小编主要介绍一下时间窗口,时间窗口分为两种:
滚动窗口(Tumbling Window):滚动窗口的时间是对齐的,窗口长度固定,没有重叠,如下图所示,滚动窗口统计固定时间范围事件发生的总次数,如下图所示:

滚动窗口事件划分解读
滑动窗口(Sliding Window):滑动窗口是固定窗口更广义的一种形式,由固定窗口的窗口长度和滑动步长组成,窗口长度固定,可以重叠。如下图所示,滑动窗口统计时间范围事件发生的总次数。

滑动窗口事件划分解读
结合以上两种窗口,现在假设要求3秒内事件发生的次数<10次视为正常,那如何合理地设定时间窗口参数就显得极其重要,先看下图:

不同时间轴窗口事件划分解读
以上图为例,有两个时间轴,根据设定的需求,按照Time1的划分方式,整个监测流程下来都是正常的,不会出现恶意请求的预警操作;但在同一数据流的情况下,按Time2的划分方式,则就会出现大量的预警操作,这种情况大部分会出现在随机事件发生的时间分布不均匀,且设置的固定时间窗口恰好跨越事件发生的时间范围的时候。
这两个时间轴的划分,即使窗口长度一致,但仅仅是窗口的起始位置不同,也会出现截然不同的结果。因此,无论是事件时间滚动窗口,还是事件时间滑动窗口,都有可能会存在这样的问题。
1.2 方案二:复杂事件处理(CEP)
Flink CEP是在Flink上层实现的复杂事件处理库。它可以在无限事件流中检测出特定的事件模型,可以掌握数据中的重要部分。如下图所示:

复杂事件匹配过程
在Flink CEP中,在使用事件时间时,为了保证事件按照正确的顺序被处理,当一个事件到来后,首先,会进入一个缓冲区。在缓冲区里,事件都按照时间戳从小到大排序。当watermark到达后,缓冲区中所有小于watermark的事件将被处理。这意味着watermark之间的数据都按照时间戳顺序处理。
Flink CEP提供了Pattern API, 用于对输入流进行复杂事件规则定义,提取符合规则的序列。Pattern API,又叫模式API,它可以定义从输入流中抽取的复杂模式序列,每个复杂的模式序列包括多个简单的模式。
模式序列可以看作是模式构成的图,基于用户指定的条件从一个Pattern转换到另一个Pattern,一个Pattern匹配一个输入时间的序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,需要指定跳过策略AfterMatchSkipStrategy。匹配跳过策略,解决了匹配后,从哪里开始重新匹配的问题。有五种跳过策略,如下:
NO_SKIP:每个成功的匹配都会被输出;
SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配;
SKIP_PAST_LAST_EVENT:丢弃起始在匹配的开始和结束之间的所有部分匹配;
SKIP_TO_FIRST:丢弃起始在匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配;
SKIP_TO_LAST:丢弃起点在匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
更详细的使用说明可以到Flink API官网进行了解,包括模式API的详细定义使用,官网中给出了大量的样例说明,此处不再累赘。
官网地址:https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/
2 案例分析
通过对Flink事件时间窗口统计实现与Flink CEP复杂事件处理的两种方案分析对比,本案例将使用Flink CEP来实现对应用服务调用的实时监测与预警。根据前面提到的业务场景需求,预设服务接口如果在3s内,被连续调用超过200次,就认为是恶意调用,需要及时输出预警信息。其环境要求如下:
Flink 1.13.2;
Scala 2.12;
JDK 1.8。
2.1 构建应用—Flink CEP
首先,创建Flink CEP的业务处理类,用户可自定义命名。在此,小编命名为:Visit200TimesP3SCep,对要用到的数据结构进行定义,代码示例如下:
访问日志数据:客户端应用名称,API接口名称,请求方式,请求时间戳 case class VisitLogData(app:String, api:String, request_method:String, ts:Long) // 恶意请求预警结果:客户端应用名称,API接口名称,首次请求时间戳,末次请求时间戳,预警信息 case class VisitWarningData(app:String, api:String, startTs:Long, endTs:Long, msg:String)
随后,构建Flink的执行环境,并读取日志行为数据:
val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.addSource(new VisitLogDataSource())
在正式的生产环境中,行为数据日志的来源往往来自于Kafka。在本案例中为了更好地展示实现成果,将数据来源改成自定义生成,由VisitLogDataSource实现,代码示例如下:
class VisitLogDataSource extends SourceFunction[String]{ var running = true val appList = Array("应用A", "应用B", "应用C") val apiList = Array("企业信息查询", "企业模糊查询", "新闻信息查询") override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val rand = new Random() while(running){ (0 to (rand.nextInt(20) + 20)).foreach(_ => { val app = appList(rand.nextInt(appList.length)) val api = apiList(rand.nextInt(apiList.length)) val ts = System.currentTimeMillis() val line = s"${app},${api},POST,${ts}" sourceContext.collect(line) }) //Thread.sleep(50) } } override def cancel(): Unit = running = false }
在生产分布式环境中,由于网络传输以及在Kafka多分区消费的情况下,读取Flink时,数据可能会存在延迟或者乱序。
因此,Flink读取数据流后,需要对数据进行类型转换,将行为日志记录的字符串转行成VisitLogData格式,并在数据中提取事件时间作为watermark,解决数据流中乱序数据问题。代码示例如下:
val visitLogStream = inputStream.map(line => { val datas = line.split(",") VisitLogData(datas(0), datas(1), datas(2), datas(3).toLong) }).assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1L)) .withTimestampAssigner(new SerializableTimestampAssigner[VisitLogData]{ override def extractTimestamp(t: VisitLogData, l: Long): Long = t.ts }) )
接下来,是实现监测预警的核心处理部分,即Flink CEP的使用,可以分为三个步骤:
第一步,定义匹配模式(Pattern),根据需求进行自定义匹配。
第二步,将Pattern应用到数据流上,得到一个PatternStream。
第三步,检出符合模式的数据流,调用select返回匹配结果。筛选出的结果在此只做简单的打印输出。生产环境中可以输出到预警流中,供下游业务实现监测预警功能。代码示例如下:
val limitTimes = 200 val withInTime = 3L // 1.定义匹配的模式 val pattern = Pattern.begin[VisitLogData]("start") .where(_.request_method != null) .times(limitTimes - 1) .consecutive() .next("end").where(_.request_method != null) .within(Time.seconds(withInTime)) // 2.将模式应用到数据流上,得到一个PatternStream val patternStream = CEP.pattern(visitLogStream.keyBy(v => (v.app, v.api)), pattern) // 3.检出符合模式的数据流,需要调用select val visitWarningStream = patternStream.select(new VisitLogWarningEvent(limitTimes, withInTime)) visitWarningStream.print("warning")
从模式中选取,获得匹配结果,定义 VisitLogWarningEvent 类实现 PatternSelectFunction 接口及select方法,在方法中封装预警结果 VisitWarningData 作为返回值数,代码示例如下:
class VisitLogWarningEvent(limitTimes:Int, withInTime:Long) extends PatternSelectFunction[VisitLogData, VisitWarningData]{ override def select(map: util.Map[String, util.List[VisitLogData]]): VisitWarningData = { // 当前匹配到的时间序列,就保存在Map里 val startRecord = map.get("start").get(0) val endRecord = map.get("end").get(0) val msg = withInTime + "秒内连续访问接口超过" + limitTimes + "次" VisitWarningData(startRecord.app, startRecord.api, startRecord.ts, endRecord.ts, msg) } }
到此,功能代码已实现完成,启动Flink作业进行测试,预警输出结果如下所示:

预警结果示例
3 划重点
Flink CEP是在Flink上层实现的复杂事件处理库。它可以在无限事件流中检测出特定的事件模型,可以掌握数据中的重要部分。其中,实现监测预警的核心处理部分,即Flink CEP的使用,可以分为三个步骤:
第一步,定义匹配模式(Pattern),根据需求进行自定义匹配;
第二步,将Pattern应用到数据流上,得到一个PatternStream;
第三步,检出符合模式的数据流,调用select返回匹配结果。
#往期推荐#
更多精彩内容,“码”上了解!↓
Flink CEP—应用服务调用频率的实时监测与预警
本文2024-09-23 01:00:54发表“云苍穹知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-cangqiong-143207.html
- 鼎捷EAI整合規範文件V3.1.07 (集團).pdf
- 鼎捷OpenAPI應用場景說明_基礎資料.pdf
- 鼎捷OpenAPI應用場景說明_財務管理.pdf
- 鼎捷T100 API設計器使用手冊T100 APIDesigner(V1.0).docx
- 鼎新e-GoB2雲端ERP B2 線上課程E6-2應付票據整批郵寄 領取.pdf
- 鼎新e-GoB2雲端ERP B2 線上課程A4使用者建立權限設定.pdf
- 鼎新e-GoB2雲端ERP B2 線上課程C3會計開帳與會計傳票.pdf
- 鼎新e-GoB2雲端ERP B2 線上課程E6-1應付票據.pdf
- 鼎新e-GoB2雲端ERP B2 線上課程A5-1進銷存參數設定(初階篇).pdf
- 鼎新e-GoB2雲端ERP B2 線上課程D2帳款開帳與票據開帳.pdf