第七周,大数据建模 scala 复习 , KaFka 回顾 , 继续讲解
第七周,10-13 , scala 复习 , KaFka 回顾
KAFAKA 消息中介,
比如说, 淘宝的订单交易系统, 产生订单的信息, 对这些订单要实时分析, 对他关心的, 进行推送, 这时要用 KAFKA 进行
推送, 再从KAFKA中读取出来, 持久化, 7天可以存储, 可以高存储量, 百万级别。
生产者发布一个消息, 或者就是客户下一个订单, 这个消息推送到 KAFKA的主题TOPIC中, 消费者要订阅这个主题, 不同的
TOPIC 要进行不同分析。 等于不同的消费者会订阅不同的主题, 再从KAFKA的集群中获得。
KAFKA 集群 由 ZOOKEPPER管理。 做一些搜索引擎的事情, KAFKA的数量, 可以和 浏览器对接, ELESHCHE , 输入
关键词, 进行 TOPIC 主题的创建。
KAFKA就是一个 高吞吐量的一个集群。
淘宝数据要出现显示大屏 , 把实时处理的数据, 可以放在 Redis 里面,
序列号, 也就是偏移量, 这个就是由 ZOOKEPPER 管理, 消费者要从 KAFKA进行 消费, 也需要进行记录。
在 不同的 TOPIC 也由 ZOOKEPPER 管理, 这2个集群都要建立。
分区 partion , 可以设置在 TOPIC下 。
Broker 就是一个 缓存代理。
日志类、订单类属于不同的 分区 PARTION , OFFSET 就是序号或者偏移量。
接下来讲: KAFKA的 MASSAGE ,通讯的基本单位, 每个生产者可以向一个 TOPIC 发布一些消息。
KAFKA 中的MASSAGE是以 TOPIC 为基本单位组织的,
MASSAGE 是如何进行存储的, TOPIC 对应一个 偏移量, 也就是 ID , 也就是指针,
总之, 几十年来, 计算机还是进行表管理。
放入 TOPIC ,变成一个字符串, 然后就是用 空格进行确认, 总之, 把生产者的数据, 存储到 KAFKA
消费者再从这个 MASSAGE中取得数据。
11:10 开始上课, KAFKA的 消息处理机制。
1、发送到 PARTITION 中的消息, 自动追加到日志中, 顺序是一至的,
2、对于消费者 , 消费消息的顺序也是一至的。
3、如果 topic 的 replication factor 为 n , 那么允许n-1 个 kafka 的实例失效
4、kafka 对消息的重复、丢失、错误以及顺序没有严格的要求。
5、kafka 提供 at-least-once delivery , 当消费者宕机后, 有些消息可能会被重复 发送 delivry
6、 因每个 partition只会被 消费者组内部的一个消费者消费。 KAFKA是保证每个 PARTITION 内的消息会被顺序订阅。
7、Kafka 为每条消息计算 CRC检验, 用于错误检测, CRC检验不通过的消息会直接被丢弃掉
ACK校验, 当消费者消费成功, 返回ACK消息。
KAFKA数据传输的机制又是什么?
1、at most once: 最多一次, 这个和 JMS 中的非持久化消息类似, 无论成败, 将不会重发。
2、at least once : 消息至少发送一次, 如果消息美未能接受成功, 可能进行重发, 直到接受成功。
3、exactly once : 消息只会发送一次 。
对于 这 3点, 做详细描述。
KafAKA的存储策略,
生产者生产的消息, 然后在 kaFka 存储 , 是顺序产生的, offset 不一致
一、kafka的存储机制
kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。
所谓的分区其实就是在kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号,编号从0开始。
1、segment
所谓的segment其实就是在分区对应的文件夹下产生的文件。
一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;
另一方面可以基于这些segment文件进行历史数据的删除,提高效率。
一个segment又由一个.log和一个.index文件组成。
1..log
.log文件为数据文件用来存放数据分段数据。
2..index
.index为索引文件保存对对应的.log文件的索引信息。
在.index文件中,保存了对对应.log文件的索引信息,通过查找.index文件可以获知每个存储在当前segment中的offset在.log文件中的开始位置,
而每条日志有其固定格式,保存了包括offset编号、日志长度、key的长度等相关信息,通过这个固定格式中的数据可以确定出当前offset的结束位置,
从而对数据进行读取。
3.命名规则
这两个文件的命名规则为:
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,
20位数字字符长度,没有数字用0填充。
2、读取数据
开始读取指定分区中某个offset对应的数据时,先根据offset和当前分区的所有segment的名称做比较,确定出数据在哪个segment中,
再查找该segment的索引文件,确定当前offset在数据文件中的开始位置,最后从该位置开始读取数据文件,在根据数据格式判断结果,
获取完整数据。
二、可靠性保证
1、AR
在Kafka中维护了一个AR列表,包括所有的分区的副本。AR又分为ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW这些信息都被保存在Zookeeper中。
1.ISR
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。
在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
2.OSR
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。
最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,
则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
3.LEO
LogEndOffset:分区的最新的数据的offset,当数据写入leader后,LEO就立即执行该最新数据。相当于最新数据标识位。
4.HW
HighWatermark:只有写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以
被消费者访问,保证没有同步完成的数据不会被消费者访问到。相当于所有副本同步数据标识位。
在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,
可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。
所以LEO代表已经写入的最新数据位置,而HW表示已经同步完成的数据,只有HW之前的数据才能被外界访问。
5.HW截断机制
如果leader宕机,选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。
当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
2、生产者可靠性级别
通过以上的讲解,已经可以保证kafka集群内部的可靠性,但是在生产者向kafka集群发送时,数据经过网络传输,也是不可靠的,可能因为网络延迟、闪断等原因造成数据的丢失。
kafka为生产者提供了如下的三种可靠性级别,通过不同策略保证不同的可靠性保障。
其实此策略配置的就是leader将成功接收消息信息响应给客户端的时机。
通过request.required.acks参数配置:
1:生产者发送数据给leader,leader收到数据后发送成功信息,生产者收到后认为发送数据成功,如果一直收不到成功消息,则生产者认为发送数据失败会自动重发数据。
当leader宕机时,可能丢失数据。
0:生产者不停向leader发送数据,而不需要leader反馈成功消息。
这种模式效率最高,可靠性最低。可能在发送过程中丢失数据,也可能在leader宕机时丢失数据。
-1:生产者发送数据给leader,leader收到数据后要等到ISR列表中的所有副本都同步数据完成后,才向生产者发送成功消息,如果一只收不到成功消息,则认为发送数据失败会自动重发数据。
这种模式下可靠性很高,但是当ISR列表中只剩下leader时,当leader宕机让然有可能丢数据。
此时可以配置min.insync.replicas指定要求观察ISR中至少要有指定数量的副本,默认该值为1,需要改为大于等于2的值
这样当生产者发送数据给leader但是发现ISR中只有leader自己时,会收到异常表明数据写入失败,此时无法写入数据,保证了数据绝对不丢。
虽然不丢但是可能会产生冗余数据,例如生产者发送数据给leader,leader同步数据给ISR中的follower,同步到一半leader宕机,此时选出新的leader,可能具有部分此次提交的数据,而生产者收到失败消息重发数据,新的leader接受数据则数据重复了。
3、leader选举
当leader宕机时会选择ISR中的一个follower成为新的leader,如果ISR中的所有副本都宕机,怎么办?
有如下配置可以解决此问题:
unclean.leader.election.enable=false
策略1:必须等待ISR列表中的副本活过来才选择其成为leader继续工作。
unclean.leader.election.enable=true
策略2:选择任何一个活过来的副本,成为leader继续工作,此follower可能不在ISR中。
策略1,可靠性有保证,但是可用性低,只有最后挂了leader活过来kafka才能恢复。
策略2,可用性高,可靠性没有保证,任何一个副本活过来就可以继续工作,但是有可能存在数据不一致的情况。
4、kafka可靠性的保证
At most once:消息可能会丢,但绝不会重复传输。
At least once:消息绝不会丢,但可能会重复传输。
Exactly once:每条消息肯定会被传输一次且仅传输一次。
kafka最多保证At least once,可以保证不丢,但是可能会重复,为了解决重复需要引入唯一标识和去重机制,kafka提供了GUID实现了唯一标识,但是并没有提供自带的去重机制,需要开发人员基于业务规则自己去重。
生产者产生第一个消息, 会在 segment 中记录第一个偏移量, 一致追加, 如果打了阀值, 会存储到磁盘上去。
KAFKA 的数据传输
KAFKA 消息发布流程
消息处理的优势:
KAFKA的设计原理 ,无论做不做大数据, 还是仅仅是配合 JAVA, 都需要了解 KAFKA
KAFKA 的 通讯协议
通讯过程
应用层
与其它计算机进行通讯的一个应用,它是对应应用程序的通信服务的。
例如,一个没有通信功能的字处理程序就不能执行通信的代码,从事字处理工作的程序员也不关心OSI的第7层。
但是,如果添加了一个传输文件的选项,那么字处理器的程序就需要实现OSI的第7层。
表示层
这一层的主要功能是定义数据格式及加密。
例如,FTP允许你选择以二进制或ASCII格式传输。
如果选择二进制,那么发送方和接收方不改变文件的内容。
如果选择ASCII格式,发送方将把文本从发送方的字符集转换成标准的ASCII后发送数据。
在接收方将标准的ASCII转换成接收方计算机的字符集。示例:加密,ASCII等。
会话层
它定义了如何开始、控制和结束一个会话,包括对多个双向消息的控制和管理,
以便在只完成连续消息的一部分时可以通知应用,从而使表示层看到的数据是连续的,
在某些情况下,如果表示层收到了所有的数据,则用数据代表表示层。
示例:RPC,SQL等。
传输层
这层的功能包括是否选择差错恢复协议还是无差错恢复协议,及在同一主机上对不同应用的数据流的输入
进行复用,还包括对收到的顺序不对的数据包的重新排序功能。示例:TCP,UDP,SPX。
网络层
这层对端到端的包传输进行定义,它定义了能够标识所有结点的逻辑地址,还定义了路由实现的方式和学习的方式。
为了适应最大传输单元长度小于包长度的传输介质,网络层还定义了如何将一个包分解成更小的包的分段方法。
示例:IP,IPX等。
数据链路层
物理层
OSI的物理层规范是有关传输介质的特性,这些规范通常也参考了其他组织制定的标准。
连接头、帧、帧的使用、电流、编码及光调制等都属于各种物理层规范中的内容。
物理层常用多个规范完成对所有细节的定义。示例:Rj45,802.3等。
KAFKA 集群的安装部署:
1、下载kafka.tgz 架包
2、解压
3、配置 zookepper,配置producer,配置consumer
4、启动服务
a 启动zooKEPPER ,
b 启动kafka
./bin/zkserver.sh start
./bin/kafka-server-start.sh /config/server.properties
创建 topic
./bin/kafka-topics.sh --create--zookepper hadoop1:2181,hadoop2:2181,hadoop3:2181
--replication -factor 1 --partition 1 -topic mytopic
配置消费者信息
查看是不是有 kafka 的进程, 使用 # ps 命令
------------ 日志文件, 实时的推送到 Kafka 里面, 做一个 topic - from -beginning
把消费者的端口启动了, 消费 test 的数据。
刚才的日志文件, 用 for循环 做了一个 50000个日志文件, 灌入到kafka 里面去, 然后, 就可以消费了。
生产者 产生了数据, 放入到 KafKA里面, 然后消费者就可以消费了。
生产者对应的参数。 消费者对应的参数。
做一下回顾: KAFKA Message 不断记录,offset 偏移量, 到达阀值, flash到硬盘
P
----------------------------------------------------------------------------以下是原始笔记
Kafka
1、Kafka是什么
2、Kafka体系结构
3、Kafka设计理念
4、Kafka通信协议
5、Kafka集群
6、Kafka相关操作:kafka的shell操作及java操作
7、kafka的producer和consumer开发
Kafka产生的背景:
Kafka是分布式发布-订阅消息系统,它最初由LInkedin公司开发,使用scala语言编写之后成为Apache项目的一部分,kafka是一个分布式的,可划分的,多订阅者,冗余备份持久性的日志服务,它主要用于处理活
跃的流式数据。
kafka可以起到两个作用:
1、降低系统组网的复杂度
2、降低编程的复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,kafka承载高速数据总线的作用。
kafka简介:
1、同时为发布和订阅提供高吞吐量,kafka每秒可以生产为25万消息(50MB),每秒可以处理55万条数据(110MB)。
2、可以进行持久化操作,将消息持久化到磁盘,因此可用于批量消费,如ETL,以及实时应用程序。通过将数据持久化到磁盘以及replication防止数据丢失。
3、分布式系统,易于向外扩展,所有的producer、broker、consumer都会有多个,均为分布式的,无需停机即可扩展机器。
4、消息被处理的状态是在consumer端维护,而不是在server端维护,当失败时能自动平衡。
5、支持onlin和offline的场景。
性能测试:
虚拟机:CPU双核、内存:2GB、硬盘:60GB
测试指标:
消息推积压力测试:
单个kafka broker节点测试,启动一个kafka broker和producer,producer不断向broker发送消息
直到broker堆积数据为18GB为停止producer,接着启动consumer,不断从broker获取数据
直到全部数据读取完停止,最后检查producer==consumer数据,没有出现卡死不响应现象。
结论:数据大量堆积不会出现broker卡死或不影响现象。
生产者速率:
1万左右。
结论:性能上完全满足要求,其性能主要由磁盘决定
消费者速率
1万左右
结论:性能上完全满足要求,其性能主要由磁盘决定。
Kafka的基本概念:
1、Topic:特指kafka的消息源的不同分类
2、Partion: Topic物理上的分组,一个topic可以分为多个partion,每个partion是一个有序的队列,partion中的每条消息都会被分配一个有序的id,也叫offset。
3、Message: 消息,是通信的基本单位,每个producer可以向一个topic发布一些消息。
4、Producers:消息和数据的生产者,向kafka的一个topic发布消息的过程叫做producers
5、Consumers:消息和数据的消费者,订阅Topic并处理其发布的消息的过程叫做consumers。
6、Broker:缓存代理,kafka集群中的一台或多台服务器统称为Broker.
kafka设计关注的重点:
1、为生产者和消费者提供一个通用的API
2、消息的持久化
3、高吞吐量,可以满足百万级别的消息处理。
4、对分布式和高扩展的支持。
kafka最基本的架构是生产者发布一个消息到kafka的一个主题topic,这个主题topic即是由扮演kafkaServer角色的broker提供,消费者订阅这个主题,然后从中获取信息。
kafka的两大法宝:
1、提供文件的分段
2、提供文件索引
索引优化:稀疏存储,每隔一定字节的数据建立一条索引
kafka消息队列分类:
1、点对点
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出消息,并且消费消息。
注意:
消息被消费后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
2、发布订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息,和点对点不同,发布到topic的消息会被所有订阅者消费。
消息队列MQ对比:
1、RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化有很好的支持。
2、ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级、复杂的队列。但技术也复杂,并且只提供非持久性的队列。
3、ActiveMQ:是Apache下的一个子项目,类似于ZeroMQ,能够以代理人和点对点的技术实现队列。
4、Redis:是一个Key-Value的Nosql数据库,但也支持MQ功能,数据量小,性能优于RabbitMQ,数据超过10k就慢得无法接受。
Kafka部署架构:
(Producer、Broker、Consumer、Zookeeper)
producer --(push)--> kafka(broker) <---(pull)---Consumer
|
|
|
|
Zookeeper
Kafka集群架构
(Broker--Master、Slave <------Zookeeper)
Kafka的Producers
Producer将消息发布到指定的topic中,同时prodeucer也能决定将此消息归属于哪个partion,比如基于round-robin方式或者通过其它的一些算法等。
消息和数据的生产者,向kafka的一个topic发布消息的过程叫做producers
异步发送
批量发送可以很有效的提高发送效率,kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请批量发送出去。
Kafka的Broker
Broker:缓存代理,为了减少磁盘写入的次数,Broker会将消息暂时buffer起来,当消息的个数达到一定阀值时,再flush到磁盘,这样就减少了磁盘io调用的次数。
kafka的Consumers
注意:kafka的设计原理决定对于一个topic同一个group不能有多于partions个数的consumer同时消费者,否则将意味着某些xonsumers将无法得到消息。
Kafka的broker无状态机制
1、Broker没有副本机制,一但broker宕机,该broker的消息将都不可用
2、Broker不保存订阅者的状态,由订阅者自己保存
3、无状态导致消息的删除成为难道,kafka采用基于时间的sla,消息保存一定时间后会被删除。
4、消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset进行重新读取消费消息。
kafka的consumer group
1、允许consumer group对一个topic进行消费,不同的consumer group之间独立订阅
2、为了对减小一个consumer group中不同的consumer之间的分布式协调开销,指定partion为最小的并行消费单位,即一个group内的consumer只能消费不同的partion
Kafka的Topic/Log
一个topic可以认为是一类消息,每个topic将被分成多个partion分区,每个partion在存储层面是append log文件,任何发布到此partion的消息都会被追加到Log文件的尾部,每条消息在文件中的位置称为offset,也叫做偏移量,partion是以文件的形式存储在文件系统中。
logs文件根据broker中的配置来保存一定时间后删除来释放磁盘空间。
Kafka的partion
1、kafka基于文件存储,通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partion都会被当前的server保存
2、可以将一个topic切分多任意多个partion,来消息保存消费的效率
3、越多的partion意味着可以容纳更多的consumer,有效提升并发消费的能力。
kafka的Message
Message消息:是通信的基本单位,每个producer可以向一个topic发布一些消息。
Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的,每个topic以可以为每个partion存储一部分message。
partiion中的每条message包含了三个属性:
1、offset 对应类型:long
2、MessageSize: 对应类型:int32
3、data: 是Message的具体内容。
Kafka的Offset
每条消息在文件中的位置称为:offset,也叫偏移量,offset为一个long型数字,字是唯一标记一条消息,kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中不允许对消息进行随即读写。
partition中的每条消息message由offset来表示它在这个partition中的偏移量,这个offset不是这个message在partition数据文件中的实际存储的位置,而是逻辑上一个值,它唯一确定了partition中的一条message,因此可以认为offset是partition中message的id.
kafka的消息处理机制
1、发送到partitions中的消息将会按照它接收的顺序追加到日志中
2、对于消费者,它们消费消息的顺序和日志中消息顺序一致
3、如果topic的replication factor为n,那么允许n-1个kafka实例失效
4、kafka对消息的重复、丢失、错误以及顺序没有严格的要求。
5、kafka提供at-least-once delivery,当consumers宕机后,有些消息可能会被重复delivery
6、因每个partition只会被consumergroup内的一个consumer消费,所以kafka保证每个partition内的消息会被顺序订阅。
7、kafka为每条消息计算CRC检验,用于错误检测,CRC检验不通过的消息会直接被丢弃掉
ACK校验,当消费者消费成功,返回ACK消息。
数据传输的事务定义:
1、at most once: 最多一次,这个和jms中非持久化消息类似,无论成败,将不会重发。
2、at least once: 消息至少发送一次,如果消息未能接受成功,可能会进行重发,直到接受成功。
3、exactly once: 消息只会发送一次。
at most once: 消费者fetch消息,然后保存offset,然后处理消息,当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理,那么此后未处理的消息都不能被fetch到,这就是at most once。
at least once: 消费者fetch消息,然后处理消息,然后打开offset,如果消息处理成功之后,但是在保存offset阶段zookeeper异常,导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是at least once,原因offset没有即时的提交到zookeeper,zookeeper恢复正常还是之前的offset状态。
exactly once: kafka中并没有严格的去实现基于2阶段提交事务,我们认为这种策略在kafka中没有必要。
注意:
通常情况下:at least once是我们的首选,相比at most once,重复接受消息总比丢失数据要好。
kafka的储存策略:
1、kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
2、每个segment中存储多条消息,消息id由其逻辑位置决定,从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
3、broker收到发布消息往对应的partion的最后一个segment上添加消息。
4、每个partition在内存中对应一个index,记录每个segment中的第一条消息偏移。
5、发布者发送到某个topic的消息会被 均匀的分布到多个partition上(随机或者根据用户指定的回调函数进行分布),broker收到发布消息往对应的partition的最后一个segment上进行添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阀值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
kafka的数据传输:
1、发布者每次可发布多条消息(将消息加到一个消息集合中发布),sub每次迭代一条消息。
2、不创建单独的cache,使用系统的page cache。发布者顺序发布,订阅者通常比发布者滞后一点点,直接使用Linux的page cache效果也比较后,同时减少了cache管理及垃圾收集和开销。
3、使用sendfile优化网络传输,减少一次内存拷贝。
kafka的消息发送的流程:
1、由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据。
2、producer向kafka(push)推数据
3、consumer从kafka拉(pull)数据
消息处理的优势:
1、简化kafka设计
2、consumer根据消费能力自主控制消息拉取速度。
3、consumer根据自身情况自主选择消费模式,例如:批量、重复消费,从尾端开始消费等。
4、kafka集群接收到producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长,而不关注消息是否被消费。
kafka设计原理实现:
1、kafka以topic来进行消息管理,发布者发到某个topic的消息会被均匀的分布到多个partition上
2、每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成
3、每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储 位置,避免id到位置的额外映射。
4、每个partition在内存中有一个Index,记录每个segment中的第一条消息的偏移量
5、当某个segment上的消息数据达到一定阀值,会flush到磁盘,进行订阅,broker此时会重新创建新的segment。
kafka的通讯协议:
kafka通讯的基本单位是request/response
基本结构:messagesize、requestmessage、responsemessage
通讯过程:
客户端打开与服务器的socket
往socket写入一个Int32的数字
服务端先读取出一个int32的整数从而获取这次requests的大小
然后读取对应字节数的数据从而得到requests的具体内容
服务器端处理了请求后,也用同样的方式来发送响应。
kafka的通讯协议组件关系:
Request/Response是通讯层的结构,和网络的7层模型对比的话,它类似于TCP层
Message、MessageSet定义的是业务层的结构,类似于网络7层模型中的HTTP层,Message、MessageSet只是Request、Response的payload中的一种数据结构。
说明:
kafka的通讯协议中不包含schema,格式也比较简单,这样设计的好处是协议自身的overhead小,再加上把多条message放在一起做压缩,提高压缩比率,从而在网络上传输的数据量会少一些。
kafka的分布式实现:
1、一个topic的多个partition被分布在kafka集群中的多个server(kafka实例)上,每个server负责partition中消息的读写操作。
2、此外kafka还可以配置partition需要备份的个数replicas,每个partition将会被备份到多台机器上,以提高可用性。
3、基于replicated方案,那么就意味着需要对多备份从进行调整。
4、每个partition都有一个server为leader,leader负责所有的读写操作,如果leader失效,那么将会有其它的follower来接管,成为新的leader。
5、follower只是单调的和leader跟进,同步消息即可,由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个leader
6、kafka会将leader均衡的分散在每个实例上,来确保整体的性能稳定。
kafka数据持久化:
1、发现线性的访问磁盘,很多时候比随机的内存访问快得多
2、传统的使用内存做为磁盘缓存
3、kafka直接将数据写入到日志文件中
kafka安装:
1、下载kafka.tgz包
2、解压
3、配置zookeeper,配置producer,配置consumer
4、启动服务
a、启动zookeeper服务,b、启动kafka
./bin/zkServer.sh start /stop /status
./bin/kafka-server-start.sh config/server.properties
创建topic:
./bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic mytopic
查看topic:
./bin/kafka-topics.sh --list --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181
查看topic详细信息:
./bin/kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic
删除topic
./bin/kafka-topics.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --delete --topic mytopic
创建生产者producer
./bin/kafka-console-producer.sh --broker--list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic mytopic
创建消费者consumer
./bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic --from-beginning
生产者参数查看:
./bin/kafka-console-producer.sh
消费者参数查看:
./bin/kafka-console-consumer.sh