0xTTEPX

Just do it, deeply...

Follow me on GitHub

zkClient

write by donaldhan, 2018-06-16 15:17

引言

前一篇文章,我们分析了一下Zookeeper原生API,先来回顾一下: Zookeeper主要有两个成员分别为客户端和watcher管理器。watcher观察器,主要关注点的事件类型有节点创建NodeCreated,节点删除NodeDeleted,节点数据改变NodeDataChanged, 节点子节点更新事件类型NodeChildrenChanged;客户端状态有:同步连接SyncConnected,断开连接Disconnected,只读连接ConnectedReadOnly,验证失败AuthFailed,已验证SaslAuthenticated,会话过期Expired等状态。 Watcher观察者管理器ZKWatchManager,主要根据事件类型,注册节点观察器,默认为节点数据观察器集,节点存在观察器集,节点孩子节点观察器集,默认观察期器集;如果是NodeCreated和NodeDeleted,则注册节点数据观察器集,节点存在观察器集; 如果是NodeDataChanged,则注册节点孩子节点观察器集;如果是NodeDeleted,则注册节点数据观察器集,节点存在观察器集,节点孩子节点观察器集。

客户端ClientCnxn中最重要的是发送线程SendThread和事件线程EventThread,同时关联一个ZooKeeper,以及客户端watcher管理器ClientWatchManager,实际为ZKWatchManager, 还有一个我们需要关注的点是等待发送数据包队列pendingQueue(LinkedList)和需要被发送的数据包队列outgoingQueue(LinkedList)。

数据包Packet主要有请求头部requestHeader(RequestHeader),响应头部replyHeader(ReplyHeader),请求request(Record),响应response(Record),字节缓冲区ByteBuffer,客户端路径clientPath,服务端路径serverPath,异步回调接口AsyncCallback,数据包上下文,观察者注册器watchRegistration。

发送线程SendThread主要的作用是发送客户端请求数据包,实际委托给内部的clientCnxnSocket。

客户端socket的主要功能为发送数据包sendPacket和调度数据包队列doTransport。

客户端Socket的实现ClientCnxnSocketNIO,内部主要使用nio的选择器和选择key。

发送数据包,实际委托给内Socket通道。

调度数据包队列,实际委托给内Socket通道,如果是响应消息,则转化为响应Record,如果是发送数据包,则委托给内部的socket通道。

事件线程主要处理创建、设值,获取节点数据和获取节点子节点数据,检查节点是否存在,删除节点等事件,并处理。

启动客户端Socket,实际上启动发送数据包线程(处理数据的请求和响应)和事件线程(处理crwda相关事件)。

创建节点,创建创建请求和响应,委托给socket客户端,发送创建节点操作。

Zk的crwda的相关操作,首先创建相应类型的请求和响应,然后委托给socket客户端,处理响应的操作,并解析响应消息。

今天我们主要来看一下ZkClient。 ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装。 相对于原生api优势:

  1. 实现了超时重连、Watcher反复注册等功能。
  2. 添加序列化支持。
  3. 同时可以递归创建和删除路径。

这篇文章所有使用的示例代码可以参考zookeeper-demo

目录

ZkClient客户端

一般我们创建ZkClient客户端如下:

/**
 * @ClassName: CreateNodeSample
 * @Description: 使用ZkClient创建节点,注意:当递归创建持久化路径的方法传入的createParents为true时,节点已经存在,不会抛出异常。
 * @Author: Donaldhan
 * @Date: 2018-05-13 19:28
 */
