您好!欢迎来到爱源码

爱源码

热门搜索: 抖音快手短视频下载   

Kafka Adventure-制作人源代码分析-核心组件 《企业网站源码》

  • 时间:2022-07-07 01:21 编辑: 来源: 阅读:316
  • 扫一扫,手机访问
摘要:Kafka Adventure-制作人源代码分析-核心组件 《企业网站源码》
在序言中,我们说Kafka是一个消息队列,但实际上,更准确地说,它是Broker的核心组件。 为什么这么说?你会发现,我们可以通过控制台,Java代码,C++代码,甚至Socket来给Broker写消息。只要我们遵循卡夫卡写消息的协议,我们就可以向卡夫卡队列发送消息。 Kafka用专业的方式定义了应用层网络协议。只要我们基于传输层构造符合这个协议的数据,就是合法的Kafka消息。 所以image说我们只写Kafka消息给一个生产者的客户端,它的形式多种多样,有Java,Python,C++等实现。那么,我们是否需要在每次发送消息时,自己实现这一套消息发送协议呢?显然,卡夫卡官方已经考虑到了这个问题。为了给我们提供一个可以开箱即用的消息队列,官方帮助我们用各种语言编写了高质量的生产者实现,比如我们今天要讨论的Java版本。 如前所述,卡夫卡已经帮助我们实现了各种版本的生产者代码。事实上,他可能根本不会提供这个代码。既然实现了核心队列功能,那么客户端代码也完全可以由客户来实现。 那么如果没有官方代码,我们应该实现什么功能,什么接口,什么方法,如何组织这些代码呢? 带着这样的疑问一起来思考吧!一般这种有数据流的设计谁会出?什么数据?去哪里?如何保证可靠访问?考虑这些方面。 自然,消息由应用程序构造并提供给生产者。生产者必须首先知道消息需要发送到哪个代理的哪个主题,以及主题的具体分区。 然后需要配置客户端的Broker集群地址,要发送的主题的名称,消息的分区策略,是通过key hash分配给特定的分区还是不同的分区。 知道了消息的去向,还需要知道发送的是什么格式的消息,是字符串还是数字还是序列化的二进制对象。 消息的序列化需要将消息序列化为字节数组,以便于在网络上传输。因此,配置生产者的消息序列化策略,最好是通过传递枚举或类名来自动构造序列化器,以便于后续序列化过程的扩展。 消息队列经常用于多个系统之间的异步调用,所以这种调用关系没有很强的实时依赖性。 因为向Kafka发送消息会产生网络I/O,比较耗费时间,那么除了同步调用之外,是否可以将消息发送动作设置为异步,以提高生产者的吞吐量? 并且对于大量的消息发送场景,我们可以设置一个窗口,可以是时间维度,也可以是消息数量维度,将消息累加,批量发送,减少网络I/O数量,提高吞吐量。 最后,为了保证消息能够最大程度的成功发送到代理,我们还需要少量的失败重试机制,比如在失败后将其放入重试队列,每隔一段时间再尝试发送一次。 此外,这里还整理了少量的Kakfa笔记,以理清我们的思路。通过上面的分析,我们会有一个大概的了解,应该采用什么方法,基础设计会分成哪些部分。 但是不够清楚,不够明确。 首先,总结实现客户端的要点:配置Broker的基本信息:集群地址、主题、分区消息序列化,通过可扩展序列化器实现消息异步写入缓冲区,实现网络I/O线程发送消息失败重试机制。画出每个核心模块以及它们之间的交互顺序:image客户设置Kafka集群信息,producer从Kafka Broker拉取可用Kafka节点、主题、分区的对应关系。 缓存在生产者成员变量中,如果代理集群被扩展,或者如果代理有一台机器离线,它需要重新获取这些服务信息。 客户端根据客户端设置的序列化程序序列化消息,然后将消息异步写入客户端缓冲区。 在缓冲区中的消息数量达到某个数量或某个时间窗口后,网络I/O线程从缓冲区中取出消息,并将它们发送给代理。 这是我对一个卡夫卡制作人的实现的思考。接下来我们来看看官方的代码设计和我们的思维有什么不同,他为什么要这么设计。 其实经过以上的思考和整理,我们的设计已经非常接近卡夫卡的官方设计,官方模块更加细化,功能更加独立。 核心组件首先看一下KafkaProducer类中的成员变量,它们是Producer的核心组件。 image的核心字段解释如下:clinetId:标识发送方Idmetric:统计指示符partitioner: Partitioner用于确定消息发送到哪个分区。 如果有键,就按照键的哈希;否则使用roundrobinkey/value Serializer:Message key/value Serializer interceptors:发送前后消息的统一解maxRequestSize:可以发送的最大消息。默认值为1M,这会影响消息记录的大小。这个值在服务器端也是有限的。 MaxBlockTimems:当缓冲区已满或等待元数据信息时,超时的补偿机制Accumulator:Accumulator Network client:Packed网络层sender:当网络I/O线程发送消息时,数据是如何在这些组件之间流动的?imageProducer调用send方法后,当从Broker获取的元数据有效时,经过分块和序列化后,由分区放在一个缓冲区的特定位置。缓冲区由ConcurrentHashMap组成,键是subject分区,值是用于存储消息的deque缓存块。 站在用户的角度,如果不需要关心发送结果,发送过程就结束了。 接下来,独立的发送方线程负责从缓冲区获取足够的数据,并调用网络客户端封装层来实际发送数据。这里使用Java8的NIO网络模型来发送数据。 可以看出,整个逻辑的关键点在于RecordAccumulator如何缓存消息。一般来说,成熟的框架和中间件都有自己的内存管理机制。例如,Netty也有一个复杂而精致的内存管理通用层。这里的缓冲区也是如此。主要是要看卡夫卡是怎么管理记忆的。 此外,我们还需要注意发送方使用什么样的逻辑从缓冲区获取数据,从而实现尽可能少的网络交互,发送尽可能多的数据。 以及网络出现故障时如何保证数据的可靠性。 这个地方也是我们设计和官方实现的差距,网络I/O的精心优化。 目前来看,已经挺长了。为了阅读方便,本文主要思考如何设计一个卡夫卡制作人,以及官方是如何实现的。我们之间的差距在哪里,应该多注意些什么? 通过自己的思考和比较,认识到自己的不足,学习新的点!作者:徐碧碧参考链接:https://www.cnblogs.com/lwen/p/14300608.html


  • 全部评论(0)
