dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍。spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行计算,这种情况下不会spark streaming的人也都可以方便的享受到实时计算带来的好处。    

下面的示例为读取本地文件成rdd并隐式转换成dataframe对数据进行查询,最后以追加的形式写入mysql表的过程,scala代码示例如下

import java.sql.Timestampimport org.apache.spark.sql.{SaveMode, SQLContext}import org.apache.spark.{SparkContext, SparkConf}object DataFrameSql { case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{ override def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp) } def main(args:Array[String]): Unit ={ val conf = new SparkConf() conf.setMaster("local[2]")// ---------------------- //参数 spark.sql.autoBroadcastJoinThreshold 设置某个表是否应该做broadcast,默认10M,设置为-1表示禁用 //spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果 // spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom //spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //默认partition是200个 conf.setAppName("dataframe test") val sc = new SparkContext(conf) val sqc = new SQLContext(sc) val ac = sc.accumulator(0,"fail nums") val file = sc.textFile("src\\main\\resources\\000000_0") val log = file.map(lines => lines.split(" ")).filter(line =>  if (line.length != 4) { //做一个简单的过滤  ac.add(1)  false  } else true)  .map(line => memberbase(line(0).toLong, line(1),Timestamp.valueOf(line(2)), line(3).toInt)) // 方法一、利用隐式转换 import sqc.implicits._ val dftemp = log.toDF() // 转换 /*  方法二、利用createDataFrame方法,内部利用反射获取字段及其类型  val dftemp = sqc.createDataFrame(log)  */ val df = dftemp.registerTempTable("memberbaseinfo") /*val sqlcommand ="select date_format(createtime,'yyyy-MM')as mm,count(1) as nums " +  "from memberbaseinfo group by date_format(createtime,'yyyy-MM') " +  "order by nums desc,mm asc "*/ val sqlcommand="select * from memberbaseinfo" val sel = sqc.sql(sqlcommand) val prop = new java.util.Properties prop.setProperty("user","etl") prop.setProperty("password","xxx") // 调用DataFrameWriter将数据写入mysql val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // 表可以不存在 println(ac.name.get+" "+ac.value) sc.stop() }}
20160309 45386477 2012-06-12 20:13:15 90143820160309 45390977 2012-06-12 22:38:06 90103620160309 45446677 2012-06-14 21:57:39 90143820160309 45464977 2012-06-15 13:42:55 90143820160309 45572377 2012-06-18 14:55:03 90260620160309 45620577 2012-06-20 00:21:09 90260620160309 45628377 2012-06-20 10:48:05 90118120160309 45628877 2012-06-20 11:10:15 90260620160309 45667777 2012-06-21 18:58:34 90252420160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607

更多明细可以查看官方文档 Spark SQL and DataFrame Guide

更多相关文章

  1. 《Android和PHP最佳实践》官方站
  2. android用户界面之按钮(Button)教程实例汇
  3. TabHost与RadioGroup结合完成的菜单【带效果图】5个Activity
  4. Android(安卓)UI开发第十七篇——Android(安卓)Fragment实例(Lis
  5. Android——Activity四种启动模式
  6. Android布局(序章)
  7. Android发送短信方法实例详解
  8. Android(安卓)读取资源文件实例详解
  9. android 蓝牙通讯

随机推荐

  1. PHP获取来路域名 关键字
  2. PHP微信公众平台跳转网页实现定位思路 By
  3. PHP更新基于其他表的表数据
  4. 关于php的输出方式
  5. PHP判断客户端是PCweb端还是移动手机端方
  6. AJAX学习之提交表单
  7. Thinkphp5验证类的使用
  8. php 缓存output_buffering和ob_start
  9. php执行数据库查询返回json格式数据
  10. 【ecmall】解决无法上传店铺logo和banner