编辑
2025-11-02
Kakfa
00

目录

一、环境部署
二、修zookeepe改配置文件
三、修改Kakfa集群
三、生产者和消费者测试

一、环境部署

js
准备的机器 192.168.201.112 192.168.201.113

都安装好java jdk

js
rpm -ivh jdk-21_linux-x64_bin.rpm

都解压压缩包

js
tar -zxvf apache-zookeeper-3.9.4-bin.tar.gz tar -zxvf kafka_2.12-3.6.2.tgz

二、修zookeepe改配置文件

创建存储数据目录

js
mkdir -p /kafka/zookeeper_data

修改192.168.201.112、192.168.201.113机器的配置文件

js
vi /kafka/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
js
# ZooKeeper 的时间基准单位(毫秒),所有时间相关配置都以它为倍数 # 比如后续的 initLimit、syncLimit 都是 "tickTime 的倍数" tickTime=2000 # 从节点(Follower)启动后,与主节点(Leader)同步初始数据的最大超时时间 # 计算方式:initLimit × tickTime = 10 × 2000ms = 20秒 # 超过这个时间未同步成功,从节点会被判定为连接失败 initLimit=10 # 运行中,从节点与主节点之间通信的最大超时时间(如请求响应超时) # 计算方式:syncLimit × tickTime = 5 × 2000ms = 10秒 # 超过这个时间未收到响应,主节点会认为从节点失联 syncLimit=5 # 数据存储目录(核心!必须改) # 存储内容:内存快照(snapshot)、事务日志(记录所有写操作,用于恢复数据) # 注意:/tmp 是临时目录,Linux 重启后会自动清空,生产环境必须改成永久目录 # 示例:dataDir=/data/zookeeper(需提前创建该目录并授权) dataDir=/kafka/zookeeper_data # 客户端连接 ZooKeeper 的端口号(默认2181,需确保防火墙开放) # 外部程序(如Kafka)通过此端口连接,例如 Kafka 配置 zookeeper.connect=IP:2181 clientPort=2181 # 限制单个客户端(如一个Kafka实例)的最大并发连接数(默认60) # 若出现"连接数超限"报错,可取消注释并调大,例如:maxClientCnxns=100 #maxClientCnxns=60 # 以下是自动清理旧数据的配置(生产环境建议开启,避免磁盘占满) # 请先阅读官方维护指南再开启:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # 自动保留的最新快照文件数量(取消注释后生效) # 例如设置为3,表示只保留最近3个快照,更早的会被删除 #autopurge.snapRetainCount=3 # 自动清理任务的执行频率(单位:小时,取消注释后生效) # 设置为0表示禁用自动清理;设置为1表示每1小时清理一次 #autopurge.purgeInterval=1 ## 监控指标配置(对接Prometheus时使用,默认关闭) # # Prometheus监控 exporter 配置 #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider # 监控数据暴露的IP0.0.0.0表示允许所有IP访问) #metricsProvider.httpHost=0.0.0.0 # 监控数据暴露的端口(需开放此端口给Prometheus) #metricsProvider.httpPort=7000 # 是否导出JVM信息(如内存、CPU使用情况) #metricsProvider.exportJvmInfo=true # 添加集群编号 server.0=192.168.201.112:2888:3888 server.1=192.168.201.113:2888:3888

指定集群编号192.168.201.112

js
touch /kafka/zookeeper_data/myid echo 0 > /kafka/zookeeper_data/myid

指定集群编号192.168.201.113

js
touch /kafka/zookeeper_data/myid echo 1 > /kafka/zookeeper_data/myid

然后这两台机器进入目录启动zookeeper

js
cd /kafka/apache-zookeeper-3.9.4-bin/bin ./zkServer.sh start

这样就一个领导者一个工作者。( leader领导者,follower:跟随者) image.png

image.png

三、修改Kakfa集群

创建Kafka的数据存储目录

js
mkdir -p /kafka/data

修改配置文件

js
vi /kafka/kafka_2.12-3.6.2/config/server.properties

服务器192.168.201.112配置

js
############################# 服务器基本配置(ZooKeeper 模式) ############################# # 当前节点的唯一 ID(集群中不可重复) broker.id=0 # ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个) zookeeper.connect=192.168.201.112:2181,192.168.201.113:2181 # 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改) zookeeper.connection.timeout.ms=6000 ############################# socket 服务器设置(保留,无需修改) ############################# # 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器) listeners=PLAINTEXT://:9092 # broker 之间通信使用的监听器名称 inter.broker.listener.name=PLAINTEXT # 向客户端暴露的连接地址(填 Kafka 实际 IP) advertised.listeners=PLAINTEXT://192.168.201.112:9092 # 监听器与安全协议的映射(删除 CONTROLLER 相关映射) listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 网络和 IO 线程数(保留默认) num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# 日志存储基本配置(保留,无需修改) ############################# # 消息日志存储目录(你的配置是 /kafka/data,没问题) log.dirs=/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) ############################# offsets.topic.replication.factor=1 share.coordinator.state.topic.replication.factor=1 share.coordinator.state.topic.min.isr=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000

服务器192.168.201.113配置

js
############################# 服务器基本配置(ZooKeeper 模式) ############################# # 当前节点的唯一 ID(集群中不可重复) broker.id=1 # ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个) zookeeper.connect=192.168.201.112:2181,192.168.201.113:2181 # 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改) zookeeper.connection.timeout.ms=6000 ############################# socket 服务器设置(保留,无需修改) ############################# # 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器) listeners=PLAINTEXT://:9092 # broker 之间通信使用的监听器名称 inter.broker.listener.name=PLAINTEXT # 向客户端暴露的连接地址(填 Kafka 实际 IP) advertised.listeners=PLAINTEXT://192.168.201.113:9092 # 监听器与安全协议的映射(删除 CONTROLLER 相关映射) listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 网络和 IO 线程数(保留默认) num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# 日志存储基本配置(保留,无需修改) ############################# # 消息日志存储目录(你的配置是 /kafka/data,没问题) log.dirs=/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) ############################# offsets.topic.replication.factor=1 share.coordinator.state.topic.replication.factor=1 share.coordinator.state.topic.min.isr=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000

然后进入bin目录启动kafka

js
cd /kafka/kafka_2.12-3.6.2/bin/
js
./kafka-server-start.sh -daemon ../config/server.properties

三、生产者和消费者测试

创建集群主题

js
./kafka-topics.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --create --topic test_topic --partitions 3 --replication-factor 1

查看所有主题

js
./kafka-topics.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --list

启动一个手动输入消息的生产者,向 test_topic 发送消息

js
./kafka-console-producer.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --topic test_topic

启动消费者,订阅 test_topic 接收消息

js
./kafka-console-consumer.sh --bootstrap-server 192.168.201.112:9092,192.168.201.113:9092 --topic test_topic --from-beginning

本文作者:松轩(^U^)

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

Document