@Slf4j
public class CreateNodeSample {
    private static ZkClient zkClient;
    public static void main(String[] args) {
        try {
            zkClient = new ZkClient(ConfigConstant.IP, ConfigConstant.SESSION_TIMEOUT);
            log.info("success connected ...");
            String path = "/zk-book/c1";
            //如果父节点不存在,可以创建父节点
            zkClient.createPersistent(path, true);
            log.info("success create:{} ...",path);
        } catch (RuntimeException e) {
            e.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

从上面示例,可以看出,主要客户端为ZkClient。

ZkClient的成员变量

我们来看一下ZkClient的成员变量

public class ZkClient implements Watcher {
   protected IZkConnection _connection;//客户端连接
   //子节点监听器集
   private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<String, Set<IZkChildListener>>();
   //节点数据监听器集
   private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = new ConcurrentHashMap<String, Set<IZkDataListener>>();
   //节点状态监听器集
   private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>();
   private KeeperState _currentState;//当前状态
   private final ZkLock _zkEventLock = new ZkLock();//事件锁
   private boolean _shutdownTriggered;
   private ZkEventThread _eventThread;//事件线程
   // TODO PVo remove this later
   private Thread _zookeeperEventThread;
   private ZkSerializer _zkSerializer;//序列化器
}

从上面可以看出Zk客户端ZkClient主要的成员变量为:客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。。

再往下看之前我们,先把子节点监听器IZkChildListener,节点数据监听器IZkDataListener, 客户端状态监听器IZkStateListener,序列化器ZkSerializer的定义看一下。

节点数据监听器IZkDataListener

public interface IZkDataListener {
    public void handleDataChange(String dataPath, Object data) throws Exception;
    public void handleDataDeleted(String dataPath) throws Exception;
}

节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。

子节点监听器IZkChildListener

public interface IZkChildListener {

    /**
     * Called when the children of the given path changed.
     *
     * @param parentPath
     *            The parent path
     * @param currentChilds
     *            The children or null if the root node (parent path) was deleted.
     * @throws Exception
     */
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}

子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。

客户端状态监听器IZkStateListener

public interface IZkStateListener {

    /**
     * Called when the zookeeper connection state has changed.
     *
     * @param state
     *            The new state.
     * @throws Exception
     *             On any error.
     */
    public void handleStateChanged(KeeperState state) throws Exception;

    /**
     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
     * any ephemeral nodes here.
     *
     * @throws Exception
     *             On any error.
     */
    public void handleNewSession() throws Exception;
}

客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。

事件锁ZkLock

public class ZkLock extends ReentrantLock {

    private static final long serialVersionUID = 1L;

    private Condition _dataChangedCondition = newCondition();
    private Condition _stateChangedCondition = newCondition();
    private Condition _zNodeEventCondition = newCondition();

    /**
     * This condition will be signaled if a zookeeper event was processed and the event contains a data/child change.
     *
     * @return the condition.
     */
    public Condition getDataChangedCondition() {
        return _dataChangedCondition;
    }

    /**
     * This condition will be signaled if a zookeeper event was processed and the event contains a state change
     * (connected, disconnected, session expired, etc ...).
     *
     * @return the condition.
     */
    public Condition getStateChangedCondition() {
        return _stateChangedCondition;
    }

    /**
     * This condition will be signaled if any znode related zookeeper event was received.
     *
     * @return the condition.
     */
    public Condition getZNodeEventCondition() {
        return _zNodeEventCondition;
    }
}

从上面可以看出,事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。

序列化器ZkSerializer

/**
 * Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
 * and higher level objects.
 *
 * @see BytesPushThroughSerializer
 * @see SerializableSerializer
 */
public interface ZkSerializer {

    public byte[] serialize(Object data) throws ZkMarshallingError;

    public Object deserialize(byte[] bytes) throws ZkMarshallingError;
}

序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。

客户端连接IZkConnection,事件线程ZkEventThread这个我们在后面用到的时候,再讲。

再来看Zkclient的构造

public ZkClient(String serverstring) {
    this(serverstring, Integer.MAX_VALUE);
}

public ZkClient(String zkServers, int connectionTimeout) {
    this(new ZkConnection(zkServers), connectionTimeout);
}

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
}

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
}

public ZkClient(IZkConnection connection) {
    this(connection, Integer.MAX_VALUE);
}

public ZkClient(IZkConnection connection, int connectionTimeout) {
    this(connection, connectionTimeout, new SerializableSerializer());
}

public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) {
    _connection = zkConnection;
    _zkSerializer = zkSerializer;
    connect(connectionTimeout, this);
}

Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。

来看一下默认的序列化器

public class SerializableSerializer implements ZkSerializer {

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Object object = inputStream.readObject();
            return object;
        } catch (ClassNotFoundException e) {
            throw new ZkMarshallingError("Unable to find object class.", e);
        } catch (IOException e) {
            throw new ZkMarshallingError(e);
        }
    }

    @Override
    public byte[] serialize(Object serializable) throws ZkMarshallingError {
        try {
            ByteArrayOutputStream byteArrayOS = new ByteArrayOutputStream();
            ObjectOutputStream stream = new ObjectOutputStream(byteArrayOS);
            stream.writeObject(serializable);
            stream.close();
            return byteArrayOS.toByteArray();
        } catch (IOException e) {
            throw new ZkMarshallingError(e);
        }
    }

}

这个一看就明白,我们再来看ZkClient会话连接ZkConnection。

会话连接ZkConnection

再看会话连接以前,先来看接口IZkConnection的定义:

public interface IZkConnection {

    public void connect(Watcher watcher);

    void close() throws InterruptedException;

    public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException;

    public void delete(String path) throws InterruptedException, KeeperException;

    boolean exists(final String path, final boolean watch) throws KeeperException, InterruptedException;

    List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException;

    public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException;

    public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException;

    public States getZookeeperState();

    public long getCreateTime(String path) throws KeeperException, InterruptedException;

    public String getServers();
}

从上面可以看出,会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与Zk原生API的客户端socket作用相同。

我们回到会话连接ZkConnection的定义:

public class ZkConnection implements IZkConnection {

    private static final Logger LOG = Logger.getLogger(ZkConnection.class);

    /** It is recommended to use quite large sessions timeouts for ZooKeeper. */
    private static final int DEFAULT_SESSION_TIMEOUT = 30000;

    private ZooKeeper _zk = null;
    private Lock _zookeeperLock = new ReentrantLock();

    private final String _servers;
    private final int _sessionTimeOut;

    public ZkConnection(String zkServers) {
        this(zkServers, DEFAULT_SESSION_TIMEOUT);
    }

    public ZkConnection(String zkServers, int sessionTimeOut) {
        _servers = zkServers;
        _sessionTimeOut = sessionTimeOut;
    }
}

从上面可以看出,ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。

再来看会话客户端ZkConnection的连接关闭,和CDRWA操作:

CDRWA操作

ZkConnection的连接关闭

@Override
public void connect(Watcher watcher) {
     _zookeeperLock.lock();
     try {
         if (_zk != null) {
             throw new IllegalStateException("zk client has already been started");
         }
         try {
             LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
             _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
         } catch (IOException e) {
             throw new ZkException("Unable to connect to " + _servers, e);
         }
     } finally {
         _zookeeperLock.unlock();
     }
}

public void close() throws InterruptedException {
     _zookeeperLock.lock();
     try {
         if (_zk != null) {
             LOG.debug("Closing ZooKeeper connected to " + _servers);
             _zk.close();
             _zk = null;
         }
     } finally {
         _zookeeperLock.unlock();
     }
}

从上面可以看出,连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。

CDRWA操作

public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
     return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
}

public void delete(String path) throws InterruptedException, KeeperException {
     _zk.delete(path, -1);
}

public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException {
     return _zk.exists(path, watch) != null;
}

public List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException {
     return _zk.getChildren(path, watch);
}

public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException {
     return _zk.getData(path, watch, stat);
}

public void writeData(String path, byte[] data) throws KeeperException, InterruptedException {
     writeData(path, data, -1);
}

public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
     _zk.setData(path, data, version);
}

public States getZookeeperState() {
     return _zk != null ? _zk.getState() : null;
}

public ZooKeeper getZookeeper() {
     return _zk;
}

@Override
public long getCreateTime(String path) throws KeeperException, InterruptedException {
     Stat stat = _zk.exists(path, false);
     if (stat != null) {
         return stat.getCtime();
     }
     return -1;
}

@Override
public String getServers() {
     return _servers;
}

从上面可以看出,CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。

我们回到ZkClient的CDRWA操作

ZkClient的CDRWA操作

