编辑
2025-10-31
Kakfa
00
请注意,本文编写于 45 天前,最后修改于 44 天前,其中某些信息可能已经过时。

目录

一、单机安装部署
先修改zookeeper的配置
Kafka配置文件

一、单机安装部署

先安装java JDK,高版本kafka只兼容javajdk11版本

js
yum install -y java-11-openjdk

上传apache-zookeeper-3.9.4-bin.tar.gz和kafka_2.13-4.1.0.tgz包

进行解压

js
tar -zxvf apache-zookeeper-3.9.4-bin.tar.gz
js
tar -zxvf kafka_2.13-4.1.0.tgz

先修改zookeeper的配置

zoo.cfg是zookeeper的核心配置文件

把dataDir=/kafka/data修改,记得先创建好/kafka/data目录

js
vi /kafka/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
yml
# ZooKeeper 的时间基准单位(毫秒),所有时间相关配置都以它为倍数 # 比如后续的 initLimit、syncLimit 都是 "tickTime 的倍数" tickTime=2000 # 从节点(Follower)启动后,与主节点(Leader)同步初始数据的最大超时时间 # 计算方式:initLimit × tickTime = 10 × 2000ms = 20秒 # 超过这个时间未同步成功,从节点会被判定为连接失败 initLimit=10 # 运行中,从节点与主节点之间通信的最大超时时间(如请求响应超时) # 计算方式:syncLimit × tickTime = 5 × 2000ms = 10秒 # 超过这个时间未收到响应,主节点会认为从节点失联 syncLimit=5 # 数据存储目录(核心!必须改) # 存储内容:内存快照(snapshot)、事务日志(记录所有写操作,用于恢复数据) # 注意:/tmp 是临时目录,Linux 重启后会自动清空,生产环境必须改成永久目录 # 示例:dataDir=/data/zookeeper(需提前创建该目录并授权) dataDir=/kafka/zookeeper_data # 客户端连接 ZooKeeper 的端口号(默认2181,需确保防火墙开放) # 外部程序(如Kafka)通过此端口连接,例如 Kafka 配置 zookeeper.connect=IP:2181 clientPort=2181 # 限制单个客户端(如一个Kafka实例)的最大并发连接数(默认60) # 若出现"连接数超限"报错,可取消注释并调大,例如:maxClientCnxns=100 #maxClientCnxns=60 # 以下是自动清理旧数据的配置(生产环境建议开启,避免磁盘占满) # 请先阅读官方维护指南再开启:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # 自动保留的最新快照文件数量(取消注释后生效) # 例如设置为3,表示只保留最近3个快照,更早的会被删除 #autopurge.snapRetainCount=3 # 自动清理任务的执行频率(单位:小时,取消注释后生效) # 设置为0表示禁用自动清理;设置为1表示每1小时清理一次 #autopurge.purgeInterval=1 ## 监控指标配置(对接Prometheus时使用,默认关闭) # # Prometheus监控 exporter 配置 #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider # 监控数据暴露的IP(0.0.0.0表示允许所有IP访问) #metricsProvider.httpHost=0.0.0.0 # 监控数据暴露的端口(需开放此端口给Prometheus) #metricsProvider.httpPort=7000 # 是否导出JVM信息(如内存、CPU使用情况) #metricsProvider.exportJvmInfo=true

进入bin目录启动zookeeper

js
cd /kafka/apache-zookeeper-3.9.4-bin/bin

启动

js
./zkServer.sh status

image.png

测试链接

js
./zkCli.sh

Kafka配置文件

image.png

js
# kafka-server-start.sh Kafka Broker 启动程序,需指定配置文件(如 server.properties)才能启动服务端节点。 # kafka-server-stop.sh Kafka Broker 停止程序,用于优雅关闭运行中的 Broker 进程,避免数据丢失。 # kafka-topics.sh Topic 全生命周期管理程序,可创建、删除、修改 Topic(如调整分区数)、查看 Topic 列表/详情。 # kafka-console-producer.sh 命令行模拟生产者程序,手动输入消息并发送到指定 Topic,常用于测试消息生产流程。 # kafka-console-consumer.sh 命令行模拟消费者程序,订阅指定 Topic 并消费消息,支持从最新位置或最早位置开始消费,用于测试消息消费流程。
js
/kafka/kafka_2.13-4.1.0/config

