主机准备
主机 | IP | 系统 |
---|---|---|
kafka1 | 192.168.10.101 | ubuntu22.04 |
kafka2 | 192.168.10.102 | ubuntu22.04 |
kafka3 | 192.168.10.103 | ubuntu22.04 |
主机环境
- 安装java 11
- kafka_2.13-3.7.0
配置
这里部署的是broker和controller在一起的方式,也可以分别部署,具体修改kraft相关文件,配置都是相通的
kafka/config/kraft/server.properties
#集群角色
process.roles=broker,controller
#每台机器的id不能一样
node.id=3
#这个是所有conntroller的地址,用于投票选举使用
controller.quorum.voters=1@192.168.10.101:9093,2@192.168.10.102:9093,3@192.168.10.103:9093
# 监听端口
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
# 客户端连接地址
advertised.listeners=PLAINTEXT://192.168.20.208:9092
# 日志地址, 这个修改到非/tmp目录,否则系统重启后会丢失
log.dirs=/var/log/kafka/kraft-combined-logs
# ...其他配置按照实际情况修改
启动集群步骤:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
这个操作只在一个机器上运行,执行后记录这个值,这个是集群id,整个集群需要一致bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
每个机器上都要执行, 这个KAFKA_CLUSTER_ID 就是是上面执行的值bin/kafka-server-start.sh config/kraft/server.properties
启动集群
配置systemd开机启动
在/etc/systemd/system/kafka.service
文件中添加内容
[Unit]
Description=Apache Kafka Server
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/kraft/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
- 执行
sudo systemctl daemon-reload
重新加载 - 执行
sudo systemctl enable kafka
开机启动 - 执行
sudo systemctl start kafka
启动 - 使用
sudo journalctl -u kafka.service
查看日志
使用jps
查看进程是否启动成功
如果说还需要配置connect
在/etc/systemd/system/connect.service
文件中添加内容
[Unit]
Description=Apache Kafka Connect Node
After=kafka.service
Wants=kafka.service
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties
Restart=on-failure
[Install]
WantedBy=multi-user.target
- 执行
sudo systemctl daemon-reload
重新加载 - 执行
sudo systemctl enable connect
开机启动 - 执行
sudo systemctl start connect
启动 - 使用
sudo journalctl -u connect.service
查看日志
注意事项:
1. 先改配置文件然后在执行上述2的命令,否则启动会报错
2. 步骤1产生的随机数是集群id, 所有节点在2步骤时需要使用同一个id,否则运行期间会报`INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse`