基础创建(指定核心参数)
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--create --topic test_topic \
--partitions 3 \ # 分区数:根据并发处理需求设置,单机建议 1-5 个
--replication-factor 1 # 副本数:单机只能设 1,集群建议 2-3 个(保证高可用)
带自定义配置创建(如消息保留时间、最大消息大小)
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--create --topic long_retention_topic \
--partitions 2 --replication-factor 1 \
--config retention.ms=86400000 \ # 消息保留时长:24 小时(默认 7 天,单位:毫秒)
--config max.message.bytes=1048576 # 单条消息最大大小:1MB(默认 1MB,单位:字节)
--config cleanup.policy=delete # 消息清理策略:删除(默认,可选 compact 压缩)
查看所有主题
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --list
过滤查看指定前缀 / 关键词的主题
js# 示例:查看以“test_”开头的主题
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --list | grep "test_"
查看单个主题的完整信息(推荐)
输出包含:主题名称、分区数、副本分布(Leader/Follower 节点)、自定义配置、ISR 列表(同步副本集)等。
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--describe --topic test_topic
查看所有主题的详情(不推荐,主题多时输出冗长)
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 --describe
新增 / 更新主题的自定义配置
js# 示例:将 test_topic 的消息保留时间延长至 7 天
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--alter --topic test_topic \
--config retention.ms=604800000 # 7 天 = 7*24*3600*1000 毫秒
删除主题的自定义配置(恢复默认值)
js# 示例:删除“最大消息大小”的自定义配置,恢复默认值
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--alter --topic test_topic \
--delete-config max.message.bytes
增加主题分区(仅支持扩容,不支持缩容)
js# 示例:将 test_topic 的分区数从 3 增加到 5
./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--alter --topic test_topic \
--partitions 5
分区扩容后,历史消息仅存在于原分区,新分区从扩容后开始接收消息,需确保业务能处理分区分布变化。
删除主题
js./kafka-topics.sh --bootstrap-server 192.168.201.113:9092 \
--delete --topic test_topic
前提条件:
查看所有消费组
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 --list
过滤查看指定消费组
js# 示例:查看包含“order”关键词的消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 --list | grep "order"
查看单个消费组的所有主题消费情况
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--describe --group test_consumer_group
查看消费组在指定主题上的消费详情(精准查询)
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--describe --group test_consumer_group \
--topic test_topic
重置前需确保:消费组无正在运行的消费者(或暂停消费者),避免偏移量被实时覆盖。
场景 1:重置到主题分区的最开始(重新消费所有历史消息)
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--reset-offsets --group test_consumer_group \
--topic test_topic \
--to-earliest # 核心参数:重置到最早偏移量
--execute # 必须加此参数才会实际执行(不加则仅预览重置结果)
场景 2:重置到指定时间点(如 1 小时前的消息)
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--reset-offsets --group test_consumer_group \
--topic test_topic \
--to-datetime 2025-11-01T04:00:00.000 # 格式:YYYY-MM-DDTHH:MM:SS.fff
--execute
场景 3:重置到指定偏移量(如将分区 0 的偏移量设为 100)
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--reset-offsets --group test_consumer_group \
--topic test_topic:0 \ # 指定主题的分区 0(格式:topic:partition)
--to-offset 100 # 目标偏移量
--execute
场景 4:重置到当前最新偏移量(跳过所有未消费消息)
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--reset-offsets --group test_consumer_group \
--topic test_topic \
--to-latest # 重置到最新偏移量
--execute
前提条件:消费组为 “空闲状态”(无正在运行的消费者,且偏移量无关联业务),删除后不可恢复。
js./kafka-consumer-groups.sh --bootstrap-server 192.168.201.113:9092 \
--delete --group test_consumer_group
估算主题的消息总数
js./kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server 192.168.201.113:9092 \
--topic test_topic \
--time -1 # -1 = 最新偏移量,-2 = 最早偏移量(总数 ≈ 各分区最新偏移量之和)
手动消费主题消息(调试用)
从历史消息开始消费(查看全量消息)
js./kafka-console-consumer.sh --bootstrap-server 192.168.201.113:9092 \
--topic test_topic \
--from-beginning \
--group debug_consumer_group # 临时消费组,避免干扰业务消费组
仅消费指定分区的消息
js./kafka-console-consumer.sh --bootstrap-server 192.168.201.113:9092 \
--topic test_topic:0 \ # 仅消费分区 0
--group debug_consumer_group
查看主题的配置列表(所有支持的配置项)
js./kafka-configs.sh --bootstrap-server 192.168.201.113:9092 \
--describe --entity-type topics --entity-name test_topic
本文作者:松轩(^U^)
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!