消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
1-1 消息系统是什么
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,而不必担心如何共享数据。分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种类型的消息传递模式可用:一种是点对点,另一种是发布-订阅 (pub-sub) 消息传递系统。大多数消息传递模式都遵循pub-sub。
消息系统分类
-
Peer-to-Peer(点对点,一对一)
- 一般基于Pull接受消息
- 发送到队列中的消息被一个且仅仅一个接收者所接收,即便有多个接收者在同一队列中侦听同一消息
- 支持异步“即发即弃”的消息传递方式,也支持同步请求/应答传送方式
-
发布/订阅(一对多)
- 发布到一个主题的消息可以被多个订阅者接收
- 发布/订阅模式既可基于Push消费数据,也可基于Pull消费数据
- 解耦能力比P2P模型更强
为这两种模型提供了单一的消费者抽象模型: 消费者组 ()。 消费者用一个消费者组名标记自己。 一个发布在上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了P2P模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
1-2 为什么要使用消息系统
- 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 易扩展
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 - 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 - 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。 - 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1-3 常用消息队列对比
-
Redis
Redis是一个基于Key-Value对的NoSQL数据库,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,可以当做一个轻量级的队列服务来使用。 Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性不是那么严格可以尝试使用。
-
轻量级,不需要单独的消息服务器或中间件,应用程序将扮演这个服务器角色,它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高。特点是高性能、跨平台(支持Linux、Windows、OS X等)、多语言支持(C、C++、Java、Python等30多种语言)、可单独部署或集成到应用中使用。
-
一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。
-
RocketMQ是阿里开源的消息中间件,纯Java实现的发布/订阅消息系统,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是简单的复制,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里多次双十一活动。
因为是阿里内部从实践到产品的产物,因此里面很多接口、api并不是很普遍适用。可靠性毋庸置疑,而且与Kafka一脉相承(甚至更优),性能强劲,支持海量堆积。
横向对比
**
官网:https://kafka.apache.org/
Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
2-1 kafka安装
6.解压:
2-2 启动Kafka
(1)基于zookeeper启动
首先需要启动zookeeper,可以先通过命令下载最新版本zookeeper,搭建zookeeper集群。但Kafka内置了一个zookeeper,也可以使用这个自带的zookeeper,以下演示是使用Kafka自带的zookeeper进行启动。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QNZmDuuQ-1650887263842)(Kafka.assets/image-20211015152747197.png)]
再启动Kafka服务:
首先使用为新集群生成ID:
最后,启动Kafka服务:
2-3 基本概念
-
Producer
消息和数据的生产者,向Kafka的一个Topic发布消息的进程/代码/服务。
-
Consumer
消息和数据的消费者,订阅Topic并且处理其发布的消息的进程/代码/服务。
-
Consumer Group
逻辑概念,由多个 consumer 组成。消费者组是Kafka实现单播和广播两种消息模型的手段。对于同一个topic,会广播给不同的消费组,但一个消费组中只有一个消费者可以消费该消息,消费者组之间互不影响。消费者组中的每个消费者,都会实时记录自己消费到哪个offset,以便在出错恢复中,从上次的位置继续消费。
-
Broker
服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。
-
- 物理概念,Kafka下数据存储的基本单元。一个Topic数据会被分散存储到多个Partition,每个Partion是有序的
- 一个partition只分布于一个Broker上(不考虑备份)
- 一个Partition物理上对应一个文件夹
- 一个Partition包含多个Segment,一个Segment对应三个文件
- Segment由一个个不可变记录组成
- 记录只会被append到Segment中,不会被单独删除或者修改
- 清除过期日志时,直接删除一个或多个Segment(默认情况,Kafka会将数据保留168小时,也可以设置每个分区所能保存数据的大小,超过这个大小之后将旧数据删除掉)
kafka采用了分片和索引机制,将每个partition分为多个segment。segment包括偏移量索引文件、日志文件、时间戳索引文件(如果需要按照时间来定位消息的offset,会先在这个文件里查找)以及可能的其他可能文件(比如事务索引文件)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:test这个topic有三个分区,则对应的文件夹为test-0,test-1,test-2。
index和log文件以当前segment的第一条消息的offset命名,一个segment默认存储1GB。
每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
通常情况下,越多的partition会带来越高的吞吐量,但是同时也会给broker节点带来相应的性能损耗和潜在风险,虽然这些影响很小,但不可忽略,因此需要根据自身broker节点的实际情况来设置partition的数量以及replica的数量。
-
Replica:Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。
- 当某个Topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)
- Replica的个数小于等于Broker数,即对每个Partition而言,每个Broker上最多有一个副本,因此可用Broker ID表示Replica
- 所有Partiton的Replica默认情况会均匀分布到所有Broker上
-
Leader:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是leader。
-
Follower:每个分区多个副本的"从",实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,通过推举算法某个follower就会成为新的leader。
同一分区的不同副本中保存的是相同的消息,副本之间是“一主多从”的关系。在 Kafka 中,追follower副本是不对外提供服务的,任何一个follower副本都不能响应消费者和生产者的读写请求。所有的请求都必须由leader副本来处理,或者说,所有的读写请求都必须发往leader者副本所在的 Broker,由该 Broker 负责处理。follower副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
- ISR(In-sync Replicas):ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
另外,能够进入到 ISR 的追随者副本要满足一定的条件: Follower 副本能够落后 Leader 副本的最长时间间隔,超出这个时间就不会放到ISR中。
2-4 Kafka架构
- Broker注册
用一个专门节点保存服务列表,也就是 。在启动时,向发送注册请求,会创建这个节点,节点路径为;并保存的地址和端口。
这个节点属性为临时节点,一旦宕机,zookeeper将删除这个临时节点。
一个的消息会被保存到多个,这些跟的对应关系也需要保存到。
- ids 节点:记录该消费组中当前正在消费的消费者;
- owners 节点:记录该消费组消费的 topic 信息;
- offsets 节点:记录每个 topic 的每个分区的 offset。
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听 下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
4. 负载均衡
向进行注册后,生产者根据节点来感知服务列表变化,这样可以实现动态负载均衡。
中的消费者,可以根据节点信息来拉取特定分区的消息,实现负载均衡。
5. Controller选举
集群中会有一个被选举为负责跟进行交互,它负责管理整个集群中所有分区和副本的状态。其他监听节点的数据变化。
的选举工作依赖于,选举成功后,会创建一个临时节点。
选举成功后,会从集群中拉取一份完整的元数据初始化,这些元数据缓存在节点。当集群发生变化时,比如增加分区,不仅需要变更本地的缓存数据,还需要将这些变更信息同步到其他。
监听到事件、定时任务事件和其他事件后,将这些事件按照先后顺序暂存到中,由事件处理线程按顺序处理,这些处理多数需要跟交互,
则需要更新自己的元数据。
6. 记录消费进度Offset
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的消费进度 Offset 记录到 ZooKeeper上,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。
注意:Kafka已推荐将consumer的Offset信息保存在 Kafka 内部的 topic 中,即:
并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer信息。
7. 记录Partition和Consumer的关系
consumer group 下有多个 consumer(消费者),对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的 ,group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其它 group)。同时,Kafka 为每个消费者分配一个 consumer ID,通常采用 hostname:UUID 形式表示。
在Kafka中,规定了每个 partition 只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper 上记录下 partition 与 consumer 之间的关系,每个 一旦确定了对一个 的消费权力,需要将其 consumer ID 写入到 ZooKeeper 对应消息分区的临时节点上,例如:
其中,[broker_id-partition_id] 就是一个消息分区的标识,节点内容就是该消息分区消费者的 consumer ID。
基于zookeeper模式的最重要的配置文件server.properties的配置详解如下:
2.基于Kraft架构
Kafak 团队把通过 Raft 协议同步数据的方式 ,简称 KRaft。
Quorum节点状态机
在KRaft模式下,Quorum中的一个节点可以处于以下3种状态之一。
- Candidate(候选者)——主动发起选举;
- Leader(领导者)——在选举过程中获得多数票;
- Follower(跟随者)——已经投票给Candidate,或者正在从Leader复制日志;
领导选举
当满足以下三个条件之一时,Quorum中的某个节点就会触发选举:
1.向Leader发送请求后,在超时阈值quorum.fetch.timeout.ms之后仍然没有得到响应,表示Leader疑似失败;
2.从当前Leader收到了EndQuorumEpoch请求,表示Leader已退位;
3.Candidate状态下,在超时阈值quorum.election.timeout.ms之后仍然没有收到多数票,也没有Candidate赢得选举,表示此次选举作废,重新进行选举。
选举过程
在 Raft 算法中,随机超时时间是有 2 种含义的:
- 跟随者等待领导者心跳信息超时的时间间隔,是随机的;
- 当没有候选人赢得过半票数,选举无效了,这时需要等待一个随机时间间隔,也就是说,等待选举超时的时间间隔是随机的。
从以上的选举过程看,我们知道在 Raft 中的选举中是有任期机制的,顾名思义,每一任领导者,都有它专属的任期,当领导者更换后,任期也会增加,Raft 中的任期还要注意以下个细节:
-
如果某个节点,发现自己的任期编号比其他节点小,则会将自己的任期编号更新比自己更大的值;
-
从上面的选举过程看出,每次推荐自己成为候选人,都会得到自身的那一票;
-
如果候选人或者领导者发现自己的任期编号比其它节点好要小,则会立即更新自己为跟随者。
这点很重要,按照我的理解,这个机制能够解决同一时间内有多个领导者的情况,比如领导者 A 挂了之后,集群其他节点会选举出一个新的领导者 B,在节点 B 恢复之后,会接收来自新领导者的心跳消息,此时节点 A 会立即恢复成跟随者状态;
配置文件
Kraft目录下server.properties的配置文件详解如下:
2-5 为什么要弃用zookeeper
实际上,问题不在于ZooKeeper本身,而在于外部元数据管理的概念上。Kafka本身就是一个分布式系统,但是需要另一个分布式系统来管理,复杂性无疑增加了:
-
运维复杂度
使用了,部署的时候必须要部署两套系统,的运维人员必须要具备的运维能力。
摒弃zookeeper的Kafka部署也会变得更加简单。
-
Controller故障处理
依赖一个单一节点跟进行交互,如果这个节点发生了故障,就需要从中选择新的。新的选举成功后,会重新从拉取元数据进行初始化,并且需要通知其他所有的更新。老的需要关闭监听、事件处理线程和定时任务。分区数非常多时,这个过程非常耗时,而且这个过程中集群是不能工作的。
Kraft模式中每个controller都拥有几乎update-to-date的Metadata,所以controller集群重新选主时恢复时间很短。
-
分区瓶颈
当分区数增加时,保存的元数据变多,集群压力变大,达到一定级别后,监听延迟增加,给的工作带来了影响。
官方介绍,可以轻松支持百万级别的分区,partition的主选举将变得更快捷。
在大规模集群和云原生的背景下,使用给的运维和集群性能造成了很大的压力,去除的必然趋势。
2-6 可视化工具
下载地址:https://www.kafkatool.com/download.html
提示配置相应的连接信息: