世界上只有一种英雄主义,就是看清生活的真相之后依然热爱生活!

CentOS 7 安装和配置 ZooKeeper 和 kafka 集群

2018-09-13
1,444次查阅
2019/7/8

ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。 分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列 等功能。

Zookeeper工作原理

在zookeeper的集群中,各个节点共有下面3种角色和4种状态:

  • 角色:leader,follower,observer
  • 状态:leading,following,observing,looking

Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议(ZooKeeper Atomic Broadcast protocol)。Zab协议有两种模式,它们分别是恢复模式(Recovery选主)和广播模式(Broadcast同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。

每个Server在工作过程中有4种状态:

  • LOOKING:当前Server不知道leader是谁,正在搜寻。
  • LEADING:当前Server即为选举出来的leader。
  • FOLLOWING:leader已经选举出来,当前Server与之同步。
  • OBSERVING:observer的行为在大多数情况下与follower完全一致,但是他们不参加选举和投票,而仅仅接受(observing)选举和投票的结果。

zookeeper集群一大特性是只要集群中半数以上的节点存活,集群就可以正常提供服务,所以集群机器数一般选择部署2n+1台机器。

安装JDK

tar zxvf jdk-8u151-linux-x64.tar.gz && cp jdk1.8.0_151 /usr/local/jdk

/etc/profile文件中添加:

#Add JDK
export JAVA_HOME=/usr/local/jdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

执行. /etc/profile生效,验证:

[root@iZj6ch1b7r6pvvwievual3Z src]# java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

安装ZooKeeper

下载地址:http://mirrors.hust.edu.cn/apache/zookeeper/

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
tar zxvf zookeeper-3.4.13.tar.gz
mv zookeeper-3.4.13 /usr/local/zookeeper

修改配置文件

cd /usr/local/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5

#data,logs的路径,根据个人的情况不同设置
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.0.50:2888:3888
server.2=192.168.0.60:2888:3888
server.3=192.168.0.70:2888:3888

# 其中2888是zookeeper服务之间通信的端口
# 3888是zookeeper与其他应用程序通信端口

在数据盘新建datalogs目录(目录位置根据个人),并在data目录下新建名为myid的文件,文件内容为1,群集中每台机器的myid文件内容不能重复,并和配置文件里的server相对应。1-255即可。

mkdir -p /data/zookeeper/{data,logs} 
echo "1" > /data/zookeeper/data/myid    #这里的 1 对应配置文件 server 后面的数字 

启动停止操作:

/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/zookeeper/bin/zkServer.sh status

安装配置 kafka 集群

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

下载地址:http://mirror.bit.edu.cn/apache/kafka/

wget http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar xvf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 /usr/local/kafka

创建日志目录

mkdir /data/kafka-logs

修改配置文件/usr/local/kafka/config/server.properties

broker.id=0                                                # 默认为0,群集按实例排序1、2、3
listeners=PLAINTEXT://192.168.0.50:9092                    # 本机地址
advertised.host.name=192.168.0.50                          # 本机地址
advertised.listeners=PLAINTEXT://192.168.0.50:9092         # 本机地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs                                  #自定义日志地址
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.cleanup.policy=delete
log.retention.hours=2
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.minutes=60
log.retention.ms=3600000
# zookeeper.connect=192.168.0.50:2181                                           #单机,一个IP:端口
zookeeper.connect=192.168.0.50:2181,192.168.0.60:2181,192.168.0.70:2181         #群集,多IP:端口
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

启动停止

#启动
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

#停止
/usr/local/kafka/bin/kafka-server-stop.sh

创建topic

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic

/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.0.50:2181

删除topic

/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

产生消息

#单机
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.50:9092 --topic test

#群集
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.50:9092,192.168.0.60:9092,192.168.0.70:9092 --topic test

消费消息

#单机
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.50:9092 --from-beginning --topic test

#群集
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.50:9092,192.168.0.60:9092,192.168.0.70:9092 --from-beginning --topic test 

评论

想说点什么?