Flink计算PV,UV的案例及问题分析
Flink计算PV,UV的案例及问题分析
浪院长 浪尖聊大数据
PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。
UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。
一个UV可以用很多PV,一个PV也只能对应一个IP
没有这些数据的支持,意味着你不知道产品的发展情况,用户获取成本,UV,PV,注册转化率;没有这些数据做参考,你不会知道接下来提供什么建议给领导采纳,也推测不出领导为啥烦忧,那么就么有任何表现的机会。
举两个UV计算的场景:
- 实时计算当天零点起,到当前时间的uv。
- 实时计算当天每个小时的UV。0点...12点...24点
请问这个用spark streaming如何实现呢?是不是很难有好的思路呢?
今天主要是想给大家用flink来实现一下,在这方面flink确实比较优秀了。
主要技术点就在group by的使用。
下面就是完整的案例:
package org.table.uv;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;public class ComputeUVDay { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.registerFunction("DateUtil",new DateUtil()); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9092") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); // 計算天級別的uv// Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)"); // 计算小时级别uv Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')"); tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() { @Override public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception { System.out.println(value.f1.toString()); } }); System.out.println(env.getExecutionPlan()); env.execute("ComputeUVDay"); }}
其中DateUtil类如下:
package org.table.uv;import org.apache.flink.table.functions.ScalarFunction;import java.sql.Timestamp;import java.text.DateFormat;import java.text.SimpleDateFormat;public class DateUtil extends ScalarFunction { public static String eval(long timestamp){ String result = "null"; try { DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); result = sdf.format(new Timestamp(timestamp)); } catch (Exception e) { e.printStackTrace(); } return result; } public static String eval(long ts, String format) { String result = "null"; try { DateFormat sdf = new SimpleDateFormat(format); result = sdf.format(ts); } catch (Exception e) { e.printStackTrace(); } return result; } public static void main(String[] args) { String eval = eval(System.currentTimeMillis(),"yyyyMMddHH"); System.out.println(eval); }}
代码里面的案例,是可以用于生产中的吗?
假如数据量小可以直接使用,每秒数据量大的话,就比较麻烦。因为你看group by后面的维度,只有当天date 这个维度,这样就会导致计算状态超级集中而使得内存占用超大进而引发oom。
这种情况解决办法就是将状态打散,然后再次聚合即可,典型的分治思想。
具体做法作为福利分享给球友吧。
还有一个问题就是由于存在全局去重及分组操作,flink内部必然要维护一定的状态信息,那么这些状态信息肯定不是要一直保存的,比如uv,我们只需要更新今天,最多昨天的状态,这个点之前的状态要删除的,不能让他白白占着内存,而导致任务内存消耗巨大,甚至因oom而挂掉。
StreamQueryConfig streamQueryConfig = tEnv.queryConfig();streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));tEnv.sqlUpdate(sql,streamQueryConfig);
再有就是能使用事件时间吗?事件时间假如事件严重超时了,比如,我们状态保留时间设置的是两天,两天之后状态清除,那么这时候来了事件时间刚刚好是两天之前的,由于已经没有状态就会重新计算uv覆盖已经生成的值,就导致值错误了,这个问题如何解决呢?
这算是一个疑问吧?
©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任更多相关文章
- flink sql使用中的一个问题
- 计算折旧月数,实用才是硬道理
- 获取yarn上APP的状态案例
- 大数据和云计算技术周报(第106期)
- 大数据和云计算技术周报(第107期)
- 大数据和云计算技术周报(第102期)
- 大数据和云计算技术周报(第104期)
- U盘显示操作无法完成,因为磁盘管理控制台视图不是最新状态怎么办
- iview的table合并相同的单元格