kafka顺序消费(kafka顺序消费问题)

健身运动 2024-07-24 09:53:47

Kafka使用场景

由上可知能引起reblanc总结:e无非下面三种情况:

Kafka作为一个传统的消息的替代品表现得非常出色。使用消息有各种各样的原因(将处理与数据生成器解耦,缓冲未处理的消息,等等)。与大多数消息传递系统相比,Kafka有更好的吞吐量、内置分区、和容错性,这使得它成为大规模消息处理应用的一个很好的解决方案。

kafka顺序消费(kafka顺序消费问题)kafka顺序消费(kafka顺序消费问题)


kafka顺序消费(kafka顺序消费问题)


在这个领域,Kafka可以与ActiveMQ或RabbitMQ等传统消息传递系统相媲美。

Kafka最初的用例是能够重建一个用户活动跟踪管道,作为一组实时发布-提要。这意味着站点活动(页面浏览、搜索或用户可能采取的其他作)被发布到中心主题,每个活动类型有一个主题。这些提要可用于一系列用例,包括实时处理、实时监视和加载到Hadoop或脱机数据仓库系统以进行脱机处理和报告。

活动跟踪通常是非常大的量,因为许多活动消息会生成的每个用户页面视图。

Kafka通常用于运行数据。这涉及聚合来自分布式应用程序的统计信息,以生成集中的作数据提要。

许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常收集上的物理日志文件,并将它们放在一个中心位置(可能是文件或HDFS)进行处理。Kafka抽象了文件的细节,并以消息流的形式对日志或数据进行了更清晰的抽象。这允许低延迟处理,并更容易支持多个数据源和分布式数据消费。与以日志为中心的系统如Scribe或Flume相比,Kafka提供了同样好的性能,由于而更强的持久性保证,以及更低的端到端延迟。

很多Kafka的用户在处理数据的管道中都有多个阶段,原始的输入数据会从Kafka的主题中被消费,然后被聚合、充实或者转换成新的主题进行进一步的消费或者后续的处理。例如,文章的处理管道可能会从RSS源抓取文章内容,并将其发布到“文章”主题;进一步的处理可能会规范化或删除该内容,并将清理后的文章内容发布到新主题;一个处理阶段可能会尝试向用户这些内容。这种处理管道基于单个主题创建实时数据流图。从0.10.0.0开始,Apache Kafka提供了一个轻量级但功能强大的流处理库,名为Kafka Streams,用于执行上述的数据处理。除了Kafka Streams,其他开源流处理工具包括Apache Storm和Apache Samza。

溯源是一种应用程序设计风格,其中将状态更改记录为按时间顺序排列的记录序列。Kafka支持非常大的存储日志数据,这使得它成为这种风格的应用程序的后端。

Kafka可以作为分布式系统的一种外部提交日志。日志有助于在之间数据,并充当1、持久性(durability)故障的重新同步机制,以恢复它们的数据。Kafka的日志压缩特性支持这种用法。在这种用法中,Kafka类似于Apache BookKeeper项目。

软件开发中的Kafka和数据库的关系是什么呢?

5. 分区与消费者 的关系 在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时上,例如:

首先明确说明Kafka不是数据库,它没有schema,也没有表,更没有索引。

Follower: 从副本,相对于主副本,从副本只同步主副本数据,不提供读写服务。

1.它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。

在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为的consumer消费这个日志——Kafka分区的顺序性保证了app端更新作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新作,

那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,我们依次讨论下ACID。

数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Ja中有AtomicInteger这样的类能够提供线程安全的整数作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,

第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:

3、隔离性(isolation)

4、一致性(consistency)

希望能帮到你,谢谢!

kafka重复消费的问题

其中1和2我们可以人为或约定规范的方式来减少rTopic: Kafka中的消息维度,一个Topic类似一个queue。生产者将消息发送到特定的Topic,消费者通过Topic进行消费。eblance的情况发生,但是3是引起reblance的最常见原因。

