apache kafka关注点

Overview

上次的mq看了pulsar, 这次看看更多start的kafka, 因为组件相对少(bookeeper), 所以code相对更多(自处理), 另外总体感觉source code(v2.5.0)比较复杂. wrap了很多层.

比如system.exit()addShutdownHook()

这是一个简单的draft, 也没有画图, 之后有机会再补充.

image

kafka消息生成流程, credit: flykinghg1

startup

args 
new KafkaConfig
  - val NumPartitions = 1
  - val LogDir = "/tmp/kafka-logs"

new KafkaServerStartable()
kafkaServerStartable.startup()

----
// 线程池工厂, ScheduledThreadPoolExecutor
kafkaScheduler = new KafkaScheduler()
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

// 关于record/log的周期性flush和cleanup
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
LogManager -> loadLogs() -> loadLog() -> this.currentLogs.put(topicPartition, log)
java nio file spi api package

// Handles new connections, requests and responses to and from broker.
socketServer = new SocketServer(
socketServer.startup()

// 事务
transactionCoordinator = TransactionCoordinator()
transactionCoordinator.startup()

// admin api
new KafkaApis()
new KafkaRequestHandlerPool()

// 动态配置监听器
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
ZooKeeperClientWatcher()
partitionsInitializing.replace(topicPartition, false, true) // 更新为true(dirty了), would reload later

shutdown

// one thread
kafkaServerStartable.shutdown (private var shutdownLatch = new CountDownLatch(1))

def shutdown(): Unit = {}

shutdownLatch.countDown()
// main thread
// will hold until kafkaServerStartable.shutdown be called(shutdownLatch equal zero)
kafkaServerStartable.awaitShutdown()

def awaitShutdown(): Unit = shutdownLatch.await()

image

client

image


producer

image

ProducerConfig

// 0.
ConsoleProducer.main()
new ProducerConfig(args)

// 0. 初始化, 关键点KEY
new KafkaProducer()
  - this.accumulator = new RecordAccumulator() // 累积器
  - this.sender = new Sender(this.accumulator) // flush器
    - this.ioThread = new KafkaThread(ioThreadName, this.sender).start() // 异步flush

// 0.
new ProducerRecord(topic, msg.getBytes)

// 0.
send(producer, record).doSend()

// 0. get kafka cluster info/metadata
cluster = waitOnMetadata().cluster
serializedKey     = keySerializer.serialize(record.topic(), record.headers(), record.key())
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value())

if (partition == null){
    partition = partition(record, serializedKey, serializedValue, cluster)
} else{
allPartitions = cluster.partitionsForTopic(topic)

1. DefaultPartitioner // murmur2 % numPartitions
2. RoundRobinPartitioner // topicCounterMap.get(topic).incr() % numPartitions (类似random % numPartitions)
3. UniformStickyPartitioner // pick by topic, then random then cache, similar to DefaultPartitioner but with topic cache picker
}

// 0.
tp = new TopicPartition(record.topic(), partition)

// 0. 因为是batch模式, 所以每条新产生的record都会先累积起来
// 这里是异步, 桥梁是**RecordAccumulator**, 一些线程RecordAccumulator.batches.append负责累积, 一些线程Sender.RecordAccumulator.flush负责落盘.
// 累积
result = RecordAccumulator.append(tp, serializedKey, serializedValue)
ProducerBatch(tp).tryAppend(key, value)

appendDefaultRecord(offset, timestamp, key, value, headers)
DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers)

offset = nextSequentialOffset(): Long{
    lastOffset == null ? baseOffset : lastOffset + 1
}
lastOffset = offset;

// flush
sendProduceRequests(batches, now)
requestBuilder = ProduceRequest.Builder.forMagic()
clientRequest = client.newClientRequest(nodeId, requestBuilder)
client.send(clientRequest, now)

consumer

image

ConsumerConfig

// 0. ConsoleConsumer.main() new ConsumerConfig(args)

// 0. 初始化 new KafkaConsumer()

  • this.fetcher = new Fetcher() new ConsumerWrapper(consumer) // mainly for its recordIter
  • consumerInit() // binding topic partition, and offset
  • this.subscriptions.assignFromUser(new HashSet<>(partitions))
  • this.subscriptions.seekUnvalidated(partition, newPosition); // 设定offset

// 0. receive msg = consumer.receive().poll() records = pollForFetches()

  • fetcher.sendFetches(); // 发送request到cluster取数, 并存放到buffer
    • this.completedFetches.CompletedFetch.batches.currentBatch.streamingIterator()
    • client.send(fetchTarget, request) // 网络, async
  • fetcher.fetchedRecords()
    • records = fetchRecords(nextInLineFetch, recordsRemaining)
    • lastRecord = nextFetchedRecord()
      • records = currentBatch.streamingIterator(decompressionBufferSupplier)
        • DefaultRecordBatch() // 从之前的fetcher-buffer里面读bytes
          • buffer = this.buffer.duplicate()
          • new RecordIterator()
        • record = records.next()

transaction

from ExactlyOnceMessageProcessor.java

producer.initTransactions()
try {
  producer.beginTransaction()
  for() {
    producer.send(customizedRecord)
  }
  producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata())
  producer.commitTransaction()
}
catch {
  producer.abortTransaction()
}

producer client send

  1. producer.sendOffsetsToTransaction -> new AddOffsetsToTxnHandler(new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN))

  2. producer.commitTransaction -> new EndTxnHandler(new EndTxnRequest(new EndTxnRequestData(id=commit, ApiKeys.END_TXN)))
    • new InitProducerIdHandler(new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID))
  3. producer.commitTransaction -> new EndTxnHandler(new EndTxnRequest(new EndTxnRequestData(id=abort, ApiKeys.END_TXN)))
    • new InitProducerIdHandler(new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID))

通过pendingRequests来沟通.

broker server receive

  • case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
  • case ApiKeys.END_TXN => handleEndTxnRequest(request)
  • case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
  1. txnCoordinator.handleEndTransaction()
    • 这里也是异步的, 一个sendResponseCallback()将call放到responseQueue里面, 再通过processNewResponses()来处理
  2. preAppendResult
    • txnManager.getTransactionState(transactionalId)
    • txnMetadata.inLock { … }
      • Left(Errors.CONCURRENT_TRANSACTIONS) // 同一个(transactionalId, partitionId)下面的transaction不能并行
      • Ongoing
        • PrepareCommit
        • PrepareAbort
        • txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds())
    • CompleteCommit
    • CompleteAbort
    • PrepareCommit
    • PrepareAbort
  3. txnManager.appendTransactionToLog()
    • sendTxnMarkersCallback()
    • PrepareCommit
    • PrepareAbort
    • txnMetadata.prepareComplete() - replicaManager.appendRecords() -updateCacheCallback()
      • metadata.completeTransitionTo(newMetadata)

当要commit未完全提交, 就还有办法abort, 而这个abort是通过同一个transactionalId来沟通, 即之前是commit, 现在变更为abort, 而这个txnManager.getTransactionState(transactionalId)所对应的状态也要随之而改变. 例如清空所有之前commit pending, if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {...}


what can i learn from this project

  • 先new类, 然后调用类方法. 这样类方法里面就可以直接用new类的参数
    tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
    tokenManager.startup()
    
  • 可以通过interface/abstract class来快速查看architect/component
  • 从return往回找其产生过程/调用链
  • 当不是很清楚source code某些逻辑时, 可以通过Google找找灵感3

Referene

  1. kafka生产者的蓄水池机制
  2. apache pulsar关注点
  3. Kafka 事务实现原理