最新版Spark 3 HelloWorld
Spark已经发布到3.1.1了,好久没看这个项目了.今天更新下本地仓库,编译下竟然出错了.
$mvn compile......[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:403: type mismatch; found : Map[String,org.apache.spark.resource.ResourceInformation] required: scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation][ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:404: type mismatch; found : scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation] required: Map[String,org.apache.spark.resource.ResourceInformation][ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:554: overloaded method value apply with alternatives: (env: org.apache.spark.SparkEnv,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] <and> (sc: org.apache.spark.SparkContext,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] cannot be applied to (org.apache.spark.SparkContext, java.util.Map[String,org.apache.spark.ResourceInformation])[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala:118: type mismatch; found : java.util.Map[String,org.apache.spark.ResourceInformation] required: java.util.Map[String,org.apache.spark.resource.ResourceInformation][INFO] [Info] : java.util.Map[String,org.apache.spark.ResourceInformation] <: java.util.Map[String,org.apache.spark.resource.ResourceInformation]?[INFO] [Info] : false[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:308: too many arguments (6) for method resolveMavenDependencies: (packagesExclusions: String, packages: String, repositories: String, ivyRepoPath: String, ivySettingsPath: Option[String])StringNote that 'packagesTransitive' is not a parameter name of the invoked method.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:377: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:380: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:383: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:392: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:396: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:450: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/scheduler/Task.scala:101: type mismatch; found : Map[String,org.apache.spark.resource.ResourceInformation] required: Map[String,org.apache.spark.ResourceInformation][INFO] [Info] : Map[String,org.apache.spark.resource.ResourceInformation] <: Map[String,org.apache.spark.ResourceInformation]?[INFO] [Info] : false[ERROR] 12 errors found......
赶紧看了项目README,原来本地环境已经不能满足新Spark需要了,Spark项目也打包了相应的编译工具,按照文档编译就好了.
如果先前没下载过这个项目可以使用git clone先把项目下载到本地:
git clone https://github.com/apache/spark.git
进入项目目录并编译:
$cd spark$./build/mvn -DskipTests clean package......[INFO] Reactor Summary for Spark Project Parent POM 3.2.0-SNAPSHOT:[INFO] [INFO] Spark Project Parent POM ........................... SUCCESS [ 2.562 s][INFO] Spark Project Tags ................................. SUCCESS [ 5.148 s][INFO] Spark Project Sketch ............................... SUCCESS [ 5.963 s][INFO] Spark Project Local DB ............................. SUCCESS [ 1.505 s][INFO] Spark Project Networking ........................... SUCCESS [ 2.883 s][INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 1.516 s][INFO] Spark Project Unsafe ............................... SUCCESS [ 7.137 s][INFO] Spark Project Launcher ............................. SUCCESS [ 1.516 s][INFO] Spark Project Core ................................. SUCCESS [01:55 min][INFO] Spark Project ML Local Library ..................... SUCCESS [ 36.128 s][INFO] Spark Project GraphX ............................... SUCCESS [ 30.925 s][INFO] Spark Project Streaming ............................ SUCCESS [ 53.579 s][INFO] Spark Project Catalyst ............................. SUCCESS [03:50 min][INFO] Spark Project SQL .................................. SUCCESS [07:58 min][INFO] Spark Project ML Library ........................... SUCCESS [02:42 min][INFO] Spark Project Tools ................................ SUCCESS [ 13.733 s][INFO] Spark Project Hive ................................. SUCCESS [04:52 min][INFO] Spark Project REPL ................................. SUCCESS [ 34.085 s][INFO] Spark Project Assembly ............................. SUCCESS [ 8.368 s][INFO] Kafka 0.10+ Token Provider for Streaming ........... SUCCESS [01:06 min][INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [02:08 min][INFO] Kafka 0.10+ Source for Structured Streaming ........ SUCCESS [01:24 min][INFO] Spark Project Examples ............................. SUCCESS [01:01 min][INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 10.397 s][INFO] Spark Avro ......................................... SUCCESS [01:12 min][INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 31:51 min[INFO] Finished at: 2021-03-13T17:28:05+08:00[INFO] ------------------------------------------------------------------------
从编译结果看Spark3项目结构还是变化挺大的,像Tags/Sketch/LocalDB/这些原来都没见过.具体每个子项目是做什么的,我们以后再一一介绍.接下来先运行下HelloWorld.试用Spark最简单的方式是使用scala shell:
tianlang@tianlang:spark$ ./bin/spark-shell 2021-03-14 08:51:53,351 WARN util.Utils: Your hostname, tianlang resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface wlp7s0)2021-03-14 08:51:53,352 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another addressSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).2021-03-14 08:52:01,141 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableSpark context Web UI available at http://192.168.0.104:4040Spark context available as 'sc' (master = local[*], app id = local-1615683123916).Spark session available as 'spark'.Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)Type in expressions to have them evaluated.Type :help for more information.scala> spark.range(1000*1000*1000).count();res0: Long = 1000000000
spark.range(1000*1000*1000)是生成一个从0(包含)到10亿(不包含)的序列,可看成是一个只有一列的表t_range:
1 |
2 |
...... |
999999998 |
999999999 |
count()是计算下数据条数,有一条算一条总共是10亿条.相当于SQL的select count(*) from t_range. SQL的count函数容易跟SQL的sum函数弄混,sum是把所有的数据加起来也就是取序列中所有数值的和:0+1+2+3+......+999999998+9999999999;而count是统计下序列中的数据条数有一条算一条,不管具体数值是1还是999999998:1+1+1+......+1+1;
在SparkShell中不能直接调用sum对序列中的数值求和:
scala> spark.range(1000*1000*1000).sum();<console>:24: error: value sum is not a member of org.apache.spark.sql.Dataset[Long] spark.range(1000*1000*1000).sum();
因为spark.range生成的Dataset对象没有sum函数.那怎么实现求和操作呢?
可以查下Spark API文档,看下Dataset有那些函数可以使用:
首先看到的是跟sum长的比较像的summary.看下它的介绍如果英文看不懂也没关系,可以看示例代码.发现它可以用来统计总条数/平均数/最大值/最小值等就是没有取和.看来不是名字像功能就一样啊.
如果先前接触过大数据,应该听过MapReduce.那应该就比较容易找到reduce函数:
defreduce(func: (T, T) ⇒ T): T
(Scala-specific) Reduces the elements of this Dataset using the specified binary function. The given
func
must be commutative and associative or the result may be non-deterministic.
用reduce取和:
scala> spark.range(1000*1000*1000).reduce((a,b) => a+b)<console>:24: error: overloaded method value reduce with alternatives: (func: org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long <and> (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long) spark.range(1000*1000*1000).reduce((a,b) => a+b)
为reduce提供一个取和的函数还报错了,从错误信息可以看出是数据类型不匹配问题,来个强制类型转换吧:
scala> spark.range(1000*1000*1000).reduce((a,b) => (a+b).asInstanceOf[java.lang.Long])
res11: Long = 499999999500000000
当然也可以使用其它的方式实现取和,比如:foreach,但执行方式跟reduce是有差别的,我们后面有机会再说.
大家应该也感觉到了,使用reduce函数远没有SQL中的sum函数方便.SQL中的函数用现在比较流行的词叫声明式的API,只需要关注我要什么就可以了,而不需要像reduce一样还要我关注怎么干.
这也是SQL经久不衰的一个原因吧.Spark也很早就提供了Spark SQL模块用于支持SQL语法.可以回头看下我们先前使用的Dataset就是sql包下的:
scala> spark.range(1000*1000*1000);
res2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
我们先前也说可以把range结果类比成一个只有一列的表,也不是随便说说的.还真的可以在上面执行SQL语句:
首先把Dataset注册为临时视图(也可以叫临时表,但注册临时表的API在2.0.0后就标记为废弃了):
scala> spark.range(1000*1000*1000).createOrReplaceTempView("t_range");
接下来就可以对视图t_range执行SQL了:
scala> spark.sql("select sum(id) from t_range");
res18: org.apache.spark.sql.DataFrame = [sum(id): bigint]scala> res18.collect
res19: Array[org.apache.spark.sql.Row] = Array([499999999500000000])
我是怎么知道列名称是id的?是通过printSchema函数.
scala> spark.range(1000*1000*1000).printSchema();
root
|-- id: long (nullable = false)
上面的代码都是使用的Scala,如果更倾向于使用Python.也可以使用./bin/pyspark.Spark3对Python的支持也提到了一个新高度.
HelloWorld就先到这里吧.蚂蚁啃骨头一点一点来.
有钱的捧个钱场,没钱点个分享(可赚钱)
赞赏
0人进行了赞赏支持
更多相关文章
- 磁盘显示函数不正确怎么恢复?
- 后台二 项目流程(阅读)
- 后台一 搭建项目
- 快速开发对项目有价值
- flex 基础
- laravel8+vue2单页面应用
- tp5中在js函数中给url传参数并进行连接
- 常用数组函数-创建-删除-键值操作-回调函数
- php字符串函数