flink sql使用中的一个问题

浪尖 浪尖聊大数据

最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。

对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。

而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?

先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。

这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。

sql代码如下,供大家测试参考

package org.table.kafka;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("jsontest")                        .property("bootstrap.servers", "localhost:9093")                        .property("group.id","test")                        .startFromLatest()        )                .withFormat(                        new Json()                                .failOnMissingField(false)                                .deriveSchema()                )                .withSchema(                        new Schema()                                .field("rowtime",Types.SQL_TIMESTAMP)                                .rowtime(new Rowtime()                                        .timestampsFromField("eventtime")                                        .watermarksPeriodicBounded(2000)                                )                                .field("fruit", Types.STRING)                                .field("number", Types.INT)                )                .inAppendMode()                .registerTableSource("source");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "localhost:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "localhost:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink1");        Table table = tEnv.sqlQuery("select * from source");        tEnv.registerTable("view",table);        tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        System.out.println(env.getExecutionPlan());//        env.execute();    }}

可视化页面链接:
https://flink.apache.org/visualizer/

使用的过程中避免重要的账号密码被泄露。

©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 计算折旧月数,实用才是硬道理
  2. spark面试该准备点啥
  3. 获取yarn上APP的状态案例
  4. 关于浪尖小蜜圈的一些说明
  5. 浪尖聊聊大数据从业者的迷茫及解决方案
  6. 大数据和云计算技术周报(第106期)
  7. 大数据和云计算技术周报(第107期)
  8. 社区版本idea查看继承关系的骚操作
  9. 大数据和云计算技术周报(第102期)

随机推荐

  1. 使用file_put_contents()创建及向文档内
  2. 面向对象的程序设计语言是一种什么语言
  3. Dockerfile构建PHP镜像
  4. phpcms模板怎么安装
  5. 面向对象的方法是什么意思
  6. PHP多进程、信号量及孤儿进程和僵尸进程
  7. print不是函数
  8. 怎么搭建php开发环境配置
  9. Swoole自定义项目初始化事件处理的实现
  10. PHP怎么把JSON转换成数组?