I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.

与Scala相比,我更喜欢Python。但是,由于Spark是用Scala本地编写的,我希望我的代码在Scala中比Python版本运行得更快,原因很明显。

With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6 so that every core has something to process.

有了这个假设,我想学习并编写Scala版本的一些非常常见的数据预处理代码。数据来自于Kaggle上的春季叶竞赛。只是对数据进行概述(它包含1936年的维度和145232行)。数据由各种类型组成,例如int, float, string, boolean。我在8个内核中有6个用于火花加工;这就是为什么我使用minpartition =6,这样每个内核都有可以处理的东西。

Scala Code

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python Code

Python代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Stage 0 (38 mins), Stage 1 (18 sec)

第0阶段(38分钟),第1阶段(18秒)

Python Performance Stage 0 (11 mins), Stage 1 (7 sec)

Python性能阶段0(11分钟),第1阶段(7秒)

Both produces different DAG visualization graphs (due to which both pictures show different stage 0 functions for Scala (map) and Python (reduceByKey))

两者都生成不同的DAG可视化图(由于这两个图显示了Scala (map)和Python (reduceByKey)的不同阶段0函数)

But, essentially both code tries to transform data into (dimension_id, string of list of values) RDD and save to disk. The output will be used to compute various statistics for each dimension.

但是,实际上,这两段代码都试图将数据转换为RDD并保存到磁盘。输出将用于计算每个维度的各种统计信息。

Performance wise, Scala code for this real data like this seems to run 4 times slower than the Python version. Good news for me is that it gave me good motivation to stay with Python. Bad news is I didn't quite understand why?

就性能而言,象这样的真实数据的Scala代码运行速度似乎比Python版本慢4倍。对我来说,好消息是它给了我继续使用Python的良好动机。坏消息是我不太明白为什么?

1 个解决方案

#1


251


The original answer discussing the code can be found below.

讨论代码的原始答案可以在下面找到。


First of all, you have to distinguish between different types of API, each with its own performance considerations.

首先,您必须区分不同类型的API,每个API都有自己的性能考虑因素。

RDD API

(pure Python structures with JVM based orchestration)

(使用基于JVM的编排的纯Python结构)

This is the component which will be most affected by the performance of the Python code and the details of PySpark implementation. While Python performance is rather unlikely to be a problem, there at least few factors you have to consider:

这是最受Python代码性能和PySpark实现细节影响的组件。虽然Python的性能不太可能成为问题,但至少有几个因素需要考虑:

  • Overhead of JVM communication. Practically all data that comes to and from Python executor has to be passed through a socket and a JVM worker. While this is a relatively efficient local communication it is still not free.
  • JVM通信的开销。实际上,所有涉及和来自Python执行程序的数据都必须通过套接字和JVM worker传递。虽然这是一种相对高效的本地通信,但它仍然不是免费的。
  • Process-based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect, it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage:

    基于进程的执行器(Python)和基于线程的(单个JVM多个线程)执行器(Scala)。每个Python执行器都在自己的进程中运行。作为一个副作用,它提供了比JVM对等物更强的隔离,以及对执行程序生命周期的一些控制,但可能显著提高内存使用:

    • interpreter memory footprint
    • 翻译内存占用
    • footprint of the loaded libraries
    • 载入库的内存占用
    • less efficient broadcasting (each process requires its own copy of a broadcast)
    • 低效率的广播(每个过程都需要自己的广播拷贝)
  • Performance of Python code itself. Generally speaking Scala is faster than Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally, if you don't use ML / MLlib (or simply NumPy stack), consider using PyPy as an alternative interpreter. See SPARK-3094.

    Python代码本身的性能。一般来说,Scala比Python要快,但是在不同的任务中会有所不同。此外,您有多个选项,包括JITs,如Numba、C扩展(Cython)或像Theano这样的专门库。最后,如果不使用ML / MLlib(或简单地使用NumPy堆栈),可以考虑使用PyPy作为替代解释器。看到火花- 3094。

  • PySpark configuration provides the spark.python.worker.reuse option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collection (it is more an impression than a result of systematic tests), while the former one (default) is optimal for in case of expensive broadcasts and imports.
  • PySpark配置提供了spark.python.worker。重用选项,可用于在为每个任务分配Python进程和重用现有进程之间进行选择。后一种选择似乎对避免昂贵的垃圾收集很有用(它更像是一种印象,而不是系统测试的结果),而前者(默认)是最优的,以防昂贵的广播和进口。
  • Reference counting, used as the first line garbage collection method in CPython, works pretty well with typical Spark workloads (stream-like processing, no reference cycles) and reduces the risk of long GC pauses.
  • 引用计数,作为CPython中的第一行垃圾收集方法,在典型的Spark工作负载(类似流的处理,没有引用循环)中工作得非常好,并且降低了长时间GC暂停的风险。