public void createPersistent(String path, boolean createParents) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        try {
            create(path, null, CreateMode.PERSISTENT);
        } catch (ZkNodeExistsException e) {
            if (!createParents) {
                throw e;
            }
        } catch (ZkNoNodeException e) {
            if (!createParents) {
                throw e;
            }
            String parentDir = path.substring(0, path.lastIndexOf('/'));
            createPersistent(parentDir, createParents);
            createPersistent(path, createParents);
        }
}

从上面可以看出,创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录。

public boolean deleteRecursive(String path) {
       List<String> children;
       try {
           children = getChildren(path, false);
       } catch (ZkNoNodeException e) {
           return true;
       }

       for (String subPath : children) {
           if (!deleteRecursive(path + "/" + subPath)) {
               return false;
           }
       }

       return delete(path);
}
public boolean delete(final String path) {
    try {
      //注意这里是个回调,及在重新连接成功后,在执行删除操作
        retryUntilConnected(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                _connection.delete(path);
                return null;
            }
        });

        return true;
    } catch (ZkNoNodeException e) {
        return false;
    }
}
public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
       if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
           throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
       }
       while (true) {
           try {
               return callable.call();
           } catch (ConnectionLossException e) {
               // we give the event thread some time to update the status to 'Disconnected'
               Thread.yield();
               waitUntilConnected();
           } catch (SessionExpiredException e) {
               // we give the event thread some time to update the status to 'Expired'
               Thread.yield();
               waitUntilConnected();
           } catch (KeeperException e) {
               throw ZkException.create(e);
           } catch (InterruptedException e) {
               throw new ZkInterruptedException(e);
           } catch (Exception e) {
               throw ExceptionUtil.convertToRuntimeException(e);
           }
       }
   }
public void waitUntilConnected() throws ZkInterruptedException {
        waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException {
        return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit);
    }

public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) throws ZkInterruptedException {
    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
        throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
    }
    Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));

    LOG.debug("Waiting for keeper state " + keeperState);
    acquireEventLock();
    try {
        boolean stillWaiting = true;
        while (_currentState != keeperState) {
            if (!stillWaiting) {
                return false;
            }
            stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
        }
        LOG.debug("State is " + _currentState);
        return true;
    } catch (InterruptedException e) {
        throw new ZkInterruptedException(e);
    } finally {
        getEventLock().unlock();
    }
}
private void acquireEventLock() {
       try {
           getEventLock().lockInterruptibly();
       } catch (InterruptedException e) {
           throw new ZkInterruptedException(e);
       }
   }

从上面可以看出,当会话失去连接时,重新连接,,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。

protected boolean exists(final String path, final boolean watch) {
       return retryUntilConnected(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
               return _connection.exists(path, watch);
           }
       });
}
public boolean exists(final String path) {
       return exists(path, hasListeners(path));
}
// 判断是否存在目录监听器
private boolean hasListeners(String path) {
      Set<IZkDataListener> dataListeners = _dataListener.get(path);
      if (dataListeners != null && dataListeners.size() > 0) {
          return true;
      }
      Set<IZkChildListener> childListeners = _childListener.get(path);
      if (childListeners != null && childListeners.size() > 0) {
          return true;
      }
      return false;
}

从上面可以看出,检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。

再看读写操作之前,我们来看一个示例:

package org.donald.zkclient.read;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.donald.constant.ConfigConstant;

/**
 * @ClassName: ReadDataSample
 * @Description: ZkClient获取节点数据
 * @Author: Donaldhan
 * @Date: 2018-05-13 20:03
 */