最核心的配置文件:server.properties(Broker 必用)、producer.properties(生产者)、consumer.properties(消费者)。

js
vi server.properties

使用ZooKeeper

js
############################# 服务器基本配置(ZooKeeper 模式) ############################# # 当前节点的唯一 ID(集群中不可重复) broker.id=1 # ZooKeeper 集群连接地址(填你的 ZooKeeper 实际 IP:端口,单机就填一个) zookeeper.connect=192.168.201.113:2181 # 连接 ZooKeeper 的超时时间(默认 6000ms,无需修改) zookeeper.connection.timeout.ms=6000 ############################# socket 服务器设置(保留,无需修改) ############################# # 服务器监听的网络地址和端口(仅保留 PLAINTEXT 即可,删除 CONTROLLER 监听器) listeners=PLAINTEXT://:9092 # broker 之间通信使用的监听器名称 inter.broker.listener.name=PLAINTEXT # 向客户端暴露的连接地址(填 Kafka 实际 IP) advertised.listeners=PLAINTEXT://192.168.201.113:9092 # 监听器与安全协议的映射(删除 CONTROLLER 相关映射) listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 网络和 IO 线程数(保留默认) num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# 日志存储基本配置(保留,无需修改) ############################# # 消息日志存储目录(你的配置是 /kafka/data,没问题) log.dirs=/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# 内部主题、日志刷新/保留策略(全部保留,无需修改) ############################# offsets.topic.replication.factor=1 share.coordinator.state.topic.replication.factor=1 share.coordinator.state.topic.min.isr=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000

不使用ZooKeeper

