# Kafka学习总结

# 理论

定位

LinkIn公司采用scala开发的一个多分区、多副本基于zk协调的分布式消息系统,目前定位为一个分布式流式处理平台

角色

  • 消息系统
    • 系统解耦
    • 冗余存储
    • 流量消峰
    • 缓冲
    • 异步通信
    • 扩展性
    • 可恢复性
  • 存储系统
    • 持久化到磁盘
    • 数据保留策略为“永久”
    • 启用主题的日志压缩功能
    • 多副本
  • 流式处理平台

基本概念

  • Producer
  • Broker - kafka实例
  • Consumer

  • Topic
  • Partition

kafka消息以topic为单位进行归类,生产者将消息发送到topic,消费者订阅topic进行消费。

分区为主题下细分概念,分区属于主题,但每个分区的消息都不同,例如对某个topic发10条消息,可能按334这样分布在3个分区中。分区在存储层面可视为一个可追加的log文件,每个消息被追加到分区时会分配一个特定的偏移量offset,这个唯一标识能保证消息在分区内有顺序性。

之所以设计分区这个概念是因为机器IO瓶颈,如果有多个分区同时消息均匀落在不同分区内,这样可以解决单个机器IO瓶颈的问题。

  • Replica

kafka为分区引入了多副本机制,通过增加副本数量提升容灾能力。同一分区的副本保存的是相同的消息,关系为一主多从,leader+follower的模式,leader负责读写,而follower只负责同步冗余,在leader宕机时从follower中重新选举对外提供服务。follower很多时候消息相对于leader会有一定滞后。

consumer使用pull的方式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机重新上线时根据之前保存的消息位置重新拉去消息,这样不会丢失消息。

  • AR - Assigned Replicas
    分区内所有副本
  • ISR - In-Sync Replicas
    所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR
  • OSR - Out-of-Sync Replicas
    与leader副本同步滞后过多的副本(不包括leader副本)组成OSR

这几个概念要结合后面的HW和LEO,了解kafka在性能和一致性之间做出的方案,因为可靠性和性能是个相悖的概念。

这个滞后范围可以通过参数配置,正常情况下要保证所有follower都要与leader保证一定程度内的同步,即AR=ISR,OSR为空。
leader副本负责维护这个集合,当follower落后太多或失效时,会将follower从ISR转移到OSR集合,同时,如果leader发生故障,只有ISR集合中的follower才有资格被选为leader。

  • HW - High Watermark
  • LEO - Log End Offset

对于每个分区的日志文件来说,一条消息被写入leader,那么它的offset会+1,初始为0,同时LEO为offset+1,如向leader中写入了9条消息,offset为8,LEO为9,而对于follower来讲,因为它通过拉取leader的消息,所以会有一定的滞后。而对于每个follower也有自己的LEO,比如follower1同步完成,LEO也为9,而follower2同步到了offset 6,那么它的LEO为7。若ISR为3,则follower2是最小的LEO值,那么当前分区的HW也为7,取分区副本isr集合中的最小LEO。而对于消费者来讲,其能消费的消息为0-6的offset区间消息。

所以对于kafka来讲,它的消息复制既不是完全的同步,也不是单纯的异步,同步要求所有follower完成太影响性能,异步如果leader宕机而消息未被同步到follower中会造成消息丢失,出现数据一致性问题。所以kafka采用这种ISR的方式权衡数据可靠性和性能问题。

# 安装配置

依赖

  • jdk

  • zookeeper

新版本不需要zk,但是学习按着书本来,还是装zk。

  • 设置ZOOKEEPER_HOME
# /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper-3.6.4
export PATH=$PATH:$ZOOKEEPER_HOME/bin
  • 修改zk的配置文件
cd $$ZOOKEEPER_HOME/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# zk服务器心跳时间,ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader与follower容忍的心跳最大时间,响应超过tickTime*syncLimit,则leader认为follower死掉,从服务器列表删除follower
syncLimit=5
# 数据目录 日志目录 对外端口 目录需自己创建
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort=2181
  • 创建myid
    在data目录下创建myid文件,写入编号,如0

  • 启动zk

zkServer.sh start

# 查看状态
zkServer.sh status
  • kafka
  • 添加环境变量
  • 配置文件
# vim $KAFKA_HOME/config/server.properties
# broker编号,如果集群中有多个,则要设置为不同
broker.id=0
# broker对外提供服务入口
listeners=PLAINTEXT://localhost://9092
# 存放消息日志文件地址
log.dirs=/tmp/kafka-logs
# kafka所需zk集群地址
zookeeper.connect=localhost:2181/kafka
  • 启动zk
kafka-server-start.sh ./server.properties &
  • 生产和消费

使用kafka的脚本工具,在bin目录下,如创建一个分区数为4,副本数为3的主题,topic-hong
因为使用的单broker,所以会提示报错,无法创建副本,所以改为分区数为4,副本数为1的topic

kafka-topics.sh --bootstrap-server localhost:9092  --create --topic topic-hong --replication-factor 1 --partitions 4

# 显示topic的描述
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-hong

新开shell,执行consumer脚本

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-hong

另外一个shell忘topic内写入消息

kafka-console-producer.sh --broker-list localhost:9092 --topic topic-hong

会发现在consumer端能正常消费。

同时配置里zookeeper.connect配置如果是集群,用逗号分隔,最佳实践为在zk端口后指定目录,如localhost:2181/kafka,这样可以实现多个kafka集群公用一个zk集群,节省硬件资源。同时也可以去到zk对应目录查看具体详情。

# 客户端开发

生产者发送消息三种模式
send(record, callback)方法返回值并不是void,而是Future<RecordMetadata>

  • 发后即忘

无视api返回参数,即不使用future接口的get

  • 同步

使用future接口的get方法

  • 异步

使用callback感知异步发送结果


异常类型

  • 可重试异常(可以配置重试次数)

网络、leader不可用等

  • 不可重试异常

消息体过大等