Spark已经发布到3.1.1了,好久没看这个项目了.今天更新下本地仓库,编译下竟然出错了.

$mvn compile......[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:403: type mismatch; found   : Map[String,org.apache.spark.resource.ResourceInformation] required: scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation][ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:404: type mismatch; found   : scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation] required: Map[String,org.apache.spark.resource.ResourceInformation][ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:554: overloaded method value apply with alternatives:  (env: org.apache.spark.SparkEnv,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] <and>  (sc: org.apache.spark.SparkContext,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] cannot be applied to (org.apache.spark.SparkContext, java.util.Map[String,org.apache.spark.ResourceInformation])[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala:118: type mismatch; found   : java.util.Map[String,org.apache.spark.ResourceInformation] required: java.util.Map[String,org.apache.spark.resource.ResourceInformation][INFO] [Info] : java.util.Map[String,org.apache.spark.ResourceInformation] <: java.util.Map[String,org.apache.spark.resource.ResourceInformation]?[INFO] [Info] : false[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:308: too many arguments (6) for method resolveMavenDependencies: (packagesExclusions: String, packages: String, repositories: String, ivyRepoPath: String, ivySettingsPath: Option[String])StringNote that 'packagesTransitive' is not a parameter name of the invoked method.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:377: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:380: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:383: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:392: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:396: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:450: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.Unspecified value parameter secMgr.[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/scheduler/Task.scala:101: type mismatch; found   : Map[String,org.apache.spark.resource.ResourceInformation] required: Map[String,org.apache.spark.ResourceInformation][INFO] [Info] : Map[String,org.apache.spark.resource.ResourceInformation] <: Map[String,org.apache.spark.ResourceInformation]?[INFO] [Info] : false[ERROR] 12 errors found......

赶紧看了项目README,原来本地环境已经不能满足新Spark需要了,Spark项目也打包了相应的编译工具,按照文档编译就好了.

如果先前没下载过这个项目可以使用git clone先把项目下载到本地:

git clone https://github.com/apache/spark.git


进入项目目录并编译:

$cd spark$./build/mvn -DskipTests clean package......[INFO] Reactor Summary for Spark Project Parent POM 3.2.0-SNAPSHOT:[INFO] [INFO] Spark Project Parent POM ........................... SUCCESS [  2.562 s][INFO] Spark Project Tags ................................. SUCCESS [  5.148 s][INFO] Spark Project Sketch ............................... SUCCESS [  5.963 s][INFO] Spark Project Local DB ............................. SUCCESS [  1.505 s][INFO] Spark Project Networking ........................... SUCCESS [  2.883 s][INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  1.516 s][INFO] Spark Project Unsafe ............................... SUCCESS [  7.137 s][INFO] Spark Project Launcher ............................. SUCCESS [  1.516 s][INFO] Spark Project Core ................................. SUCCESS [01:55 min][INFO] Spark Project ML Local Library ..................... SUCCESS [ 36.128 s][INFO] Spark Project GraphX ............................... SUCCESS [ 30.925 s][INFO] Spark Project Streaming ............................ SUCCESS [ 53.579 s][INFO] Spark Project Catalyst ............................. SUCCESS [03:50 min][INFO] Spark Project SQL .................................. SUCCESS [07:58 min][INFO] Spark Project ML Library ........................... SUCCESS [02:42 min][INFO] Spark Project Tools ................................ SUCCESS [ 13.733 s][INFO] Spark Project Hive ................................. SUCCESS [04:52 min][INFO] Spark Project REPL ................................. SUCCESS [ 34.085 s][INFO] Spark Project Assembly ............................. SUCCESS [  8.368 s][INFO] Kafka 0.10+ Token Provider for Streaming ........... SUCCESS [01:06 min][INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [02:08 min][INFO] Kafka 0.10+ Source for Structured Streaming ........ SUCCESS [01:24 min][INFO] Spark Project Examples ............................. SUCCESS [01:01 min][INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 10.397 s][INFO] Spark Avro ......................................... SUCCESS [01:12 min][INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time:  31:51 min[INFO] Finished at: 2021-03-13T17:28:05+08:00[INFO] ------------------------------------------------------------------------

从编译结果看Spark3项目结构还是变化挺大的,像Tags/Sketch/LocalDB/这些原来都没见过.具体每个子项目是做什么的,我们以后再一一介绍.接下来先运行下HelloWorld.试用Spark最简单的方式是使用scala shell:

tianlang@tianlang:spark$ ./bin/spark-shell 2021-03-14 08:51:53,351 WARN util.Utils: Your hostname, tianlang resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface wlp7s0)2021-03-14 08:51:53,352 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another addressSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).2021-03-14 08:52:01,141 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableSpark context Web UI available at http://192.168.0.104:4040Spark context available as 'sc' (master = local[*], app id = local-1615683123916).Spark session available as 'spark'.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT      /_/         Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)Type in expressions to have them evaluated.Type :help for more information.scala> spark.range(1000*1000*1000).count();res0: Long = 1000000000

spark.range(1000*1000*1000)是生成一个从0(包含)到10亿(不包含)的序列,可看成是一个只有一列的表t_range:

1
2
......
999999998
999999999


count()是计算下数据条数,有一条算一条总共是10亿条.相当于SQL的select count(*) from  t_range. SQL的count函数容易跟SQL的sum函数弄混,sum是把所有的数据加起来也就是取序列中所有数值的和:0+1+2+3+......+999999998+9999999999;而count是统计下序列中的数据条数有一条算一条,不管具体数值是1还是999999998:1+1+1+......+1+1;

在SparkShell中不能直接调用sum对序列中的数值求和:

scala> spark.range(1000*1000*1000).sum();<console>:24: error: value sum is not a member of org.apache.spark.sql.Dataset[Long]       spark.range(1000*1000*1000).sum();

因为spark.range生成的Dataset对象没有sum函数.那怎么实现求和操作呢?

可以查下Spark API文档,看下Dataset有那些函数可以使用:

首先看到的是跟sum长的比较像的summary.看下它的介绍如果英文看不懂也没关系,可以看示例代码.发现它可以用来统计总条数/平均数/最大值/最小值等就是没有取和.看来不是名字像功能就一样啊.

如果先前接触过大数据,应该听过MapReduce.那应该就比较容易找到reduce函数:

defreduce(func: (T, T) ⇒ T): T

(Scala-specific) Reduces the elements of this Dataset using the specified binary function. The given func must be commutative and associative or the result may be non-deterministic.

用reduce取和:

scala> spark.range(1000*1000*1000).reduce((a,b) => a+b)<console>:24: error: overloaded method value reduce with alternatives:  (func: org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long <and>  (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)       spark.range(1000*1000*1000).reduce((a,b) => a+b)

为reduce提供一个取和的函数还报错了,从错误信息可以看出是数据类型不匹配问题,来个强制类型转换吧:

scala> spark.range(1000*1000*1000).reduce((a,b) => (a+b).asInstanceOf[java.lang.Long])
res11: Long = 499999999500000000  

当然也可以使用其它的方式实现取和,比如:foreach,但执行方式跟reduce是有差别的,我们后面有机会再说. 

大家应该也感觉到了,使用reduce函数远没有SQL中的sum函数方便.SQL中的函数用现在比较流行的词叫声明式的API,只需要关注我要什么就可以了,而不需要像reduce一样还要我关注怎么干.

这也是SQL经久不衰的一个原因吧.Spark也很早就提供了Spark SQL模块用于支持SQL语法.可以回头看下我们先前使用的Dataset就是sql包下的:

scala> spark.range(1000*1000*1000);
res2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

我们先前也说可以把range结果类比成一个只有一列的表,也不是随便说说的.还真的可以在上面执行SQL语句:

首先把Dataset注册为临时视图(也可以叫临时表,但注册临时表的API在2.0.0后就标记为废弃了):

scala> spark.range(1000*1000*1000).createOrReplaceTempView("t_range");

接下来就可以对视图t_range执行SQL了:


scala> spark.sql("select sum(id) from t_range");
res18: org.apache.spark.sql.DataFrame = [sum(id): bigint]

scala> res18.collect
res19: Array[org.apache.spark.sql.Row] = Array([499999999500000000])

我是怎么知道列名称是id的?是通过printSchema函数.

scala> spark.range(1000*1000*1000).printSchema();
root
 |-- id: long (nullable = false)

上面的代码都是使用的Scala,如果更倾向于使用Python.也可以使用./bin/pyspark.Spark3对Python的支持也提到了一个新高度.

HelloWorld就先到这里吧.蚂蚁啃骨头一点一点来.


©著作权归作者所有:来自51CTO博客作者FusionZhu的原创作品,如需转载,请注明出处,否则将追究法律责任

有钱的捧个钱场,没钱点个分享(可赚钱)

赞赏

0人进行了赞赏支持

更多相关文章

  1. 磁盘显示函数不正确怎么恢复?
  2. 后台二 项目流程(阅读)
  3. 后台一 搭建项目
  4. 快速开发对项目有价值
  5. flex 基础
  6. laravel8+vue2单页面应用
  7. tp5中在js函数中给url传参数并进行连接
  8. 常用数组函数-创建-删除-键值操作-回调函数
  9. php字符串函数

随机推荐

  1. yahoo mysql性能监控工具使用
  2. 有可能用by来计算一列的总数吗?
  3. 关于sql语句的多重循环
  4. 如何将SQL Server日期格式转换为Oracle?
  5. 深度探索 -- 生成SQL语句的向导应如何做?
  6. 关于使用大型数据库,我需要了解什么?
  7. 一种在t-SQL中选择两个日期之间的日期的
  8. MYSQL5.7---ONLY_FULL_GROUP_BY 异常处理
  9. 在sdCard创建数据库(打造自己的SQLiteOpen
  10. sqlserver获取当前id的前一条数据和后一