1 Kafka 概述
Kafka 起初是 由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。
-
对于一个 topic,他的每一个 partition 同一时间只能被同一消费者组中的一个消费者所消费
-
相比于 AMQ,它更加轻量级:非侵入性的、依赖的东西非常少,占用资源非常少,部署简单,没有太多依赖,比较容易使用。
-
消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
-
存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
-
流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
2 Kafka 解决了什么问题
3 Kafka 技术特性
-
高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个 topic 可以分多个 partition, consumer group 对 partition 进行并行 consume 操作。 -
可扩展性:kafka 集群支持热扩展 -
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失,消息被消费仍然不会被立即删除,而是会有过期时间。 -
容错性:允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败) -
高并发:支持数千个客户端同时读写 -
队列模式:所有 consumer 都在一个队列,这样消息就在队内进行分区并行消费 -
订阅-发布模式:所有 consumer 都不再一个队列,这样 topic 消息可以广播给所有订阅的消费者
4 Kafka 工作原理
4.1 架构图
-
当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的leader 副本。
-
当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。
-
当某个 Topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)
Partition:
Replica:
4.2 Kafka 写流程
-
连接 zk 集群,从 zk 中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息。注:Kafka 2.8.0 已移出对 zookeeper 的依赖。
-
向对应 broker 发消息
-
客户端在发送消息时,必须指定消息所属的 Topic 和消息值 Value,此外还可以指定消息所属的 Partition 以及消息的 Key。
-
对消息做序列化处理
-
如果消息记录中指定了 Partition,则 Partitioner 不做任何事情;否则,Partitioner 根据消息的 key 得到一个 Partition。这是生产者就知道向哪个 Topic下的哪个 Partition 发送这条消息。
-
消息被添加到相应的 batch 中,独立的线程将这些 batch 发送到 Broker 上(注意,消息不是一条一条发往 broker 的,而是会在 客户端本地缓存一批数量后,在发出去,因此客户端是以 批-batch 为单位发送消息的,即一批当中包含一条或多条消息;同样,broker 也是以批为单位进行数据存储的,后面会讲到 )。
-
broker 收到消息会返回一个响应。如果消息成功写入 Kafka,则返回成功信息,内容包含了 Topic 信息、Patition信息、消息在 Partition 中的 Offset 信息;若失败,返回一个错误。
4.3 Kafka 读流程
-
连接 zk 集群,从 zk 中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息
-
连接到对应的 leader 对应的 broker
-
consumer 通过请求将希望读取的 topic、partition 以及对应的 offset 发送给 leader
-
leader 根据 offset 等信息定位到 segment(索引文件和日志文件)
-
根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给 consumer
5 Kafka 运维
Kafka 的命令行工具路径:xxx/kafka/bin/下
5.1 Topic 管理指令
可以管理 Topic ,包括 创建、删除、分区扩容、查询 Topic 详细信息、查看 Topic 列表 等
命令工具:kafka-topics.sh
# 创建 Topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
# Topic 分区扩容
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 4
# 删除 Topic:
kafka-topics.sh --delete --zookeeper localhost:2181 localhost:9092 --topic test
#查询 Topic 详细信息
$ ./kafka-topics.sh --topic event_topic --zookeeper localhost:2181 --describe
Topic:event_topic PartitionCount:10 ReplicationFactor:2 Configs:compression.type=gzip
Topic: event_topic Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: event_topic Partition: 1 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: event_topic Partition: 2 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: event_topic Partition: 3 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: event_topic Partition: 4 Leader: 1003 Replicas: 1003,1001 Isr: 1003,1001
Topic: event_topic Partition: 5 Leader: 1002 Replicas: 1002,1003 Isr: 1002,1003
Topic: event_topic Partition: 6 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: event_topic Partition: 7 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: event_topic Partition: 8 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: event_topic Partition: 9 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
#列出全部 Topic
kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
5.2 增删节点后的数据均衡
增加数据节点后,虽然新节点上已经启动了 broker ,但 kafka 不会自动均衡数据,需要手动执行。 命令工具:kafka-reassign-partitions.sh 编写配置文件 move-json-file.json ,告诉 kafka 你希望哪些 topic 要重新分区: {
"topics": [{
"topic": "event_topic"
},
{
"topic": "profile_topic"
},
{
"topic": "item_topic"
}
],
"version": 1
}
执行命令生成分配信息:要注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。 # 下面 --broker-list 参数 对应的是 brokerid
$ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ~/mv.json --broker-list "1001,1002" --generate
Current partition replica assignment #当前分配信息
{"version":1,"partitions":[{"topic":"event_topic","partition":2,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":7,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1003,1002],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration #分配后的信息
{"version":1,"partitions":[{"topic":"event_topic","partition":7,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]}]}
将上面得到期望的重新分配方式文件保存在一个 json 文件里面:reassignment-json-file.json,然后通过参数 —execute 执行分配: 该命令也可以用于以下使用场景:
-
给分区增加副本,你只需要在 第 2 步生成的内容里面, 在 replicas 参数中加入你想要增加的 副本所在 broker id 信息即可,这样执行的时候会自动在 对应 broker 上创建副本。 -
重新分配分区
5.3 消费情况指令
查看group的消费情况
# group: 指定group id名字
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
# 示例:
# TOPIC: group对应的topic
# PARTITION:aprtition编号,从0开始0-5表示有6个partition
# CURRENT-OFFSET:此消费着当前已消费的offset
# LOG-END-OFFSET:生产者在此partition分区上已提交确认的offset
# LAG:两个offset的差值,就是常说的积压。此数值过大为异常。
# HOST:消费者所在的服务器ip
# CLIENT-ID:消费者的信息
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
删除group
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test-group
重新设置消费者位移
Earliest策略:把位移调整到当前最早位移处
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest策略:把位移调整到当前最新位移处
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略:把位移调整到当前最新提交位移处
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略:把位移调整到指定位移处
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N策略:把位移调整到当前位移+N处(N可以是负值)
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --topic test --reset-offsets --shift-by <offset_N> --execute
DateTime策略:(把位移调整到大于给定时间的最小位移处)
时间需要减8
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --topic test --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
Duration策略:把位移调整到距离当前时间指定间隔的位移处,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。
以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
5.4 设置 Topic 过期时间
# 设置 topic 过期时间(单位 毫秒)
### 3600000 毫秒 = 1小时
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics --add-config retention.ms=3600000
# 查看 topic 配置
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics
5.5 工具相关
使用脚本生产/消费消息
# 连接到test-topic,然后通过输入+会车生产消息
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --producer-property
>
# --from-beginning: 指定从开始消费消息,否则会从最新的地方开始消费消息
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property
kafka性能测试
# 测试生产者
# 向指定主题发送了 1 千万条消息,每条消息大小是 1KB
# 它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
# 测试消费者性能
$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
6 Kafka 常用性能调优
6.1 磁盘目录优化
kafka 读写的单位是 partition,因此将一个 topic 拆分为多个 partition 可以提高吞吐量。但是这里有个前提,就是不同 partition 需要位于不同的磁盘(可以在同一个机器)。如果多个 partition 位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性。 6.2 JVM参数配置
推荐使用最新的 G1 来代替 CMS 作为垃圾回收器。推荐 Java 使用的最低版本为 JDK 1.7u51。 G1相比较于CMS的优势:
-
G1 是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力 -
对于内存的划分方法不同,Eden, Survivor, Old 区域不再固定,使用内存会更高效。 G1 通过对内存进行 Region 的划分,有效避免了内存碎片问题。 -
G1 可以指定GC时可用于暂停线程的时间(不保证严格遵守)。 而 CMS 并不提供可控选项。 -
CMS 只有在 FullGC 之后会重新合并压缩内存,而G1把回收和合并集合在一起。 -
CMS 只能使用在 Old 区,在清理 Young 时一般是配合使用 ParNew,而 G1 可以统一两类分区的回收算法。
-
JVM占用内存较大(At least 4G) -
应用本身频繁申请、释放内存,进而产生大量内存碎片时。 -
对于GC时间较为敏感的应用。 -
目前我们使用的 JVM 参数:
6.3 日志数据刷盘策略
为了大幅度提高 producer 写入吞吐量,需要定期批量写文件。
有 2 个参数可配置:
-
log.flush.interval.messages = 100000
:每当 producer 写入 100000 条数据时,就把数据刷到磁盘
-
log.flush.interval.ms=1000
:每隔 1 秒,就刷一次盘
6.4 日志保留时间
当 kafka server 的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能导致磁盘空间不够用,kafka 默认是保留7天。
参数:log.retention.hours = 168
来源:本文转自公众号“运维开发故事”,点击查看原文。
近期好文: