zkClient
write by donaldhan, 2018-06-16 15:17引言
前一篇文章,我们分析了一下Zookeeper原生API,先来回顾一下: Zookeeper主要有两个成员分别为客户端和watcher管理器。watcher观察器,主要关注点的事件类型有节点创建NodeCreated,节点删除NodeDeleted,节点数据改变NodeDataChanged, 节点子节点更新事件类型NodeChildrenChanged;客户端状态有:同步连接SyncConnected,断开连接Disconnected,只读连接ConnectedReadOnly,验证失败AuthFailed,已验证SaslAuthenticated,会话过期Expired等状态。 Watcher观察者管理器ZKWatchManager,主要根据事件类型,注册节点观察器,默认为节点数据观察器集,节点存在观察器集,节点孩子节点观察器集,默认观察期器集;如果是NodeCreated和NodeDeleted,则注册节点数据观察器集,节点存在观察器集; 如果是NodeDataChanged,则注册节点孩子节点观察器集;如果是NodeDeleted,则注册节点数据观察器集,节点存在观察器集,节点孩子节点观察器集。
客户端ClientCnxn中最重要的是发送线程SendThread和事件线程EventThread,同时关联一个ZooKeeper,以及客户端watcher管理器ClientWatchManager,实际为ZKWatchManager,
还有一个我们需要关注的点是等待发送数据包队列pendingQueue(LinkedList
数据包Packet主要有请求头部requestHeader(RequestHeader),响应头部replyHeader(ReplyHeader),请求request(Record),响应response(Record),字节缓冲区ByteBuffer,客户端路径clientPath,服务端路径serverPath,异步回调接口AsyncCallback,数据包上下文,观察者注册器watchRegistration。
发送线程SendThread主要的作用是发送客户端请求数据包,实际委托给内部的clientCnxnSocket。
客户端socket的主要功能为发送数据包sendPacket和调度数据包队列doTransport。
客户端Socket的实现ClientCnxnSocketNIO,内部主要使用nio的选择器和选择key。
发送数据包,实际委托给内Socket通道。
调度数据包队列,实际委托给内Socket通道,如果是响应消息,则转化为响应Record,如果是发送数据包,则委托给内部的socket通道。
事件线程主要处理创建、设值,获取节点数据和获取节点子节点数据,检查节点是否存在,删除节点等事件,并处理。
启动客户端Socket,实际上启动发送数据包线程(处理数据的请求和响应)和事件线程(处理crwda相关事件)。
创建节点,创建创建请求和响应,委托给socket客户端,发送创建节点操作。
Zk的crwda的相关操作,首先创建相应类型的请求和响应,然后委托给socket客户端,处理响应的操作,并解析响应消息。
今天我们主要来看一下ZkClient。 ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装。 相对于原生api优势:
- 实现了超时重连、Watcher反复注册等功能。
- 添加序列化支持。
- 同时可以递归创建和删除路径。
这篇文章所有使用的示例代码可以参考zookeeper-demo。
目录
ZkClient客户端
一般我们创建ZkClient客户端如下:
/**
* @ClassName: CreateNodeSample
* @Description: 使用ZkClient创建节点,注意:当递归创建持久化路径的方法传入的createParents为true时,节点已经存在,不会抛出异常。
* @Author: Donaldhan
* @Date: 2018-05-13 19:28
*/
@Slf4j
public class CreateNodeSample {
private static ZkClient zkClient;
public static void main(String[] args) {
try {
zkClient = new ZkClient(ConfigConstant.IP, ConfigConstant.SESSION_TIMEOUT);
log.info("success connected ...");
String path = "/zk-book/c1";
//如果父节点不存在,可以创建父节点
zkClient.createPersistent(path, true);
log.info("success create:{} ...",path);
} catch (RuntimeException e) {
e.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
从上面示例,可以看出,主要客户端为ZkClient。
ZkClient的成员变量
我们来看一下ZkClient的成员变量
public class ZkClient implements Watcher {
protected IZkConnection _connection;//客户端连接
//子节点监听器集
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<String, Set<IZkChildListener>>();
//节点数据监听器集
private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = new ConcurrentHashMap<String, Set<IZkDataListener>>();
//节点状态监听器集
private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>();
private KeeperState _currentState;//当前状态
private final ZkLock _zkEventLock = new ZkLock();//事件锁
private boolean _shutdownTriggered;
private ZkEventThread _eventThread;//事件线程
// TODO PVo remove this later
private Thread _zookeeperEventThread;
private ZkSerializer _zkSerializer;//序列化器
}
从上面可以看出Zk客户端ZkClient主要的成员变量为:客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。。
再往下看之前我们,先把子节点监听器IZkChildListener,节点数据监听器IZkDataListener, 客户端状态监听器IZkStateListener,序列化器ZkSerializer的定义看一下。
节点数据监听器IZkDataListener
public interface IZkDataListener {
public void handleDataChange(String dataPath, Object data) throws Exception;
public void handleDataDeleted(String dataPath) throws Exception;
}
节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。
子节点监听器IZkChildListener
public interface IZkChildListener {
/**
* Called when the children of the given path changed.
*
* @param parentPath
* The parent path
* @param currentChilds
* The children or null if the root node (parent path) was deleted.
* @throws Exception
*/
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}
子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。
客户端状态监听器IZkStateListener
public interface IZkStateListener {
/**
* Called when the zookeeper connection state has changed.
*
* @param state
* The new state.
* @throws Exception
* On any error.
*/
public void handleStateChanged(KeeperState state) throws Exception;
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
public void handleNewSession() throws Exception;
}
客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。
事件锁ZkLock
public class ZkLock extends ReentrantLock {
private static final long serialVersionUID = 1L;
private Condition _dataChangedCondition = newCondition();
private Condition _stateChangedCondition = newCondition();
private Condition _zNodeEventCondition = newCondition();
/**
* This condition will be signaled if a zookeeper event was processed and the event contains a data/child change.
*
* @return the condition.
*/
public Condition getDataChangedCondition() {
return _dataChangedCondition;
}
/**
* This condition will be signaled if a zookeeper event was processed and the event contains a state change
* (connected, disconnected, session expired, etc ...).
*
* @return the condition.
*/
public Condition getStateChangedCondition() {
return _stateChangedCondition;
}
/**
* This condition will be signaled if any znode related zookeeper event was received.
*
* @return the condition.
*/
public Condition getZNodeEventCondition() {
return _zNodeEventCondition;
}
}
从上面可以看出,事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。
序列化器ZkSerializer
/**
* Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
* and higher level objects.
*
* @see BytesPushThroughSerializer
* @see SerializableSerializer
*/
public interface ZkSerializer {
public byte[] serialize(Object data) throws ZkMarshallingError;
public Object deserialize(byte[] bytes) throws ZkMarshallingError;
}
序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。
客户端连接IZkConnection,事件线程ZkEventThread这个我们在后面用到的时候,再讲。
再来看Zkclient的构造
public ZkClient(String serverstring) {
this(serverstring, Integer.MAX_VALUE);
}
public ZkClient(String zkServers, int connectionTimeout) {
this(new ZkConnection(zkServers), connectionTimeout);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
}
public ZkClient(IZkConnection connection) {
this(connection, Integer.MAX_VALUE);
}
public ZkClient(IZkConnection connection, int connectionTimeout) {
this(connection, connectionTimeout, new SerializableSerializer());
}
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) {
_connection = zkConnection;
_zkSerializer = zkSerializer;
connect(connectionTimeout, this);
}
Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。
来看一下默认的序列化器
public class SerializableSerializer implements ZkSerializer {
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Object object = inputStream.readObject();
return object;
} catch (ClassNotFoundException e) {
throw new ZkMarshallingError("Unable to find object class.", e);
} catch (IOException e) {
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object serializable) throws ZkMarshallingError {
try {
ByteArrayOutputStream byteArrayOS = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(byteArrayOS);
stream.writeObject(serializable);
stream.close();
return byteArrayOS.toByteArray();
} catch (IOException e) {
throw new ZkMarshallingError(e);
}
}
}
这个一看就明白,我们再来看ZkClient会话连接ZkConnection。
会话连接ZkConnection
再看会话连接以前,先来看接口IZkConnection的定义:
public interface IZkConnection {
public void connect(Watcher watcher);
void close() throws InterruptedException;
public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException;
public void delete(String path) throws InterruptedException, KeeperException;
boolean exists(final String path, final boolean watch) throws KeeperException, InterruptedException;
List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException;
public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException;
public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException;
public States getZookeeperState();
public long getCreateTime(String path) throws KeeperException, InterruptedException;
public String getServers();
}
从上面可以看出,会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与Zk原生API的客户端socket作用相同。
我们回到会话连接ZkConnection的定义:
public class ZkConnection implements IZkConnection {
private static final Logger LOG = Logger.getLogger(ZkConnection.class);
/** It is recommended to use quite large sessions timeouts for ZooKeeper. */
private static final int DEFAULT_SESSION_TIMEOUT = 30000;
private ZooKeeper _zk = null;
private Lock _zookeeperLock = new ReentrantLock();
private final String _servers;
private final int _sessionTimeOut;
public ZkConnection(String zkServers) {
this(zkServers, DEFAULT_SESSION_TIMEOUT);
}
public ZkConnection(String zkServers, int sessionTimeOut) {
_servers = zkServers;
_sessionTimeOut = sessionTimeOut;
}
}
从上面可以看出,ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。
再来看会话客户端ZkConnection的连接关闭,和CDRWA操作:
CDRWA操作
ZkConnection的连接关闭
@Override
public void connect(Watcher watcher) {
_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
_zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + _servers, e);
}
} finally {
_zookeeperLock.unlock();
}
}
public void close() throws InterruptedException {
_zookeeperLock.lock();
try {
if (_zk != null) {
LOG.debug("Closing ZooKeeper connected to " + _servers);
_zk.close();
_zk = null;
}
} finally {
_zookeeperLock.unlock();
}
}
从上面可以看出,连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。
CDRWA操作
public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
}
public void delete(String path) throws InterruptedException, KeeperException {
_zk.delete(path, -1);
}
public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException {
return _zk.exists(path, watch) != null;
}
public List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException {
return _zk.getChildren(path, watch);
}
public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException {
return _zk.getData(path, watch, stat);
}
public void writeData(String path, byte[] data) throws KeeperException, InterruptedException {
writeData(path, data, -1);
}
public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
_zk.setData(path, data, version);
}
public States getZookeeperState() {
return _zk != null ? _zk.getState() : null;
}
public ZooKeeper getZookeeper() {
return _zk;
}
@Override
public long getCreateTime(String path) throws KeeperException, InterruptedException {
Stat stat = _zk.exists(path, false);
if (stat != null) {
return stat.getCtime();
}
return -1;
}
@Override
public String getServers() {
return _servers;
}
从上面可以看出,CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。
我们回到ZkClient的CDRWA操作
ZkClient的CDRWA操作
public void createPersistent(String path, boolean createParents) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
try {
create(path, null, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
if (!createParents) {
throw e;
}
} catch (ZkNoNodeException e) {
if (!createParents) {
throw e;
}
String parentDir = path.substring(0, path.lastIndexOf('/'));
createPersistent(parentDir, createParents);
createPersistent(path, createParents);
}
}
从上面可以看出,创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录。
public boolean deleteRecursive(String path) {
List<String> children;
try {
children = getChildren(path, false);
} catch (ZkNoNodeException e) {
return true;
}
for (String subPath : children) {
if (!deleteRecursive(path + "/" + subPath)) {
return false;
}
}
return delete(path);
}
public boolean delete(final String path) {
try {
//注意这里是个回调,及在重新连接成功后,在执行删除操作
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
_connection.delete(path);
return null;
}
});
return true;
} catch (ZkNoNodeException e) {
return false;
}
}
public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
}
while (true) {
try {
return callable.call();
} catch (ConnectionLossException e) {
// we give the event thread some time to update the status to 'Disconnected'
Thread.yield();
waitUntilConnected();
} catch (SessionExpiredException e) {
// we give the event thread some time to update the status to 'Expired'
Thread.yield();
waitUntilConnected();
} catch (KeeperException e) {
throw ZkException.create(e);
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
}
public void waitUntilConnected() throws ZkInterruptedException {
waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException {
return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit);
}
public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) throws ZkInterruptedException {
if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
}
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
LOG.debug("Waiting for keeper state " + keeperState);
acquireEventLock();
try {
boolean stillWaiting = true;
while (_currentState != keeperState) {
if (!stillWaiting) {
return false;
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
}
LOG.debug("State is " + _currentState);
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
}
private void acquireEventLock() {
try {
getEventLock().lockInterruptibly();
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
}
}
从上面可以看出,当会话失去连接时,重新连接,,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。
protected boolean exists(final String path, final boolean watch) {
return retryUntilConnected(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return _connection.exists(path, watch);
}
});
}
public boolean exists(final String path) {
return exists(path, hasListeners(path));
}
// 判断是否存在目录监听器
private boolean hasListeners(String path) {
Set<IZkDataListener> dataListeners = _dataListener.get(path);
if (dataListeners != null && dataListeners.size() > 0) {
return true;
}
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && childListeners.size() > 0) {
return true;
}
return false;
}
从上面可以看出,检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。
再看读写操作之前,我们来看一个示例:
package org.donald.zkclient.read;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: ReadDataSample
* @Description: ZkClient获取节点数据
* @Author: Donaldhan
* @Date: 2018-05-13 20:03
*/
@Slf4j
public class ReadDataSample {
private static ZkClient zkClient;
public static void main(String[] args) {
try {
String path = "/zk-book";
zkClient = new ZkClient(ConfigConstant.IP, ConfigConstant.SESSION_TIMEOUT);
zkClient.createEphemeral(path, "123");
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) {
log.info("Node:{} deleted", dataPath);
}
@Override
public void handleDataChange(String dataPath, Object data) {
log.info("Node:{} changed, new data: {}", dataPath, data);
}
});
log.info("{} value:{}",path , zkClient.readData(path));
zkClient.writeData(path,"456");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep( Integer.MAX_VALUE );
} catch (RuntimeException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
}
上述示例,先创建一个临时路径,并添加目录监听器,然后读取数据。
我们先来看一下,注册节点监听器和反注册节点监听器
public void subscribeDataChanges(String path, IZkDataListener listener) {
Set<IZkDataListener> listeners;
synchronized (_dataListener) {
listeners = _dataListener.get(path);
if (listeners == null) {
listeners = new CopyOnWriteArraySet<IZkDataListener>();
_dataListener.put(path, listeners);
}
listeners.add(listener);
}
//关键点
watchForData(path);
LOG.debug("Subscribed data changes for " + path);
}
public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
synchronized (_dataListener) {
final Set<IZkDataListener> listeners = _dataListener.get(path);
if (listeners != null) {
listeners.remove(dataListener);
}
if (listeners == null || listeners.isEmpty()) {
_dataListener.remove(path);
}
}
}
我们来看关键点
//关键点
watchForData(path);
public void watchForData(final String path) {
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
_connection.exists(path, true);
return null;
}
});
}
再来看注册子目录监听器:
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
synchronized (_childListener) {
Set<IZkChildListener> listeners = _childListener.get(path);
if (listeners == null) {
listeners = new CopyOnWriteArraySet<IZkChildListener>();
_childListener.put(path, listeners);
}
listeners.add(listener);
}
return watchForChilds(path);
}
/**
* Installs a child watch for the given path.
* 监听给定路径的子路径
* @param path
* @return the current children of the path or null if the zk node with the given path doesn't exist.
*/
public List<String> watchForChilds(final String path) {
if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
}
return retryUntilConnected(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
exists(path, true);
try {
return getChildren(path, true);
} catch (ZkNoNodeException e) {
// ignore, the "exists" watch will listen for the parent node to appear
}
return null;
}
});
}
public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
synchronized (_childListener) {
final Set<IZkChildListener> listeners = _childListener.get(path);
if (listeners != null) {
listeners.remove(childListener);
}
}
}
我们回到读操作:
public <T extends Object> T readData(String path) {
return (T) readData(path, false);
}
public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
T data = null;
try {
data = (T) readData(path, null);
} catch (ZkNoNodeException e) {
if (!returnNullIfPathNotExists) {
throw e;
}
}
return data;
}
@SuppressWarnings("unchecked")
public <T extends Object> T readData(String path, Stat stat) {
return (T) readData(path, stat, hasListeners(path));
}
@SuppressWarnings("unchecked")
protected <T extends Object> T readData(final String path, final Stat stat, final boolean watch) {
byte[] data = retryUntilConnected(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return _connection.readData(path, stat, watch);
}
});
return (T) derializable(data);
}
private <T extends Object> T derializable(byte[] data) {
if (data == null) {
return null;
}
return (T) _zkSerializer.deserialize(data);
}
从上面可以,读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。
再来看写操作:
public void writeData(String path, Object object) {
writeData(path, object, -1);
}
public void writeData(final String path, Object datat, final int expectedVersion) {
final byte[] data = serialize(datat);
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
_connection.writeData(path, data, expectedVersion);
return null;
}
});
}
从上面可以看出,写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。
来小节一下CDRWA操作: 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。
再来看zk事件线程。
事件线程ZkEventThread
/**
* All listeners registered at the {@link ZkClient} will be notified from this event thread. This is to prevent
* dead-lock situations. The {@link ZkClient} pulls some information out of the {@link ZooKeeper} events to signal
* {@link ZkLock} conditions. Re-using the {@link ZooKeeper} event thread to also notify {@link ZkClient} listeners,
* would stop the ZkClient from receiving events from {@link ZooKeeper} as soon as one of the listeners blocks (because
* it is waiting for something). {@link ZkClient} would then for instance not be able to maintain it's connection state
* anymore.
*/
class ZkEventThread extends Thread {
private static final Logger LOG = Logger.getLogger(ZkEventThread.class);
private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>();
private static AtomicInteger _eventId = new AtomicInteger(0);
static abstract class ZkEvent {
private String _description;
public ZkEvent(String description) {
_description = description;
}
public abstract void run() throws Exception;
@Override
public String toString() {
return "ZkEvent[" + _description + "]";
}
}
ZkEventThread(String name) {
setDaemon(true);
setName("ZkClient-EventThread-" + getId() + "-" + name);
}
@Override
public void run() {
LOG.info("Starting ZkClient event thread.");
try {
while (!isInterrupted()) {
ZkEvent zkEvent = _events.take();
int eventId = _eventId.incrementAndGet();
LOG.debug("Delivering event #" + eventId + " " + zkEvent);
try {
zkEvent.run();
} catch (InterruptedException e) {
interrupt();
} catch (ZkInterruptedException e) {
interrupt();
} catch (Throwable e) {
LOG.error("Error handling event " + zkEvent, e);
}
LOG.debug("Delivering event #" + eventId + " done");
}
} catch (InterruptedException e) {
LOG.info("Terminate ZkClient event thread.");
}
}
public void send(ZkEvent event) {
if (!isInterrupted()) {
LOG.debug("New event: " + event);
_events.add(event);
}
}
}
从上面可以看出,事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue
在ZkClient的声明中,我们看到ZkClient实际上是实现了Watcher接口,下面我们来看Watcher接口的实现。
ZkClient Watcher接口
public void process(WatchedEvent event) {
LOG.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeChildrenChanged;
//给事件锁加锁
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
//关闭时,触发,则直接返回
if (getShutdownTrigger()) {
LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
//处理连接状态变更
if (stateChanged) {
processStateChanged(event);
}
//处理目录更新事件
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
//如果会话状态变更,则唤醒所有等待连接条件的线程
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
// TODO PVo write a test for this
if (event.getState() == KeeperState.Expired) {
//唤醒所有等待节点事件和节点变更条件的线程
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
//触发所有事件监听器
fireAllEvents();
}
}
if (znodeChanged) {
//唤醒所有等待节点事件条件的线程
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
//唤醒所有等待节点变更条件的线程
getEventLock().getDataChangedCondition().signalAll();
}
//释放锁
getEventLock().unlock();
LOG.debug("Leaving process event");
}
}
public boolean getShutdownTrigger() {
return _shutdownTriggered;
}
从上面可以看出,ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。我们来看几个关键点:
- 处理连接状态变更
//处理连接状态变更 if (stateChanged) { processStateChanged(event); } private void processStateChanged(WatchedEvent event) { LOG.info("zookeeper state changed (" + event.getState() + ")"); setCurrentState(event.getState()); if (getShutdownTrigger()) { return; } try { //触发会话状态变更事件 fireStateChangedEvent(event.getState()); if (event.getState() == KeeperState.Expired) { //过期,则重新连接 reconnect(); //触发新的会话事件 fireNewSessionEvents(); } } catch (final Exception e) { throw new RuntimeException("Exception while restarting zk client", e); } } private void fireStateChangedEvent(final KeeperState state) { for (final IZkStateListener stateListener : _stateListener) { _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) { @Override public void run() throws Exception { stateListener.handleStateChanged(state); } }); } } private void reconnect() { getEventLock().lock(); try { _connection.close(); _connection.connect(this); } catch (InterruptedException e) { throw new ZkInterruptedException(e); } finally { getEventLock().unlock(); } } private void fireNewSessionEvents() { for (final IZkStateListener stateListener : _stateListener) { _eventThread.send(new ZkEvent("New session event sent to " + stateListener) { @Override public void run() throws Exception { stateListener.handleNewSession(); } }); } }
从上面可以看出,状态变更事件处理,主要是将触发状态监听任务包装成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。
再来看处理目录更新事件
-
处理目录更新事件 ```java //处理目录更新事件 if (dataChanged) { processDataOrChildChange(event); } private void processDataOrChildChange(WatchedEvent event) { final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) { Set
childListeners = _childListener.get(path); if (childListeners != null && !childListeners.isEmpty()) { fireChildChangedEvents(path, childListeners); } } if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) { Set listeners = _dataListener.get(path); if (listeners != null && !listeners.isEmpty()) { fireDataChangedEvents(event.getPath(), listeners); } } }
private void fireDataChangedEvents(final String path, Set
@Override
public void run() throws Exception {
// reinstall watch,重新注册监听器,避免反复注册
exists(path, true);
try {
Object data = readData(path, null, true);
listener.handleDataChange(path, data);
} catch (ZkNoNodeException e) {
listener.handleDataDeleted(path);
}
}
});
} }
private void fireChildChangedEvents(final String path, Set
@Override
public void run() throws Exception {
try {
// if the node doesn't exist we should listen for the root node to reappear
exists(path);
List<String> children = getChildren(path);
listener.handleChildChange(path, children);
} catch (ZkNoNodeException e) {
listener.handleChildChange(path, null);
}
}
});
}
} catch (Exception e) {
LOG.error("Failed to fire child changed event. Unable to getChildren. ", e);
} } ```
从上面可以,触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。
- 触发所有事件监听器
//触发所有事件监听器 fireAllEvents(); private void fireAllEvents() { for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) { fireChildChangedEvents(entry.getKey(), entry.getValue()); } for (Entry<String, Set<IZkDataListener>> entry : _dataListener.entrySet()) { fireDataChangedEvents(entry.getKey(), entry.getValue()); } }
有了上面连个小节的介绍,上述代码很容易理解。
总结
Zk客户端ZkClient主要的成员变量为,客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。
节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。
子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。
客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。
事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。
序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。
Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。
会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与Zk原生API的客户端socket作用相同。
ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。 连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。 CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。
事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue
ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。 状态变更事件处理,主要是将触发状态监听任务保证成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。
触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。