1.BlockManagerMaster创建

  

    BlockManagerMaster要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令,它是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,SparkEnv中对应的初始化代码如下:

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(      BlockManagerMaster.DRIVER_ENDPOINT_NAME,      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),      conf, isDriver)

    这里可以看到在构造blockManagerMaster时,会创建一个BlockManagerMasterEndpoint实例并注册到了rpcEnv中,Executor中的blockManager通过Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef与blockManagerMaster进行通信。


2.BlockManagerMaster成员函数:


    1).removeExecutor()函数,代码如下:

  //向BlockManagerMasterEndpoint发送RemoveExecutor消息,移除挂掉的Exeutor  //这个函数只会在driver端调用  def removeExecutor(execId: String) {    tell(RemoveExecutor(execId))    logInfo("Removed " + execId + " successfully in removeExecutor")  }

    2).removeExecutorAsync()函数,代码如下:

  // 跟1)作用差不多,移除挂掉的Executor,这里是非阻塞的异步方法  def removeExecutorAsync(execId: String) {    driverEndpoint.ask[Boolean](RemoveExecutor(execId))    logInfo("Removal of executor " + execId + " requested")  }

  3).registerBlockManager()函数,代码如下:

 //Executor端的BlockManager启动会,会向BlockManagerMaster进行注册// BlockManagerMaster会保存在master的blockManagerInfo中 def registerBlockManager(      blockManagerId: BlockManagerId,      maxOnHeapMemSize: Long,      maxOffHeapMemSize: Long,      slaveEndpoint: RpcEndpointRef): BlockManagerId = {    logInfo(s"Registering BlockManager $blockManagerId")    val updatedId = driverEndpoint.askSync[BlockManagerId](      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))    logInfo(s"Registered BlockManager $updatedId")    updatedId  }

  3).updateBlockInfo()函数,代码如下:

  //更新block数据块信息  def updateBlockInfo(      blockManagerId: BlockManagerId,      blockId: BlockId,      storageLevel: StorageLevel,      memSize: Long,      diskSize: Long): Boolean = {      //向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果    val res = driverEndpoint.askSync[Boolean](      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))    logDebug(s"Updated info of block $blockId")    res  }

  4).getLocations()函数,代码如下:

 //获取block所在的BockManager节点信息,这里返回的是Seq集合, //如果block的Replication>1  一个block块,可能会在多个blockmanager //节点上存在  def getLocations(blockId: BlockId): Seq[BlockManagerId] = {  //向BlockManagerMasterEndpoint发送GetLocations消息    driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))  }

  5).getPeers()函数,代码如下:

  //获取参数blockManagerId之外的其他BlockManagerId,  //上面说了一个block,可能会在多个blockmanager节点上存在  def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {   //向BlockManagerMasterEndpoint发送GetPeers消息    driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))  }

  6).getExecutorEndpointRef()函数,代码如下:

  //这里就是获取BlockManagerMasterEndpoint的引用,与其进行通信  private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {    for (      blockManagerId <- blockManagerIdByExecutor.get(executorId);      info <- blockManagerInfo.get(blockManagerId)    ) yield {      info.slaveEndpoint    }  }

  7).getBlockStatus()函数,代码如下:

//获取一个Block的状态信息,位置,占用内存和磁盘大小def getBlockStatus(      blockId: BlockId,      askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {    val msg = GetBlockStatus(blockId, askSlaves)    val response = driverEndpoint.      askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)    val (blockManagerIds, futures) = response.unzip    implicit val sameThread = ThreadUtils.sameThread    val cbf =      implicitly[        CanBuildFrom[Iterable[Future[Option[BlockStatus]]],        Option[BlockStatus],        Iterable[Option[BlockStatus]]]]    val blockStatus = timeout.awaitResult(      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))    if (blockStatus == null) {      throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)    }    blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>      status.map { s => (blockManagerId, s) }    }.toMap  }


    总结一下BlockManagerMaster里面的各种函数处理其实都在 BlockManagerMasterEndpoint实例中,后面我们会详细剖析BlockManagerMasterEndpoint类的各个消息的具体处理流程。


©著作权归作者所有:来自51CTO博客作者mb5ff98083d7c62的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Spark2.x精通:源码剖析SortShuffleWriter具体实现
  2. Excel重大更新,VLOOKUP退役,新的搜索函数上线
  3. 如何使用Excel快速生成随机数据,你肯定想不到
  4. SQL如何提前字符串中的字母?
  5. SQL如何提取字符串中的中文和数字?
  6. 2021-03-08:在一个数组中,任何一个前面的数a,和任何一个后面的数b,如
  7. Python VS Java如何选择?Python学习分析!
  8. C++入门第一课(命名空间;输入输出;缺省参数;函数重载)
  9. python实现域名解析

随机推荐

  1. 《你不知道的JavaScript
  2. Java-子类和父类的几种关系
  3. 准备语句SQL异常“无输入参数”
  4. 低延迟系统的Java实践
  5. java基础:集合框架之Map(共性方法)
  6. Java正则表达式
  7. javaweb从单机到分布式架构演变过程
  8. Netty学习心得 netty服务端和客户端的连
  9. 线程“main”中的异常java.lang.RuntimeE
  10. java中jcom操作excel