MLlib

(mixed Python and JVM execution)

(混合Python和JVM执行)

Basic considerations are pretty much the same as before with a few additional issues. While basic structures used with MLlib are plain Python RDD objects, all algorithms are executed directly using Scala.

基本的考虑事项与以前几乎相同,只是有一些附加的问题。虽然MLlib使用的基本结构是普通的Python RDD对象,但所有算法都直接使用Scala执行。

It means an additional cost of converting Python objects to Scala objects and the other way around, increased memory usage and some additional limitations we'll cover later.

这意味着将Python对象转换为Scala对象的额外成本,以及内存使用的增加,以及我们稍后将介绍的一些额外限制。

As of now (Spark 2.x), the RDD-based API is in a maintenance mode and is scheduled to be removed in Spark 3.0.

目前(Spark 2.x),基于rdd的API处于维护模式,计划在Spark 3.0中删除。

DataFrame API and Spark ML

(JVM execution with Python code limited to the driver)

(Python代码仅限于驱动程序的JVM执行)

These are probably the best choice for standard data processing tasks. Since Python code is mostly limited to high-level logical operations on the driver, there should be no performance difference between Python and Scala.

这些可能是标准数据处理任务的最佳选择。由于Python代码主要局限于驱动程序上的高级逻辑操作,所以Python和Scala之间应该没有性能差异。

A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution. This may improved in the future with introduction of the vectorized UDFs (SPARK-21190).

唯一的例外是使用行级的Python udf,这种udf比它们的Scala对等物的效率要低得多。虽然有一些改进的机会(在Spark 2.0.0中有大量的开发),但是最大的限制是内部表示(JVM)和Python解释器之间的完整往返。如果可能的话,您应该支持内置表达式的组合(例如)。Python UDF行为在Spark 2.0.0中得到了改进,但是与本机执行相比,它仍然是次优的。这可能在将来随着矢量化udf (SPARK-21190)的引入而有所改进。

Also be sure to avoid unnecessary passing data between DataFrames and RDDs. This requires expensive serialization and deserialization, not to mention data transfer to and from Python interpreter.

还要确保避免在DataFrames和RDDs之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。

It is worth noting that Py4J calls have pretty high latency. This includes simple calls like:

值得注意的是Py4J调用具有相当高的延迟。这包括简单的调用,如:

from pyspark.sql.functions import col

col("foo")

Usually, it shouldn't matter (overhead is constant and doesn't depend on the amount of data) but in the case of soft real-time applications, you may consider caching/reusing Java wrappers.

通常,它不重要(开销是常量,不依赖于数据量),但是在软实时应用程序中,您可以考虑使用Java包装器进行缓存/重用。

GraphX and Spark DataSets

As for now (Spark 1.6 2.1) neither one provides PySpark API so you can say that PySpark is infinitely worse than Scala.

至于现在(Spark 1.6 2.1),没有人提供PySpark API,因此您可以说PySpark比Scala糟糕得多。

GraphX

In practice, GraphX development stopped almost completely and the project is currently in the maintenance mode with related JIRA tickets closed as won't fix. GraphFrames library provides an alternative graph processing library with Python bindings.

在实践中,GraphX开发几乎完全停止,项目目前处于维护模式,相关的JIRA票据关闭,无法修复。GraphFrames库提供了一个具有Python绑定的替代图形处理库。

Dataset

Subjectively speaking there is not much place for statically typed Datasets in Python and even if there was the current Scala implementation is too simplistic and doesn't provide the same performance benefits as DataFrame.

主观地说,在Python中,静态类型的数据集没有太多的地方,即使当前的Scala实现过于简单,也不能提供与DataFrame相同的性能优势。

Streaming

