Kafka源码系列之副本同步机制及isr列表更新

 浪尖 浪尖聊大数据

一,基本思路

<Kafka源码系列之Consumer高级API性能分析>读过这篇文章的同学必然会对本篇文件较为熟悉,因为该篇讲的副本同步,实际上也是基于SimpleConsumer的,而且是维护了Broker总数个ReplicaFetcherThread(类似于ConsumerFetcherThread)。负责具体的数据拉取,然后追加到本地。每个ReplicaFetcherThread,维护了一个SimpleConsumer,负责固定的从一个Broker上同步本Broker存在的Follower副本的leader上的数据。

二,重要类介绍

1kafkaServer

代表一个kafka Broker的生命周期。除了所有的必要启动和停止一个kafka node的功能

2ReplicaManager

管理副本的动作,比如,启动副本为leader或者Follower,停止副本,从leader同步数据等。

3ReplicaFetcherManager

继承自AbstractFetcherManager。负责创建和停止ReplicaFetcherThread

4ReplicaFetcherThread

继承自AbstractFetcherThread。负责从leader同步数据,追加到本地日志。拥有一个SimpleConsumer

功能类似ConsumerFetcherThread

三,源码过程整理

1ReplicaManager创建和启动

kafkaServer中创建并且启动了ReplicaManager

replicaManager new ReplicaManager(configtimezkClient,kafkaSchedulerlogManagerisShuttingDown)

replicaManager.startup()

启动的实际上是一个定时调度线程,周期性的检测是否有副本掉队,进而收缩isr列表

// start ISR expiration thread
scheduler.schedule("isr-expiration"maybeShrinkIsrperiod = config.replicaLagTimeMaxMsunit = TimeUnit.MILLISECONDS)

实际是在leader节点才会用到该功能

maybeShrinkIsr会遍历所有分区,判断是否收缩isr

allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs,config.replicaLagMaxMessages))

进入maybeShrinkIsr会发现,它会首先判断是不是leader在本地。是的话进行处理,否则什么都不做。

leaderReplicaIfLocal() match {
  case Some(leaderReplica) =>
    val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica,replicaMaxLagTimeMsreplicaMaxLagMessages)
    if(outOfSyncReplicas.size > 0) {
      val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
      assert(newInSyncReplicas.size > 0)
      info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topicpartitionId,
        inSyncReplicas.map(_.brokerId).mkString(","),newInSyncReplicas.map(_.brokerId).mkString(",")))
      // update ISR in zk and in cache
      updateIsr(newInSyncReplicas)
      // we may need to increment high watermark since ISR could be down to 1
      maybeIncrementLeaderHW(leaderReplica)
      replicaManager.isrShrinkRate.mark()
    }
  case None => // do nothing if no longer leader
}

判断一个副本是否从isr列表中移除有两个点,具体实现方法是getOutOfSyncReplicas:

A),卡住了

val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)

 

B),同步速度跟不上

val slowReplicas = candidateReplicas.filter(r =>
  r.logEndOffset.messageOffset >= &&
  leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)

找到,落后的副本后,就将该副本从isr列表中移除,并更新高水位

if(outOfSyncReplicas.size > 0) {
  val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
  assert(newInSyncReplicas.size > 0)
  info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topicpartitionId,
    inSyncReplicas.map(_.brokerId).mkString(","),newInSyncReplicas.map(_.brokerId).mkString(",")))
  // update ISR in zk and in cache
  updateIsr(newInSyncReplicas)
  // we may need to increment high watermark since ISR could be down to 1
  maybeIncrementLeaderHW(leaderReplica)
  replicaManager.isrShrinkRate.mark()
}

2ReplicaFetcherThread创建的过程

每次Broker上有新的Follower产生的时候会调用makeFollowers,这个在我的另一篇文章里面可以详细的了解到<Kafka源码系列之topic创建分区分配及leader选举>

会调用ReplicaFetcherManageraddFetcherForPartitions方法

replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

会先判断链接到该分区leader所在BrokerReplicaFetcherThread是否存在,存在直接将该topic的分区添加到该BrokerReplicaFetcherThread,不存在就先创建再添加

