zookeeper实现注册与监听

集群管理

Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“leader”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“leader”知道。

分布式应用中, 我们经常同时启动多个server, 调用方(client)选择其中之一发起请求。分布式应用必须考虑高可用性和可扩展性: server的应用进程可能会崩溃, 或者server本身也可能会宕机。 当server不够时, 也有可能增加server的数量。总而言之, server列表并非一成不变, 而是一直处于动态的增减中。

实现方式是在 Zookeeper 上创建一个 EPHEMERAL 类型的目录节点,然后每个 Server 在它们创建目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,由于是 EPHEMERAL 目录节点,当创建它的 Server 死去,这个目录节点也随之被删除,所以 Children 将会变化,这时 getChildren上的 Watch 将会被调用,所以其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是同样的原理。

Server端注册节点

基本API操作请参考上一篇文章zookeeper集群搭建与配置,这里给出java实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public class AppServer {
private String pnode="sgroup";
private String snode="sub";
private String host="192.168.62.147:2181";
public void connect(String content) throws Exception {
ZooKeeper zk=new ZooKeeper(host,5000,new Watcher(){
public void process(WatchedEvent event){
//no process
}
});
String createdPath=zk.create("/"+pnode+"/"+snode, content.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create :"+createdPath);
}
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
AppServer as=new AppServer();
as.connect("register");
as.handle();
}
}

启动后将会在/sgroup下创建sub0000000000,结束运行后该节点就会消失。

多线程模拟

用多线程模拟多个server的注册过程,这里将创建3个临时有序节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package zkServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public class Server {
private String pnode="sgroup";
private String snode="sub";
private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
public void connect(String content) throws Exception {
ZooKeeper zk=new ZooKeeper(host,5000,new Watcher(){
public void process(WatchedEvent event){
//no process
}
});
String createdPath=zk.create("/"+pnode+"/"+snode, content.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create :"+createdPath);
}
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args){
Multi server1=new Multi(1);
Multi server2=new Multi(2);
Multi server3=new Multi(3);
new Thread(server1,"server1").start();
new Thread(server2,"server1").start();
new Thread(server3,"server1").start();
}
}
class Multi implements Runnable {
private int seq;
public Multi(int seq){
this.seq=seq;
}
public void run(){
try {
Server as=new Server();
as.connect("register");
System.out.println("server"+this.seq+" registered");
as.handle();
} catch (Exception e){
e.printStackTrace();
}
}
}

运行结果

1
2
3
4
5
6
create :/sgroup/sub0000000005
server3 registered
create :/sgroup/sub0000000006
server1 registered
create :/sgroup/sub0000000007
server2 registered

1
2
[zk: localhost:2181(CONNECTED) 26] ls /sgroup
[sub0000000005, sub0000000006, sub0000000007]

Client端监视节点变化