From what I've seen so far, I would strongly recommend using Scala over Python. It may change in the future if PySpark gets support for structured streams but right now Scala API seems to be much more robust, comprehensive and efficient. My experience is quite limited.

就我目前所见,我强烈建议使用Scala而不是Python。如果PySpark得到了对结构化流的支持,它在未来可能会改变,但是现在Scala API看起来更加健壮、全面和高效。我的经验相当有限。

Structured streaming in Spark 2.x seem to reduce the gap between languages but for now it is still in its early days. Nevertheless, RDD based API is already referenced as "legacy streaming" in the Databricks Documentation (date of access 2017-03-03)) so it reasonable to expect further unification efforts.

星火2中的结构化流。x似乎减少了语言之间的差异,但目前还处于早期阶段。尽管如此,基于RDD的API在Databricks文档中已经被引用为“遗留流”(访问日期为2017-03-03),因此有理由期待进一步的统一工作。

Non-performance considerations

Feature parity

Not all Spark features are exposed through PySpark API. Be sure to check if the parts you need are already implemented and try to understand possible limitations.

不是所有的Spark特性都是通过PySpark API公开的。确保检查您需要的部件是否已经实现,并尝试理解可能的限制。

It is particularly important when you use MLlib and similar mixed contexts (see Calling Java/Scala function from a task). To be fair some parts of the PySpark API, like mllib.linalg, provides a more comprehensive set of methods than Scala.

当您使用MLlib和类似的混合上下文(参见从任务调用Java/Scala函数)时,这一点尤为重要。公平地说,PySpark API的某些部分,比如mllib。linalg提供了比Scala更全面的方法集。

API design

The PySpark API closely reflects its Scala counterpart and as such is not exactly Pythonic. It means that it is pretty easy to map between languages but at the same time, Python code can be significantly harder to understand.

PySpark API紧密地反映了它的Scala对等物,因此并不完全是python的。这意味着在语言之间进行映射非常容易,但是同时,Python代码可能会非常难于理解。

Complex architecture

PySpark data flow is relatively complex compared to pure JVM execution. It is much harder to reason about PySpark programs or debug. Moreover at least basic understanding of Scala and JVM in general is pretty much a must have.

与纯粹的JVM执行相比,PySpark数据流相对复杂。对PySpark程序或调试进行推理要难得多。此外,至少基本了解Scala和JVM是必须的。

Spark 2.x and beyond

Ongoing shift towards Dataset API, with frozen RDD API brings both opportunities and challenges for Python users. While high level parts of the API are much easier to expose in Python, the more advanced features are pretty much impossible to be used directly.

随着冻存的RDD API不断向Dataset API转变,这给Python用户带来了机遇和挑战。虽然API的高级部分更容易在Python中公开,但更高级的特性几乎不可能直接使用。

Moreover native Python functions continue to be second class citizen in the SQL world. Hopefully this will improve in the future with Apache Arrow serialization (current efforts target data collection but UDF serde is a long term goal).

此外,本地Python函数仍然是SQL世界中的二等公民。希望将来Apache Arrow序列化能够改进这一点(目前的工作目标是数据收集,但UDF serde是一个长期目标)。

For projects strongly depending on the Python codebase, pure Python alternatives (like Dask or Ray) could be an interesting alternative.

对于强烈依赖于Python代码基的项目,纯Python替代方案(如Dask或Ray)可能是一个有趣的替代方案。

It doesn't have to be one vs. the other

The Spark DataFrame (SQL, Dataset) API provides an elegant way to integrate Scala/Java code in PySpark application. You can use DataFrames to expose data to a native JVM code and read back the results. I've explained some options somewhere else and you can find a working example of Python-Scala roundtrip in How to use a Scala class inside Pyspark.

Spark DataFrame (SQL, Dataset) API为在PySpark应用程序中集成Scala/Java代码提供了一种优雅的方式。您可以使用DataFrames将数据公开到本地JVM代码中,并读取结果。我已经在其他地方解释了一些选项,您可以找到Python-Scala roundtrip的一个工作示例,在Pyspark中如何使用Scala类。

It can be further augmented by introducing User Defined Types (see How to define schema for custom type in Spark SQL?).

通过引入用户定义的类型(参见如何在Spark SQL中定义自定义类型的模式?)


