flink自定义trigger-实现窗口随意输出

浪尖 浪尖聊大数据

前面,一篇简单讲了flink的窗口及与Spark Streaming窗口之间的对比。
对于flink的窗口操作,尤其是基于事件时间的窗口操作,大家还要掌三个个重要的知识点:

  1. 窗口分配器:就是决定着流入flink的数据,该属于哪个窗口。

  2. 时间戳抽取器/watermark生成器:抽取时间戳并驱动着程序正常执行。

  3. trigger:决定着数据啥时候落地。

flink有很多内置的触发器,对于基于事件时间的窗口触发器叫做
EventTimeTrigger。其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。
那么可能你没留意前面说的,为啥需要trigger,因为没有trigger的话,存在允许事件滞后的时候,输出时间延迟比较大,而我们需要尽早看到数据,那么这个时候就可以自己定义窗口触发。
自定义触发器
修改自基于处理时间的触发器,源码如下:

package org.trigger;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class CustomProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private CustomProcessingTimeTrigger() {}private static int flag = 0;@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());// CONTINUE是代表不做输出,也即是,此时我们想要实现比如100条输出一次,// 而不是窗口结束再输出就可以在这里实现。if(flag > 9){flag = 0;return TriggerResult.FIRE;}else{flag++;}System.out.println("onElement : "+element);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window,OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/*** Creates a new trigger that fires once system time passes the end of the window.*/public static CustomProcessingTimeTrigger create() {return new CustomProcessingTimeTrigger();}}

主要实现逻辑是在onElement函数,实现的逻辑是增加了每10个元素触发一次计算结果输出的逻辑。

主函数

package org.trigger;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.AllWindowedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;/*trigger 测试滚动窗口,20s然后是trigger内部技术,10个元素输出一次。*/public class kafkaSourceTriggerTest {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9093");properties.setProperty("group.id", "test");FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test",new SimpleStringSchema(),properties);AllWindowedStream<Integer, TimeWindow> stream = env.addSource(kafkaConsumer010).map(new String2Integer()).timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(20)).trigger(CustomProcessingTimeTrigger.create());stream.sum(0).print();env.execute("Flink Streaming Java API Skeleton");}private static class String2Integer extends RichMapFunction<String, Integer> {private static final long serialVersionUID = 1180234853172462378L;@Overridepublic Integer map(String event) throws Exception {return Integer.valueOf(event);}@Overridepublic void open(Configuration parameters) throws Exception {}}}

代码测试,通过的哦。

©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. shell scripts之算数运算详解
  2. 苹果macOS 常用快捷键分享
  3. iMove 如何输出影片、mp4 档案格式及调整画质?
  4. 练习2-3 输出倒三角图案 (5分)
  5. spark streaming窗口及聚合操作后如何管理offset
  6. Spring Boot利用Logback输出日志到指定位置的简单配置及使用方式
  7. php的初步认识与常用数据类型
  8. C++入门第一课(命名空间;输入输出;缺省参数;函数重载)
  9. MySQL基础知识——管理和连接

随机推荐

  1. Android获取当前网络状态
  2. Android清单文件属性大全
  3. Android:控件AutoCompleteTextView 自动提
  4. android webview web里面的数据透传到jav
  5. android 读取SQLite android could not o
  6. android 版本更新
  7. android 打开各种文件(setDataAndType)
  8. 自定义TabActivity,TabActivity的美化
  9. Android 保活后台启动Service 8.0踩坑记
  10. 解析Android消息处理机制:Handler/Thread/