1).下载kafka:

wgethttp://apache.fayea.com/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

2) 解压:

tar-zxfkafka_2.10-0.8.2.1.tgz

3)启动kafka
adeMacBook-Pro:kafka_2.10-0.8.2.1 apple$ sh bin/kafka-server-start.sh config/server.properties
备注:要挂到后台使用:

shbin/kafka-server-start.shconfig/server.properties&

4)新建一个TOPIC

 adeMacBook-Pro:binapple$shkafka-topics.sh--create--topickafkatopic--replication-factor1
--partitions1--zookeeperlocalhost:2181

备注:要挂到后台使用:

 shkafka-topics.sh--create--topickafkatopic--replication-factor1--partitions1--zookeeperlocalhost
:2181&

5) 把KAFKA的生产者启动起来:

adeMacBook-Pro:binapple$shkafka-console-producer.sh--broker-listlocalhost:9092--sync
--topickafkatopic

备注:要挂到后台使用:

shkafka-console-producer.sh--broker-listlocalhost:9092--sync--topickafkatopic&

6)另开一个终端,把消费者启动起来:

 adeMacBook-Pro:binapple$shkafka-console-consumer.sh--zookeeperlocalhost:2181--topickafkatopic--from
-beginning

备注:要挂到后台使用:

 shkafka-console-consumer.sh--zookeeperlocalhost:2181--topickafkatopic--from-beginning&

基于0.8.0版本。

##查看topic分布情况kafka-list-topic.sh
bin/kafka-list-topic.sh –zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)
bin/kafka-list-topic.sh –zookeeper 192.168.197.170:2181,192.168.197.171:2181 –topic test (查看test的分区情况)

其实kafka-list-topic.sh里面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类

##创建TOPIC kafka-create-topic.sh

bin/kafka-create-topic.sh   --replica 2 --partition 8 --topic test  --zookeeper 192.168.197.170:2181,192.168.197.171:2181

创建名为test的topic, 8个分区分别存放数据,数据备份总共2份

bin/kafka-create-topic.sh   --replica 1 --partition 1 --topic test2  --zookeeper 192.168.197.170:2181,192.168.197.171:2181

结果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170

##重新分配分区kafka-reassign-partitions.sh
这个命令可以分区指定到想要的–broker-list上

bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute 
cat topic-to-move.json
{"topics":
     [{"topic": "test2"}],
     "version":1
}

##为Topic增加 partition数目kafka-add-partitions.sh
bin/kafka-add-partitions.sh –topic test –partition 2 –zookeeper 192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)

##控制台接收消息

bin/kafka-console-consumer.sh --zookeeper  192.168.197.170:2181,192.168.197.171:2181  --from-beginning --topic test

##控制台发送消息

bin/kafka-console-producer.sh --broker-list  192.168.197.170:9092,192.168.197.171: 9092    --topic test 

##手动均衡topic, kafka-preferred-replica-election.sh

bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json
cat preferred-click.json
{
 "partitions":
  [
    {"topic": "click", "partition": 0},
    {"topic": "click", "partition": 1},
    {"topic": "click", "partition": 2},
    {"topic": "click", "partition": 3},
    {"topic": "click", "partition": 4},
    {"topic": "click", "partition": 5},
    {"topic": "click", "partition": 6},
    {"topic": "click", "partition": 7},
    {"topic": "play", "partition": 0},
     {"topic": "play", "partition": 1},
     {"topic": "play", "partition": 2},
     {"topic": "play", "partition": 3},
     {"topic": "play", "partition": 4},
     {"topic": "play", "partition": 5},
     {"topic": "play", "partition": 6},
     {"topic": "play", "partition": 7}

  ]
}

##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

启动命令

/data/apps/kafka_2.11-0.10.0.0/bin/kafka-server-start.sh  /data/apps/kafka_2.11-0.10.0.0/config/server.properties  > /dev/nul 

#提供者测试

tail -n 0 -f  /var/log/messages  | bin/kafka-console-producer.sh --broker-list   aws10037.in.aiwaly.com:9092,aws10036.in.aiwaly.com:9092,aws10038.in.aiwaly.com:9092  --sync --topic my-replicated-topic

#循环输入日志

for((i=1;i<=10;));do echo test data-$(date +%F\ %T-%N)  >> /var/log/messages

#消费者

bin/kafka-console-consumer.sh --zookeeper aws10037.in.aiwaly.com:2181,aws10036.in.aiwaly.com:2181,aws10038.in.aiwaly.com:2181  --topic  my-replicated-topic   --from-beginning
如何获取kafka某topic的Logsize 
其实kafka提供了一系列的工具可以查看offset.


​MacPro:kafka-0.8.2.2-src ajian$ kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic my-replicated-topic  --group console-consumer-11758
Group           Topic                          Pid Offset          logSize         Lag             Owner
console-consumer-11758 my-replicated-topic            0   3715607         3715608         1               console-consumer-11758_MacPro-1449666659508-e5c5524d-0
文档更新时间: 2019-06-20 07:36   作者:月影鹏鹏