What is wrong with code provided in the question

(Disclaimer: Pythonista point of view. Most likely I've missed some Scala tricks)

(免责声明:毕达哥拉斯的观点。我很可能错过了一些Scala技巧)

First of all, there is one part in your code which doesn't make sense at all. If you already have (key, value) pairs created using zipWithIndex or enumerate what is the point in creating string just to split it right afterwards? flatMap doesn't work recursively so you can simply yield tuples and skip following map whatsoever.

首先,代码中有一个部分根本没有意义。如果已经有(key, value)对使用zipWithIndex创建或者枚举创建字符串的意义是什么?flatMap不会递归地工作,所以您可以简单地生成元组并跳过任何映射。

Another part I find problematic is reduceByKey. Generally speaking, reduceByKey is useful if applying aggregate function can reduce the amount of data that has to be shuffled. Since you simply concatenate strings there is nothing to gain here. Ignoring low-level stuff, like the number of references, the amount of data you have to transfer is exactly the same as for groupByKey.

我发现问题的另一个部分是还原键。一般来说,如果应用聚合函数可以减少需要重新排列的数据量,reduceByKey是有用的。因为你只是连接字符串,所以这里没有什么可获得的。忽略底层的内容,比如引用的数量,您必须传输的数据量与groupByKey完全相同。

Normally I wouldn't dwell on that, but as far as I can tell it is a bottleneck in your Scala code. Joining strings on JVM is a rather expensive operation (see for example: Is string concatenation in scala as costly as it is in Java?). It means that something like this _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) which is equivalent to input4.reduceByKey(valsConcat) in your code is not a good idea.

通常我不会详细讨论这个问题,但就我所知,这是Scala代码的瓶颈。在JVM上连接字符串是一项相当昂贵的操作(参见:scala中的字符串连接是否与Java中一样昂贵?)它的意思是像这样的东西。(v1: String, v2: String) => v1 + ',' + v2),相当于input4.reduceByKey(valsConcat)在你的代码中不是一个好主意。

If you want to avoid groupByKey you can try to use aggregateByKey with StringBuilder. Something similar to this should do the trick:

如果您想避免groupByKey,可以尝试使用StringBuilder的aggregateByKey。类似的事情应该可以达到这个目的:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

but I doubt it is worth all the fuss.

但我怀疑这一切值得大惊小怪。

Keeping the above in mind, I've rewritten your code as follows:

记住以上内容,我重写了您的代码如下:

Scala:

Scala:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python:

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Results

In local[6] mode (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz) with 4GB memory per executor it takes (n = 3):

在本地[6]模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行程序占用4GB内存(n = 3):

  • Scala - mean: 250.00s, stdev: 12.49
  • Scala -平均值:250.00s, stdev: 12.49
  • Python - mean: 246.66s, stdev: 1.15
  • Python -平均值:246.66秒,stdev: 1.15

I am pretty sure that most of that time is spent on shuffling, serializing, deserializing and other secondary tasks. Just for fun, here's naive single-threaded code in Python that performs the same task on this machine in less than a minute:

我确信大部分时间都花在了拖放、序列化、反序列化和其他次要任务上。有趣的是,下面是Python中的简单的单线程代码,它在不到一分钟的时间内在这台机器上执行相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

更多相关文章

  1. Python使用pandas对数据进行差分运算
  2. 利用Python进行数据分析-- 学习心得(汇总)
  3. python python 入门学习之网页数据爬虫cnbeta文章保存
  4. python尝试自定义数据结构不知道怎么下手
  5. 28.mysql数据库之查询
  6. Python_基础(命名,数据类型,循环)
  7. 支持c和python之间的跨语言(c)标记的代码编辑器
  8. 尝试使用python字典重新格式化JSON数据
  9. 用 Python requests库 爬取网页数据

随机推荐

  1. appium測試中验证toast的正确性
  2. Android压缩
  3. 代码中如何设置TextView为不可见
  4. Android判断当前线程是否是主线程的方法
  5. Android(安卓)OpenGL ES(八)----纹理编程
  6. android-passwordsafe - Android Passwor
  7. Android判断横屏竖屏代码
  8. Android Studio v1.0 项目无法运行
  9. Android Unable to find instrumentation
  10. Android使用Aidl实现跨进程通信