Commit cannot be completed since the group has al在 Kafka 中,用一个文件夹存储一条消息队列,成为一个 Log,每条消息队列由多个文件组成,每个文件称为一个 LogSegment,每当一个 LogSegment 的大小到达阈值,系统就会重新生成一个 LogSegment;当旧的 LogSegment 过期需要清理时(虽然磁盘空间相对于内存会宽裕很多,我们可以保存更长时间的消息数据,比如一周,以供消费者更灵活的使用,但还是需要定期清理太老的数据),系统会根据清理策略删除这些文件。ready rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Kafka 消费者组成员reblance机制

零拷贝:

由于消费者组了topic,因topic partition数和消费者组成员个数不同而存在的分配机制。

其中,[broker_id-partition_id]就是一个 消息分区 的标识,内容就是该 消息分区 上 消费者的Consumer ID。

reblance 过程需要 Gr本文介绍了 Kafka 的队列实现以及其读写过程。Kafka 认为作系统级别的文件缓存比 Ja 的堆内存更省空间和高效,如果生产者消费者之间比较「和谐」的话,大部分的读写作都会落在文件缓存,且在顺序读写的情况下,硬盘的速度并不慢,因此选择直接写磁盘文件的方式存储队列。在队列的读写过程中,Kafka 尽可能地使用顺序读写,并使用零拷贝来优化性能。,Kafka 让消费者自己控制消费位置,提供了更加灵活的数据消费方式。oup Coordinator 的参与。

上面描述了 Group Coordinator 的作用,那 新消费者组创建的时候是如何选择自己的 Group Coordinator 的?

reblance 发生时,Group下的所有成员都会协调在一起共同参与,kafka能够保证公平的分配。但是在reblance过程中,Group下的所有成员实例都会停止消费,直到reblance完成。

reblance主要分为两个作,加入组(join group)和组信息同步(sync group)。

除了消费者成员正常的添加和停止之外,还有些情况下 Coordinator 会错误的认为消费者组成员已停止而将其 踢出组 以致发生reblance。

在描述会发生上述误reblance之前,先解释下consumer端的几个参数:

通过上述三个参数可知,引起误reblance的有以下两种情况:

消息中间件之Kafka

2.利用Page Cache(页高速缓冲存储器,简称页高缓)空中接力的方式来实现高效读写,作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是作系统自己管理的缓存。原理就是Page Cache可以把磁盘中的数据缓存到内存中,把对磁盘的访问改为对内存的访问。

Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka 上。

队列模型了解吗?Kafka 的消息模型知道吗?

Consumer Group: 消费者,一个消费者组可以包含一个或者多个消费者。使用多分区 + 多消费者的方式,可以极大提高下游系统处理速度。同一消费者组中的消费者不会重复消费消息,不同的消费者组之间不会互相影响,都能收到全部消息。kafka就是通过消费组来实现P2P模式和广播模式的。

Broker: Kafka 。

Partition: 分区,分区是属于Topic逻辑概念下的一个分区,每个分区只属于一个Topic,一个Topic通常有多个分区,每个分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件,消息在被追加到分区日志文件时,会分配一个特定的便宜了(offset)。

Offset: 分区中的消息的标识,用它来保证消息在分区内的顺序性,offset不跨分区,也就是说,Kafka保证消息在分区内的有序性,不保证消息在Topic下的有序性

Replication: 副本,是Kafka保证数据高可用的方式。同一Partition的数据可以在多个Broker(kafka)上存在多个副本,通常只有主副本提供读写服务,当主副本发生故障,Kafka会在Controller的管理下,选择新的副本作为主副本提供读写服务

Record: 写入kafka中的消息,每个消息包含了key、value和timestamp。

生产者-消费者是一种设计模式,是在生产者和消费者之间添加一个中间件来达到解耦的目的。

Zookeeper是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服务、同步服务和命名注册等能力。任何分布式服务都需要一种协调任务的方法,Kafka使用Zookeeper来进行任务协调,也有一些其他技术具有自己的内置任务协调机制。

