编辑
2025-11-01
Kakfa
00

目录

一、主题(Topic)管理命令
创建主题
查看主题列表
查看主题详情
修改主题配置
二、消费组(Consumer Group)管理命令
查看消费组列表
查看消费组详情(偏移量、滞后消息数)
重置消费组偏移量(场景:重新消费、跳过错误消息)
删除消费组
三、其他命令

一、主题(Topic)管理命令

创建主题

基础创建(指定核心参数)

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --create --topic test_topic \ --partitions 3 \ # 分区数:根据并发处理需求设置,单机建议 1-5 个 --replication-factor 1 # 副本数:单机只能设 1,集群建议 2-3 个(保证高可用)

带自定义配置创建(如消息保留时间、最大消息大小)

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --create --topic long_retention_topic \ --partitions 2 --replication-factor 1 \ --config retention.ms=86400000 \ # 消息保留时长:24 小时(默认 7 天,单位:毫秒) --config max.message.bytes=1048576 # 单条消息最大大小:1MB(默认 1MB,单位:字节) --config cleanup.policy=delete # 消息清理策略:删除(默认,可选 compact 压缩)

查看主题列表

查看所有主题

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --list

过滤查看指定前缀 / 关键词的主题

js
# 示例:查看以“test_”开头的主题 ./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --list | grep "test_"

查看主题详情

查看单个主题的完整信息(推荐)

输出包含:主题名称、分区数、副本分布(Leader/Follower 节点)、自定义配置、ISR 列表(同步副本集)等。

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --describe --topic test_topic

查看所有主题的详情(不推荐,主题多时输出冗长)

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --describe

修改主题配置

新增 / 更新主题的自定义配置

js
# 示例:将 test_topic 的消息保留时间延长至 7 天 ./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --alter --topic test_topic \ --config retention.ms=604800000 # 7 天 = 7*24*3600*1000 毫秒

删除主题的自定义配置(恢复默认值)

js
# 示例:删除“最大消息大小”的自定义配置,恢复默认值 ./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --alter --topic test_topic \ --delete-config max.message.bytes

增加主题分区(仅支持扩容,不支持缩容)

js
# 示例:将 test_topic 的分区数从 3 增加到 5 ./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --alter --topic test_topic \ --partitions 5

分区扩容后,历史消息仅存在于原分区,新分区从扩容后开始接收消息,需确保业务能处理分区分布变化。

删除主题

js
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \ --delete --topic test_topic

前提条件:

  • Kafka 配置 delete.topic.enable 需设为 true(默认已开启);
  • 主题无正在运行的消费组,且无未提交的偏移量;
  • 删除后不可恢复,需谨慎操作。

二、消费组(Consumer Group)管理命令

查看消费组列表

查看所有消费组

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 --list

过滤查看指定消费组

js
# 示例:查看包含“order”关键词的消费组 ./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 --list | grep "order"

查看消费组详情(偏移量、滞后消息数)

查看单个消费组的所有主题消费情况

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --describe --group test_consumer_group

查看消费组在指定主题上的消费详情(精准查询)

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --describe --group test_consumer_group \ --topic test_topic
  • 核心字段说明:
  • 字段 含义
  • CURRENT-OFFSET 消费组当前已消费到的偏移量
  • LOG-END-OFFSET 主题分区的最新消息偏移量(总消息数 = LOG-END-OFFSET - 起始偏移量)
  • LAG 滞后消息数(未消费的消息数 = LOG-END-OFFSET - CURRENT-OFFSET),0 表示消费完成
  • PARTITION 主题分区编号
  • LEADER 负责处理该分区读写的 Broker 节点 ID

重置消费组偏移量(场景:重新消费、跳过错误消息)

重置前需确保:消费组无正在运行的消费者(或暂停消费者),避免偏移量被实时覆盖。

场景 1:重置到主题分区的最开始(重新消费所有历史消息)

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --reset-offsets --group test_consumer_group \ --topic test_topic \ --to-earliest # 核心参数:重置到最早偏移量 --execute # 必须加此参数才会实际执行(不加则仅预览重置结果)

场景 2:重置到指定时间点(如 1 小时前的消息)

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --reset-offsets --group test_consumer_group \ --topic test_topic \ --to-datetime 2025-11-01T04:00:00.000 # 格式:YYYY-MM-DDTHH:MM:SS.fff --execute

场景 3:重置到指定偏移量(如将分区 0 的偏移量设为 100)

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --reset-offsets --group test_consumer_group \ --topic test_topic:0 \ # 指定主题的分区 0(格式:topic:partition) --to-offset 100 # 目标偏移量 --execute

场景 4:重置到当前最新偏移量(跳过所有未消费消息)

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --reset-offsets --group test_consumer_group \ --topic test_topic \ --to-latest # 重置到最新偏移量 --execute

删除消费组

前提条件:消费组为 “空闲状态”(无正在运行的消费者,且偏移量无关联业务),删除后不可恢复。

js
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \ --delete --group test_consumer_group

三、其他命令

估算主题的消息总数

js
./kafka-run-class.sh kafka.tools.GetOffsetShell \ --bootstrap-server 192.168.201.113:9092 \ --topic test_topic \ --time -1 # -1 = 最新偏移量,-2 = 最早偏移量(总数 ≈ 各分区最新偏移量之和)

手动消费主题消息(调试用)

从历史消息开始消费(查看全量消息)

js
./kafka-console-consumer.sh --bootstrap-server 192.168.201.113:9092 \ --topic test_topic \ --from-beginning \ --group debug_consumer_group # 临时消费组,避免干扰业务消费组

仅消费指定分区的消息

js
./kafka-console-consumer.sh --bootstrap-server 192.168.201.113:9092 \ --topic test_topic:0 \ # 仅消费分区 0 --group debug_consumer_group

查看主题的配置列表(所有支持的配置项)

js
./kafka-configs.sh --bootstrap-server 192.168.201.113:9092 \ --describe --entity-type topics --entity-name test_topic

本文作者:松轩(^U^)

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

Document