文章目录

    • 问题背景
    • 解决过程
    • 注意事项


问题背景

kafka数据定时导入到hive,后续做数据清洗:
flume,confulent都需要单独部署服务,比较繁琐。调查其他可选方案,参考以下文章:参考资料
综合比较,camus 简单,比较方便接入。主要分两步:
1、采用mapreduce过程处理数据从kafka导入hadoop
2、hadoop数据接入hive管理。

解决过程

1、下载源码,本地构建jar包。
参考文章
camus源码
2、查看camus.properties配置文件,支持的功能选项
期间需要自定义input,output encoder,
需要配置Reader,Writer类,具体参考源码实现。
3、修改camus.properties配置项,最终结果如下:

camus.job.name=Camus-Job-Testetl.destination.path=/tmp/escheduler/root/resources/topicsetl.execution.base.path=/tmp/escheduler/root/resources/camus/execetl.execution.history.path=/tmp/escheduler/root/resources/camus/exec/history# 新增的自定义decodercamus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder# 修改写入hadoop的writeretl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProviderkafka.client.name=camus# brokerkafka.brokers=....# topickafka.whitelist.topics=topic_exec_servicecheck_prod_calConfigId_177log4j.configuration=false# 禁用压缩,deflate,snappymapred.output.compress=falseetl.output.codec=deflateetl.deflate.level=6etl.default.timezone=Asia/Shanghai

4、上传jar,properties文件,执行如下命令:实现kafka数据到hadoop的功能:

cd /home/app/transform/libs/hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

数据导入到hadoop,
5、数据从hadoop到hive,执行如下脚本:

date_string=$(date '+%Y/%m/%d/%H') partion=$(date '+%Y-%m-%d_%H')topic='topic_exec_servicecheck_prod_calConfigId_177'table_name='dwd.test_exec_servicecheck'filePath="/tmp/escheduler/root/resources/topics/$topic/hourly/"$date_string"/"hive<<EOFcreate table if not exists $table_name(   date TIMESTAMP,   node STRING,   status STRING)PARTITIONED BY(dt STRING)row format delimited fields terminated by '|'  STORED AS TEXTFILE;load data inpath '$filePath' into table $table_name partition (dt='$partion');EOF

6、配置定时调度,按小时执行。

注意事项

附自定义decoder

public class StringMessageDecoder  extends MessageDecoder<Message, String> {private static final org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class);@Overridepublic CamusWrapper<String> decode(Message message) {//log.info(message.getTopic());return new CamusWrapper<String>(new String(message.getPayload()));}}

hive input/output 支持自定义数据格式,这个是很有意义的,通常来说文本文件,分隔符分割一行,纯文本解析,最简单,但是可读性,可维护性差。
支持json格式数据写入,json处理相关jar文件 放到${HIVE_HOME}/lib目录。

©著作权归作者所有:来自51CTO博客作者feiyingw的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Nodejs 源码阅读指南
  2. [原创干货]Oracle Sharding实施教程来袭!!!
  3. Facebook是如何通过Android应用程序跟踪非注册用户
  4. 【DB笔试面试223】在Oracle中,如果丢失一个数据文件而且没有备份,
  5. 【DB笔试面试842】在Oracle中,如何启动Oracle数据库的监听日志?
  6. Greenplum6 数据库数据库学习_外部表
  7. 【DB笔试面试219】在Oracle中,如果发现有坏块,那么如何检索其它未
  8. 【DB笔试面试497】Oracle使用哪个包可以生成并传递数据库告警信
  9. 【DB笔试面试844】在Oracle中,tnsnames.ora文件的作用是什么?

随机推荐

  1. 关于PHP中sqlite3的使用
  2. PHP 生成随机红包算法
  3. 使用phpqrcode生成二维码
  4. php调试利器:FirePHP的安装与使用
  5. PHP Redis相关操作大全
  6. 高级PHP工程师必备的编码技巧及思维
  7. PHP的isset()、is_null、empty()使用总结
  8. PHP上传图片到数据库并显示
  9. PHP开发常见功能实现流程
  10. PHP 文字生成透明图片之路