Kafka将Broker、Topic和Partitin的元数据存储在Zookeeper上。

选举过程: Broker首先尝试读取/controller中的brokerid值,如果brokerid值不为-1,表示已经存在Broker当选Controller,否则尝试创建/controller,创建成功后将当前brokerid写入/controller,作为 activeControllerId

主要职责: controller选举出来作为整个Broker集群的管理者,管理所有集群信息和元数据。

Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。

顺序写:

PageCache: producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少

网络模型: Kafka基于NIO,采用Reactor线程模型,实现了自己的RPC通信。 一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。

批量与压缩: Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

分区并发: Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。

文件结构:

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的作就不需要作磁盘 IO。

Kafka 充分利用二分法来查找对应 offset 的消息位置

和其他消息队列相比,Kafka的优势在哪里?

Kafka 如Controller是从Broker中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生变化,由Controller负责为该分区选举新的 leader 副本。当某个分区的同步副本发生变化时,由Controller负责通知所有Broker更新元数据信息。何保证消息不重复消费?

kafka出现消息重复消费的原因:

解决方案:

参考1: Kafka性能篇:为何Kafka这么"快"?

参考2: Kafka原理篇:图解kakfa架构原理

Kafka 消费者组成员reblance机制

File -->Project Structure -->Artifacts --> Jar --> From module with dependencies.

由于消费者组了topic,因topic partition数和消费者组成员个数不同而存在的分配机制。

partition的offset信息的存储方式在Kafka不同版本中是不一样的:

reblance 过程需要 Group Coordinator 的参与。

上面描述了 Group Coordinator 的作用,那 新消费者组创建的时候是如何选择自己的 Group Coordinator 的?

reblance 发生时,Group下的所有成员都会协调在一起共同参与,kafka能够保证公平的分配。但是在reblance过程中,Group下的所有成员实例都会停止消费,直到reblance完成。

reblance主要分为两个作,加入组(join group)和组信息同步(sync group)。

除了消费者成员正常的添加和停止之外,还有些情况下 Coordinator 会错误的认为消费者组成员已停止而将其 踢出组 以致发生reblance。

在描述会发生上述误reblance之前,先解释下consuregisterAllEndpoints方法将解析的KafkaListener封装到KafkaListenerEndpointDescriptor,然后注册到list里。registerListenerContainer为每一个KafkaListenerEndpointDescriptor生成一个MessageListenerContainermer端的几个参数:

通过上述三个参数可知,引起误reblance的有以下两种情况:

关于kafka消费者的命令

sh kafka-consumer-groups.sh --bootstra由上可知能引起reblance无非下面三种情况:p-server "ip:端口,ip:端口,ip:端口" --group 组 --describe

我在本地idea起了两个进程

那么CONSUMER-ID会有两个,但是CLIENT-ID依旧是一个。

怀疑CLIENT-ID如果ip不同的话,会不同。那这样写作发生在生产者向队列生产消息时,在上篇文章讲网络通信时我们已经说到,所有的客户端请求会根据协议转到一ISR :In-Sync Replicas 副本同步队列个 Handler 来具体处理,负责写作的 Handler 叫 ProducerHandler,整个写请求的流程如下:的话我重新打个jar包放到其他地方执行,是否就会显示不一样呢?

通过idea的

Build --> Build Artifacts

等一通作生成了jar包放在了其他机器上跑

发现CLIENT-ID依然一样

另外测试过sh kafka-console-consumer.sh --bootstrap-server "xx" --topic xx

消费数据时,再使用sh kafka-consumer-groups.sh 来检查,并不会有关于CONSUMER-ID和CLIENT-ID,即没有识别到消费进程。

那看源码吧

软件开发中的Kafka和数据库的关系是什么呢?

首先明确说明Kafka不是数据库,它没有schema,也没有表,更没有索引。

1.它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。

那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,我们依次讨论下ACID。

数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Ja中有AtomicInteger这样的类能够提供线程安全的整数作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,

第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:

