elk

kakfa常用命令

Posted by llussy on July 28, 2020

[toc]

重启

/usr/local/kafka_2.11-2.0.0/bin/kafka-server-stop.sh
sleep 3
/usr/local/kafka_2.11-2.0.0/bin/kafka-server-start.sh  -daemon /usr/local/kafka_2.11-2.0.0/config/server.properties

列出/删除topic

#列出topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181

#列出topic详情
./bin/kafka-topics.sh --zookeeper lcoalhost:2181 --topic ecar-baojia --describe

#创建topic
./bin/kafka-topics.sh --create --topic headlineRecomUpdateSync4 --replication-factor 3 --partitions 5 --zookeeper  192.168.1.1:2181

#删除topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

#连接到zookeeper彻底删除topic
./bin/zookeeper-shell.sh 192.168.1.1:2181
找到topic目录:ls /brokers/topics   删掉对应的topic :     rmr  /brokers/topic/topic-name
找到目录:  ls /config/topics        删除对应的topic:     rmr  /config/topics/topic-name

####从 头开始消费数据

# old version
./bin/kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic ecar-baojia --from-beginning

# new version
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.1:9092 --topic testtest --from-beginning

查看/删除消费组

#查看消费组
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --list

#删除
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --group logstash-b --delete

####查看消费组的积压情况

# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --describe --group logstash-a

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test-a         8          503108030       505454999       2346969         logstash-0-946898a5-7b1f-4930-9ec3-0d5d977d31b8 /192.168.1.1   logstash-0
test-a         2          503256691       505374825       2118134         logstash-0-946898a5-7b1f-4930-9ec3-0d5d977d31b8 /192.168.1.1   logstash-0
test-a         3          503152179       505364622       2212443         logstash-0-946898a5-7b1f-4930-9ec3-0d5d977d31b8 /192.168.1.1   logstash-0

展示的几个参数: TOPIC: 该消费者组消费的是哪些topic PARTITION: 表示该消费者消费的是哪些分区, 本例创建topic时只有一个分区, 所以输出只有一行, 分区号为0 CURRENT-OFFSET: 表示消费者组最新消费的位移值, 此值在消费过程中是变化的 LOG-END-OFFSET: 表示topic所有分区当前的日志终端位移值, 因为我们生产了1000万数据, 所以此处是1000万 LAG: 表示滞后进度, 此值为LOG-END-OFFSET 与 CURRENT-OFFSET的差值, 代表的是滞后情况, 此值越大表示滞后严重, 本例最终LAG为0 说明没有消费滞后.

检查消费者位置offset(0.9.0.0之前)

/usr/local/kafka_2.12-2.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic ugc_topic_test --zookeeper 192.168.1.1:2181

清除offset

登陆kafka集群所使用的zookeeper集群中任一台服务器(以10.90.11.32为例):

./ zkCli.sh -server 10.90.11.32:12181

[zk: 10.90.11.19:12181(CONNECTED) 1] ls /consumers/${consumer_name}/offsets/${topic_name}

[zk: 10.90.11.19:12181(CONNECTED) 2] rmr /consumers/${consumer_name}/offsets/${topic_name}

注:清除的是${consumer_name}消费者分组对于${topic_name}该topic的offset,此分组的消费者重启后默认会从当前开始消费。

设置消费者offset

# 设置为最初偏移量
./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-earliest –execute

#设置任意偏移量
./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-offset 3 –execute

#设置最近偏移量
./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-latest --execute

修改topic持久化时间

./bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 -topic test --alter --config retention.ms=259200000
# 259200000 3天
# 345600000 4天

增加partition分区数

./kafka-topics.sh --alter --topic recHeadLine --zookeeper  192.168.1.1:2181 --partitions 80

auto.offset.reset

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

Kafka auto.offset.reset值详解

参考

删除topics

kafka删除topics

kafka查询消费者组

kafka手动修改消费者偏移量