Apache Spark是目前处理和使用大数据的最广泛使用的框架之一,Python是数据分析,机器学习等最广泛使用的编程语言之一。那么,为什么不一起使用它们呢?这就是Spark与python也被称为PySpark的原因。


Apache Spark开发人员每年的平均年薪为110,000美元。毫无疑问,Spark在这个行业中已经被广泛使用。由于其丰富的库集,Python今天被大多数数据科学家和分析专家使用。


将Python与Spark集成是开源社区的主要礼物。 Spark是用Scala语言开发的,与Java非常相似。它将程序代码编译为用于Spark大数据处理的JVM的字节码。为了支持Spark和Python,Apache Spark社区发布了PySpark。在本文中,我们将讨论以下主题:


1、Apache Spark简介及其功能

2、为什么选择Python?

3、使用Python设置Spark(PySpark)

4、PySpark SparkContext和数据流

5、PySpark KDD用例


Apache Spark是Apache Software Foundation开发的用于实时处理的开源集群计算框架。 Spark提供了一个接口,用于编程具有隐式数据并行和容错功能的集群。




下面是Apache Spark的一些特性,它比其它的大数据框架的优势在于:


1、速度:比传统的大型数据处理框架快100倍。

2、强大的缓存:简单的编程层提供了强大的缓存和磁盘持久性功能。

3、部署:可以通过Mesos,通过Yarn的Hadoop或Spark自己的集群管理器进行部署。

4、实时:由于内存中的计算,实时计算和低延迟。

5、多语言:这是该框架最重要的特性之一,因为它可以在Scala,Java,Python和R语言中编程。


虽然Spark是在Scala中设计的,但它的速度比Python快10倍,但只有当使用的内核数量少时,Scala才会更快。由于现在大多数分析和处理都需要大量内核,因此Scala的性能优势并不大。



对于程序员来说,由于其语法和标准库,Python相对来说更容易学习。 而且,它是一种动态类型语言,这意味着RDD可以保存多种类型的对象。



尽管Scala拥有SparkMLlib,但它没有足够的库和工具来实现机器学习和NLP目的。 此外,Scala缺乏数据可视化。



使用Python设置Spark(PySpark)


我们应该如何下载Spark并安装它,当你已经解压缩了spark文件,安装它并将其添加到.bashrc文件的路径中,输入:source .bashrc


export SPARK_HOME = /usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7/bin


要打开PySpark shell,输入命令:./bin/pyspark



Apache Spark由于它具有令人惊叹的功能,如内存处理,polyglot和快速处理等,被许多公司用于各种行业:



Yahoo!使用Apache Spark的机器学习功能来个性化其新闻和网页以及推荐式广告。使用Spark和Python来找出哪些新闻用户有兴趣阅读和分类新闻报道,以找出哪类用户有兴趣阅读哪些新闻类别。


TripAdvisor使用Apache Spark通过比较数百个网站为其客户找到最佳酒店价格,向数百万旅客提供建议。以可读格式阅读和处理酒店评论所需的时间是在Apache Spark的帮助下完成的。


阿里巴巴运营着全球最大的Apache Spark集群,以便在其电子商务平台上分析数百PB以上的数据。


PySpark SparkContext与数据流


用Python来连接Spark,使用RD4s可以通过库Py4j来实现。 PySpark Shell将Python API链接到Spark Core并初始化Spark Context。 Spark上下文是任何Spark应用程序的核心。


1、Spark Context设置内部服务并建立到Spark执行环境的连接。

2、驱动程序中的Spark Context对象协调所有分布式进程并允许资源分配。

3、集群管理器提供执行程序,它们是具有逻辑的JVM进程。

4、Spark Context对象将应用程序发送给执行者。

5、Spark Context在每个执行器中执行任务。


PySpark KDD用例


现在让我们来看一个用例:KDD'99 Cup(国际知识发现和数据挖掘工具竞赛)。 这里我们将取数据集的一部分,因为原始数据集太大。


import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")


创建 RDD:

现在我们用这个下载的文件来创建RDD。

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

过滤


假设我们要计算在数据集中有多少正常的相互作用。 我们可以按如下过滤raw_data RDD。


from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))


统计:

现在我们来计算新RDD中有多少元素:

from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))

输出结果如下:

There are 97278 'normal' interactions
Count completed in 5.951 seconds


映射:

在这种情况下,我们想要将数据文件作为CSV格式文件读取。 我们可以通过对RDD中的每个元素应用lambda函数来做到这一点,如下所示。 这里我们使用map()和take()函数转换。

from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])

输出结果:

Parse completed in 1.715 seconds
[u'0',
 u'tcp',
 u'http',
 u'SF',
 u'181',
 u'5450',
 u'0',
 u'0',
.
.
 u'normal.']


拆分:


现在我们希望将RDD中的每个元素都作为键值对(其中键是标记)(例如normal),并且该值是表示CSV格式文件中的行的整个元素列表。可以按如下进行, 这里我们用line.split()和map()函数。


def parse_interaction(line):
elems = line.split(",")
tag = elems[41]
return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

输出结果如下:

(u'normal.',
 [u'0',
  u'tcp',
  u'http',
  u'SF',
  u'181',
  u'5450',
  u'0',
  u'0',
  u'0.00',
  u'1.00',
.
.
.
.
  u'normal.'])

收集行为:

这里我们将使用collect()行为。 它会将RDD的所有元素存入内存。 因此,使用大型RDD时必须小心使用。


t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))

输出结果如下:

Data collected in 17.927 seconds

当然,这比我们之前使用的其他任何动作花费的时间要长。 每个具有RDD片段的Spark工作节点都必须进行协调,以便检索其部分,然后将所有内容缩小到一起。


作为结合前面所有内容的最后一个例子,我们希望收集所有常规交互作为键值对。


# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)


输出结果如下 :

Data collected in 12.485 seconds
There are 97278 normal interactions


希望你喜欢Python这篇文章,如果已经在阅读完全部内容,恭喜你不再是PySpark的新手了。


周末愉快。


©著作权归作者所有:来自51CTO博客作者mob604756f09529的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. insert带来的TiDB集群性能瓶颈排障
  2. 如何从io.Reader 中读数据
  3. 监控系统项目实施--安装与部署-- MySQL数据库
  4. 监控系统项目实施--安装与部署-- MySQL数据库--分区表创建
  5. 监控系统项目实施--安装与部署-- MySQL数据库--分区表创建2
  6. 监控系统项目实施--安装与部署-- MySQL数据库--备份脚本
  7. K8S中部署KAFKA集群
  8. 大数据的语言,工具与框架发展
  9. 基本数据类型

随机推荐

  1. android对话框的使用
  2. android的Material Design点击涟漪效果
  3. NoClassDefFoundError with Android Stud
  4. SDK下载地址
  5. Android(安卓)MMS 源码流程
  6. Android 修改Window属性
  7. 在线android
  8. Android 与JS互调
  9. android 6 sdk/ndk下载地址
  10. Android多媒体开发(2)————使用Android