val partitionsPerFetcher = partitionAndOffsets.groupBy{case(topicAndPartitionbrokerAndInitialOffset) => 
  BrokerAndFetcherId(brokerAndInitialOffset.broker,getFetcherId(topicAndPartition.topictopicAndPartition.partition))}
for ((brokerAndFetcherIdpartitionAndOffsets) <- partitionsPerFetcher) {
  var fetcherThread: AbstractFetcherThread = null
  fetcherThreadMap.get(brokerAndFetcherId) match {
    case Some(f) => fetcherThread = f
    case None =>
      fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId,brokerAndFetcherId.broker)
      fetcherThreadMap.put(brokerAndFetcherIdfetcherThread)
      fetcherThread.start
  }

  fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartitionbrokerAndInitOffset) =>
    topicAndPartition -> brokerAndInitOffset.initOffset
  })
}

3ReplicaFetcherThread同步数据的过程

在其父类AbstractFetcherThreaddowork方法中会先构建FetchRequest,然后就是具体的获取数据更新本地偏移,然后调用ReplicaFetcherThreadprocessPartitionData方法,将数据追加到本地日志。

构建获取数据的请求

if (partitionMap.isEmpty)
    partitionMapCond.await(200LTimeUnit.MILLISECONDS)
  partitionMap.foreach {
    case((topicAndPartitionoffset)) =>
      fetchRequestBuilder.addFetch(topicAndPartition.topic,topicAndPartition.partition,
                       offsetfetchSize)
  }
}

val fetchRequest = fetchRequestBuilder.build()

processFetcherRequest方法中具体的去请求数据

response = simpleConsumer.fetch(fetchRequest)

然后更新本地偏移

val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
val validBytes = messages.validBytes
val newOffset = messages.shallowIterator.toSeq.lastOption match {
  case Some(m: MessageAndOffset) => m.nextOffset
  case None => currentOffset.get
}
partitionMap.put(topicAndPartitionnewOffset)

调用processPartitionData方法将数据追加到本地日志,此处跟生产者生产数据到leader的时候不同之处是分配偏移未使能,原因是从leader同步的数据就已经带了偏移

replica.log.get.append(messageSetassignOffsets = false)

标记副本的高水位

replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

四,总结

本文主要是讲解了kafka的副本同步过程,思路基本跟kafkajava高级消费者api一样。

两个抽象类:

AbstractFetcherManager:其子类两个

1ReplicaFetcherManager

2ConsumerFetcherManager

AbstractFetcherThread:其子类两个

ReplicaFetcherThread

ConsumerFetcherThread

此处,希望大仔细阅读相关代码,然后从中获取到相关的编程经验,提升自己的代码能力。

本文牵涉到一个重要的概念就是ShrinkIsr,弄懂这个概念,首先是要理解什么事isr列表和如何判断一个副本是否应该被移除isr列表。

两个重要的配置

名称

默认值

含义

replica.lag.time.max.ms

10000

副本未同步数据的时间

replica.lag.max.messages

4000

副本滞后的最大消息条数


©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 成为大数据高手的活法-晚上复盘
  2. 带聚光灯的Excel数据查询,简单到没朋友
  3. 还在浪费资源“海推”产品?这个银行产品推荐系统你必须知道!(附教程
  4. TCP/IP之TCP报文简介
  5. spring boot 获得 http请求url中的参数
  6. 提高办公效率:Microsoft Excel 快速看数据
  7. JDBC【1】--初级增删改查
  8. 如何构建数字孪生城市治理的技术场景?ThingJS
  9. Python数据分析难吗?需要英语数学基础吗?

随机推荐

  1. 小程序静态资源如何设置防盗链?
  2. 全栈资源共享 一起成长,努力成为你想成为
  3. 003. 无重复字符的最长子串 | Leetcode题
  4. 最后一天送点福利,为您2019添砖加瓦~
  5. 月入三万,我能少了你一个鸡蛋?
  6. 深入浅出 JavaScript 中的For循环之详解
  7. 004. 寻找两个正序数组的中位数 | Leetco
  8. 自学编程的八大误区!克服它!
  9. webpack4配置详解之慢嚼细咽
  10. “狗屁不通文章生成器”项目登顶GitHub热