flink sql使用中的一个问题
16lz
2021-03-15
flink sql使用中的一个问题
浪尖 浪尖聊大数据
最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。
对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。
而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?
先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。
这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。
sql代码如下,供大家测试参考
package org.table.kafka;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9093") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime",Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink1"); Table table = tEnv.sqlQuery("select * from source"); tEnv.registerTable("view",table); tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); System.out.println(env.getExecutionPlan());// env.execute(); }}
可视化页面链接:
https://flink.apache.org/visualizer/
使用的过程中避免重要的账号密码被泄露。
©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任更多相关文章
- 计算折旧月数,实用才是硬道理
- spark面试该准备点啥
- 获取yarn上APP的状态案例
- 关于浪尖小蜜圈的一些说明
- 浪尖聊聊大数据从业者的迷茫及解决方案
- 大数据和云计算技术周报(第106期)
- 大数据和云计算技术周报(第107期)
- 社区版本idea查看继承关系的骚操作
- 大数据和云计算技术周报(第102期)