前言

Flink用来消费消息队列中的数据,在消费之后一定会需要用某种方式存起来,这里我简述一下在数据持久化中可能会遇到的坑和解决方案。 这篇文章中的代码,都经过本公司业务系统上调试过,是我们在使用Flink开发入库服务的时候踩过的一个个小坑,将它们总结起来,希望减少各位踩坑的数量。

本文会以消费Kafka为例,展示持久化到MySQL,MongoDB和HBase等数据库的思路。语言如果没有特殊的标记,一般都是Scala。 本文假定你已经搞定了Flink集群的搭建、程序的提交,同时你的数据库应该也能够支持这样规模的数据写入。

由于公司代码涉及业务,暂时不能开源,但是文中的源代码已经足够使用了。 为了避免文章过于臃肿,代码放在了

Flink持久化踩坑笔记代码gist.github.com

Github Gist 需要翻墙,而知乎的文章长度限制也不允许我放入太多代码,请借个梯子看吧。

读的时候可能会有点儿繁琐,不过请耐心读完,一定会有收获。(大牛就绕道吧)

业务定义

假设我们现在有一个DataStream[T],这个Stream以一个稳定的速度吐着消息,这个时候你需要做的很简单,就是把数据:

如果持久化到HBase:转换为Put,然后创建连接、写入如果持久化到SQL:创建SQL语句,创建连接、写入持久化到MongoDB:创建BSON对象,创建连接、写入

坑1:过于频繁的连接创建

任你是神仙一般的系统,也受不了频繁的打开和关闭连接这种操蛋的操作,可偏偏有人这么写:每次收到数据的时候打开连接,写完关闭,每当看到这种博客我都无言以对。 正常情况下,应该是创建一个单例,如果写入要并发的话应该创建一个连接池。

下面以写入HBase为例,展示一下大致的思路是怎么样的。以下是需要注意的几点:

这里继承了RichOutputFormat<T>,但其实使用RichSinkFunction<T>也是一样的,但是记得覆盖其Open方法来初始化连接。Flink官方的测试代码用的也是RichOutputFormat<T>来做。PutCollection是我公司根据业务封装的Put类,方便聚合、序列化和调试,你可以理解为Map<String,List<Put>>,其中String是TableName。之所以使用的都是Rich,因为我们把配置放在了Consul上,而Consul的地址在启动的时候指定,并且放在GlobalJobParameters里面,Rich中可以通过getRuntimeContext().getExecutionConfig().getGlobalJobParameters()获取。

代码参见:

HBaseRichOutputFormat.javagist.github.com

坑1.1:数据库配置何去何从

正常来说,我们都会把配置放在文件里面,但是这样的话修改配置就要重新打包,麻烦。 我们使用了Consul(其实用ZK、Redis之类的都是一样的)作为配置中心,主要是因为Consul有UI,手动操作比较方便。

/** * 查询Consul中的配置信息,并且按照properties的文件格式解码 * * @param key * @return */private def queryConsulProperties(key: String): Properties = {
val prop = new Properties()
prop.load(new StringReader(queryConsulString(key)))
prop}

坑2:尽可能使用ProcessingTime减少资源消耗

诚然,Flink相当牛逼的一点就是可以处理乱序数据,有EventTime这个大杀器,但是我建议能不用就不用,使用ProcessingTime将更加的高效。

你可以在处理流数据的时候使用EventTime,但是在最后的写入阶段,建议使用ProcessingTime,这样写入也会更加及时。

坑3:使用TimeWindow+CountTrigger避免峰值出现异常

写入的时候往往是批量写入,而不是单条写入,这个时候如果使用CountWindow不能保证时效性,使用TimeWindow容易造成瞬时峰值数据量过大搞崩某些程序,我们想要一个TimeCountWindow,当超时或者队列过长任意条件满足的时候触发写入。然而我没在Flink里面找到这个东西,QQ号码出售平台可以使用TimeWindow+Trigger完成这个功能。

首先,对于某个流:

