支持的数据类型

Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类:

  • Primitives(原始数据类型)
  • Java和Scala的Tuples(元组)
  • Scala的样例类
  • POJO类型
  • 一些特殊的类型

接下来让我们一探究竟。

Primitives

Java和Scala提供的所有原始数据类型都支持,例如Int(Java的Integer),String,Double等等。下面举一个例子:

DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L);
numbers.map(n -> n + 1);

Tuples

元组是一种组合数据类型,由固定数量的元素组成。

Flink为Java的Tuple提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。

DataStream<Tuple2<String, Integer>> persons = env
  .fromElements(
    Tuple2.of("Adam", 17),
    Tuple2.of("Sarah", 23)
  );

persons.filter(p -> p.f1 > 18);

Tuple的元素可以通过它们的public属性访问——f0,f1,f2等等。或者使用getField(int pos)方法来访问,元素下标从0开始:

import org.apache.flink.api.java.tuple.Tuple2

Tuple2<String, Integer> personTuple = Tuple2.of("Alex", 42);
Integer age = personTuple.getField(1); // age = 42

不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子:

personTuple.f1 = 42; // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43

POJO

POJO类的定义:

  • 公有类
  • 无参数的公有构造器
  • 所有的字段都是公有的,可以通过getters和setters访问。
  • 所有字段的数据类型都必须是Flink支持的数据类型。

举个例子:

public class Person {
  public String name;
  public int age;

  public Person() {}

  public Person(String name, int age) {
    this.name = name;
    this.age = age;
  }
}

DataStream<Person> persons = env.fromElements(
  new Person("Alex", 42),
  new Person("Wendy", 23)
);

其他数据类型

  • Array, ArrayList, HashMap, Enum
  • Hadoop Writable types