Kafka 集群管理
启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
停止 Zookeeper
bin/zookeeper-server-stop.sh
启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
或后台启动:
bin/kafka-server-start.sh -daemon config/server.properties
停止 Kafka Broker
查看 Broker 状态
如果依赖 Zookeeper:
bin/zookeeper-shell.sh <zookeeper_host>:<port> ls /brokers/ids
如果不依赖 Zookeeper:
bin/kafka-broker-api-versions.sh --bootstrap-server <broker_host>:<port>
或:
bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --describe
Topic 操作
创建 Topic
bin/kafka-topics.sh --create --bootstrap-server <broker_host>:<port> --replication-factor <replCount> --partitions <partitionCount> --topic <topicName>
查看所有 Topic
bin/kafka-topics.sh --list --bootstrap-server <broker_host>:<port>
查看指定 Topic 的详细信息
bin/kafka-topics.sh --describe --topic <topicName> --bootstrap-server <broker_host>:<port>
修改 Topic 配置
bin/kafka-topics.sh --alter --topic <topicName> --config <key>=<value> --bootstrap-server <broker_host>:<port>
扩容 Topic 分区
bin/kafka-topics.sh --alter --topic <topicName> --partitions <newPartitionCount> --bootstrap-server <broker_host>:<port>
删除 Topic
bin/kafka-topics.sh --delete --topic <topicName> --bootstrap-server <broker_host>:<port>
生产者操作
发送消息到指定 Topic
bin/kafka-console-producer.sh --topic <topicName> --bootstrap-server <broker_host>:<port>
发送消息到指定分区
bin/kafka-console-producer.sh --topic <topicName> --partition <partitionNumber> --bootstrap-server <broker_host>:<port>
消费者操作
从指定 Topic 消费消息
bin/kafka-console-consumer.sh --topic <topicName> --bootstrap-server <broker_host>:<port> --from-beginning
从指定分区消费消息
bin/kafka-console-consumer.sh --topic <topicName> --partition <partitionNumber> --bootstrap-server <broker_host>:<port>
指定消费组消费消息
bin/kafka-console-consumer.sh --topic <topicName> --bootstrap-server <broker_host>:<port> --group <groupId>
消费者组管理
列出所有消费者组
bin/kafka-consumer-groups.sh --list --bootstrap-server <broker_host>:<port>
查看指定消费者组的详细信息
bin/kafka-consumer-groups.sh --describe --group <groupId> --bootstrap-server <broker_host>:<port>
删除消费者组
bin/kafka-consumer-groups.sh --delete --group <groupId> --bootstrap-server <broker_host>:<port>
重置消费者组的消费位移
bin/kafka-consumer-groups.sh --group <groupId> --reset-offsets --all-topics --to-earliest --execute --bootstrap-server <broker_host>:<port>
其他选项:
其他管理命令
查看集群状态
bin/kafka-broker-api-versions.sh --bootstrap-server <broker_host>:<port>
重新选举 Topic 分区的 Leader
bin/kafka-preferred-replica-election.sh --bootstrap-server <broker_host>:<port>
数据重平衡
bin/kafka-reassign-partitions.sh --bootstrap-server <broker_host>:<port>
评论