Flink Time
1.1 Time
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
-
EventTime[事件时间]
事件发生的时间,例如:点击网站上的某个链接的时间,每一条日志都会记录自己的生成时间
如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime
-
IngestionTime[摄入时间]
数据进入Flink的时间,如某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据
如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准
-
ProcessingTime[处理时间]
某个Flink节点执行某个operation的时间,例如:timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time
如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准
在Flink的流式处理中,绝大部分的业务都会使用EventTime,一般只在EventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置使用事件时间
1.2.数据延迟产生的问题
l 示例1
现在假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。
选好了外卖后,你就用在线支付功能付款了,这个时候是11点50分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点05分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,支付数据的事件时间是11点50分,而支付数据的处理时间是12点05分
一般在实际开发中会以事件时间作为计算标准
l 示例2
一条日志进入Flink的时间为2019-08-12 10:00:01,摄入时间
到达Window的系统时间为2019-08-12 10:00:02,处理时间
日志的内容为:2019-08-12 09:58:02 INFO Fail over to rm2 ,事件时间
对于业务来说,要统计1h内的故障日志个数,哪个时间是最有意义的?—事件时间
EventTime,因为我们要根据日志的生成时间进行统计。
l 示例3
某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,
但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。
l 示例4
在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。
我们先来设想一下下面这个场景:
-
使用时间窗口来统计10分钟内的用户流量
-
有一个时间窗口
-
开始时间为:2017-03-19 10:00:00
-
结束时间为:2017-03-19 10:10:00
-
-
有一个数据,因为网络延迟
-
事件发生的时间为:2017-03-19 10:10:00
-
但进入到窗口的时间为:2017-03-19 10:10:02,延迟了2秒中
-
-
时间窗口并没有将59这个数据计算进来,导致数据统计不正确
这种处理方式,根据消息进入到window时间,来进行计算。在网络有延迟的时候,会引起计算误差。
如何解决?—使用水印解决网络延迟问题
通过上面的例子,我们知道,在进行数据处理的时候应该按照事件时间进行处理,也就是窗口应该要考虑到事件时间,但是窗口不能无限的一直等到延迟数据的到来,需要有一个触发窗口计算的机制,也就是我们接下来要学的watermaker水位线/水印机制
1.3 使用Watermark解决
水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,
可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印。
-
水印并不会影响原有Eventtime事件时间
-
当数据流添加水印后,会按照水印时间来触发窗口计算
也就是说watermark水印是用来触发窗口计算的
-
一般会设置水印时间,比事件时间小几秒钟,表示最大允许数据延迟达到多久
(即水印时间 = 事件时间 - 允许延迟时间)10:09:57 = 10:10:00 - 3s
-
当接收到的 水印时间 >= 窗口结束时间,则触发计算 如等到一条数据的水印时间为10:10:00 >= 10:10:00 才触发计算,也就是要等到事件时间为10:10:03的数据到来才触发计算
(即事件时间 - 允许延迟时间 >= 窗口结束时间 或 事件时间 >= 窗口结束时间 + 允许延迟时间)
总结:watermaker是用来解决延迟数据的问题
如窗口10:00:00~10:10:00
而数据到达的顺序是: A 10:10:00 ,B 10:09:58
如果没有watermaker,那么A数据将会触发窗口计算,B数据来了窗口已经关闭,则该数据丢失
那么如果有了watermaker,设置允许数据迟到的阈值为3s
那么该窗口的结束条件则为 水印时间>=窗口结束时间10:10:00,也就是需要有一条数据的水印时间= 10:10:00
而水印时间10:10:00= 事件时间- 延迟时间3s
也就是需要有一条事件时间为10:10:03的数据到来,才会真正的触发窗口计算
而上面的 A 10:10:00 ,B 10:09:58都不会触发计算,也就是会被窗口包含,直到10:10:03的数据到来才会计算窗口10:00:00~10:10:00的数据
Watermark案例
步骤:
1、获取数据源
2、转化
3、声明水印(watermark)
4、分组聚合,调用window的操作
5、保存处理结果
注意:
当使用EventTimeWindow时,所有的Window在EventTime的时间轴上进行划分,
也就是说,在Window启动后,会根据初始的EventTime时间每隔一段时间划分一个窗口,
如果Window大小是3秒,那么1分钟内会把Window划分为如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:03,00:00:09)
[00:00:03,00:00:12)
[00:00:03,00:00:15)
[00:00:03,00:00:18)
[00:00:03,00:00:21)
[00:00:03,00:00:24)
…
[00:00:57,00:00:42)
[00:00:57,00:00:45)
[00:00:57,00:00:48)
…
如果Window大小是10秒,则Window会被分为如下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
…
[00:00:50,00:01:00)
l 注意:
1.窗口是左闭右开的,形式为:[window_start_time,window_end_time)。
2.Window的设定基于第一条消息的事件时间,也就是说,Window会一直按照指定的时间间隔进行划分,不论这个Window中有没有数据,EventTime在这个Window期间的数据会进入这个Window。
3.Window会不断产生,属于这个Window范围的数据会被不断加入到Window中,所有未被触发的Window都会等待触发,只要Window还没触发,属于这个Window范围的数据就会一直被加入到Window中,直到Window被触发才会停止数据的追加,而当Window触发之后才接受到的属于被触发Window的数据会被丢弃。
4.Window会在以下的条件满足时被触发执行:
(1)在[window_start_time,window_end_time)窗口中有数据存在
(2)watermark时间 >= window_end_time;
5.一般会设置水印时间,比事件时间小几秒钟,表示最大允许数据延迟达到多久
(即水印时间 = 事件时间 - 允许延迟时间)
当接收到的 水印时间 >= 窗口结束时间且窗口内有数据,则触发计算
(即事件时间 - 允许延迟时间 >= 窗口结束时间 或 事件时间 >= 窗口结束时间 + 允许延迟时间)
1.3. API
定期生成是现实时间驱动的,这里的“定期生成”主要是指 watermark(因为 timestamp 是每一条数据都需要有的),即定期会调用生成逻辑去产生一个 watermark。
根据特殊记录生成是数据驱动的,即是否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配 Timestamp 之后都会调用用户实现的 watermark 生成方法,用户需要在生成方法中去实现 watermark 的生成逻辑。
所以一般都是使用定期生成。
数据源:
01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
2020-04-10 11:32:46 2020-04-10 11:32:47 2020-04-10 11:32:48 2020-04-10 11:32:49 2020-04-10 11:32:50
代码:
package com.example.time;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
public class WaterMarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> data = env.socketTextStream("", 7777);
SingleOutputStreamOperator<Tuple2<String, Long>> maped = data.map(new MapFunction<String, Tuple2<String, Long>>() {
public Tuple2<String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<String, Long>(split[0], Long.valueOf(split[1]));
}
});
//做出水印watermark
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = maped.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxStamp = 0l;
Long maxOrderOut = 2000l;
@Nullable
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxStamp - maxOrderOut);
}
public long extractTimestamp(Tuple2<String, Long> element, long l) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Long f1 = element.f1;
currentMaxStamp = Math.max(f1, currentMaxStamp);
System.out.println("EventTime:" + f1 + "...f1.format:" + sdf.format(f1) + "...watermark:" + getCurrentWatermark().getTimestamp() + "...watermark.format:" + sdf.format(getCurrentWatermark().getTimestamp()));
return f1;
}
});
SingleOutputStreamOperator<String> result = watermarks.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long start = timeWindow.getStart();
long end = timeWindow.getEnd();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String startTime = simpleDateFormat.format(start);
String endTime = simpleDateFormat.format(end);
Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
ArrayList<Long> list = new ArrayList<Long>();
while (iterator.hasNext()) {
Tuple2<String, Long> next = iterator.next();
String key = next.f0;
Long f1 = next.f1;
list.add(f1);
}
Collections.sort(list);
String firstTime = simpleDateFormat.format(list.get(0));
String lastTime = simpleDateFormat.format(list.get(list.size() - 1));
collector.collect("startTime:" + startTime + "...endTime:" + endTime + "...firstTime:" + firstTime + "...lastTime:" + lastTime);
}
});
result.print();
env.execute();
}
}