3、隔离性(isolFile -->Project Structure -->Artifacts --> Jar --> From module with dependencies.ation)

4、一致性(consistenc4. 消费者负载均衡 与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。yKafka 副本当前分为副本和追随者副本。只有Leader副本才能 对外提供读写服务,响应s端的请求。Follower 副本只是采用拉(PULL)的方 式,被动地同步Leader副本中的数据,并且在Leader副本所在的 Broker 宕机后,随时准备应聘 Leader 副本。)

希望能帮到你,谢谢!

关于kafka消费者的命令

sh kafka-consumer-groups.sh --bootstrap-server "ip:端口,ip:端口,ip:端口" --group 组 --describe

我在本地i2、原子性(atomicity)dea起了两个进程

那么CONSUMER-ID会有两个,但是CLIENT-ID依旧是一个。

怀疑CLIENT-ID如果ip不同的话,会不同。那这样的话我重新打个jar包放到其他地方执行,是否就会显示不一样呢?

通过idea的

如果C1消费消息超时,出入rebalance,重新分配后该消息被其他消费者消费,此时C1消费完成提交offset,导致错误Build --> Build Artifacts

等一通作生刚才我们已经决定用磁盘文件来存储队列数据,那么要如何选择数据结构呢?一般情况下,如果需要查找数据并随机访问,我们会用 B+ 树来存储数据,但其时间复杂度是 O(log N),由于我们设计的是消息队列,我们可以完全顺序的写收到的生产者消息,消费者消费时,只要记录下消费者当前消费的位置,往后消费就可以了,这样可以对文件尽可能的进行顺序读写,同时,时间复杂度是O(1)。其实,这跟我们写日志的方式很像,每条日志顺序 append 到日志文件。成了jar包放在了其他机器上跑

发现CLIENT-ID依然一样

另外测试过sh kafka-console-consumer.sh --bootstrap-server "xx" --topic xx

消费数据时,再使用sh kafka-consumer-groups.sh 来检查,并不会有关于CONSUMER-ID和CLIENT-ID,即没有识别到消费进程。

那看源码吧

Kafka 设计详解之队列

减少刷盘的间隔

在 上文 中我们介绍了 Kafka 的网络通信,本文打算详细分析 Kafka 的核心 — 队列 的设计和实现,来对 Kafka 进行更深一步的了解。

push模式

乍一看到这个问题,我们会想,内存的读取速度远快于磁盘,如果追求性能,内存也充足的话,当然是将生产者产生的消息数据写到内存(比如用一个数组或者链表来存储队列数据),供消费者消费。真的是这样吗?

Controller的选举依赖Zookeeper,成功竞选为的Broker会在Zookeeper中创建一个/controller临时。

下面我们依次分析下写内存和写磁盘文件的优缺点,首先,内存的优点是读写速度非常快,但是,如果我们的目标是设计「大数据量」下的「高吞吐量」的消息队列,会有以下几个问题:

接下来我们来分析一下磁盘,写磁盘文件方式存储队列数据的优点就是能规避上述内存的缺点,但其有很的缺点,就是读写速度慢,如果纯依靠磁盘,那消息队列肯定做不到「高吞吐量」这个目标。

分析了内存跟磁盘的优缺点,好像我们还是只能选写内存,但我们忽视了磁盘的两个情况:一是磁盘慢是慢在随机读写,如果是顺序读写,他的速度能达到 600MB/sec(RAID-5 磁盘阵列),并不慢,如果我们尽可能地将数据的读写设计成顺序的,可以大大提升性能。二是 现代的作系统会(尽可能地)将磁盘里的文件进行缓存 。

有了作系统级别的文件缓存,那用磁盘存储队列数据的方式就变得有优势了。首先,磁盘文件的数据会有文件缓存,所以不必担心随机读写的性能;其次,同样是使用内存,磁盘文件使用的是作系统级别的内存,相比于在 Ja 内存堆中存储队列,它没有 GC 问题,也没有 Ja 对象的额外内存开销,更可以规避应用重启后的内存 load 数据耗时的问题,而且,文件缓存是作系统提供的,因为我们只要简单的写磁盘文件,系统复杂性大大降低。