@Slf4j
public class ReadDataSample {
    private static ZkClient zkClient;
    public static void main(String[] args) {
        try {
            String path = "/zk-book";
            zkClient = new ZkClient(ConfigConstant.IP, ConfigConstant.SESSION_TIMEOUT);
            zkClient.createEphemeral(path, "123");
            zkClient.subscribeDataChanges(path, new IZkDataListener() {
                @Override
                public void handleDataDeleted(String dataPath) {
                    log.info("Node:{} deleted", dataPath);
                }
                @Override
                public void handleDataChange(String dataPath, Object data) {
                    log.info("Node:{} changed, new data: {}", dataPath, data);
                }
            });
            log.info("{} value:{}",path , zkClient.readData(path));
            zkClient.writeData(path,"456");
            Thread.sleep(1000);
            zkClient.delete(path);
            Thread.sleep( Integer.MAX_VALUE );
        } catch (RuntimeException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }
}

上述示例,先创建一个临时路径,并添加目录监听器,然后读取数据。

我们先来看一下,注册节点监听器和反注册节点监听器

public void subscribeDataChanges(String path, IZkDataListener listener) {
     Set<IZkDataListener> listeners;
     synchronized (_dataListener) {
         listeners = _dataListener.get(path);
         if (listeners == null) {
             listeners = new CopyOnWriteArraySet<IZkDataListener>();
             _dataListener.put(path, listeners);
         }
         listeners.add(listener);
     }
     //关键点
     watchForData(path);
     LOG.debug("Subscribed data changes for " + path);
}

public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
     synchronized (_dataListener) {
         final Set<IZkDataListener> listeners = _dataListener.get(path);
         if (listeners != null) {
             listeners.remove(dataListener);
         }
         if (listeners == null || listeners.isEmpty()) {
             _dataListener.remove(path);
         }
     }
}

我们来看关键点

//关键点
watchForData(path);
public void watchForData(final String path) {
    retryUntilConnected(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            _connection.exists(path, true);
            return null;
        }
    });
}

再来看注册子目录监听器:

public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
     synchronized (_childListener) {
         Set<IZkChildListener> listeners = _childListener.get(path);
         if (listeners == null) {
             listeners = new CopyOnWriteArraySet<IZkChildListener>();
             _childListener.put(path, listeners);
         }
         listeners.add(listener);
     }
     return watchForChilds(path);
}
/**
  * Installs a child watch for the given path.
  * 监听给定路径的子路径
  * @param path
  * @return the current children of the path or null if the zk node with the given path doesn't exist.
  */
public List<String> watchForChilds(final String path) {
     if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
         throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
     }
     return retryUntilConnected(new Callable<List<String>>() {
         @Override
         public List<String> call() throws Exception {
             exists(path, true);
             try {
                 return getChildren(path, true);
             } catch (ZkNoNodeException e) {
                 // ignore, the "exists" watch will listen for the parent node to appear
             }
             return null;
         }
     });
}
public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
     synchronized (_childListener) {
         final Set<IZkChildListener> listeners = _childListener.get(path);
         if (listeners != null) {
             listeners.remove(childListener);
         }
     }
}

我们回到读操作:

public <T extends Object> T readData(String path) {
    return (T) readData(path, false);
}

public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
    T data = null;
    try {
        data = (T) readData(path, null);
    } catch (ZkNoNodeException e) {
        if (!returnNullIfPathNotExists) {
            throw e;
        }
    }
    return data;
}

@SuppressWarnings("unchecked")
public <T extends Object> T readData(String path, Stat stat) {
    return (T) readData(path, stat, hasListeners(path));
}

@SuppressWarnings("unchecked")
protected <T extends Object> T readData(final String path, final Stat stat, final boolean watch) {
    byte[] data = retryUntilConnected(new Callable<byte[]>() {

        @Override
        public byte[] call() throws Exception {
            return _connection.readData(path, stat, watch);
        }
    });
    return (T) derializable(data);
}
private <T extends Object> T derializable(byte[] data) {
 if (data == null) {
     return null;
 }
 return (T) _zkSerializer.deserialize(data);
 }

从上面可以,读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。

再来看写操作:

public void writeData(String path, Object object) {
      writeData(path, object, -1);
}
public void writeData(final String path, Object datat, final int expectedVersion) {
     final byte[] data = serialize(datat);
     retryUntilConnected(new Callable<Object>() {

         @Override
         public Object call() throws Exception {
             _connection.writeData(path, data, expectedVersion);
             return null;
         }
     });
}

从上面可以看出,写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。

来小节一下CDRWA操作: 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。

再来看zk事件线程。

事件线程ZkEventThread

