Kafka作为分布式流处理平台,其基本命令是日常运维和开发的核心工具,涵盖集群管理、主题操作、消息生产与消费等多个维度,以下从不同场景详细解析常用命令及其使用方法。

集群管理命令
集群管理主要用于监控Kafka服务状态、查看节点信息及调整配置,常用命令通过Kafka自带的脚本执行,位于<kafka_home>/bin
目录下。
-
启动与停止服务
- 启动ZooKeeper(Kafka依赖ZooKeeper管理元数据):
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 启动Kafka服务:
bin/kafka-server-start.sh -daemon config/server.properties
- 停止服务:
bin/kafka-server-stop.sh
- 启动ZooKeeper(Kafka依赖ZooKeeper管理元数据):
-
查看集群状态
- 使用
kafka-topics.sh
结合--describe
选项可查看主题分布情况,但更通用的集群状态监控通过kafka-cluster.sh
(需Kafka 2.3+):bin/kafka-cluster.sh --bootstrap-server localhost:9092 describe --status
- 查看Broker节点列表:
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
- 使用
主题(Topic)操作命令
主题是Kafka中消息的逻辑分类,管理主题是核心操作之一。

-
创建主题
基本语法:kafka-topics.sh --bootstrap-server <broker地址> --create --topic <主题名> [选项]
示例:创建名为test_topic
的主题,分区3个,副本因子2,并指定清理策略为删除7天前的数据:bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic test_topic \ --partitions 3 \ --replication-factor 2 \ --config retention.ms=604800000
-
查看主题列表与详情
- 列出所有主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- 查看主题详情(分区分布、副本状态等):
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic
输出示例:
Topic: test_topic PartitionCount: 3 ReplicationFactor: 2 Topic: test_topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test_topic Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
其中
Leader
表示当前负责该分区读写的Broker,Replicas
为副本列表,Isr
(In-Sync Replicas)表示同步副本列表。(图片来源网络,侵删)
- 列出所有主题:
-
修改主题配置
动态调整主题参数,如增加分区数、修改清理策略:# 增加分区至5个 bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --alter --topic test_topic --partitions 5 # 修改清理策略为保留最近1000条消息 bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics --entity-name test_topic \ --alter --config retention.ms=-1 --config retention.bytes=-1 --config max.message.bytes=1048576
-
删除主题
需确保delete.topic.enable
在server.properties
中为true
(默认为true):bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test_topic
消息生产与消费命令
消息的发送(生产)和接收(消费)是Kafka最核心的功能。
-
生产消息(Producer)
基本命令:kafka-console-producer.sh --bootstrap-server <broker地址> --topic <主题名>
示例:向test_topic
发送消息,输入后按回车发送:bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic > hello kafka > this is a test message
高级选项:
- 指定key(消息会根据key分配到分区):
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic test_topic --property parse.key=true --property key.separator=: > key1:value1 > key2:value2
- 启用压缩(如GZIP):
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic test_topic --compression-type gzip
- 指定key(消息会根据key分配到分区):
-
消费消息(Consumer)
命令行消费者支持多种消费模式,基本语法:kafka-console-consumer.sh --bootstrap-server <broker地址> --topic <主题名> [选项]
示例:- 从最新消息开始消费:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic test_topic --from-beginning
- 消费指定分区(如分区0),并设置超时时间(如10秒无消息则退出):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic test_topic --partition 0 --timeout-ms 10000
- 消费到指定偏移量(如offset=5):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic test_topic --partition 0 --offset 5
- 消费时过滤消息(通过
--property
配置):bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic test_topic --property print.key=true --property print.value=true
- 从最新消息开始消费:
消费者组管理
消费者组是实现消息负载均衡的关键,支持多消费者并行消费。
-
查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
-
查看消费者组详情
包括消费者ID、分区分配情况、消费偏移量等:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --describe --group test_group
输出示例:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test_group test_topic 0 10 20 10 test_group test_topic 1 15 15 0
-
重置消费偏移量
支持重置到最早(earliest
)、最新(latest
)或指定时间戳:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group test_group --reset-offsets --to-earliest --execute --topic test_topic
工具类命令
-
消息读写工具(kafka-verifiable-producer/consumer)
用于测试,可验证消息顺序、丢失等情况:# 生产者:发送100条消息,每条消息包含序列号 bin/kafka-verifiable-producer.sh --broker-list localhost:9092 \ --topic test_topic --max-messages 100 # 消费者:验证消息顺序,确保不丢失 bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 \ --topic test_topic --group-id test_group
-
消息导出工具(kafka-dump-log.sh)
查看日志文件内容,用于排查问题:bin/kafka-dump-log.sh --files /tmp/kafka-logs/test_topic-0/00000000000000000000.log
常用命令参数总结
操作类型 | 命令脚本 | 核心参数示例 |
---|---|---|
集群状态 | kafka-cluster.sh | --bootstrap-server , --describe --status |
主题管理 | kafka-topics.sh | --create , --alter , --delete , --partitions , --replication-factor |
消息生产 | kafka-console-producer.sh | --topic , --property parse.key=true , --compression-type |
消息消费 | kafka-console-consumer.sh | --topic , --from-beginning , --partition , --offset , --group |
消费者组管理 | kafka-consumer-groups.sh | --list , --describe , --reset-offsets , --to-earliest |
相关问答FAQs
Q1: 如何解决Kafka主题创建失败提示“Topic already exists”?
A: 若确认主题需删除,可先执行删除命令(需确保delete.topic.enable=true
),或修改主题名重新创建,若主题存在但需修改配置,使用kafka-configs.sh
或kafka-topics.sh --alter
调整参数,无需删除重建,若因权限问题失败,检查Broker的server.properties
中authorizer.class.name
配置,确保相关用户有操作权限。
Q2: 消费者消费消息时出现“Consumer fenced”错误,如何处理?
A: “Consumer fenced”通常是由于消费者组内多个消费者实例使用相同group.id
且触发重平衡(rebalance)导致,解决方案:
- 检查是否有重复的消费者进程,确保
group.id
唯一; - 调整
session.timeout.ms
和heartbeat.interval.ms
参数,避免网络波动导致频繁重平衡; - 使用
kafka-consumer-groups.sh --describe --group <group_id>
查看消费者状态,确认是否有长时间未响应的消费者实例,必要时清理僵尸消费者。