集群管理

Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“leader”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“leader”知道。

分布式应用中, 我们经常同时启动多个server, 调用方(client)选择其中之一发起请求。分布式应用必须考虑高可用性和可扩展性: server的应用进程可能会崩溃, 或者server本身也可能会宕机。 当server不够时, 也有可能增加server的数量。总而言之, server列表并非一成不变, 而是一直处于动态的增减中。

实现方式是在 Zookeeper 上创建一个 EPHEMERAL 类型的目录节点,然后每个 Server 在它们创建目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,由于是 EPHEMERAL 目录节点,当创建它的 Server 死去,这个目录节点也随之被删除,所以 Children 将会变化,这时 getChildren上的 Watch 将会被调用,所以其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是同样的原理。

Server端注册节点

基本API操作请参考上一篇文章zookeeper集群搭建与配置,这里给出java实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class AppServer {
	private String pnode="sgroup";
	private String snode="sub";
	private String host="192.168.62.147:2181";
	public void connect(String content) throws Exception {
		ZooKeeper zk=new ZooKeeper(host,5000,new Watcher(){
			public void process(WatchedEvent event){
				//no process
			}
		});
		String createdPath=zk.create("/"+pnode+"/"+snode, content.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("create :"+createdPath);
	}

	public void handle() throws InterruptedException{
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		AppServer as=new AppServer();
		as.connect("register");
		as.handle();
	}
}

启动后将会在/sgroup下创建sub0000000000,结束运行后该节点就会消失。

多线程模拟

用多线程模拟多个server的注册过程,这里将创建3个临时有序节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package zkServer;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class Server {
	private String pnode="sgroup";
	private String snode="sub";
	private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
	public void connect(String content) throws Exception {
		ZooKeeper zk=new ZooKeeper(host,5000,new Watcher(){
			public void process(WatchedEvent event){
				//no process
			}
		});
		String createdPath=zk.create("/"+pnode+"/"+snode, content.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("create :"+createdPath);
	}

	public void handle() throws InterruptedException{
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args){
		Multi server1=new Multi(1);
		Multi server2=new Multi(2);
		Multi server3=new Multi(3);

		new Thread(server1,"server1").start();
		new Thread(server2,"server1").start();
		new Thread(server3,"server1").start();
	}
}

 class Multi implements Runnable {
	private int seq;
	public Multi(int seq){
		this.seq=seq;
	}
	public void run(){
		try {
			Server as=new Server();
			as.connect("register");
			System.out.println("server"+this.seq+" registered");
			as.handle();
		} catch (Exception e){
			e.printStackTrace();
		}
	}
}

运行结果

1
2
3
4
5
6
create :/sgroup/sub0000000005
server3 registered
create :/sgroup/sub0000000006
server1 registered
create :/sgroup/sub0000000007
server2 registered
1
2
[zk: localhost:2181(CONNECTED) 26] ls /sgroup
[sub0000000005, sub0000000006, sub0000000007]

Client端监视节点变化

zkClient的逻辑比zkServer稍微复杂一些, 需要监听”/sgroup”下子节点的变化事件, 当事件发生时, 需要更新server列表。注册监听”/sgroup”下子节点的变化事件, 可在getChildren方法中完成。当zookeeper回调监听器的process方法时, 判断该事件是否是”/sgroup”下子节点的变化事件, 如果是, 则调用更新逻辑, 并再次注册该事件的监听。需要注意的是watcher是一次性的,不支持用持久Watcher的原因很简单,ZK无法保证性能,所以我们必须再次注册。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package zkclient;

import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

public class Client {
	 private String pnode="sgroup";
	 private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
	 private ZooKeeper zk;
	 private List<String> serverList =new ArrayList<String>();

	public void connect() throws Exception {
		zk=new ZooKeeper(host,5000,new Watcher(){
			public void process(WatchedEvent event){
				if(event.getType()==EventType.NodeChildrenChanged && ("/"+pnode).equals(event.getPath())){
					try {
		 				updateServerList();
					}
					catch(Exception e) {
						e.printStackTrace();
					}
				}
			}
		});
		updateServerList();
	}


	public void updateServerList() throws Exception {
		List<String> subNodeList =zk.getChildren("/"+pnode, true);
		serverList=subNodeList;
		System.out.println("server list updated: "+serverList);
	}

	public void handle() throws InterruptedException{
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception{
		Client ac=new Client();
		ac.connect();
		ac.handle();

	}
}

Server注册与离开会触发watch event,client将得到server的动态列表

1
2
3
4
5
server list updated: []
server list updated: [sub0000000039, sub0000000038]
server list updated: [sub0000000039, sub0000000040, sub0000000038]
server list updated: [sub0000000040, sub0000000038]
server list updated: []

setData的byte[]大小是有限制的,官方文档说是1M,如果超出限制,zookeeper并不会提醒你。

Client监视节点数据变化

这里用2个线程,一个监视节点列表变化,一个监视节点数据变化,当节点数,或者节点数据变化时会自动通知到client。不过这里监视的两部分节点并不在同一个父节点下,主要用来实现LVS中real server的自动注册,以及LVS自动修改和推送配置。

客户端

ListMonitor用作监控节点数量变化,ConfMonitor用作监控节点数据变化。在connectForList()和connectForConf()中建立连接、设置watcher,并且如果父节点不存在则创建。在ConfMonitor中也监控了节点数量的变化,可以动态监听添加的子节点和配置。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package zkclient;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class Client {
	 private String pnode="sgroup";
	 private String conf="conf";
	 private String host="192.168.62.147:2181,192.168.62.148:2181,192.168.62.149:2181";
	 private ZooKeeper zk;
	 private List<String> serverList =new ArrayList<String>();


	public void connectForList() throws Exception {
		zk=new ZooKeeper(host,5000,new Watcher(){
			public void process(WatchedEvent event){			//监视子节点变化
				if(event.getType()==EventType.NodeChildrenChanged && ("/"+pnode).equals(event.getPath())){
					try {
		 				updateServerList();
					}
					catch(Exception e) {
						e.printStackTrace();
					}
				}
			}
		});

		if(zk.exists("/"+pnode, false)==null)  //如果节点不存在则创建
			zk.create("/"+pnode, "server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

		updateServerList();
	}

	public void updateServerList() throws Exception {
		List<String> subNodeList =zk.getChildren("/"+pnode, true);
		serverList=subNodeList;
		System.out.println("ListMonitor ===>"+serverList);
	}

	public void handle() throws InterruptedException{
		Thread.sleep(Long.MAX_VALUE);			//不做处理
	}

	public void connectForConf() throws Exception {
		zk=new ZooKeeper(host,5000,new Watcher(){
			public void process(WatchedEvent event){			//监视节点数据变化
				if(event.getType()==EventType.NodeDataChanged && conf.equals(event.getPath().split("/")[1])) {
					try {
		 				updateConf(event.getPath());
					}
					catch(Exception e) {
						e.printStackTrace();
					}
				}
			}
		});
		if(zk.exists("/"+conf, false)==null)			//如果节点不存在则创建
			zk.create("/"+conf, "server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		updateConf("");
	}

	public void updateConf(String node) throws Exception {
		List<String> subNodeList =zk.getChildren("/"+conf, true);
		for(String subnode : subNodeList){
			byte[] data=zk.getData("/"+conf+"/"+subnode, true, null);
//		List<String> dataList=new ArrayList<String>();
//		dataList.add(new String(data));
		}
		if(!node.isEmpty())
			System.out.println("DataMonitor ===>["+node+"] -----> "+new String(zk.getData(node, false, null)));

	}

	public static void main(String[] args) throws Exception {
		ListMonitor a = new ListMonitor();
		ConfMonitor b = new ConfMonitor();
		new Thread(a,"listmonitor").start();
		new Thread(b,"confmonitor").start();

	}
}

	class ListMonitor implements Runnable {
		public void run () {
			Client ac=new Client();
			try {
				ac.connectForList();
				ac.handle();
			}
			catch(Exception e) {
				e.printStackTrace();
			}

		}
	}

	class ConfMonitor implements Runnable {
		public void run () {
			Client ac=new Client();
			try {
				ac.connectForConf();
				ac.handle();
			}
			catch(Exception e) {
				e.printStackTrace();
			}
		}
	}

测试

这里用zookeeper自带的zkCli.sh来进行操作,上述程序来监听。

运行结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
ListMonitor ===>[a, b, c, d, e]
ListMonitor ===>[b, c, d, e]
ListMonitor ===>[c, d, e]
ListMonitor ===>[d, e]
ListMonitor ===>[e]
ListMonitor ===>[]

DataMonitor ===>[/conf/nginx] -----> vv
DataMonitor ===>[/conf/nginx] -----> v2
DataMonitor ===>[/conf/apache] -----> new
DataMonitor ===>[/conf/qzhttp] -----> latest

ListMonitor ===>[echo]
ListMonitor ===>[echo, bash]
ListMonitor ===>[echo, bash, tyr]

DataMonitor ===>[/conf/apache] -----> tyr.so

操作分为四部分

  • 第一部分依次删除/sgroup的子节点
  • 第二部分更改/conf子节点中的值
  • 第三部分在/sgroup节点下创建子节点
  • 第四部分更改/conf/apache的值

未完…