资讯详情页最新发布上方横幅
最新发布的资讯信息
【域名/主机/服务器|】qq邮箱提醒在哪里打开(2024-06-04 18:58)
【技术支持|常见问题】1556原创ng8文章搜索页面不齐(2024-05-01 14:43)
【技术支持|常见问题】1502企业站群-多域名跳转-多模板切换(2024-04-09 12:19)
【技术支持|常见问题】1126完美滑屏版视频只能显示10个(2024-03-29 13:37)
【技术支持|常见问题】响应式自适应代码(2024-03-24 14:23)
【技术支持|常见问题】1126完美滑屏版百度未授权使用地图api怎么办(2024-03-15 07:21)
【技术支持|常见问题】如何集成阿里通信短信接口(2024-02-19 21:48)
【技术支持|常见问题】算命网微信支付宝产品名称年份在哪修改?风水姻缘合婚配对_公司起名占卜八字算命算财运查吉凶源码(2024-01-07 12:27)
【域名/主机/服务器|】帝国CMS安装(2023-08-20 11:31)
【技术支持|常见问题】通过HTTPs测试Mozilla DNS {免费源码}(2022-11-04 10:37)

联系我们
Q Q:375457086
Q Q:526665408
电话:0755-84666665
微信:15999668636
联系客服
企业客服1 企业客服2 联系客服
86-755-84666665
手机版
手机版
扫一扫进手机版
返回顶部