经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
KafkaProducer源码分析
来源:cnblogs  作者:每天晒白牙  时间:2019/9/16 9:00:25  对本文有异议

Kafka常用术语

Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求

Topic:主题,Kafka承载消息的逻辑容器,每条发布到Kafka的消息都有对应的逻辑容器,工作中多用于区分业务

Partition:分区,是物理概念,代表有序不变的消息序列,每个Topic由一个或多个Partion组成

Replica:副本,Kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为Leader和Follower,角色不同作用不同,副本是对Partition而言的,每个分区可配置多个副本来实现高可用

Record:消息,Kafka处理的对象

Offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值

Producer:生产者,向主题发送新消息的应用程序

Consumer:消费者,从主题订阅新消息的应用程序

Consumer Offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移

Consumer Group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源

Reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程

下面用一张图展示上面提到的部分概念(用PPT画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)

file

消息生产流程

先来个KafkaProducer的小demo

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. if (args.length != 2) {
  3. throw new IllegalArgumentException("usage: com.ding.KafkaProducerDemo bootstrap-servers topic-name");
  4. }
  5. Properties props = new Properties();
  6. // kafka服务器ip和端口,多个用逗号分割
  7. props.put("bootstrap.servers", args[0]);
  8. // 确认信号配置
  9. // ack=0 代表producer端不需要等待确认信号,可用性最低
  10. // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功
  11. // 消息丢失
  12. // ack=all leader需要等待所有follower成功备份,可用性最高
  13. props.put("ack", "all");
  14. // 重试次数
  15. props.put("retries", 0);
  16. // 批处理消息的大小,批处理可以增加吞吐量
  17. props.put("batch.size", 16384);
  18. // 延迟发送消息的时间
  19. props.put("linger.ms", 1);
  20. // 用来换出数据的内存大小
  21. props.put("buffer.memory", 33554432);
  22. // key 序列化方式
  23. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. // value 序列化方式
  25. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  26. // 创建KafkaProducer对象,创建时会启动Sender线程
  27. Producer<String, String> producer = new KafkaProducer<>(props);
  28. for (int i = 0; i < 100; i++) {
  29. // 往RecordAccumulator中写消息
  30. Future<RecordMetadata> result = producer.send(new ProducerRecord<>(args[1], Integer.toString(i), Integer.toString(i)));
  31. RecordMetadata rm = result.get();
  32. System.out.println("topic: " + rm.topic() + ", partition: " + rm.partition() + ", offset: " + rm.offset());
  33. }
  34. producer.close();
  35. }

实例化

KafkaProducer构造方法主要是根据配置文件进行一些实例化操作

1.解析clientId,若没有配置则由是producer-递增的数字

2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)

3.解析key、value的序列化方式并实例化

4.解析并实例化拦截器

5.解析并实例化RecordAccumulator,主要用于存放消息(KafkaProducer主线程往RecordAccumulator中写消息,Sender线程从RecordAccumulator中读消息并发送到Kafka中)

6.解析Broker地址

7.创建一个Sender线程并启动

  1. ...
  2. this.sender = newSender(logContext, kafkaClient, this.metadata);
  3. this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  4. this.ioThread.start();
  5. ...

消息发送流程

消息的发送入口是KafkaProducer.send方法,主要过程如下

  1. KafkaProducer.send
  2. KafkaProducer.doSend
  3. // 获取集群信息
  4. KafkaProducer.waitOnMetadata
  5. // key/value序列化
  6. key\value serialize
  7. // 分区
  8. KafkaProducer.partion
  9. // 创建TopciPartion对象,记录消息的topic和partion信息
  10. TopicPartition
  11. // 写入消息
  12. RecordAccumulator.applend
  13. // 唤醒Sender线程
  14. Sender.wakeup

RecordAccumulator

RecordAccumulator是消息队列用于缓存消息,根据TopicPartition对消息分组

