古道长亭

Contact me with ixiaoqiang0011@gmail.com


  • 首页

  • 归档

  • 分类

  • 关于

  • Book

  • 搜索

Flink 窗口机制

时间: 2023-07-13   |   分类: Flink   | 字数: 2657 字 | 阅读约: 6分钟 | 阅读次数:

Flink 窗口机制

Flink Window 背景

​ Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而Window就是从Streaming到Batch的桥梁。

​ 通俗讲,Window是用来对一个无限的流设置一个有限的集合,从而在有界的数据集上进行操作的一种机制。流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。

​ Window可以由时间(Time Window)(比如每30s)或者数据(Count Window)(如每100个元素)驱动。DataStream API提供了Time和Count的Window。

Flink Window 总览

  • Window 是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。
  • Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
  • Flink 提供了非常完善的窗口机制。
  • 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
  • 在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
  • 窗口可以是基于时间驱动的(Time Window,例如:每30秒钟)
  • 也可以是基于数据驱动的(Count Window,例如:每一百个元素)
  • 同时基于不同事件驱动的窗口又可以分成以下几类:
    • 翻滚窗口 (Tumbling Window, 无重叠)
    • 滑动窗口 (Sliding Window, 有重叠)
    • 会话窗口 (Session Window, 活动间隙)
    • 全局窗口 (略)
  • Flink要操作窗口,先得将StreamSource 转成WindowedStream

步骤:

1、获取流数据源

2、获取窗口

3、操作窗口数据

4、输出窗口数据

第 1 节 时间窗口(TimeWindow)

1.1 滚动时间窗口(Tumbling Window)

image-20200731072656176

​ 将数据依据固定的窗口长度对数据进行切分

​ 特点:时间对齐,窗口长度固定,没有重叠

​ 代码示例

package com.example.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;

/**
 * 翻滚窗口:窗口不重叠
 * 1、基于时间驱动
 * 2、基于事件驱动
 */
public class TumblingWindow {

    public static void main(String[] args) {
        //设置执行环境,类似spark中初始化sparkContext
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("teache2", 7777);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {

                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                long timeMillis = System.currentTimeMillis();

                int random = new Random().nextInt(10);

                System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis));

                return new Tuple2<String, Integer>(value, random);
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);


        // 基于时间驱动,每隔10s划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));

        // 基于事件驱动, 每相隔3个事件(即三个相同key的数据), 划分一个窗口进行计算
        // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3);

        // apply是窗口的应用函数,即apply里的函数将应用在此窗口的数据上。
        timeWindow.apply(new MyTimeWindowFunction()).print();
        // countWindow.apply(new MyCountWindowFunction()).print();

        try {
            // 转换算子都是lazy init的, 最后要显式调用 执行程序
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

1.2.1 基于时间驱动

​ 场景:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)

package com.example.window;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        int sum = 0;

        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }

        long start = window.getStart();
        long end = window.getEnd();

        out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                + format.format(start) + "  window_end :" + format.format(end)
        );

    }
}

1.2.2 基于事件驱动

​ 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满100个”相同”元素了,就会对窗口进行计算。

package com.example.window;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {

    @Override
    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        int sum = 0;

        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        //无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。
        long maxTimestamp = window.maxTimestamp();

        out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"
                + maxTimestamp + "," + format.format(maxTimestamp)
        );
    }
}

1.2 滑动时间窗口(Sliding Window)

image-20200731073053682

​ 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成

​ 特点:窗口长度固定,可以有重叠

1.2.1 基于时间的滑动窗口

​ 场景: 我们可以每30秒计算一次最近一分钟用户购买的商品总数

1.2.2 基于事件的滑动窗口

​ 场景: 每10个 “相同”元素计算一次最近100个元素的总和

​ 代码实现

package com.example.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;

/**
 * 滑动窗口:窗口可重叠
 * 1、基于时间驱动
 * 2、基于事件驱动
 */
public class SlidingWindow {

    public static void main(String[] args) {
        // 设置执行环境, 类似spark中初始化SparkContext
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("teacher2", 7777);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();

                int random = new Random().nextInt(10);
                System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));

                return new Tuple2<String, Integer>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);

        //基于时间驱动,每隔5s计算一下最近10s的数据
        //   WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
        //基于事件驱动,每隔2个事件,触发一次计算,本次窗口的大小为3,代表窗口里的每种事件最多为3个
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3, 2);

        //   timeWindow.sum(1).print();

        countWindow.sum(1).print();

        //   timeWindow.apply(new MyTimeWindowFunction()).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.3 会话窗口(Session Window)

image-20200731073350282

​ 由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况

session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

​ 特点

​ 会话窗口不重叠,没有固定的开始和结束时间

​ 与翻滚窗口和滑动窗口相反, 当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。

​ 后续的元素将会被分配给新的会话窗口

​ 案例描述

​ 计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开

​ 代码实现

package com.example.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;

public class SessionWindow {

    public static void main(String[] args) {

        // 设置执行环境, 类似spark中初始化sparkContext

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("teacher2", 7777);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();

                int random = new Random().nextInt(10);

                System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));

                return new Tuple2<String, Integer>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);

        //如果连续10s内,没有数据进来,则会话窗口断开。
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

        // window.sum(1).print();

        window.apply(new MyTimeWindowFunction()).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
#Flink# #流处理#
QQ扫一扫交流

标题:Flink 窗口机制

作者:古道长亭

声明: 欢迎加群交流!

如有帮助,欢迎多多交流 ^_^

微信打赏

支付宝打赏

Flink Time
Flink 常用API详解
  • 文章目录
  • 站点概览
古道长亭

古道长亭

Always remember that your present situation is not your final destination. The best is yet to come.

226 日志
57 分类
104 标签
GitHub Gitee
友情链接
  • 古道长亭的BOOK
  • JAVA学习
标签云
  • Mysql
  • 搜索引擎
  • Mybatis
  • 容器
  • 架构
  • 消息队列
  • Flink
  • Sharding sphere
  • 流处理
  • 缓存
  • 第 1 节 时间窗口(TimeWindow)
    • 1.1 滚动时间窗口(Tumbling Window)
    • 1.2 滑动时间窗口(Sliding Window)
    • 1.3 会话窗口(Session Window)
© 2019 - 2024 京ICP备19012088号-1
0%