菜鸟科技网

Kafka基本命令有哪些常用操作及参数?

Kafka作为分布式流处理平台,其基本命令是日常运维和开发的核心工具,涵盖集群管理、主题操作、消息生产与消费等多个维度,以下从不同场景详细解析常用命令及其使用方法。

Kafka基本命令有哪些常用操作及参数?-图1
(图片来源网络,侵删)

集群管理命令

集群管理主要用于监控Kafka服务状态、查看节点信息及调整配置,常用命令通过Kafka自带的脚本执行,位于<kafka_home>/bin目录下。

  1. 启动与停止服务

    • 启动ZooKeeper(Kafka依赖ZooKeeper管理元数据):
      bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    • 启动Kafka服务:
      bin/kafka-server-start.sh -daemon config/server.properties
    • 停止服务:
      bin/kafka-server-stop.sh
  2. 查看集群状态

    • 使用kafka-topics.sh结合--describe选项可查看主题分布情况,但更通用的集群状态监控通过kafka-cluster.sh(需Kafka 2.3+):
      bin/kafka-cluster.sh --bootstrap-server localhost:9092 describe --status
    • 查看Broker节点列表:
      bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

主题(Topic)操作命令

主题是Kafka中消息的逻辑分类,管理主题是核心操作之一。

Kafka基本命令有哪些常用操作及参数?-图2
(图片来源网络,侵删)
  1. 创建主题
    基本语法:kafka-topics.sh --bootstrap-server <broker地址> --create --topic <主题名> [选项]
    示例:创建名为test_topic的主题,分区3个,副本因子2,并指定清理策略为删除7天前的数据:

    bin/kafka-topics.sh --bootstrap-server localhost:9092 \
      --create --topic test_topic \
      --partitions 3 \
      --replication-factor 2 \
      --config retention.ms=604800000
  2. 查看主题列表与详情

    • 列出所有主题:
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
    • 查看主题详情(分区分布、副本状态等):
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic

      输出示例:

      Topic: test_topic  PartitionCount: 3   ReplicationFactor: 2
      Topic: test_topic  Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1,2
      Topic: test_topic  Partition: 1    Leader: 2   Replicas: 2,0   Isr: 2,0
      Topic: test_topic  Partition: 2    Leader: 0   Replicas: 0,1   Isr: 0,1

      其中Leader表示当前负责该分区读写的Broker,Replicas为副本列表,Isr(In-Sync Replicas)表示同步副本列表。

      Kafka基本命令有哪些常用操作及参数?-图3
      (图片来源网络,侵删)
  3. 修改主题配置
    动态调整主题参数,如增加分区数、修改清理策略:

    # 增加分区至5个
    bin/kafka-topics.sh --bootstrap-server localhost:9092 \
      --alter --topic test_topic --partitions 5
    # 修改清理策略为保留最近1000条消息
    bin/kafka-configs.sh --bootstrap-server localhost:9092 \
      --entity-type topics --entity-name test_topic \
      --alter --config retention.ms=-1 --config retention.bytes=-1 --config max.message.bytes=1048576
  4. 删除主题
    需确保delete.topic.enableserver.properties中为true(默认为true):

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test_topic

消息生产与消费命令

消息的发送(生产)和接收(消费)是Kafka最核心的功能。

  1. 生产消息(Producer)
    基本命令:kafka-console-producer.sh --bootstrap-server <broker地址> --topic <主题名>
    示例:向test_topic发送消息,输入后按回车发送:

    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic
    > hello kafka
    > this is a test message

    高级选项:

    • 指定key(消息会根据key分配到分区):
      bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --property parse.key=true --property key.separator=:
      > key1:value1
      > key2:value2
    • 启用压缩(如GZIP):
      bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --compression-type gzip
  2. 消费消息(Consumer)
    命令行消费者支持多种消费模式,基本语法:kafka-console-consumer.sh --bootstrap-server <broker地址> --topic <主题名> [选项]
    示例:

    • 从最新消息开始消费:
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --from-beginning
    • 消费指定分区(如分区0),并设置超时时间(如10秒无消息则退出):
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --partition 0 --timeout-ms 10000
    • 消费到指定偏移量(如offset=5):
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --partition 0 --offset 5
    • 消费时过滤消息(通过--property配置):
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic test_topic --property print.key=true --property print.value=true

消费者组管理

消费者组是实现消息负载均衡的关键,支持多消费者并行消费。

  1. 查看消费者组列表

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  2. 查看消费者组详情
    包括消费者ID、分区分配情况、消费偏移量等:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
      --describe --group test_group

    输出示例:

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
    test_group      test_topic      0          10              20              10
    test_group      test_topic      1          15              15              0
  3. 重置消费偏移量
    支持重置到最早(earliest)、最新(latest)或指定时间戳:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
      --group test_group --reset-offsets --to-earliest --execute --topic test_topic

工具类命令

  1. 消息读写工具(kafka-verifiable-producer/consumer)
    用于测试,可验证消息顺序、丢失等情况:

    # 生产者:发送100条消息,每条消息包含序列号
    bin/kafka-verifiable-producer.sh --broker-list localhost:9092 \
      --topic test_topic --max-messages 100
    # 消费者:验证消息顺序,确保不丢失
    bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 \
      --topic test_topic --group-id test_group
  2. 消息导出工具(kafka-dump-log.sh)
    查看日志文件内容,用于排查问题:

    bin/kafka-dump-log.sh --files /tmp/kafka-logs/test_topic-0/00000000000000000000.log

常用命令参数总结

操作类型 命令脚本 核心参数示例
集群状态 kafka-cluster.sh --bootstrap-server, --describe --status
主题管理 kafka-topics.sh --create, --alter, --delete, --partitions, --replication-factor
消息生产 kafka-console-producer.sh --topic, --property parse.key=true, --compression-type
消息消费 kafka-console-consumer.sh --topic, --from-beginning, --partition, --offset, --group
消费者组管理 kafka-consumer-groups.sh --list, --describe, --reset-offsets, --to-earliest

相关问答FAQs

Q1: 如何解决Kafka主题创建失败提示“Topic already exists”?
A: 若确认主题需删除,可先执行删除命令(需确保delete.topic.enable=true),或修改主题名重新创建,若主题存在但需修改配置,使用kafka-configs.shkafka-topics.sh --alter调整参数,无需删除重建,若因权限问题失败,检查Broker的server.propertiesauthorizer.class.name配置,确保相关用户有操作权限。

Q2: 消费者消费消息时出现“Consumer fenced”错误,如何处理?
A: “Consumer fenced”通常是由于消费者组内多个消费者实例使用相同group.id且触发重平衡(rebalance)导致,解决方案:

  1. 检查是否有重复的消费者进程,确保group.id唯一;
  2. 调整session.timeout.msheartbeat.interval.ms参数,避免网络波动导致频繁重平衡;
  3. 使用kafka-consumer-groups.sh --describe --group <group_id>查看消费者状态,确认是否有长时间未响应的消费者实例,必要时清理僵尸消费者。
分享:
扫描分享到社交APP
上一篇
下一篇