使用命令
#kafka服务启动:
./kafka-server-start.sh ../config/server.properties
#通过zk查看kafka集群成员:
./zkCli.sh -server localhost:2181 ls /brokers/ids
#查看有哪些topic:
./kafka-topics.sh --list --bootstrap-server kafka服务IP:9092
#创建topic:
./kafka-topics.sh --create --bootstrap-server kafka服务IP:9092 --replication-factor 1 --partitions 1 --topic test1
#topic分区数修改:
./kafka-topics.sh --bootstrap-server kafka服务IP:9092 -alter --partitions 4 --topic test1
#kafka生产(无key型消息):
./kafka-console-producer.sh --broker-list kafka服务IP:9092 --topic test1
#kafka生产(有key型消息,key与value间默认使用“Tab键”进行分隔,--property key.separator='分隔符合'):
./kafka-console-producer.sh --broker-list kafka服务IP:9092 --topic test1 --property parse.key=true --property key.separator='@'
#kafka消费(不输出key)
./kafka-console-consumer.sh --bootstrap-server kafka服务IP:9092 --from-beginning --topic test1
#kafka消费(输出key)
./kafka-console-consumer.sh --bootstrap-server kafka服务IP:9092 --from-beginning --topic test1 --property print.key=true
#删除topic:(要彻底删除必须把kafka中与当前topic相关的数据目录和zookeeper中与当前topic相关的路径一并删除)
./kafka-topics.sh --bootstrap-server kafka服务IP:9092 --delete --topic test1
#查看topic分区数
./kafka-topics.sh --bootstrap-server kafka服务IP:9092 --describe --topic 主题名称
#查看日志
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka-logs/test-1/00000000000000000000.log --print-data-log
———————————————————————————————————————————————————————
Nginx代理Kafka集群
nginx节点ip:10.88.0.11
broker节点ip:10.88.0.81、10.88.0.82、10.88.0.83
- 三台broker节点都必须配置hosts
10.88.0.81 kafka01
10.88.0.82 kafka02
10.88.0.83 kafka03
- Kafka配置不同的部分
# ########################## 10.88.0.81节点 ##########################
broker.id=1
listener.security.protocol.map=LOCAL_LISTENER:PLAINTEXT,INTERNAL_LISTENER:SASL_PLAINTEXT,EXTERNAL_LISTENER:SSL
listeners=LOCAL_LISTENER://127.0.0.1:9092,INTERNAL_LISTENER://kafka01:9093,EXTERNAL_LISTENER://kafka01:18001
advertised.listeners=INTERNAL_LISTENER://kafka01:9093,EXTERNAL_LISTENER://kafka01:18001
# ########################## 10.88.0.82节点 ##########################
broker.id=2
listener.security.protocol.map=LOCAL_LISTENER:PLAINTEXT,INTERNAL_LISTENER:SASL_PLAINTEXT,EXTERNAL_LISTENER:SSL
listeners=LOCAL_LISTENER://127.0.0.1:9092,INTERNAL_LISTENER://kafka02:9093,EXTERNAL_LISTENER://kafka02:18001
advertised.listeners=INTERNAL_LISTENER://kafka02:9093,EXTERNAL_LISTENER://kafka02:18001
# ########################## 10.88.0.83节点 ##########################
broker.id=1
listener.security.protocol.map=LOCAL_LISTENER:PLAINTEXT,INTERNAL_LISTENER:SASL_PLAINTEXT,EXTERNAL_LISTENER:SSL
listeners=LOCAL_LISTENER://127.0.0.1:9092,INTERNAL_LISTENER://kafka03:9093,EXTERNAL_LISTENER://kafka03:18001
advertised.listeners=INTERNAL_LISTENER://kafka03:9093,EXTERNAL_LISTENER://kafka03:18001
listeners配置项 指定了Kafka实际监听的网络地址和端口 LOCAL_LISTENER://127.0.0.1:9092表示Kafka在本地环回地址上的9092端口监听,仅对本机可用。 INTERNAL_LISTENER://kafka03:9093表示Kafka在kafka03主机的9093端口上监听,用于内部通信。 EXTERNAL_LISTENER://kafka03:18001表示Kafka在kafka03主机的18001端口上监听,用于对外提供服务。
advertised.listeners配置项 是告诉生产者和消费者应该使用哪些地址来连接,INTERNAL_LISTENER://kafka03:9093 和 EXTERNAL_LISTENER://kafka03:18001
- nginx节点必须配置hosts
10.88.0.11 kafka01
10.88.0.11 kafka02
10.88.0.11 kafka03
- nginx配置
stream{
upstream brokers{
server 10.88.0.81:18001;
server 10.88.0.82:18001;
server 10.88.0.83:18001;
}
server{
listen 18001;
proxy_pass brokers;
}
}
客户端配置brokers地址为:kafka01:18001,kafka02:18001,kafka03:18001 ,不能只配置一个
———————————————————————————————————————————————————————