yml
############################# 服务器基本配置 ############################# # 当前服务器的角色(KRaft 模式必填) # 可选值:broker(仅作为消息代理)、controller(仅作为控制器)、broker,controller(同时作为代理和控制器,单机模式常用) process.roles=broker,controller # 当前节点的唯一 ID(集群中所有节点必须不同,如 1、2、3...) node.id=1 # 控制器集群的连接地址(KRaft 模式必填,用于节点间通信) # 格式:主机:端口(若为集群,用逗号分隔多个控制器) controller.quorum.bootstrap.servers=192.168.201.113:9093 ############################# socket 服务器设置 ############################# # 服务器监听的网络地址和端口(KRaft 模式需至少包含控制器的监听器) # 格式:监听器名称://主机:端口(多个监听器用逗号分隔) # 示例中:PLAINTEXT 用于客户端和 broker 通信,CONTROLLER 用于控制器内部通信 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 # broker 之间通信使用的监听器名称(需与上面的监听器名称对应) inter.broker.listener.name=PLAINTEXT # 向客户端暴露的连接地址(若不设置,默认使用 listeners 的值) # 生产环境需填写实际可访问的 IP:端口(如 192.168.1.100:9092),否则客户端可能连接失败 advertised.listeners=PLAINTEXT://192.168.201.113:9092,CONTROLLER://192.168.201.113:9093 # 控制器使用的监听器名称(KRaft 模式必填,需与 listeners 中的控制器监听器对应) controller.listener.names=CONTROLLER # 监听器名称与安全协议的映射(默认无需修改,PLAINTEXT 表示无加密) # 格式:监听器名称:协议(如 CONTROLLER 用 PLAINTEXT 协议) listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 用于接收网络请求和发送响应的线程数(根据服务器 CPU 调整,默认 3 即可) num.network.threads=3 # 用于处理请求(可能包含磁盘 IO)的线程数(建议大于等于磁盘数量,默认 8) num.io.threads=8 # socket 发送缓冲区大小(默认 100KB,无需修改) socket.send.buffer.bytes=102400 # socket 接收缓冲区大小(默认 100KB,无需修改) socket.receive.buffer.bytes=102400 # 服务器可接受的最大请求大小(防止内存溢出,默认 100MB) socket.request.max.bytes=104857600 ############################# 日志存储基本配置 ############################# # 日志文件(消息数据)的存储目录(多个目录用逗号分隔) # 注意:/tmp 是临时目录,重启后数据会丢失,生产环境必须改为永久目录(如 /data/kafka/logs) log.dirs=/kafka/data # 新建主题时的默认分区数(分区越多,并行消费能力越强,但文件数量也越多) num.partitions=1 # 每个数据目录用于日志恢复(启动时)和刷新(关闭时)的线程数 # 若目录位于 RAID 阵列,建议增大此值 num.recovery.threads.per.data.dir=1 ############################# 内部主题设置 ############################# # 内部主题(如消费者偏移量、事务状态)的副本数 # 开发测试可用 1,生产环境建议大于 1(如 3),确保高可用 offsets.topic.replication.factor=1 # 消费者偏移量主题的副本数 share.coordinator.state.topic.replication.factor=1 # 共享协调器状态主题的副本数 share.coordinator.state.topic.min.isr=1 # 共享协调器主题的最小同步副本数 transaction.state.log.replication.factor=1 # 事务状态日志的副本数 transaction.state.log.min.isr=1 # 事务状态日志的最小同步副本数 ############################# 日志刷新策略 ############################# # 消息会立即写入文件系统,但默认延迟刷新到磁盘(由操作系统缓存管理) # 以下配置控制数据刷新到磁盘的策略,需权衡: # 1. 持久性:若不启用副本,未刷新的数据可能丢失 # 2. 延迟:刷新间隔过大会导致刷新时延迟飙升 # 3. 吞吐量:刷新是昂贵操作,间隔过小会导致频繁磁盘寻道 # 累计多少条消息后强制刷新到磁盘(默认注释,即不按消息数刷新) #log.flush.interval.messages=10000 # 消息在内存中最多存放多久后强制刷新到磁盘(默认注释,即不按时间刷新) #log.flush.interval.ms=1000 ############################# 日志保留策略 ############################# # 以下配置控制日志段的清理策略:按时间或大小清理 # 只要满足任一条件,日志段就会被删除(从日志末尾开始) # 日志文件可被删除的最小存活时间(默认 168 小时,即 7 天) log.retention.hours=168 # 基于大小的日志保留策略(默认注释,即不按大小清理) # 若启用,当剩余日志大小低于此值时,停止清理(单位:字节,1GB=1073741824) #log.retention.bytes=1073741824 # 单个日志段文件的最大大小(达到此值时创建新的日志段,默认 1GB) log.segment.bytes=1073741824 # 检查日志段是否符合清理条件的时间间隔(默认 300000ms,即 5 分钟) log.retention.check.interval.ms=300000
  1. process.roles:KRaft 模式必填,单机测试用 broker,controller,集群需区分角色(部分节点为 controller,部分为 broker)。
  2. node.id:集群中每个节点的唯一 ID,不能重复。
  3. controller.quorum.bootstrap.servers:集群模式需填写所有控制器节点的地址(如 node1:9093,node2:9093)。
  4. log.dirs:必须修改为永久目录(如 /data/kafka/logs),否则重启后数据丢失。
  5. advertised.listeners:生产环境需填写服务器的实际 IP(如 PLAINTEXT://192.168.1.100:9092),否则外部客户端无法连接。

启动 Kafka

js
cd /kafka/kafka_2.13-4.1.0/bin # 你的 Kafka bin 目录路径

后台启动,日志会输出到 logs 目录,不占用终端

js
./kafka-server-start.sh -daemon ../config/server.properties
js
ps -ef | grep kafka # 若输出包含“kafka.Kafka”,说明进程正常

本文作者:松轩(^U^)

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

Document