编辑推荐: |
本文来自于博客园,本文主要以kafka_2.11-0.10.0.0为例,介绍了Kafka集群的安装和使用,希望对您的学习有所帮助。
|
|
Kafka是一种高吞吐量的分布式发布订阅的消息队列系统,原本开发自LinkedIn,用作LinkedIn的活动流(ActivityStream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。
1 Kafka消息队列简介
1.1 基本术语
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker[5]
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group
name,若不指定group name则属于默认的group)。
1.2 消息队列
1.2.1 基本特性
可扩展
在不需要下线的情况下进行扩容
数据流分区(partition)存储在多个机器上
高性能
单个broker就能服务上千客户端
单个broker每秒种读/写可达每秒几百兆字节
多个brokers组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
持久存储
存储在磁盘上
冗余备份到其他服务器上以防止丢失
1.2.2 消息格式
一个topic对应一种消息格式,因此消息用topic分类
一个topic代表的消息有1个或者多个patition(s)组成
一个partition中
一个partition应该存放在一到多个server上
如果只有一个server,就没有冗余备份,是单机而不是集群
如果有多个server
一个server为leader,其他servers为followers;leader需要接受读写请求;followers仅作冗余备份;leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放,顺序不可变
只能追加消息,不能插入
每个消息都有一个offset,用作消息ID, 在一个partition中唯一
offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的
消息有超时日期,过期则删除
1.2.3 生产者 producer
producer将消息写入kafka
写入要指定topic和partition
消息如何分到不同的partition,算法由producer指定
1.2.4 消费者 consumer
consumer读取消息并作处理
consumer group
这个概念的引入为了支持两种场景:每条消息分发一个消费者,每条消息广播给消费组的所有消费者
多个consumer group订阅一个topic,该topci的消息广播给group内所有consumer
一条消息发送到一个consumer group后,只能由该group的一个consumer接收和使用
一个group中的每个consumer对应一个partition可以带来如下好处
可以按照partition的数目进行并发处理
每个partition都只有一个consumer读取,因而保证了消息被处理的顺序是按照partition的存放顺序进行,注意这个顺序受到producer存放消息的算法影响
一个Consumer可以有多个线程进行消费,线程数应不多于topic的partition数,因为对于一个包含一或多消费线程的consumer
group来说,一个partition只能分给其中的一个消费线程消费,且让尽可能多的线程能分配到partition(不过实际上真正去消费的线程及线程数还是由线程池的调度机制来决定)。这样如果线程数比partition数多,那么单射分配也会有多出的线程,它们就不会消费到任何一个partition的数据而空转耗资源
。
如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
2. 安装和使用
以kafka_2.11-0.10.0.0为例。
下载解压后,进入kafka_2.11-0.10.0.0/
2.1 启动Zookeeper
测试时可以使用Kafka附带的Zookeeper:
启动:
./bin/zookeeper-server-start.sh
config/zookeeper.properties & ,config/zookeeper.properties是Zookeeper的配置文件。
|
结束:
./bin/zookeeper-server-stop.sh
|
不过最好自己搭建一个Zookeeper集群,提高可用性和可靠性。详见:Zookeeper的安装和使用——MarchOn
2.2 启动Kafka服务器
2.2.1 配置文件
配置config/server.properties文件,一般需要配置如下字段,其他按默认即可:
broker.id:
每一个broker在集群中的唯一表示,要求是正数
listeners(效果同之前的版本的host.name及port):注意绑定host.name,否则可能出现莫名其妙的错误如consumer找不到broker。这个host.name是Kafka的server的机器名字,会注册到Zookeeper中
log.dirs: kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
log.retention.hours: 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
zookeeper.connect: 指定ZooKeeper的connect string,以hostname:port的形式,可有多个以逗号分隔,如hostname1:port1,hostname2:port2,hostname3:port3,还可有路径,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中创建/kafka节点,否则会报出错误:java.lang.IllegalArgumentException:
Path length must be > 0
|
所有参数的含义及配置可参考:http://orchome.com/12、http://blog.csdn.net/lizhitao/article/details/25667831
一个配置示例如下:
1
# Licensed to the Apache Software Foundation
(ASF) under one or more
2 # contributor license agreements. See the
NOTICE file distributed with
3 # this work for additional information regarding
copyright ownership.
4 # The ASF licenses this file to You under
the Apache License, Version 2.0
5 # (the "License"); you may not use
this file except in compliance with
6 # the License. You may obtain a copy of the
License at
7 #
8 # http://www.apache.org/licenses/LICENSE -
2.0
9 #
10 # Unless required by applicable law or agreed
to in writing , software
11 # distributed under the License is distributed
on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
13 # See the License for the specific language
governing permissions and
14 # limitations under the License.
15 # see kafka.server.KafkaConfig for additional
details and defaults
16
17 Server Basics
18
19 # The id of the broker. This must be set
to a unique integer for each broker.
20 broker.id=1
21
22 Socket Server Settings
23
24 # The address the socket server listens on.
It will get the value returned from
25 # java.net.InetAddress.getCanonicalHostName()
if not configured.
26 # FORMAT:
27 # listeners = security_protocol://host_name:port
28 # EXAMPLE:
29 # listeners = PLAINTEXT://your. host . name
: 9092
30 listeners=PLAINTEXT://192 .168 .6 .128 :
9092
31
32 # Hostname and port the broker will advertise
to producers and consumers. If not set,
33 # it uses the value for "listeners"
if configured. Otherwise, it will use the value
34 # returned from java.net.InetAddress.getCanonicalHostName().
35 #advertised.listeners=PLAINTEXT://your.host.name:9092
36
37 # The number of threads handling network
requests
38 num.network.threads=3
39
40 # The number of threads doing disk I/O
41 num.io.threads=8
42
43 # The send buffer (SO_SNDBUF) used by the
socket server
44 socket.send.buffer.bytes=102400
45
46 # The receive buffer (SO_RCVBUF) used by
the socket server
47 socket.receive.buffer.bytes=102400
48
49 # The maximum size of a request that the
socket server will accept (protection against
OOM)
50 socket.request.max.bytes=104857600
51
52
53 Log Basics
54
55 # A comma seperated list of directories under
which to store log files
56 log.dirs=/usr/local/kafka/ kafka_2.11 - 0.10
.0 .0 /kfk_data/
57
58 # The default number of log partitions per
topic. More partitions allow greater
59 # parallelism for consumption, but this will
also result in more files across
60 # the brokers.
61 num.partitions=2
62 auto.create.topics.enable=false
63
64 # The number of threads per data directory
to be used for log recovery at startup and flushing
at shutdown.
65 # This value is recommended to be increased
for installations with data dirs located in
RAID array.
66 num.recovery.threads.per.data.dir=1
67
68 Log Flush Policy
69
70 # Messages are immediately written to the
filesystem but by default we only fsync() to
sync
71 # the OS cache lazily. The following configurations
control the flush of data to disk.
72 # There are a few important trade-offs here:
73 # 1. Durability: Unflushed data may be lost
if you are not using replication.
74 # 2. Latency: Very large flush intervals
may lead to latency spikes when the flush does
occur as there will be a lot of data to flush.
75 # 3. Throughput: The flush is generally the
most expensive operation, and a small flush
interval may lead to exceessive seeks.
76 # The settings below allow one to configure
the flush policy to flush data after a period
of time or
77 # every N messages (or both). This can be
done globally and overridden on a per-topic
basis.
78
79 # The number of messages to accept before
forcing a flush of data to disk
80 #log.flush.interval.messages=10000
81
82 # The maximum amount of time a message can
sit in a log before we force a flush
83 #log.flush.interval.ms=1000
84
85Log Retention Policy
86
87 # The following configurations control the
disposal of log segments. The policy can
88 # be set to delete segments after a period
of time, or after a given size has accumulated.
89 # A segment will be deleted whenever *either*
of these criteria are met. Deletion always happens
90 # from the end of the log.
91
92 # The minimum age of a log file to be eligible
for deletion
93 log.retention.hours=4
94
95 # A size-based retention policy for logs.
Segments are pruned from the log as long as
the remaining
96 # segments don't drop below log.retention.bytes.
97 #log.retention.bytes=1073741824
98
99 # The maximum size of a log segment file.
When this size is reached a new log segment
will be created.
100 log.segment.bytes=1073741824
101
102 # The interval at which log segments are
checked to see if they can be deleted according
103 # to the retention policies
104 log.retention.check.interval.ms=300000
105
106 Zookeeper
107
108 # Zookeeper connection string (see zookeeper
docs for details).
109 # This is a comma separated host:port pairs,
each corresponding to a zk
110 # server. e.g. "127.0.0.1:3000,127.0.0.1
: 3001 ,127 .0 .0 .1: 3002".
111 # You can also append an optional chroot
string to the urls to specify the
112 # root directory for all kafka znodes.
113 zookeeper.connect=192.168 . 6 .131: 2181
,192 .168 .6 .132 :2181 ,192 .168 .6 .133 :
2181
114
115 # Timeout in ms for connecting to zookeeper
116 zookeeper.connection.timeout.ms=6000
|
注意auto.create.topics.enable字段,若为true则如果producer写入某个不存在的topic时会自动创建该topic,若为false则需要事先创建否则会报错:failed
after 3 retries。
2.2.2 命令
启动: bin/kafka-server-start.sh config/server.properties
,生产环境最好以守护程序启动:nohup &
结束: bin/kafka-server-stop.sh
2.2.3 Kafka在Zookeeper中的存储结构
若上述的zookeeper.connect的值没有路径,则为根路径,启动Zookeeper和Kafka,命令行连接Zookeeper后,用
get / 命令可发现有 consumers、config、controller、admin、brokers、zookeeper、controller_epoch
这几个目录。
其结构如下:(具体可参考:apache kafka系列之在zookeeper中存储结构)
2.3 使用
kafka本身是和zookeeper相连的,而对应producer和consumer的状态保存也都是通过zookeeper完成的。对Kafka的各种操作通过其所连接的Zookeeper完成。
2.3.1 命令行客户端
创建topic: bin/kafka-topics.sh --create --zookeeper
localhost:2181 --replication-factor 1 --partitions
1 --topic test
列出所有topic: bin/kafka-topics.sh --list --zookeeper
localhost:2181
查看topic信息(包括分区、副本情况等): kafka-topics.sh --describe
--zookeeper localhost:2181 --topic my-replicated-topic
,会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点
往某topic生产消息: bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic test
从某topic消费消息: bin/kafka-console-consumer.sh --zookeeper
localhost:2181 --topic test --from-beginning (默认用一个线程消费指定topic的所有分区的数据)
删除某个Kafka groupid:连接Zookeeper后用rmr命令,如删除名为JSI的消费组:
rmr /consumers/JSI
查看消费进度:
./bin/kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm
--zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822
--topic GPS2
各参数:
--group指MirrorMaker消费源集群时指定的group.id
-zkconnect指源集群的zookeeper地址
--topic指定查的topic,没指定则返回所有topic的消费情况
|
2.3.2 Java客户端
1、Topic操作:
import
kafka.admin.DeleteTopicCommand;
2 import kafka.admin.TopicCommand;
3
4 /**
5 * @author zsm
6 * @date 2016年9月27日 上午10:26:42
7 * @version 1.0
8 * @parameter
9 * @since
10 * @return
11 */
12 public class JTopic {
13 public static void createTopic(String zkAddr,
String topicName, int partition, int replication)
{
14 String[] options = new String[] { "--create",
"--zookeeper", zkAddr, "--topic",
topicName, "--partitions",
15 partition + "", "--replication-factor",
replication + "" };
16 TopicCommand.main(options);
17 }
18
19 public static void listTopic(String zkAddr)
{
20 String[] options = new String[] { "--list",
"--zookeeper", zkAddr };
21 TopicCommand.main(options);
22 }
23
24 public static void describeTopic(String zkAddr,
String topicName) {
25 String[] options = new String[] { "--describe",
"--zookeeper", zkAddr, "--topic",
topicName, };
26 TopicCommand.main(options);
27 }
28
29 public static void alterTopic(String zkAddr,
String topicName) {
30 String[] options = new String[] { "--alter",
"--zookeeper", zkAddr, "--topic",
topicName, "--partitions", "5"
};
31 TopicCommand.main(options);
32 }
33
34 // 通过删除zk里面对应的路径来实现删除topic的功能,只会删除zk里面的信息,Kafka上真实的数据并没有删除
35 public static void deleteTopic(String zkAddr,
String topicName) {
36 String[] options = new String[] { "--zookeeper",
zkAddr, "--topic", topicName };
37 DeleteTopicCommand.main(options);
38 }
39
40 public static void main(String[] args) {
41 // TODO Auto-generated method stub
42
43 String myTestTopic = "ZsmTestTopic";
44 int myPartition = 4;
45 int myreplication = 1;
46
47 //createTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic, myPartition, myreplication);
48 // listTopic(ConfigureAPI.KafkaProperties.ZK);
49 describeTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
50 // alterTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
51 // deleteTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
52 }
53
54 }
|
2、写:(写时可以指定key以供Kafka根据key将数据写入某个分区,若无指定,则几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时))
1 package com.zsm.kfkdemo;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.Properties;
6
7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
8
9 import kafka.javaapi.producer.Producer;
10 import kafka.producer.KeyedMessage;
11 import kafka.producer.ProducerConfig;
12
13 /**
14 * 可以指定规则(key和分区函数)以让消息写到特定分区:
15 * <p>
16 * 1、若发送的消息没有指定key则Kafka会随机选择一个分区
17 * </p>
18 * <p>
19 * 2、否则,若指定了分区函数(通过partitioner.class)则该函数以key为参数确定写到哪个分区
20 * </p>
21 * <p>
22 * 3、否则,Kafka根据hash(key)%partitionNum确定写到哪个分区
23 * </p>
24 *
25 * @author zsm
26 * @date 2016年9月27日 上午10:26:42
27 * @version 1.0
28 * @parameter
29 * @since
30 * @return
31 */
32 public class JProducer extends Thread {
33 private Producer<String, String> producer;
34 private String topic;
35 private final int SLEEP = 10;
36 private final int msgNum = 1000;
37
38 public JProducer(String topic) {
39 Properties props = new Properties();
40 props.put("metadata.broker.list",
KafkaProperties.BROKER_LIST);// 如192.168.6.127:9092,192.168.6.128:9092
41 // request.required.acks
42 // 0, which means that the producer never
waits for an acknowledgement from the broker
(the same behavior as 0.7). This option provides
the lowest latency but the weakest durability
guarantees
43 // (some data will be lost when a server
fails).
44 // 1, which means that the producer gets
an acknowledgement after the leader replica
has received the data. This option provides
better durability as the client waits until
the server
45 // acknowledges the request as successful
(only messages that were written to the now-dead
leader but not yet replicated will be lost).
46 // -1, which means that the producer gets
an acknowledgement after all in-sync replicas
have received the data. This option provides
the best durability, we guarantee that no messages
will be
47 // lost as long as at least one in sync replica
remains.
48 props.put("request.required.acks",
"-1");
49 // 配置value的序列化类
50 props.put("serializer.class", "kafka.serializer.StringEncoder");
51 // 配置key的序列化类
52 props.put("key.serializer.class",
"kafka.serializer.StringEncoder");
53 // 提供自定义的分区函数将消息写到分区上,未指定的话Kafka根据hash(messageKey)%partitionNum确定写到哪个分区
54 props.put("partitioner.class",
"com.zsm.kfkdemo.MyPartitioner");
55 producer = new Producer<String, String>(new
ProducerConfig(props));
56 this.topic = topic;
57 }
58
59 @Override
60 public void run() {
61 boolean isBatchWriteMode = true;
62 System.out.println("isBatchWriteMode:
" + isBatchWriteMode);
63 if (isBatchWriteMode) {
64 // 批量发送
65 int batchSize = 100;
66 List<KeyedMessage<String, String>>
msgList = new ArrayList<KeyedMessage<String,
String>>(batchSize);
67 for (int i = 0; i < msgNum; i++) {
68 String msg = "Message_" + i;
69 msgList.add(new KeyedMessage<String, String>(topic,
i + "", msg));
70 // msgList.add(new KeyedMessage<String,
String>(topic, msg));//未指定key,Kafka会自动选择一个分区
71 if (i % batchSize == 0) {
72 producer.send(msgList);
73 System.out.println("Send->["
+ msgList + "]");
74 msgList.clear();
75 try {
76 sleep(SLEEP);
77 } catch (Exception ex) {
78 ex.printStackTrace();
79 }
80 }
81 }
82 producer.send(msgList);
83 } else {
84 // 单个发送
85 for (int i = 0; i < msgNum; i++) {
86 KeyedMessage<String, String> msg =
new KeyedMessage<String, String>(topic,
i + "", "Message_" + i);
87 // KeyedMessage<String, String> msg
= new KeyedMessage<String, String>(topic,
"Message_" + i);//未指定key,Kafka会自动选择一个分区
88 producer.send(msg);
89 System.out.println("Send->["
+ msg + "]");
90 try {
91 sleep(SLEEP);
92 } catch (Exception ex) {
93 ex.printStackTrace();
94 }
95 }
96 }
97
98 System.out.println("send done");
99 }
100
101 public static void main(String[] args) {
102 JProducer pro = new JProducer(KafkaProperties.TOPIC);
103 pro.start();
104 }
105 }
|
3、读:(对于Consumer,需要注意 auto.commit.enable 和 auto.offset.reset
这两个字段)
package
com.zsm.kfkdemo;
2
3 import java.text.MessageFormat;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.Properties;
8
9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
10
11 import kafka.consumer.Consumer;
12 import kafka.consumer.ConsumerConfig;
13 import kafka.consumer.ConsumerIterator;
14 import kafka.consumer.KafkaStream;
15 import kafka.javaapi.consumer.ConsumerConnector;
16 import kafka.message.MessageAndMetadata;
17
18 /**
19 * 同一consumer group的多线程消费可以两种方法实现:
20 * <p>
21 * 1、实现单线程客户端,启动多个去消费
22 * </p>
23 * <p>
24 * 2、在客户端的createMessageStreams里为topic指定
大于1的线程数,再启动多个线程处理每个stream
25 * </p>
26 *
27 * @author zsm
28 * @date 2016年9月27日 上午10:26:42
29 * @version 1.0
30 * @parameter
31 * @since
32 * @return
33 */
34 public class JConsumer extends Thread {
35
36 private ConsumerConnector consumer;
37 private String topic;
38 private final int SLEEP = 20;
39
40 public JConsumer(String topic) {
41 consumer = Consumer.createJavaConsumerConnector(this.consumerConfig
());
42 this.topic = topic;
43 }
44
45 private ConsumerConfig consumerConfig() {
46 Properties props = new Properties();
47 props.put("zookeeper.connect",
KafkaProperties.ZK);
48 props.put("group.id", KafkaProperties.GROUP_ID);
49 props.put("auto.commit.enable",
"true");// 默认为true,
让consumer定期commit
offset,zookeeper会将offset持久化,
否则只在内存,若故障则再消费时会从最后一次保存的offset开始
50 props.put("auto.commit.interval.ms",
KafkaProperties.INTERVAL + "");//
经过INTERVAL时间提交一次offset
51 props.put("auto.offset.reset",
"largest");// What to do when there
is no initial offset in ZooKeeper or if an offset
is out of range
52 props.put("zookeeper.session.timeout.ms",
KafkaProperties.TIMEOUT + "");
53 props.put("zookeeper.sync.time.ms",
"200");
54 return new ConsumerConfig(props);
55 }
56
57 @Override
58 public void run() {
59 Map<String, Integer> topicCountMap
= new HashMap<String, Integer>();
60 topicCountMap.put(topic, new Integer(1));//
线程数
61 Map<String, List<KafkaStream<byte[],
byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
62 KafkaStream<byte[], byte[]> stream
= streams.get(topic).get(0);// 若上面设了多个线程去消费,
则这里需为每个stream开个线程做如下的处理
63
64 ConsumerIterator<byte[], byte[]> it
= stream.iterator();
65 MessageAndMetadata<byte[], byte[]>
messageAndMetaData = null;
66 while (it.hasNext()) {
67 messageAndMetaData = it.next();
68 System.out.println(MessageFormat.format("Receive->[
message:{0} , key:{1} , partition:{2} , offset:{3}
]",
69 new String(messageAndMetaData.message()),
new String(messageAndMetaData.key()),
70 messageAndMetaData.partition() + "",
messageAndMetaData.offset() + ""));
71 try {
72 sleep(SLEEP);
73 } catch (Exception ex) {
74 ex.printStackTrace();
75 }
76 }
77 }
78
79 public static void main(String[] args) {
80 JConsumer con = new JConsumer(KafkaProperties.TOPIC);
81 con.start();
82 }
83 }
|
与Kafka相关的Maven依赖:
1 <dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka_2.9.2</artifactId>
4 <version>0.8.1.1</version>
5 <exclusions>
6 <exclusion>
7 <groupId>com.sun.jmx</groupId>
8 <artifactId>jmxri</artifactId>
9 </exclusion>
10 <exclusion>
11 <groupId>com.sun.jdmk</groupId>
12 <artifactId>jmxtools</artifactId>
13 </exclusion>
14 <exclusion>
15 <groupId>javax.jms</groupId>
16 <artifactId>jms</artifactId>
17 </exclusion>
18 </exclusions>
19 </dependency>
|
3 MirrorMaker
Kafka自身提供的MirrorMaker工具用于把一个集群的数据同步到另一集群,其原理就是对源集群消费、对目标集群生产。
运行时需要指定源集群的Zookeeper地址(pull模式)或目标集群的Broker列表(push模式)。
3.1 使用
运行 ./kafka-run-class.sh kafka.tools.MirrorMaker --help
查看使用说明,如下:
1
Option Description
2 ------ -----------
3 --blacklist <Java regex (String)> Blacklist
of topics to mirror.
4 --consumer.config <config file> Consumer
config to consume from a
5 source cluster. You may specify
6 multiple of these.
7 --help Print this message.
8 --num.producers <Integer: Number of Number
of producer instances (default:
9 producers> 1)
10 --num.streams <Integer: Number of Number
of consumption streams.
11 threads> (default: 1)
12 --producer.config <config file> Embedded
producer config.
13 --queue.size <Integer: Queue size in Number
of messages that are buffered
14 terms of number of messages> between the
consumer and producer
15 (default: 10000)
16 --whitelist <Java regex (String)> Whitelist
of topics to mirror.
|
3.2 启动
./bin/kafka-run-class.sh
kafka.tools.MirrorMaker --consumer.config zsmSourceClusterConsumer.config
--num.streams 2 --producer.config zsmTargetClusterProducer.config
--whitelist="ds*"
--consumer.config所指定的文件里至少需要有zookeeper.connect、group.id两字段
--producer.config至少需要有metadata.broker.list字段,指定目标集群的brooker列表
--whitelist指定要同步的topic
|
可以用2.3.1所说的查看消费进度来查看对原集群的同步状况(即消费状况)。
4 Kafka监控工具(KafkaOffsetMonitor)
可以借助KafkaOffsetMonitor来图形化展示Kafka的broker节点、topic、consumer及offset等信息。
以KafkaOffsetMonitor-assembly-0.2.0.jar为例,下载后执行:
#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m
-XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar
\
com.quantifind.kafka.offsetapp.OffsetGetterWeb
\
--zk 192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181
\
--port 8087 \
--refresh 10.seconds \
--retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log
&
|
其中,zk按照host1:port1,host2:port2…的格式去写即可,port为开启web界面的端口号,refresh为刷新时间,retain为数据保留时间(单位seconds,
minutes, hours, days)
5 Kafka集群管理工具(Kafka Manager)
kafka-manager是yahoo开源出来的项目,属于商业级别的工具用Scala编写。
这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。
此工具以集群的方式运行,需要Zookeeper。
参考资料:http://hengyunabc.github.io/kafka-manager-install/
5.1 安装
需要从Github下载源码并安装sbt工具编译生成安装包,生成的时间很长且不知为何一直出错,所以这里用网友已编译好的包
(备份链接)。
包为kafka-manager-1.0-SNAPSHOT.zip
>解压:
unzip
kafka-manager-1.0-SNAPSHOT.zip
|
>配置conf/application.conf里的kafka-manager.zkhosts:
kafka-manager.zkhosts="192.168.6.131:2181,192.168.
6.132:2181,192.168.6.133:2181"
|
>启动:
./bin/kafka-manager
-Dconfig.file=conf/application.conf
|
默认是9000端口,要使用其他端口可以在命令行指定http.port,此外kafka-manager.zkhosts也可以在命令行指定,如:
./bin/kafka-manager
-Dhttp.port=9001 -Dkafka-manager.zkhosts="192.168.6.131:2181,192.168.
6.132:2181,192.168.6.133:2181"
|
5.2 使用
访问web页面,在Cluster->Add Cluster,输入要监控的Kafka集群的Zookeeper即可。
6 进阶
在当前的kafka版本实现中,对于zookeeper的所有操作都是由kafka controller来完成的(serially的方式)
offset管理:kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2
kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted
topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted
topic中。同时Kafka又在内存中维护了三元组来维护最新的offset信息,consumer来取最新offset信息时直接从内存拿即可。当然,kafka允许你快速checkpoint最新的offset信息到磁盘上。
如何确定分区数:分区数的确定与硬件、软件、负载情况等都有关,要视具体情况而定,不过依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位是MB/s。然后假设总的目标吞吐量是Tt,那么分区数
= Tt / max(Tp, Tc) |