必读|spark的重分区及排序

浪尖 浪尖聊大数据

前几天,有人在星球里,问了一个有趣的算子,也即是RepartitionAndSortWithinPartitions。当时浪尖也在星球里讲了一下,整个关于分区排序的内容。今天,在这里给大家分享一下。

昨天说了,mapPartitions 的使用技巧。大家应该都知道mapPartitions值针对整个分区执行map操作。而且对于PairRDD的分区默认是基于hdfs的物理块,当然不可分割的话就是hdfs的文件个数。但是我们也可以给partitionBy 算子传入HashPartitioner,来给RDD进行重新分区,而且会使得key的hashcode相同的数据落到同一个分区。

spark 1.2之后引入了一个高质量的算子repartitionAndSortWithinPartitions 。该算子为spark的Shuffle增加了sort。假如,后面再跟mapPartitions算子的话,其算子就是针对已经按照key排序的分区,这就有点像mr的意思了。与groupbykey不同的是,数据不会一次装入内存,而是使用迭代器一次一条记录从磁盘加载。这种方式最小化了内存压力。

repartitionAndSortWithinPartitions 也可以用于二次排序。

下面举个简单的例子。

import org.apache.spark.Partitioner class KeyBasePartitioner(partitions: Int) extends Partitioner {   override def numPartitions: Int = partitions   override def getPartition(key: Any): Int = {     val k = key.asInstanceOf[Int]     Math.abs(k.hashCode() % numPartitions)   } } import org.apache.spark.SparkContext._     sc.textFile("file:///opt/hadoop/spark-2.3.1/README.md").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).map(each=>(each._2,each._1))     implicit val caseInsensitiveOrdering = new Ordering[Int] {      override def compare(a: Int, b: Int) = b.compareTo(a)     }     // Sort by key, using  res7.repartitionAndSortWithinPartitions(new KeyBasePartitioner(3)).saveAsTextFile("file:///opt/output/")

结果,可以看到每个分区都是有效的。

mdhdeMacBook-Pro-3:output mdh$ pwd/opt/outputmdhdeMacBook-Pro-3:output mdh$ ls_SUCCESS        part-00000      part-00001      part-00002mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00000 (24,the)(12,for)(9,##)(9,and)(6,is)(6,in)(3,general)(3,documentation)(3,example)(3,how)mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00001(16,Spark)(7,can)(7,run)(7,on)(4,build)(4,Please)(4,with)(4,also)(4,if)(4,including)mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00002(47,)(17,to)(8,a)(5,using)(5,of)(2,Python)(2,locally)(2,This)(2,Hive)(2,SparkPi)mdhdeMacBook-Pro-3:output mdh$

上面只是一个简单的使用,关于二次排序及高效结合mapPartitions的例子,浪尖会在这两天更新到星球里。
【完】

©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Spark Core读取ES的分区问题分析
  2. 尝尝鲜|Spark 3.1自适应执行计划
  3. Spark join种类(>3种)及join选择依据
  4. spark改七行源码实现高效处理kafka数据积压
  5. spark源码阅读基本思路
  6. 硬盘分区属性0字节怎么恢复?
  7. 内存卡出现“分区变0字节”的解决办法
  8. Linux运维教程-Linux系统磁盘管理
  9. 划分分区组成逻辑卷和扩展根分区

随机推荐

  1. 80端口占用异常解决方法java.net.BindExc
  2. Java中的数据类型
  3. 基于Java的应用程序的GUI测试工具
  4. java.lang.NoSuchMethodException:在strut
  5. jsp中如何使用javabeans,如何使用一个已经
  6. r项目:xlsx包安装失败(由于java问题)
  7. JDK1.5到1.7的进化
  8. Java区分大小写字母数字和符号
  9. JAVAWEB网站开发,一对多,多对一,主表与子表(
  10. Java项目中的classpath