/**
 * All listeners registered at the {@link ZkClient} will be notified from this event thread. This is to prevent
 * dead-lock situations. The {@link ZkClient} pulls some information out of the {@link ZooKeeper} events to signal
 * {@link ZkLock} conditions. Re-using the {@link ZooKeeper} event thread to also notify {@link ZkClient} listeners,
 * would stop the ZkClient from receiving events from {@link ZooKeeper} as soon as one of the listeners blocks (because
 * it is waiting for something). {@link ZkClient} would then for instance not be able to maintain it's connection state
 * anymore.
 */
class ZkEventThread extends Thread {
    private static final Logger LOG = Logger.getLogger(ZkEventThread.class);
    private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>();
    private static AtomicInteger _eventId = new AtomicInteger(0);
    static abstract class ZkEvent {
        private String _description;
        public ZkEvent(String description) {
            _description = description;
        }
        public abstract void run() throws Exception;
        @Override
        public String toString() {
            return "ZkEvent[" + _description + "]";
        }
    }
    ZkEventThread(String name) {
        setDaemon(true);
        setName("ZkClient-EventThread-" + getId() + "-" + name);
    }
    @Override
    public void run() {
        LOG.info("Starting ZkClient event thread.");
        try {
            while (!isInterrupted()) {
                ZkEvent zkEvent = _events.take();
                int eventId = _eventId.incrementAndGet();
                LOG.debug("Delivering event #" + eventId + " " + zkEvent);
                try {
                    zkEvent.run();
                } catch (InterruptedException e) {
                    interrupt();
                } catch (ZkInterruptedException e) {
                    interrupt();
                } catch (Throwable e) {
                    LOG.error("Error handling event " + zkEvent, e);
                }
                LOG.debug("Delivering event #" + eventId + " done");
            }
        } catch (InterruptedException e) {
            LOG.info("Terminate ZkClient event thread.");
        }
    }
    public void send(ZkEvent event) {
        if (!isInterrupted()) {
            LOG.debug("New event: " + event);
            _events.add(event);
        }
    }
}

从上面可以看出,事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue,事件线程的主要任务是,消费zk事件ZkEvent队列中的 事件,并执行相应的事件。

在ZkClient的声明中,我们看到ZkClient实际上是实现了Watcher接口,下面我们来看Watcher接口的实现。

ZkClient Watcher接口

public void process(WatchedEvent event) {
    LOG.debug("Received event: " + event);
    _zookeeperEventThread = Thread.currentThread();

    boolean stateChanged = event.getPath() == null;
    boolean znodeChanged = event.getPath() != null;
    boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
            || event.getType() == EventType.NodeChildrenChanged;
    //给事件锁加锁
    getEventLock().lock();
    try {

        // We might have to install child change event listener if a new node was created
        //关闭时,触发,则直接返回
        if (getShutdownTrigger()) {
            LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
            return;
        }
        //处理连接状态变更
        if (stateChanged) {
            processStateChanged(event);
        }
        //处理目录更新事件
        if (dataChanged) {
            processDataOrChildChange(event);
        }
    } finally {
        if (stateChanged) {
           //如果会话状态变更,则唤醒所有等待连接条件的线程
            getEventLock().getStateChangedCondition().signalAll();

            // If the session expired we have to signal all conditions, because watches might have been removed and
            // there is no guarantee that those
            // conditions will be signaled at all after an Expired event
            // TODO PVo write a test for this
            if (event.getState() == KeeperState.Expired) {
              //唤醒所有等待节点事件和节点变更条件的线程
                getEventLock().getZNodeEventCondition().signalAll();
                getEventLock().getDataChangedCondition().signalAll();
                // We also have to notify all listeners that something might have changed
                //触发所有事件监听器
                fireAllEvents();
            }
        }
        if (znodeChanged) {
          //唤醒所有等待节点事件条件的线程
            getEventLock().getZNodeEventCondition().signalAll();
        }
        if (dataChanged) {
          //唤醒所有等待节点变更条件的线程
            getEventLock().getDataChangedCondition().signalAll();
        }
        //释放锁
        getEventLock().unlock();
        LOG.debug("Leaving process event");
    }
}
public boolean getShutdownTrigger() {
       return _shutdownTriggered;
}

