前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的方方面面,从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看 Kafka 服务端是如何启动的。
在深入剖析Kafka「Controller」之前,我想你可能或多或少会有这样的疑问:
Kafka 服务端都有哪些组件,这些组件又是通过哪个类来启动的呢?
这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动:
# 1、启动 kafka 服务命令: bin/kafka-server-start.sh config/server.properties &
那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。
我们来看下里面的 shell 内容,如下:
#!/bin/bash #LicensedtotheApacheSoftwareFoundation(ASF)under one or more # contributor licenseagreements.Seethe NOTICE file distributedwith#thisworkforadditional information regarding copyright ownership.#TheASF licensesthisfiletoYouunder theApacheLicense,Version2.0#(the"License");you may not usethisfile except in compliancewith# theLicense.Youmay obtain a copy of theLicenseat # # http://www.apache.org/licenses/LICENSE-2.0# #Unlessrequired by applicable law or agreedtoinwriting,software # distributed under theLicenseis distributed on an"AS IS"BASIS,# WITHOUT WARRANTIES OR CONDITIONS OFANYKIND,either express or implied.#SeetheLicenseforthe specific language governing permissions and # limitations under theLicense.#1、注释说明该脚本的版权信息和使用许可。if[$#-lt1];then echo"USAGE: $0 [-daemon] server.properties [--override property=value]*"exit1fi #2、检查命令行参数的个数,若小于1则输出脚本的使用方法并退出。 base_dir=$(dirname $0)#3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。if["x$KAFKA_LOG4J_OPTS"="x"];then export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"fi #4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。if["x$KAFKA_HEAP_OPTS"="x"];then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"export JMX_PORT="9999"export JMX_RMI_PORT="10000"fi #5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串-name kafkaServer-loggc。 EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer-loggc'}#6、检查命令行参数中 COMMAND 变量的值是否为-daemon,若是则将 EXTRA_ARGS 变量的值添加-daemon 选项。同时将命令行参数向左移一位,即从 $2开始计算参数。 COMMAND=$1case$COMMAND in-daemon)EXTRA_ARGS="-daemon "$EXTRA_ARGSshift;;*);;esac #7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在Kafka客户端库中的kafka.Kafka类。整个脚本的作用是启动Kafka服务。 exec $base_dir/kafka-run-class.sh $EXTRA_ARGSkafka.Kafka"$@"esac #7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在Kafka客户端库中的kafka.Kafka类。整个脚本的作用是启动Kafka服务。 exec $base_dir/kafka-run-class.sh $EXTRA_ARGSkafka.Kafka"$@"
这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。
「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。
从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。
这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。
defgetPropsFromArgs(args:Array[String]):Properties={// 创建一个命令行参数解析器val optionParser=newOptionParser(false)// 定义 --override 选项,用于覆盖 server.properties 文件中的属性val overrideOpt=optionParser.accepts("override","Optional property that should override values set in server.properties file").withRequiredArg().ofType(classOf[String])// 定义 --version 选项,用于打印版本信息并退出optionParser.accepts("version","Print version information and exit.")// 若没有提供参数或者参数包含 --help 选项,则打印用法并退出if(args.length==0||args.contains("--help")){CommandLineUtils.printUsageAndDie(optionParser,"USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))}// 若参数中包含 --version 选项,则打印版本信息并退出if(args.contains("--version")){CommandLineUtils.printVersionAndDie()}// 加载 server.properties 文件中的属性到 Properties 对象中val props=Utils.loadProps(args(0))// 若提供了其他参数,则解析这些参数if(args.length>1){// 解析参数中的选项和参数值val options=optionParser.parse(args.slice(1,args.length):_*)// 检查是否有非选项参数if(options.nonOptionArguments().size()>0){CommandLineUtils.printUsageAndDie(optionParser,"Found non argument parameters: "+options.nonOptionArguments().toArray.mkString(","))}// 将解析得到的选项和参数值添加到 props 对象中props++=CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)}// 返回解析得到的属性集合props}
该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。
如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。
如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。
privatedefbuildServer(props:Properties):Server={val config=KafkaConfig.fromProps(props,false)// 直接启动定时任务、网络层、请求处理层if(config.requiresZookeeper){newKafkaServer(config,Time.SYSTEM,threadNamePrefix=None,enableForwarding=false)}else{// 调用 BrokerServer 等来启动网络层和请求处理层newKafkaRaftServer(config,Time.SYSTEM,threadNamePrefix=None)}}
在 kafka 2.8.x 版本中 新增了 raft 协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。
#2.7.x 版本源码 defmain(args:Array[String]):Unit={try{// 1、解析命令行参数,获得属性集合val serverProps=getPropsFromArgs(args)// 2、从属性集合创建 KafkaServerStartable 对象val kafkaServerStartable=KafkaServerStartable.fromProps(serverProps)try{// 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandlerif(!OperatingSystem.IS_WINDOWS&&!Java.isIbmJdk)newLoggingSignalHandler().register()}catch{// 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息casee:ReflectiveOperationException=>warn("Failed to register optional signal handler that logs a message when the process is terminated "+s"by a signal. Reason for registration failure is: $e",e)}// 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法Exit.addShutdownHook("kafka-shutdown-hook",kafkaServerStartable.shutdown())// 4、启动 KafkaServerStartable 实例kafkaServerStartable.startup()// 5、等待 KafkaServerStartable 实例终止kafkaServerStartable.awaitShutdown()}catch{// 如果有异常发生,则记录日志并退出程序casee:Throwable=>fatal("Exiting Kafka due to fatal exception",e)Exit.exit(1)}// 6、正常终止程序Exit.exit(0)}
该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。
#2.8.x 版本 defmain(args:Array[String]):Unit={// 获取Kafka服务的配置信息val serverProps=getPropsFromArgs(args)// 根据配置信息构建Kafka服务val server=buildServer(serverProps)try{// 注册用于记录日志的信号处理器(若实现失败则退出)if(!OperatingSystem.IS_WINDOWS&&!Java.isIbmJdk)newLoggingSignalHandler().register()}catch{casee:ReflectiveOperationException=>warn("Failed to register optional signal handler that logs a message when the process is terminated "+s"by a signal. Reason for registration failure is: $e",e)}// 挂载关闭处理器,用于捕获终止信号和常规终止请求Exit.addShutdownHook("kafka-shutdown-hook",{tryserver.shutdown()// 关闭Kafka服务catch{case_:Throwable=>fatal("Halting Kafka.")// 日志记录致命错误信息// 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁Exit.halt(1)}})tryserver.startup()// 启动Kafka服务catch{case_:Throwable=>// 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown()fatal("Exiting Kafka.")Exit.exit(1)}server.awaitShutdown()// 等待Kafka服务关闭Exit.exit(0)// 调用Exit.exit()设置退出状态码}
这里最重要的是 「第 4 步」,调用 kafkaServerStartable.startup() 或者 server.startup() 来启动 kafka。
这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。
「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。
在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。
这里我们主要来看下 Class 类代码。
classKafkaServerStartable(val staticServerConfig:KafkaConfig,reporters:Seq[KafkaMetricsReporter],threadNamePrefix:Option[String]=None)extendsLogging{// 创建 KafkaServer 实例// 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。privateval server=newKafkaServer(staticServerConfig,kafkaMetricsReporters=reporters,threadNamePrefix=threadNamePrefix)defthis(serverConfig:KafkaConfig)=this(serverConfig,Seq.empty)// 启动 KafkaServer// startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。defstartup():Unit={tryserver.startup()catch{// 如果出现异常,则记录日志并退出程序case_:Throwable=>// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status codefatal("Exiting Kafka.")Exit.exit(1)}}// 关闭 KafkaServer// shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。defshutdown():Unit={tryserver.shutdown()catch{// 如果出现异常,则记录日志并强制退出程序case_:Throwable=>fatal("Halting Kafka.")// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.Exit.halt(1)}}// setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。defsetServerState(newState:Byte):Unit={server.brokerState.newState(newState)}// 等待 KafkaServer 退出// awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。defawaitShutdown():Unit=server.awaitShutdown()}
KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。
跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。
Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。
「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。
KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。
classKafkaServer(val config:KafkaConfig,//配置信息time:Time=Time.SYSTEM,threadNamePrefix:Option[String]=None,kafkaMetricsReporters:Seq[KafkaMetricsReporter]=List()//监控上报)extendsLoggingwithKafkaMetricsGroup{//标识节点已经启动完成privateval startupComplete=newAtomicBoolean(false)//标识节点正在执行关闭操作privateval isShuttingDown=newAtomicBoolean(false)//标识节点正在执行启动操作privateval isStartingUp=newAtomicBoolean(false)//阻塞主线程等待 KafkaServer 的关闭privatevarshutdownLatch=newCountDownLatch(1)//日志上下文privatevarlogContext:LogContext=nullvarmetrics:Metrics=null//记录节点的当前状态val brokerState:BrokerState=newBrokerState//API接口类,用于处理数据类请求vardataPlaneRequestProcessor:KafkaApis=null//API接口,用于处理控制类请求varcontrolPlaneRequestProcessor:KafkaApis=null//权限管理varauthorizer:Option[Authorizer]=None//启动socket,监听9092端口,等待接收客户端请求varsocketServer:SocketServer=null//数据类请求处理线程池vardataPlaneRequestHandlerPool:KafkaRequestHandlerPool=null//命令类处理线程池varcontrolPlaneRequestHandlerPool:KafkaRequestHandlerPool=null//日志管理器varlogDirFailureChannel:LogDirFailureChannel=nullvarlogManager:LogManager=null//副本管理器varreplicaManager:ReplicaManager=null//topic增删管理器varadminManager:AdminManager=null//token管理器vartokenManager:DelegationTokenManager=null//动态配置管理器vardynamicConfigHandlers:Map[String,ConfigHandler]=nullvardynamicConfigManager:DynamicConfigManager=nullvarcredentialProvider:CredentialProvider=nullvartokenCache:DelegationTokenCache=null//分组协调器vargroupCoordinator:GroupCoordinator=null//事务协调器vartransactionCoordinator:TransactionCoordinator=null//集群控制器varkafkaController:KafkaController=null//定时任务调度器varkafkaScheduler:KafkaScheduler=null//集群分区状态信息缓存varmetadataCache:MetadataCache=null//配额管理器varquotaManagers:QuotaFactory.QuotaManagers=null//zk客户端配置val zkClientConfig:ZKClientConfig=KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(newZKClientConfig())privatevar_zkClient:KafkaZkClient=nullval correlationId:AtomicInteger=newAtomicInteger(0)val brokerMetaPropsFile="meta.properties"val brokerMetadataCheckpoints=config.logDirs.map(logDir=>(logDir,newBrokerMetadataCheckpoint(newFile(logDir+File.separator+brokerMetaPropsFile)))).toMapprivatevar_clusterId:String=nullprivatevar_brokerTopicStats:BrokerTopicStats=nulldef clusterId:String=_clusterId// Visible for testingprivate[kafka]def zkClient=_zkClientprivate[kafka]def brokerTopicStats=_brokerTopicStats....}
该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。
/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */defstartup():Unit={try{info("starting")// 是否已关闭if(isShuttingDown.get)thrownewIllegalStateException("Kafka server is still shutting down, cannot re-start!")// 是否已启动if(startupComplete.get)return// 是否可以启动val canStartup=isStartingUp.compareAndSet(false,true)if(canStartup){// 设置broker状态为StartingbrokerState.newState(Starting)/* setup zookeeper */// 连接ZK,并创建根节点initZkClient(time)/* initialize features */_featureChangeListener=newFinalizedFeatureChangeListener(featureCache,_zkClient)if(config.isFeatureVersioningSupported){_featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)}/* Get or create cluster_id */// 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64_clusterId=getOrGenerateClusterId(zkClient)info(s"Cluster ID = $clusterId")/* load metadata */// 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id// 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性val(preloadedBrokerMetadataCheckpoint,initialOfflineDirs)=getBrokerMetadataAndOfflineDirs/* check cluster id */if(preloadedBrokerMetadataCheckpoint.clusterId.isDefined&&preloadedBrokerMetadataCheckpoint.clusterId.get!=clusterId)thrownewInconsistentClusterIdException(s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. "+s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")/* generate brokerId */config.brokerId=getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)logContext=newLogContext(s"[KafkaServer id=${config.brokerId}] ")// 配置loggerthis.logIdent=logContext.logPrefix// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be// applied after DynamicConfigManager starts.// 初始化AdminZkClient,支持动态修改配置config.dynamicConfig.initialize(zkClient)/* start scheduler */// 初始化定时任务调度器kafkaScheduler=newKafkaScheduler(config.backgroundThreads)kafkaScheduler.startup()/* create and configure metrics */// 创建及配置监控,默认使用JMX及Yammer MetricskafkaYammerMetrics=KafkaYammerMetrics.INSTANCE kafkaYammerMetrics.configure(config.originals)val jmxReporter=newJmxReporter()jmxReporter.configure(config.originals)val reporters=newutil.ArrayList[MetricsReporter]reporters.add(jmxReporter)val metricConfig=KafkaServer.metricConfig(config)val metricsContext=createKafkaMetricsContext()metrics=newMetrics(metricConfig,reporters,time,true,metricsContext)/* register broker metrics */_brokerTopicStats=newBrokerTopicStats// 初始化配额管理器quotaManagers=QuotaFactory.instantiate(config,metrics,time,threadNamePrefix.getOrElse(""))notifyClusterListeners(kafkaMetricsReporters++metrics.reporters.asScala)// 用于保证kafka-log数据目录的存在logDirFailureChannel=newLogDirFailureChannel(config.logDirs.size)/* start log manager */// 启动日志管理器,kafka的消息以日志形式存储logManager=LogManager(config,initialOfflineDirs,zkClient,brokerState,kafkaScheduler,time,brokerTopicStats,logDirFailureChannel)// 启动日志清理、刷新、校验、恢复等的定时线程logManager.startup()metadataCache=newMetadataCache(config.brokerId)// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.// SCRAM认证方式的token缓存tokenCache=newDelegationTokenCache(ScramMechanism.mechanismNames)credentialProvider=newCredentialProvider(ScramMechanism.mechanismNames,tokenCache)// Create and start the socket server acceptor threads so that the bound port is known.// Delay starting processors until the end of the initialization sequence to ensure// that credentials have been loaded before processing authentications.// 启动socket,监听9092端口,等待接收客户端请求socketServer=newSocketServer(config,metrics,time,credentialProvider)socketServer.startup(startProcessingRequests=false)/* start replica manager */brokerToControllerChannelManager=newBrokerToControllerChannelManagerImpl(metadataCache,time,metrics,config,threadNamePrefix)// 启动副本管理器,高可用相关replicaManager=createReplicaManager(isShuttingDown)replicaManager.startup()brokerToControllerChannelManager.start()// 将broker信息注册到ZK上val brokerInfo=createBrokerInfo val brokerEpoch=zkClient.registerBroker(brokerInfo)// Now that the broker is successfully registered, checkpoint its metadata// 校验 broker 信息checkpointBrokerMetadata(BrokerMetadata(config.brokerId,Some(clusterId)))/* start token manager */// 启动 token 管理器tokenManager=newDelegationTokenManager(config,tokenCache,time,zkClient)tokenManager.startup()/* start kafka controller */// 启动Kafka控制器,只有 Leader 会与ZK建连kafkaController=newKafkaController(config,zkClient,time,metrics,brokerInfo,brokerEpoch,tokenManager,brokerFeatures,featureCache,threadNamePrefix)kafkaController.startup()// admin管理器adminManager=newAdminManager(config,metrics,metadataCache,zkClient)/* start group coordinator */// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue// 启动集群群组协调器groupCoordinator=GroupCoordinator(config,zkClient,replicaManager,Time.SYSTEM,metrics)groupCoordinator.startup()/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue// 启动事务协调器transactionCoordinator=TransactionCoordinator(config,replicaManager,newKafkaScheduler(threads=1,threadNamePrefix="transaction-log-manager-"),zkClient,metrics,metadataCache,Time.SYSTEM)transactionCoordinator.startup()/* Get the authorizer and initialize it if one is specified.*/// ACLauthorizer=config.authorizer authorizer.foreach(_.configure(config.originals))val authorizerFutures:Map[Endpoint,CompletableFuture[Void]]=authorizer match{caseSome(authZ)=>authZ.start(brokerInfo.broker.toServerInfo(clusterId,config)).asScala.map{case(ep,cs)=>ep->cs.toCompletableFuture}caseNone=>brokerInfo.broker.endPoints.map{ep=>ep.toJava->CompletableFuture.completedFuture[Void](null)}.toMap}// 创建拉取管理器val fetchManager=newFetchManager(Time.SYSTEM,newFetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))/* start processing requests */// 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理dataPlaneRequestProcessor=newKafkaApis(socketServer.dataPlaneRequestChannel,replicaManager,adminManager,groupCoordinator,transactionCoordinator,kafkaController,zkClient,config.brokerId,config,metadataCache,metrics,authorizer,quotaManagers,fetchManager,brokerTopicStats,clusterId,time,tokenManager,brokerFeatures,featureCache)// 初始化数据类请求处理的线程池dataPlaneRequestHandlerPool=newKafkaRequestHandlerPool(config.brokerId,socketServer.dataPlaneRequestChannel,dataPlaneRequestProcessor,time,config.numIoThreads,s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",SocketServer.DataPlaneThreadPrefix)socketServer.controlPlaneRequestChannelOpt.foreach{controlPlaneRequestChannel=>// 初始化控制类请求的 KafkaApiscontrolPlaneRequestProcessor=newKafkaApis(controlPlaneRequestChannel,replicaManager,adminManager,groupCoordinator,transactionCoordinator,kafkaController,zkClient,config.brokerId,config,metadataCache,metrics,authorizer,quotaManagers,fetchManager,brokerTopicStats,clusterId,time,tokenManager,brokerFeatures,featureCache)// 初始化控制类请求的线程池controlPlaneRequestHandlerPool=newKafkaRequestHandlerPool(config.brokerId,socketServer.controlPlaneRequestChannelOpt.get,controlPlaneRequestProcessor,time,1,s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent",SocketServer.ControlPlaneThreadPrefix)}Mx4jLoader.maybeLoad()/* Add all reconfigurables for config change notification before starting config handlers */config.dynamicConfig.addReconfigurables(this)/* start dynamic config manager */dynamicConfigHandlers=Map[String,ConfigHandler](ConfigType.Topic->newTopicConfigHandler(logManager,config,quotaManagers,kafkaController),ConfigType.Client->newClientIdConfigHandler(quotaManagers),ConfigType.User->newUserConfigHandler(quotaManagers,credentialProvider),ConfigType.Broker->newBrokerConfigHandler(config,quotaManagers))// Create the config manager. start listening to notifications// 启动动态配置处理器dynamicConfigManager=newDynamicConfigManager(zkClient,dynamicConfigHandlers)dynamicConfigManager.startup()// 启动请求处理线程socketServer.startProcessingRequests(authorizerFutures)// 更新broker状态brokerState.newState(RunningAsBroker)shutdownLatch=newCountDownLatch(1)startupComplete.set(true)isStartingUp.set(false)AppInfoParser.registerAppInfo(metricsPrefix,config.brokerId.toString,metrics,time.milliseconds())info("started")}}catch{casee:Throwable=>fatal("Fatal error during KafkaServer startup. Prepare to shutdown",e)isStartingUp.set(false)shutdown()throwe}}
这里总结下该方法都启动了哪些组件:
initZkClient(time) 初始化 Zk。
kafkaScheduler 定时器。
logManager 日志模块。
MetadataCache 元数据缓存。
socketServer 网络服务器。
replicaManager 副本模块。
kafkaController 控制器。
groupCoordinator 协调器用于和ConsumerCoordinator 交互
transactionCoordinator 事务相关
fetchManager 副本拉取管理器。
dynamicConfigManager 动态配置管理器。
这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。
sealedtraitBrokerStates{def state:Byte}caseobjectNotRunningextendsBrokerStates{val state:Byte=0}caseobjectStartingextendsBrokerStates{val state:Byte=1}caseobjectRecoveringFromUncleanShutdownextendsBrokerStates{val state:Byte=2}caseobjectRunningAsBrokerextendsBrokerStates{val state:Byte=3}caseobjectPendingControlledShutdownextendsBrokerStates{val state:Byte=6}caseobjectBrokerShuttingDownextendsBrokerStates{val state:Byte=7}
NotRunning:初始状态,标识当前 broker 节点未运行。
Starting:标识当前 broker 节点正在启动中。
RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
RuningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。
这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。
这里,我们一起来总结一下这篇文章的重点。
文章开头通过对「kafka-server-start.sh」内容进行剖析,引出了 「kafka.Kafka」类。
在「kafka.Kafka」的 main 方法中调用了「KafkaServerStartable」尝试启动 Kafka 服务器。
接着在 「KafkaServerStartable」的 startup 方法中调用了 「KafkaServer」的 startup 方法启动服务器需要的各种组件类。
下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。