someStream.timeWindow(
Time.milliseconds(1000)//这里设置你要的时间).trigger(
new TimeCountTrigger(
1000,//这里设置你要的数量 timeCharacteristic = TimeCharacteristic.ProcessingTime //用什么时间触发 ))

其中,TimeCountTrigger类的源码如下所示:

TimeCountTrigger.scalagist.github.com

这里有个小坑需要注意,Trigger的clear方法有的时候不会被自动调用,需要手动调用。

坑4:动态插入的生成和写入

你,怎么做插入? 是手撸Insert语句?是强行Parse出一个Put,还是继续用原生API的InsertMany? 如果能用一个Sink解决写入问题,你能在写入之前对语句做一些合并,这个是坠吼的。

MongoDB插入的合并

最简单了,只需要用Map<String,List<Document>>去描述插入的数据集并且做合并即可。

SQL的合并

SQL的合并比较简单,只需将同一张表的插入聚合到一起就可以了,但是如果要将不同的表的插入合并到一起,那就需要一个通用的数据格式。 这个数据结构可以被描述为:Map<String,List<Map<String,Object>>>第一个String代表表名,第二个代表字段名,第三个代表数值。

如果你的数据插入是有主键的,那么List<Map<String,Object>>可以被:Map<KVPair,List<Map<String,Object>>>来描述。KVPair自己定义。 这样的话可以合并主键相同的数据。

需要注意的是,由于每一条插入记录里面,使用Map描述以后,有些字段为空,取不到的时候需要处理返回NULL。

Object 的处理方式如下所示:

val valueToString : Any => String = {
case numberValue: Number => numberValue.toString
case intValue: Int => intValue.toString
case doubleValue: Double => "%f".format(doubleValue)
case longValue: Long => longValue.toString
case dateValue: Date => "'%s'".format(DATE_FORMAT.format(dateValue))
case strValue: String => "'%s'".format(escapeSql(strValue))
case anyType: Any => "'%s'".format(escapeSql(GSON.toJson(anyType)))}// DATE_FORMAT is SimpleDateFormat

HBase Put的合并

对于HBase来说,应该对两类东西做合并,第一个是针对同一个Table的Put操作可以合并,第二个是针对同一个Row的操作可以合并。 并且,对于同一行来说,同一个列族下,相同的Qualifier的数据应该做覆盖,在流计算期间就可以减少写入的数据量,可以带来更加高效的写入体验。

为此,我们首先写一个SealedPut类,用来替代原生的Put类,这个类在必要的时候可以直接生成Put。

SealedPut.javagist.github.com


随后我们再编写一个PutCollection,用于管理不同表的Put。

PutCollection.javagist.github.com

在加窗以后,我们可以在Process里面获取一大堆PutCollection,而我们需要做的,就是将它合成一个:

someIntKeyedWindowStreamOfPutCollection.process[PutCollection](
new ProcessWindowFunction[(PutCollection, Int), PutCollection, Int, TimeWindow] {
override def process(key: Int, context: Context, elements: Iterable[(PutCollection, Int)], out: Collector[PutCollection]): Unit = {
if (elements.size > 1) {
out.collect(
PutCollection.merge(
elements.map(_._1).toSeq: _*
)
)
} else {
elements.map(_._1).foreach(out.collect)
}
LOGGER.debug("Merging {} elements (k={}).", elements.size, key)
}
})


更多相关文章

  1. dataset与classlist的使用案例
  2. Windows还原系统后分区合并为一个分区恢复方法
  3. 学习笔记:表格小知识
  4. 表格的基本写法
  5. HTML表格标签及合并行列使用
  6. ArcPy合并相同结构的mdb数据库
  7. PyTorch的TensorBoard用法示例
  8. php两个二维数组合并,并以指定键值排序
  9. 6.Pandas 合并 concat

随机推荐

  1. Android 相关 - R无法引用
  2. 使用Android调用SOAP Webservice时连接被
  3. 2015年年终总结--迷茫中前进
  4. 这些片段在Android编程中很有用
  5. Android 屏幕分辨率
  6. 国内优秀Android学习资源
  7. Android Tab导航菜单栏--FragmentTabHost
  8. Robotium用例通过代码自动解锁屏幕
  9. Android之edittext取消默认弹出软键盘
  10. 设置自定义Dialog的大小和位置