从上面可以看出,ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。我们来看几个关键点:

  1. 处理连接状态变更
    //处理连接状态变更
    if (stateChanged) {
     processStateChanged(event);
    }
    private void processStateChanged(WatchedEvent event) {
      LOG.info("zookeeper state changed (" + event.getState() + ")");
      setCurrentState(event.getState());
      if (getShutdownTrigger()) {
          return;
      }
      try {
        //触发会话状态变更事件
          fireStateChangedEvent(event.getState());
          if (event.getState() == KeeperState.Expired) {
            //过期,则重新连接
              reconnect();
              //触发新的会话事件
              fireNewSessionEvents();
          }
      } catch (final Exception e) {
          throw new RuntimeException("Exception while restarting zk client", e);
      }
     }
     private void fireStateChangedEvent(final KeeperState state) {
        for (final IZkStateListener stateListener : _stateListener) {
            _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
    
                @Override
                public void run() throws Exception {
                    stateListener.handleStateChanged(state);
                }
            });
        }
    }
    private void reconnect() {
      getEventLock().lock();
      try {
          _connection.close();
          _connection.connect(this);
      } catch (InterruptedException e) {
          throw new ZkInterruptedException(e);
      } finally {
          getEventLock().unlock();
      }
    }
    private void fireNewSessionEvents() {
     for (final IZkStateListener stateListener : _stateListener) {
         _eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
    
             @Override
             public void run() throws Exception {
                 stateListener.handleNewSession();
             }
         });
     }
    }
    

    从上面可以看出,状态变更事件处理,主要是将触发状态监听任务包装成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。

再来看处理目录更新事件

  1. 处理目录更新事件 ```java //处理目录更新事件 if (dataChanged) { processDataOrChildChange(event); } private void processDataOrChildChange(WatchedEvent event) { final String path = event.getPath();

    if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) { Set childListeners = _childListener.get(path); if (childListeners != null && !childListeners.isEmpty()) { fireChildChangedEvents(path, childListeners); } } if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) { Set listeners = _dataListener.get(path); if (listeners != null && !listeners.isEmpty()) { fireDataChangedEvents(event.getPath(), listeners); } } }

private void fireDataChangedEvents(final String path, Set listeners) { for (final IZkDataListener listener : listeners) { _eventThread.send(new ZkEvent("Data of " + path + " changed sent to " + listener) {

         @Override
         public void run() throws Exception {
             // reinstall watch,重新注册监听器,避免反复注册
             exists(path, true);
             try {
                 Object data = readData(path, null, true);
                 listener.handleDataChange(path, data);
             } catch (ZkNoNodeException e) {
                 listener.handleDataDeleted(path);
             }
         }
     });
 } }

private void fireChildChangedEvents(final String path, Set childListeners) { try { // reinstall the watch for (final IZkChildListener listener : childListeners) { _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {

             @Override
             public void run() throws Exception {
                 try {
                     // if the node doesn't exist we should listen for the root node to reappear
                     exists(path);
                     List<String> children = getChildren(path);
                     listener.handleChildChange(path, children);
                 } catch (ZkNoNodeException e) {
                     listener.handleChildChange(path, null);
                 }
             }
         });
     }
 } catch (Exception e) {
     LOG.error("Failed to fire child changed event. Unable to getChildren.  ", e);
 } } ```

从上面可以,触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。

  1. 触发所有事件监听器
    //触发所有事件监听器
    fireAllEvents();
    private void fireAllEvents() {
     for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
         fireChildChangedEvents(entry.getKey(), entry.getValue());
     }
     for (Entry<String, Set<IZkDataListener>> entry : _dataListener.entrySet()) {
         fireDataChangedEvents(entry.getKey(), entry.getValue());
     }
    }
    

    有了上面连个小节的介绍,上述代码很容易理解。

总结

Zk客户端ZkClient主要的成员变量为,客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。

节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。

子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。

客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。

事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。

序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。

Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。

会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与Zk原生API的客户端socket作用相同。

ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。 连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。 CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。

事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue,事件线程的主要任务是,消费zk事件ZkEvent队列中的 事件,并执行相应的事件。

ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。 状态变更事件处理,主要是将触发状态监听任务保证成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。

触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。