重点看下RecordAccumulator.applend追加消息的流程

  1. // 记录进行applend的线程数
  2. appendsInProgress.incrementAndGet();
  1. // 根据TopicPartition获取或新建Deque双端队列
  2. Deque<ProducerBatch> dq = getOrCreateDeque(tp);
  3. ...
  4. private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
  5. Deque<ProducerBatch> d = this.batches.get(tp);
  6. if (d != null)
  7. return d;
  8. d = new ArrayDeque<>();
  9. Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
  10. if (previous == null)
  11. return d;
  12. else
  13. return previous;
  14. }
  1. // 尝试将消息加入到缓冲区中
  2. // 加锁保证同一个TopicPartition写入有序
  3. synchronized (dq) {
  4. if (closed)
  5. throw new KafkaException("Producer closed while send in progress");
  6. // 尝试写入
  7. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  8. if (appendResult != null)
  9. return appendResult;
  10. }
  1. private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
  2. // 从双端队列的尾部取出ProducerBatch
  3. ProducerBatch last = deque.peekLast();
  4. if (last != null) {
  5. // 取到了,尝试添加消息
  6. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
  7. // 空间不够,返回null
  8. if (future == null)
  9. last.closeForRecordAppends();
  10. else
  11. return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
  12. }
  13. // 取不到返回null
  14. return null;
  15. }
  1. public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
  2. // 空间不够,返回null
  3. if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
  4. return null;
  5. } else {
  6. // 真正添加消息
  7. Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
  8. ...
  9. FutureRecordMetadata future = ...
  10. // future和回调callback进行关联
  11. thunks.add(new Thunk(callback, future));
  12. ...
  13. return future;
  14. }
  15. }
  1. // 尝试applend失败(返回null),会走到这里。如果tryApplend成功直接返回了
  2. // 从BufferPool中申请内存空间,用于创建新的ProducerBatch
  3. buffer = free.allocate(size, maxTimeToBlock);
  1. synchronized (dq) {
  2. // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加?
  3. // 因为可能已经有其他线程创建了ProducerBatch或者之前的ProducerBatch已经被Sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间
  4. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  5. if (appendResult != null) {
  6. return appendResult;
  7. }
  8. // 尝试添加失败了,新建ProducerBatch
  9. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  10. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
  11. FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
  12. dq.addLast(batch);
  13. incomplete.add(batch);
  14. // 将buffer置为null,避免在finally汇总释放空间
  15. buffer = null;
  16. return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  17. }
  1. finally {
  2. // 最后如果再次尝试添加成功,会释放之前申请的内存(为了新建ProducerBatch)
  3. if (buffer != null)
  4. free.deallocate(buffer);
  5. appendsInProgress.decrementAndGet();
  6. }
  1. // 将消息写入缓冲区
  2. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
  3. if (result.batchIsFull || result.newBatchCreated) {
  4. // 缓冲区满了或者新创建的ProducerBatch,唤起Sender线程
  5. this.sender.wakeup();
  6. }
  7. return result.future;

Sender发送消息线程

主要流程如下

  1. Sender.run
  2. Sender.runOnce
  3. Sender.sendProducerData
  4. // 获取集群信息
  5. Metadata.fetch
  6. // 获取可以发送消息的分区且已经获取到了leader分区的节点
  7. RecordAccumulator.ready
  8. // 根据准备好的节点信息从缓冲区中获取topicPartion对应的Deque队列中取出ProducerBatch信息
  9. RecordAccumulator.drain
  10. // 将消息转移到每个节点的生产请求队列中
  11. Sender.sendProduceRequests
  12. // 为消息创建生产请求队列
  13. Sender.sendProducerRequest
  14. KafkaClient.newClientRequest
  15. // 下面是发送消息
  16. KafkaClient.sent
  17. NetWorkClient.doSent
  18. Selector.send
  19. // 其实上面并不是真正执行I/O,只是写入到KafkaChannel中
  20. // poll 真正执行I/O
  21. KafkaClient.poll

通过源码分析下Sender线程的主要流程

KafkaProducer的构造方法在实例化时启动一个KafkaThread线程来执行Sender

  1. // KafkaProducer构造方法启动Sender
  2. String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
  3. this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  4. this.ioThread.start();
  1. // Sender->run()->runOnce()
  2. long currentTimeMs = time.milliseconds();
  3. // 发送生产的消息
  4. long pollTimeout = sendProducerData(currentTimeMs);
  5. // 真正执行I/O操作
  6. client.poll(pollTimeout, currentTimeMs);
  1. // 获取集群信息
  2. Cluster cluster = metadata.fetch();
  1. // 获取准备好可以发送消息的分区且已经获取到leader分区的节点
  2. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  3. // ReadyCheckResult 包含可以发送消息且获取到leader分区的节点集合、未获取到leader分区节点的topic集合
  4. public final Set<Node> 的节点;
  5. public final long nextReadyCheckDelayMs;
  6. public final Set<String> unknownLeaderTopics;

