古道长亭

Contact me with ixiaoqiang0011@gmail.com


  • 首页

  • 归档

  • 分类

  • 关于

  • Book

  • 搜索

Flink 常用API详解

时间: 2023-07-12   |   分类: Flink   | 字数: 2557 字 | 阅读约: 6分钟 | 阅读次数:

Flink 常用API详解

示例代码: https://gitee.com/ixinglan/flink-demo.git

第 1 节 Flink DataStream常用API

DataStream API主要分为3块:DataSource、Transformation、Sink

  • DataSource是程序的数据源输入,可以通过StreamExecutionEnvironment.addSource(sourceFuntion)为程序添加一个数据源
  • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap和Filter等操作
  • Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

1.1 DataSource

Flink针对DataStream提供了大量已经实现的DataSource(数据源接口),比如如下4种

1)基于文件

​ readTextFile(path)

读取文本文件,文件遵循TextInputFormat逐行读取规则并返回

tip:本地Idea读hdfs需要:

a、System.setProperty(“HADOOP_USER_NAME”,"…");

b、Window配置hadoop

c、依赖:

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-hadoop-compatibility_2.11</artifactId>
     <version>1.7.2</version>
 </dependency>
 <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-common</artifactId>
     <version>2.7.2</version>
 </dependency>
 <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-hdfs</artifactId>
     <version>2.7.2</version>
 </dependency>
 <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-client</artifactId>
     <version>2.7.2</version>
 </dependency>

2)基于Socket

​ socketTextStream

从Socket中读取数据,元素可以通过一个分隔符分开

3)基于集合

​ fromCollection(Collection)

通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的

如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用):

  • 该类是共有且独立的(没有非静态内部类)
  • 该类有共有的无参构造方法
  • 类(及父类)中所有的不被static、transient修饰的属性要么有公有的(且不被final修饰),要么是包含共有的getter和setter方法,这些方法遵循java bean命名规范。

实例:streamsource/StreamFromCollection

4)自定义输入

​ addSource可以实现读取第三方数据源的数据

​ Flink也提供了一批内置的Connector(连接器),如下表列了几个主要的

连接器是否提供Source支持是否提供Sink支持
Apache Kafka是是
ElasticSearch否是
HDFS否是
Twitter Streaming PI是否

Kafka连接器

a、依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

b、代码:streamsource/StreamFromKafka

c、启动kafka

./kafka-server-start.sh -daemon ../config/server.properties

d、创建topic

bin/kafka-topics.sh –create –zookeepper teacher1:2181 –replication-factor 1 –partitions 1 –topic mytopic2

e、启动控制台kafka生产者

bin/kafka-console-producer.sh –broker-list teacher2:9092 –topic mytopic2

1.2 Transformation

Flink针对DataStream提供了大量的已经实现的算子

  • Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作

  • FlatMap:输入一个元素,可以返回0个、1个或者多个元素

  • Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

  • KeyBy:根据指定的Key进行分组,Key相同的数据会进入同一个分区

    KeyBy有两种典型用法

    (1)DataStream.keyBy(“someKey”)指定对象中的someKey字段作为分组Key

    (2)DataStream.keyBy(0)指定Tuple中的第一个元素作为分组Key

  • Reduce:对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值

  • Aggregations:sum()、min()、max()等

案例:创建自定义数据源(流) 1秒钟产生一个新数据

思路:

​ —–实现SourceFunction<>接口,tip:SourceFuntion和SourceContext需要指明泛型,否则报InvalidTypesException异常

创建自定义分区,奇偶分区

​ —–实现Partitioner<>接口

1.3 Sink

Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示

  • writeAsText():讲元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

  • 自定义输出:addSink可以实现把数据输出到第三方存储介质中

    Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1节中表所示

案例:将流数据下沉到redis中

1、依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>

2、关键代码:

sink

第 2 节 Flink DataSet常用API

DataSet API同DataStream API一样有三个组成部分,各部分作用对应一致,此处不再赘述

2.1 DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个DataSource组件

  • 基于集合

    fromCollection(Collection),主要是为了方便测试使用

  • 基于文件

    readTextFile(path),基于HDFS中的数据进行计算分析

2.2 Transformation

​ Flink针对DataSet也提供了大量的已经实现的算子,和DataStream计算很类似

  • Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
  • FlatMap:输入一个元素,可以返回0个、1个或者多个元素
    • Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
    • Reduce:对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
    • Aggregations:sum()、min()、max()等

2.3 Sink

​ Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示

  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法

  • print()/pringToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

    Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1节中表所示

#Flink# #流处理#
QQ扫一扫交流

标题:Flink 常用API详解

作者:古道长亭

声明: 欢迎加群交流!

如有帮助,欢迎多多交流 ^_^

微信打赏

支付宝打赏

Flink 窗口机制
Flink 安装和部署
  • 文章目录
  • 站点概览
古道长亭

古道长亭

Always remember that your present situation is not your final destination. The best is yet to come.

226 日志
57 分类
104 标签
GitHub Gitee
友情链接
  • 古道长亭的BOOK
  • JAVA学习
标签云
  • Mysql
  • 搜索引擎
  • Mybatis
  • 容器
  • 架构
  • 消息队列
  • Flink
  • Sharding sphere
  • 流处理
  • 缓存
  • 第 1 节 Flink DataStream常用API
    • 1.1 DataSource
    • 1.2 Transformation
    • 1.3 Sink
  • 第 2 节 Flink DataSet常用API
    • 2.1 DataSource
    • 2.2 Transformation
    • 2.3 Sink
© 2019 - 2024 京ICP备19012088号-1
0%