Flink 常用API详解
示例代码: https://gitee.com/ixinglan/flink-demo.git
第 1 节 Flink DataStream常用API
DataStream API主要分为3块:DataSource、Transformation、Sink
- DataSource是程序的数据源输入,可以通过StreamExecutionEnvironment.addSource(sourceFuntion)为程序添加一个数据源
- Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap和Filter等操作
- Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。
1.1 DataSource
Flink针对DataStream提供了大量已经实现的DataSource(数据源接口),比如如下4种
1)基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat逐行读取规则并返回
tip:本地Idea读hdfs需要:
a、System.setProperty(“HADOOP_USER_NAME”,"…");
b、Window配置hadoop
c、依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
2)基于Socket
socketTextStream
从Socket中读取数据,元素可以通过一个分隔符分开
3)基于集合
fromCollection(Collection)
通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的
如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用):
- 该类是共有且独立的(没有非静态内部类)
- 该类有共有的无参构造方法
- 类(及父类)中所有的不被static、transient修饰的属性要么有公有的(且不被final修饰),要么是包含共有的getter和setter方法,这些方法遵循java bean命名规范。
实例:streamsource/StreamFromCollection
4)自定义输入
addSource可以实现读取第三方数据源的数据
Flink也提供了一批内置的Connector(连接器),如下表列了几个主要的
连接器 | 是否提供Source支持 | 是否提供Sink支持 |
---|---|---|
Apache Kafka | 是 | 是 |
ElasticSearch | 否 | 是 |
HDFS | 否 | 是 |
Twitter Streaming PI | 是 | 否 |
Kafka连接器
a、依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.2</version>
</dependency>
b、代码:streamsource/StreamFromKafka
c、启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
d、创建topic
bin/kafka-topics.sh –create –zookeepper teacher1:2181 –replication-factor 1 –partitions 1 –topic mytopic2
e、启动控制台kafka生产者
bin/kafka-console-producer.sh –broker-list teacher2:9092 –topic mytopic2
1.2 Transformation
Flink针对DataStream提供了大量的已经实现的算子
-
Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
-
FlatMap:输入一个元素,可以返回0个、1个或者多个元素
-
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
-
KeyBy:根据指定的Key进行分组,Key相同的数据会进入同一个分区
KeyBy有两种典型用法
(1)DataStream.keyBy(“someKey”)指定对象中的someKey字段作为分组Key
(2)DataStream.keyBy(0)指定Tuple中的第一个元素作为分组Key
-
Reduce:对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
-
Aggregations:sum()、min()、max()等
案例:创建自定义数据源(流) 1秒钟产生一个新数据
思路:
—–实现SourceFunction<>接口,tip:SourceFuntion和SourceContext需要指明泛型,否则报InvalidTypesException异常
创建自定义分区,奇偶分区
—–实现Partitioner<>接口
1.3 Sink
Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示
-
writeAsText():讲元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
-
print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
-
自定义输出:addSink可以实现把数据输出到第三方存储介质中
Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1节中表所示
案例:将流数据下沉到redis中
1、依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
2、关键代码:
第 2 节 Flink DataSet常用API
DataSet API同DataStream API一样有三个组成部分,各部分作用对应一致,此处不再赘述
2.1 DataSource
对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个DataSource组件
-
基于集合
fromCollection(Collection),主要是为了方便测试使用
-
基于文件
readTextFile(path),基于HDFS中的数据进行计算分析
2.2 Transformation
Flink针对DataSet也提供了大量的已经实现的算子,和DataStream计算很类似
- Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
- FlatMap:输入一个元素,可以返回0个、1个或者多个元素
- Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
- Reduce:对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
- Aggregations:sum()、min()、max()等
2.3 Sink
Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示
-
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
-
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
-
print()/pringToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1节中表所示