核心概念

  • 流(Streams)

    流分为无限与有限,无限为DataStream,有限为DataSet

  • 状态(State)

    状态是计算过程中的数据信息,在容错恢复和Checkpoint中有重要的作用,流计算在本质上是Incremental Processing,因此需要不断查询保持状态;另外,为了确保Exactly-once语义,需要数据能够写入到状态中;而持久化存储,能够保证在整个分布式系统运行失败或者挂掉的情况下做到Exactly-once,这是状态的另外一个价值

  • 时间(Time)

    时间分为EventTime, IngestionTime, ProcessingTime

    • EventTime 事件发生的时间,是流本身事件的时间。如日志发生的时间
    • IngestionTime 进入Flink的时间
    • ProcessingTime Flink执行算子运算的时间
  • API

    API主要有Table/SQL API, DataStream API, Function APIDataStream API总共有四类:

    • 第一类是对于单条记录的操作。比如筛除掉不符合要求的记录如Filter 操作,或者将每条记录都做一个转换如Map操作)

    • 第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过Window将需要的记录关联到一起进行处理

    • 第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 UnionJoinConnect等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

    • 第四类DataStream还支持与合并对称的操作,即把一个流按一定规则拆分为多个流如Split操作,每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理

Flink的整体处理流程

  1. 获取执行环境
  2. 添加数据源
  3. 对数据流执行运算
  4. 输出/存储/继续下一个流处理等

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.unknowname

import java.util.Properties
// 导入所有,不然会有一堆找不到隐函数报错
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.io.Serializable


class Rsyslog extends Serializable {
}


object DataStreamApp {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    properties.setProperty("bootstrap.servers", "128.0.255.10:9092")
    properties.setProperty("group.id", "test")
    // val consumer = new FlinkKafkaConsumer[Rsyslog]("test_rsyslog", new Rsyslog(), properties)
    val consumer = new FlinkKafkaConsumer[String]("test_rsyslog", new SimpleStringSchema(), properties)
    // 增加Souce
    val stream = env.addSource(consumer)
    // 执行算子运算
    stream.map(_.split("").mkString).print()
    // 执行
    env.execute()
  }
}