先安装java JDK,高版本kafka只兼容javajdk11版本
jsyum install -y java-11-openjdk
上传apache-zookeeper-3.9.4-bin.tar.gz和kafka_2.13-4.1.0.tgz包
进行解压
jstar -zxvf apache-zookeeper-3.9.4-bin.tar.gz
jstar -zxvf kafka_2.13-4.1.0.tgz
zoo.cfg是zookeeper的核心配置文件
把dataDir=/kafka/data修改,记得先创建好/kafka/data目录
jsvi /kafka/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
yml# ZooKeeper 的时间基准单位(毫秒),所有时间相关配置都以它为倍数
# 比如后续的 initLimit、syncLimit 都是 "tickTime 的倍数"
tickTime=2000
# 从节点(Follower)启动后,与主节点(Leader)同步初始数据的最大超时时间
# 计算方式:initLimit × tickTime = 10 × 2000ms = 20秒
# 超过这个时间未同步成功,从节点会被判定为连接失败
initLimit=10
# 运行中,从节点与主节点之间通信的最大超时时间(如请求响应超时)
# 计算方式:syncLimit × tickTime = 5 × 2000ms = 10秒
# 超过这个时间未收到响应,主节点会认为从节点失联
syncLimit=5
# 数据存储目录(核心!必须改)
# 存储内容:内存快照(snapshot)、事务日志(记录所有写操作,用于恢复数据)
# 注意:/tmp 是临时目录,Linux 重启后会自动清空,生产环境必须改成永久目录
# 示例:dataDir=/data/zookeeper(需提前创建该目录并授权)
dataDir=/kafka/zookeeper_data
# 客户端连接 ZooKeeper 的端口号(默认2181,需确保防火墙开放)
# 外部程序(如Kafka)通过此端口连接,例如 Kafka 配置 zookeeper.connect=IP:2181
clientPort=2181
# 限制单个客户端(如一个Kafka实例)的最大并发连接数(默认60)
# 若出现"连接数超限"报错,可取消注释并调大,例如:maxClientCnxns=100
#maxClientCnxns=60
# 以下是自动清理旧数据的配置(生产环境建议开启,避免磁盘占满)
# 请先阅读官方维护指南再开启:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# 自动保留的最新快照文件数量(取消注释后生效)
# 例如设置为3,表示只保留最近3个快照,更早的会被删除
#autopurge.snapRetainCount=3
# 自动清理任务的执行频率(单位:小时,取消注释后生效)
# 设置为0表示禁用自动清理;设置为1表示每1小时清理一次
#autopurge.purgeInterval=1
## 监控指标配置(对接Prometheus时使用,默认关闭)
#
# Prometheus监控 exporter 配置
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
# 监控数据暴露的IP(0.0.0.0表示允许所有IP访问)
#metricsProvider.httpHost=0.0.0.0
# 监控数据暴露的端口(需开放此端口给Prometheus)
#metricsProvider.httpPort=7000
# 是否导出JVM信息(如内存、CPU使用情况)
#metricsProvider.exportJvmInfo=true
进入bin目录启动zookeeper
jscd /kafka/apache-zookeeper-3.9.4-bin/bin
启动
js./zkServer.sh status

测试链接
js./zkCli.sh

