Overview
上次的mq看了pulsar, 这次看看更多start的kafka, 因为组件相对少(bookeeper), 所以code相对更多(自处理), 另外总体感觉source code(v2.5.0)比较复杂. wrap了很多层.
比如system.exit()
和addShutdownHook()
这是一个简单的draft, 也没有画图, 之后有机会再补充.
kafka消息生成流程, credit: flykinghg1
startup
args
new KafkaConfig
- val NumPartitions = 1
- val LogDir = "/tmp/kafka-logs"
new KafkaServerStartable()
kafkaServerStartable.startup()
----
// 线程池工厂, ScheduledThreadPoolExecutor
kafkaScheduler = new KafkaScheduler()
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
// 关于record/log的周期性flush和cleanup
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
LogManager -> loadLogs() -> loadLog() -> this.currentLogs.put(topicPartition, log)
java nio file spi api package
// Handles new connections, requests and responses to and from broker.
socketServer = new SocketServer(
socketServer.startup()
// 事务
transactionCoordinator = TransactionCoordinator()
transactionCoordinator.startup()
// admin api
new KafkaApis()
new KafkaRequestHandlerPool()
// 动态配置监听器
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
ZooKeeperClientWatcher()
partitionsInitializing.replace(topicPartition, false, true) // 更新为true(dirty了), would reload later
shutdown
// one thread
kafkaServerStartable.shutdown (private var shutdownLatch = new CountDownLatch(1))
def shutdown(): Unit = {}
shutdownLatch.countDown()
// main thread
// will hold until kafkaServerStartable.shutdown be called(shutdownLatch equal zero)
kafkaServerStartable.awaitShutdown()
def awaitShutdown(): Unit = shutdownLatch.await()
client
producer
ProducerConfig
// 0.
ConsoleProducer.main()
new ProducerConfig(args)
// 0. 初始化, 关键点KEY
new KafkaProducer()
- this.accumulator = new RecordAccumulator() // 累积器
- this.sender = new Sender(this.accumulator) // flush器
- this.ioThread = new KafkaThread(ioThreadName, this.sender).start() // 异步flush
// 0.
new ProducerRecord(topic, msg.getBytes)
// 0.
send(producer, record).doSend()
// 0. get kafka cluster info/metadata
cluster = waitOnMetadata().cluster
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key())
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value())
if (partition == null){
partition = partition(record, serializedKey, serializedValue, cluster)
} else{
allPartitions = cluster.partitionsForTopic(topic)
1. DefaultPartitioner // murmur2 % numPartitions
2. RoundRobinPartitioner // topicCounterMap.get(topic).incr() % numPartitions (类似random % numPartitions)
3. UniformStickyPartitioner // pick by topic, then random then cache, similar to DefaultPartitioner but with topic cache picker
}
// 0.
tp = new TopicPartition(record.topic(), partition)
// 0. 因为是batch模式, 所以每条新产生的record都会先累积起来
// 这里是异步, 桥梁是**RecordAccumulator**, 一些线程RecordAccumulator.batches.append负责累积, 一些线程Sender.RecordAccumulator.flush负责落盘.
// 累积
result = RecordAccumulator.append(tp, serializedKey, serializedValue)
ProducerBatch(tp).tryAppend(key, value)
appendDefaultRecord(offset, timestamp, key, value, headers)
DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers)
offset = nextSequentialOffset(): Long{
lastOffset == null ? baseOffset : lastOffset + 1
}
lastOffset = offset;
// flush
sendProduceRequests(batches, now)
requestBuilder = ProduceRequest.Builder.forMagic()
clientRequest = client.newClientRequest(nodeId, requestBuilder)
client.send(clientRequest, now)
consumer
ConsumerConfig
// 0. ConsoleConsumer.main() new ConsumerConfig(args)
// 0. 初始化 new KafkaConsumer()
- this.fetcher = new Fetcher() new ConsumerWrapper(consumer) // mainly for its recordIter
- consumerInit() // binding topic partition, and offset
- this.subscriptions.assignFromUser(new HashSet<>(partitions))
- this.subscriptions.seekUnvalidated(partition, newPosition); // 设定offset
// 0. receive msg = consumer.receive().poll() records = pollForFetches()
- fetcher.sendFetches(); // 发送request到cluster取数, 并存放到buffer
- this.completedFetches.CompletedFetch.batches.currentBatch.streamingIterator()
- client.send(fetchTarget, request) // 网络, async
- fetcher.fetchedRecords()
- records = fetchRecords(nextInLineFetch, recordsRemaining)
- lastRecord = nextFetchedRecord()
- records = currentBatch.streamingIterator(decompressionBufferSupplier)
- DefaultRecordBatch() // 从之前的fetcher-buffer里面读bytes
- buffer = this.buffer.duplicate()
- new RecordIterator()
- record = records.next()
- DefaultRecordBatch() // 从之前的fetcher-buffer里面读bytes
- records = currentBatch.streamingIterator(decompressionBufferSupplier)
transaction
from ExactlyOnceMessageProcessor.java
producer.initTransactions()
try {
producer.beginTransaction()
for() {
producer.send(customizedRecord)
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata())
producer.commitTransaction()
}
catch {
producer.abortTransaction()
}
producer client send
-
producer.sendOffsetsToTransaction -> new AddOffsetsToTxnHandler(new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN))
- producer.commitTransaction -> new EndTxnHandler(new EndTxnRequest(new EndTxnRequestData(id=commit, ApiKeys.END_TXN)))
- new InitProducerIdHandler(new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID))
- producer.commitTransaction -> new EndTxnHandler(new EndTxnRequest(new EndTxnRequestData(id=abort, ApiKeys.END_TXN)))
- new InitProducerIdHandler(new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID))
通过
pendingRequests
来沟通.
broker server receive
- case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
- case ApiKeys.END_TXN => handleEndTxnRequest(request)
- case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
- txnCoordinator.handleEndTransaction()
- 这里也是异步的, 一个sendResponseCallback()将call放到responseQueue里面, 再通过processNewResponses()来处理
- preAppendResult
- txnManager.getTransactionState(transactionalId)
- txnMetadata.inLock { … }
- Left(Errors.CONCURRENT_TRANSACTIONS) // 同一个(transactionalId, partitionId)下面的transaction不能并行
- Ongoing
- PrepareCommit
- PrepareAbort
- txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds())
- CompleteCommit
- CompleteAbort
- PrepareCommit
- PrepareAbort
- txnManager.appendTransactionToLog()
- sendTxnMarkersCallback()
- PrepareCommit
- PrepareAbort
- txnMetadata.prepareComplete()
- replicaManager.appendRecords()
-updateCacheCallback()
- metadata.completeTransitionTo(newMetadata)
当要commit未完全提交, 就还有办法abort, 而这个abort是通过同一个transactionalId来沟通, 即之前是commit, 现在变更为abort, 而这个txnManager.getTransactionState(transactionalId)
所对应的状态也要随之而改变. 例如清空所有之前commit pending, if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {...}
what can i learn from this project
- 先new类, 然后调用类方法. 这样类方法里面就可以直接用new类的参数
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient) tokenManager.startup()
- 可以通过interface/abstract class来快速查看architect/component
- 从return往回找其产生过程/调用链
- 当不是很清楚source code某些逻辑时, 可以通过Google找找灵感3