因此,Kafka 直接使用磁盘来存储消息队列的数据。

之前我们已经确定采用直接顺序写磁盘文件的方式来存储队列数据,下面我们来剖析下具体的实现细节。

现在我们知道一个队列(Log)是由多个队列段文件(LogSegment)组成的,那么 Kafka 是如何将这些文件逻辑上连接从而组成一条有序队列的呢?在生成每个队列段文件时,Kafka 用该段的初始位移来对其命名,如在新建一个队列时,会初始化个队列段文件,那么其文件名就是0,设每个段的大小是固定值 L,那么第二个段文件名就是 L,第 N 个就是 (N - 1) L。这样,我们就可以根据文件名对段文件进行排序,排序后的顺序就是整个队列的逻辑顺序。

了解了队列的基本实现,下面我们就来分析下队列的核心作—读和写。

队列的读作发送在消费者消费队列数据时,由于队列是线性的,只需要记录消费者上次消费到了哪里(offset),接下去消费就好了。那么首先会有一个问题,由谁来记消费者到底消费到哪里了?

一般情况下,我们会想到让服务端来记录各个消费者当前的消费位置,当消费者来拉数据,根据记录的消费位置和队列的当前位置,要么返回新的待消费数据,要么返回空。让服务端记录消费位置,当遇到网络异常时会有一些问题,比如服务端将消息发给消费者后,如果网络异常消费者没有收到消息,那么这条消息就被「跳过」了,当然我们可以借鉴二阶段提交的思想,服务端将消息发送给消费者后,标记状态为「已发送」,等消费者消费成功后,返回一个 ack 给服务端,服务端再将其标记为「成功消费」。不过这样设计还是会有一个问题,如果消费者没有返回 ack 给服务端,此时这条消息可能在已经被消费也可能还没被消费,服务端无从得知,只能根据人为策略跳过(可能会漏消息)或者重发(可能存在重复数据)。另一个问题是,如果有很多消费者,服务端需要记录每条消息的每个消费者的消费状态,这在大数据的场景下,非常消耗性能和内存。

Kafka 将每个消费者的消费状态记录在消费者本身(隔一段时间将消费状态同步到 zookeeper),每次消费者要拉数据,就给服务端传递一个 offset,告诉服务端从队列的哪个位置开始给我数据,以及一个参数 length,告诉服务端最多给我多大的数据(批量顺序读数据,更高性能),这样就能使服务端的设计复杂度大大降低。当然这解决不了一致性的问题,不过消费者可以根据自己程序特点,更灵活地处理事务。

下面就来分析整个读的流程:

分布式系统中不可避免的会遇到一致性问题,主要是两块:生产者与队列服务端之间的一致性问题、消费者与队列服务端之间的一致性问题,下面依次展开。

当生产者向服务端投递消息时,可能会由于网络或者其他问题失败,如果要保证一致性,需要生产者在失败后重试,不过重试又会导致消息重复的问题,一个解决方案是每个消息给一个的 id,通过服务端的主动去重来避免重复消息的问题,不过这一机制目前 Kafka 还未实现。目前 Kafka 提供配置,供用户不同场景下选择允许漏消息(失败后不重试)还是允许重复消息(失败后重试)。

由于在消费者里我们可以自己控制消费位置,就可以更灵活的进行个性化设计。如果我们在拉取到消息后,先增加 offset,然后再进行消息的后续处理,如果在消息还未处理完消费者就挂掉,就会存在消息遗漏的问题;如果我们在拉取到消息后,先进行消息处理,处理成功后再增加 offset,那么如果消息处理一半消费者挂掉,会存在重复消息的问题。要做到完全一致,的办法是将 offset 的存储与消费者放一起,每消费一条数据就将 offset+1。

版权声明:本文内容由互联。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发 a13828211729@163.com 邮箱删除。