js# kafka-server-start.sh
Kafka Broker 启动程序,需指定配置文件(如 server.properties)才能启动服务端节点。
# kafka-server-stop.sh
Kafka Broker 停止程序,用于优雅关闭运行中的 Broker 进程,避免数据丢失。
# kafka-topics.sh
Topic 全生命周期管理程序,可创建、删除、修改 Topic(如调整分区数)、查看 Topic 列表/详情。
# kafka-console-producer.sh
命令行模拟生产者程序,手动输入消息并发送到指定 Topic,常用于测试消息生产流程。
# kafka-console-consumer.sh
命令行模拟消费者程序,订阅指定 Topic 并消费消息,支持从最新位置或最早位置开始消费,用于测试消息消费流程。
js/kafka/kafka_2.13-4.1.0/config
最核心的配置文件:server.properties(Broker 必用)、producer.properties(生产者)、consumer.properties(消费者)。
jsvi server.properties
使用ZooKeeper
js############################# 服务器基本配置(ZooKeeper 模式) #############################
# 当前节点的唯一 ID(集群中不可重复)
broker.id=1
# ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个)
zookeeper.connect=192.168.201.113:2181
# 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改)
zookeeper.connection.timeout.ms=6000
############################# socket 服务器设置(保留,无需修改) #############################
# 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器)
listeners=PLAINTEXT://:9092
# broker 之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
# 向客户端暴露的连接地址(填 Kafka 实际 IP)
advertised.listeners=PLAINTEXT://192.168.201.113:9092
# 监听器与安全协议的映射(删除 CONTROLLER 相关映射)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 网络和 IO 线程数(保留默认)
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# 日志存储基本配置(保留,无需修改) #############################
# 消息日志存储目录(你的配置是 /kafka/data,没问题)
log.dirs=/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
不使用ZooKeeper
yml############################# 服务器基本配置 #############################
# 当前服务器的角色(KRaft 模式必填)
# 可选值:broker(仅作为消息代理)、controller(仅作为控制器)、broker,controller(同时作为代理和控制器,单机模式常用)
process.roles=broker,controller
# 当前节点的唯一 ID(集群中所有节点必须不同,如 1、2、3...)
node.id=1
# 控制器集群的连接地址(KRaft 模式必填,用于节点间通信)
# 格式:主机:端口(若为集群,用逗号分隔多个控制器)
controller.quorum.bootstrap.servers=192.168.201.113:9093
############################# socket 服务器设置 #############################
# 服务器监听的网络地址和端口(KRaft 模式需至少包含控制器的监听器)
# 格式:监听器名称://主机:端口(多个监听器用逗号分隔)
# 示例中:PLAINTEXT 用于客户端和 broker 通信,CONTROLLER 用于控制器内部通信
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# broker 之间通信使用的监听器名称(需与上面的监听器名称对应)
inter.broker.listener.name=PLAINTEXT
# 向客户端暴露的连接地址(若不设置,默认使用 listeners 的值)
# 生产环境需填写实际可访问的 IP:端口(如 192.168.1.100:9092),否则客户端可能连接失败
advertised.listeners=PLAINTEXT://192.168.201.113:9092,CONTROLLER://192.168.201.113:9093
# 控制器使用的监听器名称(KRaft 模式必填,需与 listeners 中的控制器监听器对应)
controller.listener.names=CONTROLLER
# 监听器名称与安全协议的映射(默认无需修改,PLAINTEXT 表示无加密)
# 格式:监听器名称:协议(如 CONTROLLER 用 PLAINTEXT 协议)
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 用于接收网络请求和发送响应的线程数(根据服务器 CPU 调整,默认 3 即可)
num.network.threads=3
# 用于处理请求(可能包含磁盘 IO)的线程数(建议大于等于磁盘数量,默认 8)
num.io.threads=8
# socket 发送缓冲区大小(默认 100KB,无需修改)
socket.send.buffer.bytes=102400
# socket 接收缓冲区大小(默认 100KB,无需修改)
socket.receive.buffer.bytes=102400
# 服务器可接受的最大请求大小(防止内存溢出,默认 100MB)
socket.request.max.bytes=104857600
############################# 日志存储基本配置 #############################
# 日志文件(消息数据)的存储目录(多个目录用逗号分隔)
# 注意:/tmp 是临时目录,重启后数据会丢失,生产环境必须改为永久目录(如 /data/kafka/logs)
log.dirs=/kafka/data
# 新建主题时的默认分区数(分区越多,并行消费能力越强,但文件数量也越多)
num.partitions=1
# 每个数据目录用于日志恢复(启动时)和刷新(关闭时)的线程数
# 若目录位于 RAID 阵列,建议增大此值
num.recovery.threads.per.data.dir=1
############################# 内部主题设置 #############################
# 内部主题(如消费者偏移量、事务状态)的副本数
# 开发测试可用 1,生产环境建议大于 1(如 3),确保高可用
offsets.topic.replication.factor=1 # 消费者偏移量主题的副本数
share.coordinator.state.topic.replication.factor=1 # 共享协调器状态主题的副本数
share.coordinator.state.topic.min.isr=1 # 共享协调器主题的最小同步副本数
transaction.state.log.replication.factor=1 # 事务状态日志的副本数
transaction.state.log.min.isr=1 # 事务状态日志的最小同步副本数
############################# 日志刷新策略 #############################
# 消息会立即写入文件系统,但默认延迟刷新到磁盘(由操作系统缓存管理)
# 以下配置控制数据刷新到磁盘的策略,需权衡:
# 1. 持久性:若不启用副本,未刷新的数据可能丢失
# 2. 延迟:刷新间隔过大会导致刷新时延迟飙升
# 3. 吞吐量:刷新是昂贵操作,间隔过小会导致频繁磁盘寻道
# 累计多少条消息后强制刷新到磁盘(默认注释,即不按消息数刷新)
#log.flush.interval.messages=10000
# 消息在内存中最多存放多久后强制刷新到磁盘(默认注释,即不按时间刷新)
#log.flush.interval.ms=1000
############################# 日志保留策略 #############################
# 以下配置控制日志段的清理策略:按时间或大小清理
# 只要满足任一条件,日志段就会被删除(从日志末尾开始)
# 日志文件可被删除的最小存活时间(默认 168 小时,即 7 天)
log.retention.hours=168
# 基于大小的日志保留策略(默认注释,即不按大小清理)
# 若启用,当剩余日志大小低于此值时,停止清理(单位:字节,1GB=1073741824)
#log.retention.bytes=1073741824
# 单个日志段文件的最大大小(达到此值时创建新的日志段,默认 1GB)
log.segment.bytes=1073741824
# 检查日志段是否符合清理条件的时间间隔(默认 300000ms,即 5 分钟)
log.retention.check.interval.ms=300000
启动 Kafka
jscd /kafka/kafka_2.13-4.1.0/bin # 你的 Kafka bin 目录路径
后台启动,日志会输出到 logs 目录,不占用终端
js./kafka-server-start.sh -daemon ../config/server.properties
jsps -ef | grep kafka
# 若输出包含“kafka.Kafka”,说明进程正常
本文作者:松轩(^U^)
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!