ready方法主要是遍历在上面介绍RecordAccumulator添加消息的容器,Map<TopicPartition, Deque>,从集群信息中根据TopicPartition获取leader分区所在节点,找不到对应leader节点但有要发送的消息的topic添加到unknownLeaderTopics中。同时把那些根据TopicPartition可以获取leader分区且消息满足发送的条件的节点添加到的节点中

  1. // 遍历batches
  2. for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
  3. TopicPartition part = entry.getKey();
  4. Deque<ProducerBatch> deque = entry.getValue();
  5. // 根据TopicPartition从集群信息获取leader分区所在节点
  6. Node leader = cluster.leaderFor(part);
  7. synchronized (deque) {
  8. if (leader == null && !deque.isEmpty()) {
  9. // 添加未找到对应leader分区所在节点但有要发送的消息的topic
  10. unknownLeaderTopics.add(part.topic());
  11. } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
  12. ....
  13. if (sendable && !backingOff) {
  14. // 添加准备好的节点
  15. readyNodes.add(leader);
  16. } else {
  17. ...
  18. }

然后对返回的unknownLeaderTopics进行遍历,将topic加入到metadata信息中,调用metadata.requestUpdate方法请求更新metadata信息

  1. for (String topic : result.unknownLeaderTopics)
  2. this.metadata.add(topic);
  3. result.unknownLeaderTopics);
  4. this.metadata.requestUpdate();

对已经准备好的节点进行最后的检查,移除那些节点连接没有就绪的节点,主要根据KafkaClient.ready方法进行判断

  1. Iterator<Node> iter = result.readyNodes.iterator();
  2. long notReadyTimeout = Long.MAX_VALUE;
  3. while (iter.hasNext()) {
  4. Node node = iter.next();
  5. // 调用KafkaClient.ready方法验证节点连接是否就绪
  6. if (!this.client.ready(node, now)) {
  7. // 移除没有就绪的节点
  8. iter.remove();
  9. notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
  10. }
  11. }

下面开始创建生产消息的请求

  1. // 从RecordAccumulator中取出TopicPartition对应的Deque双端队列,然后从双端队列头部取出ProducerBatch,作为要发送的信息
  2. Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

把消息封装成ClientRequest

  1. ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,requestTimeoutMs, callback);

调用KafkaClient发送消息(并非真正执行I/O),涉及到KafkaChannel。Kafka的通信采用的是NIO方式

  1. // NetworkClient.doSent方法
  2. String destination = clientRequest.destination();
  3. RequestHeader header = clientRequest.makeHeader(request.version());
  4. ...
  5. Send send = request.toSend(destination, header);
  6. InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);
  7. this.inFlightRequests.add(inFlightRequest);
  8. selector.send(send);
  9. ...
  10. // Selector.send方法
  11. String connectionId = send.destination();
  12. KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
  13. if (closingChannels.containsKey(connectionId)) {
  14. this.failedSends.add(connectionId);
  15. } else {
  16. try {
  17. channel.setSend(send);
  18. ...

到这里,发送消息的工作准备的差不多了,调用KafkaClient.poll方法,真正执行I/O操作

  1. client.poll(pollTimeout, currentTimeMs);

用一张图总结Sender线程的流程

file

通过上面的介绍,我们梳理出了Kafka生产消息的主要流程,涉及到主线程往RecordAccumulator中写入消息,同时后台的Sender线程从RecordAccumulator中获取消息,使用NIO的方式把消息发送给Kafka,用一张图总结

file

后记

这是本公众号第一次尝试写源码相关的文章,说实话真不知道该如何下笔,代码截图、贴整体代码等感觉都被我否定了,最后采用了这种方式,介绍主要流程,把无关代码省略,配合流程图。

上周参加了华为云kafka实战课程,简单看了下kafka的生产和消费代码,想简单梳理下,然后在周日中午即8.17开始阅读源码,梳理流程,一直写到了晚上12点多,还剩一点没有完成,周一早晨早起完成了这篇文章。当然这篇文章忽略了很多更细节的东西,后面会继续深入,勇于尝试,不断精进,加油!

参考资料

华为云实战

极客时间kafka专栏

原文链接:http://www.cnblogs.com/dingaimin/p/11523787.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号