js准备的机器
192.168.201.112
192.168.201.113
都安装好java jdk
jsrpm -ivh jdk-21_linux-x64_bin.rpm
都解压压缩包
jstar -zxvf apache-zookeeper-3.9.4-bin.tar.gz
tar -zxvf kafka_2.12-3.6.2.tgz
创建存储数据目录
jsmkdir -p /kafka/zookeeper_data
修改192.168.201.112、192.168.201.113机器的配置文件
jsvi /kafka/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
js# ZooKeeper 的时间基准单位(毫秒),所有时间相关配置都以它为倍数
# 比如后续的 initLimit、syncLimit 都是 "tickTime 的倍数"
tickTime=2000
# 从节点(Follower)启动后,与主节点(Leader)同步初始数据的最大超时时间
# 计算方式:initLimit × tickTime = 10 × 2000ms = 20秒
# 超过这个时间未同步成功,从节点会被判定为连接失败
initLimit=10
# 运行中,从节点与主节点之间通信的最大超时时间(如请求响应超时)
# 计算方式:syncLimit × tickTime = 5 × 2000ms = 10秒
# 超过这个时间未收到响应,主节点会认为从节点失联
syncLimit=5
# 数据存储目录(核心!必须改)
# 存储内容:内存快照(snapshot)、事务日志(记录所有写操作,用于恢复数据)
# 注意:/tmp 是临时目录,Linux 重启后会自动清空,生产环境必须改成永久目录
# 示例:dataDir=/data/zookeeper(需提前创建该目录并授权)
dataDir=/kafka/zookeeper_data
# 客户端连接 ZooKeeper 的端口号(默认2181,需确保防火墙开放)
# 外部程序(如Kafka)通过此端口连接,例如 Kafka 配置 zookeeper.connect=IP:2181
clientPort=2181
# 限制单个客户端(如一个Kafka实例)的最大并发连接数(默认60)
# 若出现"连接数超限"报错,可取消注释并调大,例如:maxClientCnxns=100
#maxClientCnxns=60
# 以下是自动清理旧数据的配置(生产环境建议开启,避免磁盘占满)
# 请先阅读官方维护指南再开启:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# 自动保留的最新快照文件数量(取消注释后生效)
# 例如设置为3,表示只保留最近3个快照,更早的会被删除
#autopurge.snapRetainCount=3
# 自动清理任务的执行频率(单位:小时,取消注释后生效)
# 设置为0表示禁用自动清理;设置为1表示每1小时清理一次
#autopurge.purgeInterval=1
## 监控指标配置(对接Prometheus时使用,默认关闭)
#
# Prometheus监控 exporter 配置
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
# 监控数据暴露的IP(0.0.0.0表示允许所有IP访问)
#metricsProvider.httpHost=0.0.0.0
# 监控数据暴露的端口(需开放此端口给Prometheus)
#metricsProvider.httpPort=7000
# 是否导出JVM信息(如内存、CPU使用情况)
#metricsProvider.exportJvmInfo=true
# 添加集群编号
server.0=192.168.201.112:2888:3888
server.1=192.168.201.113:2888:3888
指定集群编号192.168.201.112
jstouch /kafka/zookeeper_data/myid
echo 0 > /kafka/zookeeper_data/myid
指定集群编号192.168.201.113
jstouch /kafka/zookeeper_data/myid
echo 1 > /kafka/zookeeper_data/myid
然后这两台机器进入目录启动zookeeper
jscd /kafka/apache-zookeeper-3.9.4-bin/bin
./zkServer.sh start
这样就一个领导者一个工作者。( leader领导者,follower:跟随者)


创建Kafka的数据存储目录
jsmkdir -p /kafka/data
修改配置文件
jsvi /kafka/kafka_2.12-3.6.2/config/server.properties
服务器192.168.201.112配置
js############################# 服务器基本配置(ZooKeeper 模式) #############################
# 当前节点的唯一 ID(集群中不可重复)
broker.id=0
# ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个)
zookeeper.connect=192.168.201.112:2181,192.168.201.113:2181
# 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改)
zookeeper.connection.timeout.ms=6000
############################# socket 服务器设置(保留,无需修改) #############################
# 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器)
listeners=PLAINTEXT://:9092
# broker 之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
# 向客户端暴露的连接地址(填 Kafka 实际 IP)
advertised.listeners=PLAINTEXT://192.168.201.112:9092
# 监听器与安全协议的映射(删除 CONTROLLER 相关映射)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 网络和 IO 线程数(保留默认)
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# 日志存储基本配置(保留,无需修改) #############################
# 消息日志存储目录(你的配置是 /kafka/data,没问题)
log.dirs=/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
服务器192.168.201.113配置
js############################# 服务器基本配置(ZooKeeper 模式) #############################
# 当前节点的唯一 ID(集群中不可重复)
broker.id=1
# ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个)
zookeeper.connect=192.168.201.112:2181,192.168.201.113:2181
# 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改)
zookeeper.connection.timeout.ms=6000
############################# socket 服务器设置(保留,无需修改) #############################
# 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器)
listeners=PLAINTEXT://:9092
# broker 之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
# 向客户端暴露的连接地址(填 Kafka 实际 IP)
advertised.listeners=PLAINTEXT://192.168.201.113:9092
# 监听器与安全协议的映射(删除 CONTROLLER 相关映射)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 网络和 IO 线程数(保留默认)
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# 日志存储基本配置(保留,无需修改) #############################
# 消息日志存储目录(你的配置是 /kafka/data,没问题)
log.dirs=/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
然后进入bin目录启动kafka
jscd /kafka/kafka_2.12-3.6.2/bin/
js./kafka-server-start.sh -daemon ../config/server.properties
创建集群主题
js./kafka-topics.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --create --topic test_topic --partitions 3 --replication-factor 1
查看所有主题
js./kafka-topics.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --list
启动一个手动输入消息的生产者,向 test_topic 发送消息
js./kafka-console-producer.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --topic test_topic
启动消费者,订阅 test_topic 接收消息
js./kafka-console-consumer.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --topic test_topic --from-beginning
本文作者:松轩(^U^)
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!