# Kafka学习总结
# 理论
定位
LinkIn公司采用scala开发的一个多分区、多副本基于zk协调的分布式消息系统,目前定位为一个分布式流式处理平台
角色
- 消息系统
- 系统解耦
- 冗余存储
- 流量消峰
- 缓冲
- 异步通信
- 扩展性
- 可恢复性
- 存储系统
- 持久化到磁盘
- 数据保留策略为“永久”
- 启用主题的日志压缩功能
- 多副本
- 流式处理平台
基本概念
- Producer
- Broker - kafka实例
- Consumer
- Topic
- Partition
kafka消息以topic为单位进行归类,生产者将消息发送到topic,消费者订阅topic进行消费。
分区为主题下细分概念,分区属于主题,但每个分区的消息都不同,例如对某个topic发10条消息,可能按334这样分布在3个分区中。分区在存储层面可视为一个可追加的log文件,每个消息被追加到分区时会分配一个特定的偏移量offset,这个唯一标识能保证消息在分区内有顺序性。
之所以设计分区这个概念是因为机器IO瓶颈,如果有多个分区同时消息均匀落在不同分区内,这样可以解决单个机器IO瓶颈的问题。
- Replica
kafka为分区引入了多副本机制,通过增加副本数量提升容灾能力。同一分区的副本保存的是相同的消息,关系为一主多从,leader+follower的模式,leader负责读写,而follower只负责同步冗余,在leader宕机时从follower中重新选举对外提供服务。follower很多时候消息相对于leader会有一定滞后。
consumer使用pull的方式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机重新上线时根据之前保存的消息位置重新拉去消息,这样不会丢失消息。
- AR - Assigned Replicas
分区内所有副本- ISR - In-Sync Replicas
所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR- OSR - Out-of-Sync Replicas
与leader副本同步滞后过多的副本(不包括leader副本)组成OSR
这几个概念要结合后面的HW和LEO,了解kafka在性能和一致性之间做出的方案,因为可靠性和性能是个相悖的概念。
这个滞后范围可以通过参数配置,正常情况下要保证所有follower都要与leader保证一定程度内的同步,即AR=ISR,OSR为空。
leader副本负责维护这个集合,当follower落后太多或失效时,会将follower从ISR转移到OSR集合,同时,如果leader发生故障,只有ISR集合中的follower才有资格被选为leader。
- HW - High Watermark
- LEO - Log End Offset
对于每个分区的日志文件来说,一条消息被写入leader,那么它的offset会+1,初始为0,同时LEO为offset+1,如向leader中写入了9条消息,offset为8,LEO为9,而对于follower来讲,因为它通过拉取leader的消息,所以会有一定的滞后。而对于每个follower也有自己的LEO,比如follower1同步完成,LEO也为9,而follower2同步到了offset 6,那么它的LEO为7。若ISR为3,则follower2是最小的LEO值,那么当前分区的HW也为7,取分区副本isr集合中的最小LEO。而对于消费者来讲,其能消费的消息为0-6的offset区间消息。
所以对于kafka来讲,它的消息复制既不是完全的同步,也不是单纯的异步,同步要求所有follower完成太影响性能,异步如果leader宕机而消息未被同步到follower中会造成消息丢失,出现数据一致性问题。所以kafka采用这种ISR的方式权衡数据可靠性和性能问题。
# 安装配置
依赖
- jdk
略
- zookeeper
新版本不需要zk,但是学习按着书本来,还是装zk。
- 设置ZOOKEEPER_HOME
# /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper-3.6.4
export PATH=$PATH:$ZOOKEEPER_HOME/bin
- 修改zk的配置文件
cd $$ZOOKEEPER_HOME/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# zk服务器心跳时间,ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader与follower容忍的心跳最大时间,响应超过tickTime*syncLimit,则leader认为follower死掉,从服务器列表删除follower
syncLimit=5
# 数据目录 日志目录 对外端口 目录需自己创建
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort=2181
创建myid
在data目录下创建myid文件,写入编号,如0启动zk
zkServer.sh start
# 查看状态
zkServer.sh status
- kafka
- 添加环境变量
- 配置文件
# vim $KAFKA_HOME/config/server.properties
# broker编号,如果集群中有多个,则要设置为不同
broker.id=0
# broker对外提供服务入口
listeners=PLAINTEXT://localhost://9092
# 存放消息日志文件地址
log.dirs=/tmp/kafka-logs
# kafka所需zk集群地址
zookeeper.connect=localhost:2181/kafka
- 启动zk
kafka-server-start.sh ./server.properties &
- 生产和消费
使用kafka的脚本工具,在bin目录下,如创建一个分区数为4,副本数为3的主题,topic-hong
因为使用的单broker,所以会提示报错,无法创建副本,所以改为分区数为4,副本数为1的topic
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-hong --replication-factor 1 --partitions 4
# 显示topic的描述
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-hong
新开shell,执行consumer脚本
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-hong
另外一个shell忘topic内写入消息
kafka-console-producer.sh --broker-list localhost:9092 --topic topic-hong
会发现在consumer端能正常消费。
同时配置里zookeeper.connect配置如果是集群,用逗号分隔,最佳实践为在zk端口后指定目录,如localhost:2181/kafka,这样可以实现多个kafka集群公用一个zk集群,节省硬件资源。同时也可以去到zk对应目录查看具体详情。
# 客户端开发
生产者发送消息三种模式
send(record, callback)方法返回值并不是void,而是Future<RecordMetadata>
- 发后即忘
无视api返回参数,即不使用future接口的get
- 同步
使用future接口的get方法
- 异步
使用callback感知异步发送结果
异常类型
- 可重试异常(可以配置重试次数)
网络、leader不可用等
- 不可重试异常
消息体过大等