zkClient的逻辑比zkServer稍微复杂一些, 需要监听”/sgroup”下子节点的变化事件, 当事件发生时, 需要更新server列表。注册监听”/sgroup”下子节点的变化事件, 可在getChildren方法中完成。当zookeeper回调监听器的process方法时, 判断该事件是否是”/sgroup”下子节点的变化事件, 如果是, 则调用更新逻辑, 并再次注册该事件的监听。需要注意的是watcher是一次性的,不支持用持久Watcher的原因很简单,ZK无法保证性能,所以我们必须再次注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package zkclient;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
public class Client {
private String pnode="sgroup";
private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
private ZooKeeper zk;
private List<String> serverList =new ArrayList<String>();
public void connect() throws Exception {
zk=new ZooKeeper(host,5000,new Watcher(){
public void process(WatchedEvent event){
if(event.getType()==EventType.NodeChildrenChanged && ("/"+pnode).equals(event.getPath())){
try {
updateServerList();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
});
updateServerList();
}
public void updateServerList() throws Exception {
List<String> subNodeList =zk.getChildren("/"+pnode, true);
serverList=subNodeList;
System.out.println("server list updated: "+serverList);
}
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception{
Client ac=new Client();
ac.connect();
ac.handle();
}
}

Server注册与离开会触发watch event,client将得到server的动态列表

1
2
3
4
5
server list updated: []
server list updated: [sub0000000039, sub0000000038]
server list updated: [sub0000000039, sub0000000040, sub0000000038]
server list updated: [sub0000000040, sub0000000038]
server list updated: []

setData的byte[]大小是有限制的,官方文档说是1M,如果超出限制,zookeeper并不会提醒你。

Client监视节点数据变化

这里用2个线程,一个监视节点列表变化,一个监视节点数据变化,当节点数,或者节点数据变化时会自动通知到client。不过这里监视的两部分节点并不在同一个父节点下,主要用来实现LVS中real server的自动注册,以及LVS自动修改和推送配置。

客户端

ListMonitor用作监控节点数量变化,ConfMonitor用作监控节点数据变化。在connectForList()和connectForConf()中建立连接、设置watcher,并且如果父节点不存在则创建。在ConfMonitor中也监控了节点数量的变化,可以动态监听添加的子节点和配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package zkclient;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Client {
private String pnode="sgroup";
private String conf="conf";
private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
private ZooKeeper zk;
private List<String> serverList =new ArrayList<String>();
public void connectForList() throws Exception {
zk=new ZooKeeper(host,5000,new Watcher(){
public void process(WatchedEvent event){ //监视子节点变化
if(event.getType()==EventType.NodeChildrenChanged && ("/"+pnode).equals(event.getPath())){
try {
updateServerList();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
});
if(zk.exists("/"+pnode, false)==null) //如果节点不存在则创建
zk.create("/"+pnode, "server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
updateServerList();
}
public void updateServerList() throws Exception {
List<String> subNodeList =zk.getChildren("/"+pnode, true);
serverList=subNodeList;
System.out.println("ListMonitor ===>"+serverList);
}
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE); //不做处理
}
public void connectForConf() throws Exception {
zk=new ZooKeeper(host,5000,new Watcher(){
public void process(WatchedEvent event){ //监视节点数据变化
if(event.getType()==EventType.NodeDataChanged && conf.equals(event.getPath().split("/")[1])) {
try {
updateConf(event.getPath());
}
catch(Exception e) {
e.printStackTrace();
}
}
}
});
if(zk.exists("/"+conf, false)==null) //如果节点不存在则创建
zk.create("/"+conf, "server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
updateConf("");
}
public void updateConf(String node) throws Exception {
List<String> subNodeList =zk.getChildren("/"+conf, true);
for(String subnode : subNodeList){
byte[] data=zk.getData("/"+conf+"/"+subnode, true, null);
// List<String> dataList=new ArrayList<String>();
// dataList.add(new String(data));
}
if(!node.isEmpty())
System.out.println("DataMonitor ===>["+node+"] -----> "+new String(zk.getData(node, false, null)));
}
public static void main(String[] args) throws Exception {
ListMonitor a = new ListMonitor();
ConfMonitor b = new ConfMonitor();
new Thread(a,"listmonitor").start();
new Thread(b,"confmonitor").start();
}
}
class ListMonitor implements Runnable {
public void run () {
Client ac=new Client();
try {
ac.connectForList();
ac.handle();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
class ConfMonitor implements Runnable {
public void run () {
Client ac=new Client();
try {
ac.connectForConf();
ac.handle();
}
catch(Exception e) {
e.printStackTrace();
}
}
}

测试

这里用zookeeper自带的zkCli.sh来进行操作,上述程序来监听。

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ListMonitor ===>[a, b, c, d, e]
ListMonitor ===>[b, c, d, e]
ListMonitor ===>[c, d, e]
ListMonitor ===>[d, e]
ListMonitor ===>[e]
ListMonitor ===>[]
DataMonitor ===>[/conf/nginx] -----> vv
DataMonitor ===>[/conf/nginx] -----> v2
DataMonitor ===>[/conf/apache] -----> new
DataMonitor ===>[/conf/qzhttp] -----> latest
ListMonitor ===>[echo]
ListMonitor ===>[echo, bash]
ListMonitor ===>[echo, bash, tyr]
DataMonitor ===>[/conf/apache] -----> tyr.so

操作分为四部分

  • 第一部分依次删除/sgroup的子节点
  • 第二部分更改/conf子节点中的值
  • 第三部分在/sgroup节点下创建子节点
  • 第四部分更改/conf/apache的值

未完…

如果您觉得这篇文章对您有帮助,不妨支持我一下!