- 不仅仅是流计算:Apache Flink实践
- InfoQ中文站
- 1959字
- 2020-06-26 06:08:27
Apache Flink类型和序列化机制简介

使用Apache Flink(以下简称Flink)编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么Flink有那么多的类型声明方式?
BasicTypeInfo.STRING TYPE INFO、Types.STRING 、Types.STRING()有何区别?
TypeInfoFactory又是什么?
TypeInformation.of和TypeHint是如何使用的呢?
接下来本文将逐步解密Flink的类型和序列化机制。
Flink的类型分类
Flink的类型系统源码位于org.apache.flink.api.common.typeinfo包,让我们对图1深入追踪,看一下类的继承关系图:

图1:Flink类型分类

图2:TypeInformation类继承关系图

图3:使用.returns方法声明返回类型

图4:Flink-ML注册子类类型信息

图5:Flink允许注册自定义类型

图6:class对象作为参数

图7:TypeHint作为参数,保存泛型信息

图8:BasicTypeInfo快捷方式

图9:使用BasicTypeInfo快捷方式来声明一行(Row)每个字段的类型信息

图10:Types类

图11:flink-table模块的Types类

图12:为自定义类提供类型支持(图片未展示全部字段)

图13:Flink自带的TypeSerializer子类概览

图14:为Kryo增加自定义的Serializer

图15:为Kryo增加自定义的Serializer

图16:类型信息到内存块

图17:StringSerializer类的serialize()方法

图18:String对象的序列化过程
可以看到,图1和图2是一一对应的,TypeInformation类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随Flink的作业提交,被传递给每个执行节点。
由于Flink自己管理内存,采用了一种非常紧凑的存储格式(见官方博文),因而类型信息在整个数据处理流程中属于至关重要的元数据。
TypeExtractror类型提取
Flink内部实现了名为TypeExtractror的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于Java的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过URLClassLoader动态加载的类),仍需手动处理;例如下图中对DataSet变换时,使用.returns()方法声明返回类型。
这里需要说明一下,returns()接受三种类型的参数:字符串描述的类名(例如"String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java原生Class(例如String.class)等;不过字符串形式的用法即将废弃,如果确实有必要,请使用Class.forName()等方法来解决。
下面是ExecutionEnvironment类的registerType方法,它可以向Flink注册子类信息(Flink认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是Flink-ML机器学习库代码的例子:
从下图可以看到,如果通过TypeExtractor.createTypeInfo(type)方法获取到的类型信息属于PojoTypeInfo及其子类,那么将其注册到一起;否则统一交给Kryo去处理,Flink并不过问(这种情况下性能会变差)。
声明类型信息的常见手段
通过TypeInformation.of()方法,可以简单地创建类型信息对象。
1.对于非泛型的类,直接传入Class对象即可
2.对于泛型类,需要借助TypeHint来保存泛型类型信息
TypeHint的原理是创建匿名子类,运行时TypeExtractor可以通过getGenericSuperclass(). getActualTypeArguments()方法获取保存的实际类型。
3.预定义的快捷方式
例如BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明,可以直接使用。
例如下面是对Row类型各字段的类型声明,使用方法非常简明,不再需要new XxxTypeInfo<>(很多很多参数)
当然,如果觉得BasicTypeInfo还是太长,Flink还提供了完全等价的Types类(org.apache.flink.api.common.typeinfo.Types):
特别需要注意的是,flink-table模块也有一个Types类(org.apache.flink.table.api.Types),用于table模块内部的类型定义信息,用法稍有不同。使用IDE的自动import时一定要小心:
4.自定义TypeInfo和TypeInfoFactory
通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo方法。
注意需要继承TypeInformation类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是Tuple(isTupleType)、元数(对于一维的Row类型,等于字段的个数)等等,从而为TypeExtractor提供决策依据。
更多示例,请参考Flink源码的org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
TypeSerializer
Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承TypeSerializer及其子类以实现自己的序列化器。
Kryo序列化
对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfo和TypeInfoFactory),默认会交给Kryo处理。
如果Kryo仍然无法处理(例如Guava、Thrift、Protobuf等第三方库的一些类),有以下两种解决方案:
1.可以强制使用Avro来替代Kryo:
env.getConfig().enableForceAvro(); // env代表ExecutionEnvironment对象,下同
2.为Kryo增加自定义的Serializer以增强Kryo的功能:
env.getConfig().addDefaultKryoSerializer(Class<? > type, Class<? extends Serializer<? >> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<? > type, T serializer)
如果希望完全禁用Kryo(100% 使用Flink的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
类型机制的陷阱与缺陷
金无足赤,人无完人。Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
1.Lambda函数的类型提取
由于Flink类型提取依赖于继承等机制,而lambda函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse的JDT编译器会把lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
2.Kryo的JavaSerializer在Flink下存在Bug
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非
com.esotericsoftware.kryo.serializers.JavaSerializer以防止与Flink不兼容。
类型机制与内存管理
下面以StringSerializer为例,来看下Flink是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink对于内存管理是非常细致的,层次分明,代码也容易理解。