本文来自上周(2020-11-17至2020-11-19)举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为《Spark SQL Beyond Official Documentation》的分享,作者 David Vrba,是 Socialbakers 的高级机器学习工程师。

实现高效的 Spark 应用程序并获得最大的性能为目标,通常需要官方文档之外的知识。理解 Spark 的内部流程和特性有助于根据内部优化设计查询,从而在执行期间实现高效率。在这次演讲中,我们将重点讨论 Spark SQL 的一些官方文档中没有很好描述的内部特性,重点在一些基本示例上解释这些特性,同时分享一些性能技巧。

下面是本次分享的超清视频:


本次分享主要有两大主题:

Spark SQL 中的统计信息(Statistics)以排序的方式将数据存储在文件中

Spark SQL 中的统计信息(Statistics)

统计信息这块的分享主要包括以下四个方面:

我们怎么可以看到统计信息统计信息是如何计算的统计信息是怎么使用有什么注意事项?


在 Spark 中,我们可以通过以上方法来查看表级别的统计信息、,输出的信息里面有个 Statistics 那行,那就是本次分享要提到的统计信息。

如果想查看列级别的统计信息,可以使用 DESCRIBE EXTENDED table_name column_name 进行查看。输出的信息可以看到这列的最大最小值、null 的数量、去重之后的值数量等信息。

如果我们使用的是 Apache Spark 3,我们还可以通过 explain(mode="cost") 来查看统计信息:


统计信息是通过执行计划树的叶子节点计算的,然后从树的最底层往上传递,同时 Spark 也会利用这些统计信息来修改执行树的执行过程。统计信息的传递主要有两种方法:最简单的方法(Simple way)以及高级方法。

上面就是统计信息的最简单的传递方法,这种方法只传递 sizeInBytes。从右上图可以看出,我们通过 user_id 0 的过滤条件来过滤数据,其实表中肯定没有用户的 user_id 小于 0 ,但是我们可以通过逻辑计划看出,简单的统计信息传递,Filter 节点的统计信息的 sizeInBytes 和 Relation 节点是一样。

高级传递方式的特点是可以传递 sizeInBytes 信息、行数以及列级别的信息。这种传递方式必须使用 Apache Spark 2.2 版本,而且需要打开 CBO(默认没有打开,需要配置 spark.sql.cbo.enabled=true)。
上面例子可以看出,统计信息里面只有 sizeInBytes 和 rowCount,并没有列级别的统计信息,特别是 user_id 这列的统计信息。所以 Filter 节点和 Relation 节点的统计信息是一样的,因为 Spark 无法利用这些统计信息算出文件里面到底有多少满足 user_id < 0  的数据。

如果我们算出 user_id 的统计信息,可以看出,Filter 利用了这些统计信息,算出过滤后的数据行数为0。


那么,上面提到的统计信息是如何计算的呢?在 Spark 中主要有三种:

从 metastore 中获取

利用 Hadoop API 计算,仅仅计算 sizeInBytes使用 sizeInBytes 的默认值,通过 spark.sql.defaultSizeInBytes 配置,默认为 Long 的最大值。


上面是计算流程,图中的每个节点是条件,根据不同条件选择不同的路径。

在 Spark 中,统计信息主要在两种情况使用:

Join 策略选择多表 Join,调节 Join 表的顺序,这个需要打开 spark.sql.cbo.joinReorder.enable。

以上就是 Spark 统计信息相关的知识点。


以排序的方式将数据存储在文件中

以排序的方式将数据存储到文件主要涉及排序的函数有哪些,怎么将数据以排序的形式存储在文件。

上面就是 Spark 中排序的算子:

orderBy/sort 这个是 DataFrame 的转换算子,其是在不同的作业粒度对数据进行排序,需要 shuffle 来达到全局的有序;

sortWithinPartitions 这个也是 DataFrame 的转换算子,是分区粒度的排序;sortBy 这个通常是在 DataFrameWriter 上调用,并且在 write 算子调用之后调用的,一般和 bucketing 算子一起使用,需要配合使用 saveAsTable


为了有个直观的体验,我们用个例子来介绍。在这个例子中,我们使用 year 这列对数据进行分区;每个分区使用 user_id 来进行排序;每个分区只有一个文件,这个文件是通过 user_id 进行排序的。

很多人会很轻松的写出以上的程序。但是遗憾的是,最后保存在文件系统的文件并没有排序。如果需要保存的数据是有序的,需要保证 partitionColumns 有序, bucketingIdExpression 有序以及 sortColumns 有序的顺序。如果这个没有满足,则 Spark 将忽略之前的排序。

上面例子中因为我们对 year 这列进行分区,但是使用 user_id 进行排序,所以最后保存的文件肯定不会有序。

如果我们将 sortWithinPartitions('user_id') 修改为 sortWithinPartitions('year', 'user_id') 。这样最后算出的文件就是分区内有序的。


到这里我们已经学习了如何使用统计信息来提升我们的 Join 性能,以及如何以有序的方式来存储数据。

好了,今天的分享就到这里,欢迎大家转发+点赞。


©著作权归作者所有:来自51CTO博客作者mob604756e9d3bc的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 数据机构之排序算法 ————快排
  2. 冒泡排序函数
  3. C语言快排qsort()函数
  4. [face_算法篇]:常见的二分、递归、选择、插入、冒泡、快排、归并
  5. grep cut 及显示系统硬件信息命令
  6. 插入排序,选择排序
  7. 归并排序
  8. 男神毛咕噜最新Top5大作, 另外, 有序因变量依然使用OLS回归!
  9. [go-linq]-Go的.NET LINQ式查询方法

随机推荐

  1. 快看!go-carbon 1.2.2 版本发布了!新增了时
  2. 教你用golang实现直播和点播功能
  3. golang如何复用http.request.body
  4. 关于golang的make
  5. Golang如何对excel进行处理
  6. 介绍Golang序列化和反序列化
  7. 记一次因为共享变量的犯错误
  8. 关于Golang切片的三种简单使用方式及区别
  9. 国内下载 go get golang.org/x 包失败的
  10. 关于Golang panic用法详解