0xTTEPX

Just do it, deeply...

Follow me on GitHub

Curator目录监听

write by donaldhan, 2018-06-29 09:40

引言

上一篇文章,我们简单看一下Curator的CDRWA相关的构造器,及Curator框架实现,[Curator][]框架工厂CuratorFrameworkFactory内部,主要成员变量为默认的会话超时与连接超时时间,本地地址,字节压缩器GzipCompressionProvider, 默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。GzipCompressionProvider用于压缩字节流。 默认的Zookeeper工厂DefaultZookeeperFactory,用于创建原生Zookeeper客户端。DefaultACLProvider主要用户获取节点的ACL权限。

CuratorFrameworkFactory内部构建器Builder,除了会话超时与连接超时时间,字节压缩器,原生API客户端工厂, ACL提供器之外,还有线程工程ThreadFactory,验证方式,及验证值,及重试策略RetryPolicy。 ExponentialBackoffRetry主要用户控制会话超时重连的次数和下次尝试时间。 内部构建器Builder,创建的实际为CuratorFrameworkImpl。

CuratorFramework主要提供了启动关闭客户端操作,及CDRWA相关的构建器,如创建节点CreateBuilder,删除节点DeleteBuilder,获取节点数据GetDataBuilder,设置节点数据SetACLBuilder, ,检查节点ExistsBuilder,同步数据构建器SyncBuilder, 事物构建器CuratorTransaction,ACL构建器GetACLBuilder、SetACLBuilder,提供了客户端连接状态监听器Listenable, 客户端监听器Listenable ,无处理错误监听器Listenable操作,同时提供了获取zk客户端和CuratorZookeeperClient和确保路径的操作EnsurePath。

Curator zk客户端CuratorZookeeperClient主要用于获取原生API ZK客户端,以及用于重新创建失效会话,执行相应的CDRWA操作。 创建构建器CreateBuilder,主要提供了创建持久化和临时节点的操作。

Curator框架实现CuratorFrameworkImpl,创建目录实际上委托给Curator框架内部的原生API zk客户端,如果需要创建建父目录,并且父目录不存在,则创建父目录。 如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调Callable。

删除构建器DeleteBuilder,删除目录,实际操作在一个重试循环中,如果会话过期,则重新连接会话,并将实际删除操作委托给Curator框架内部的原生API zk客户端。

Curator框架实现CuratorFrameworkImpl的获取目录数据操作,检查目录和设置目录数据的原理与创建、删除操作基本相同实际操作委托给Curator框架内部的原生API zk客户端, 并保证会话有效。 本文章所有的示例,参见[zookeeper-demo][]。 [zookeeper-demo]:https://github.com/Donaldhan/zookeeper-demo “zookeeper-demo” [Curator]:https://donaldhan.github.io/zookeeper/2018/06/18/Curator.html “Curator”

目录

Curator节点监听

curator官方推荐的API是对zookeeper原生的JAVA API进行了封装,将重复注册,事件信息等很好的处理了。而且监听事件返回了详细的信息,包括变动的节点路径,节点值等等,这是原生API所没有的。这个对事件的监听类似于一个本地缓存视图和远程Zookeeper视图的对比过程。curator的方法调用采用的是流式API,此种风格的优点及使用注意事项可自行查阅资料了解。对于目录的监听,curator提供了三个接口,分别如下:

  1. NodeCache:对一个节点进行监听,监听事件包括指定路径的增删改操作;
  2. PathChildrenCache:对指定路径节点的一级子目录监听,不对该节点的操作监听,对其子目录的增删改操作监听
  3. TreeCache,综合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听,可以设置监听深度。

下面我们来看几个示例:

监听节点数据变化

package org.donald.curator.recipes.cache;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.donald.constant.ConfigConstant;

import java.io.UnsupportedEncodingException;

/**
 * @ClassName: NodeCacheSample
 * @Description: 节点监听器,类似于原生Watcher
 * @Author: Donaldhan
 * @Date: 2018-05-16 8:24
 */
@Slf4j
public class NodeCacheSample {
    private static CuratorFramework client;
    public static void main(String[] args) {
        String path = "/zk-book/nodecache";
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ConfigConstant.IP)
                            .sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
                            .connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
                            .retryPolicy(retryPolicy)
                            .build();
            log.info("success connected...");
            client.start();
            final NodeCache cache = new NodeCache(client,path,false);
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws UnsupportedEncodingException {
                    log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
                }
            });
            //不要忘记启动cache
            cache.start(true);
            //如果需要创建父节点,需要注意一个问题,创建的父节点是持久化的
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(path, "init".getBytes());
            log.info("success create:{}...", path);
            client.setData().forPath( path, "update".getBytes() );
            log.info("success update:{}...", path);
            Thread.sleep( 3000 );
            client.delete().deletingChildrenIfNeeded().forPath( path );
            log.info("success delete:{}...", path);
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }
}

监听节点不存在

package org.donald.curator.recipes.cache;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.donald.constant.ConfigConstant;

/**
 * @ClassName: NodeCacheNotExistsSample
 * @Description: 测试监听节点不存在的情况
 * @Author: Donaldhan
 * @Date: 2018-05-16 14:42
 */
@Slf4j
public class NodeCacheNotExistsSample {
    private static CuratorFramework client;
    public static void main(String[] args) {
        String path = "/curator_nodecache_sample";
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ConfigConstant.IP)
                            .sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
                            .connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
                            .retryPolicy(retryPolicy)
                            .build();
            log.info("success connected...");
            client.start();
            final NodeCache cache = new NodeCache(client, path, false);
            cache.start(true);
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() {
                    log.info("Node data update, new data: {}", new String(cache.getCurrentData().getData()));
                }
            });
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(path, "init".getBytes());
            Thread.sleep(6000);
            //使用过后,不要忘了关闭节点缓存
            cache.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }
}

监听一级目录

package org.donald.curator.recipes.cache;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.donald.constant.ConfigConstant;

/**
 * @ClassName: PathChildrenCacheSample
 * @Description: 监控路径子节点变化
 * @Author: Donaldhan
 * @Date: 2018-05-16 9:37
 */
@Slf4j
public class PathChildrenCacheSample {
    private static CuratorFramework client;
    public static void main(String[] args) {
        String path = "/zk-book";
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ConfigConstant.IP)
                            .sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
                            .connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
                            .retryPolicy(retryPolicy)
                            .build();
            log.info("success connected...");
            client.start();
            PathChildrenCache cache = new PathChildrenCache(client, path, true);
            /**
             * After cache is primed with initial values (in the background) a
             * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
             */
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client,
                                       PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                            log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                            break;
                        case CHILD_UPDATED:
                            log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                            break;
                        case CHILD_REMOVED:
                            log.info("CHILD_REMOVED,{}",event.getData().getPath());
                            break;
                        default:
                            break;
                    }
                }
            });
            client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
            log.info("success create /zk-book...");
            Thread.sleep( 1000 );
            client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
            log.info("success create child c1...");
            Thread.sleep( 1000 );
            client.setData().forPath(path+"/c1","update".getBytes());
            log.info("success update child c1...");
            Thread.sleep( 1000 );
            client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1/d1");
            log.info("success create child c1/d1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path+"/c1/d1");
            log.info("success delete child c1/d1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path+"/c1");
            log.info("success delete child c1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path);
            log.info("success delete child /zk-book...");
            Thread.sleep( 3000 );
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }
}

监听一级目录,并使用执行器,执行相应的事件

package org.donald.curator.recipes.cache;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.donald.common.threadpool.TaskExecutors;
import org.donald.constant.ConfigConstant;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName: PathChildrenCacheSample
 * @Description: 监控路径子节点变化,使用自定义线程池,执行监听事件
 * @Author: Donaldhan
 * @Date: 2018-05-16 9:37
 */
@Slf4j
public class PathChildrenCacheWithExecutorServiceSample {
    private static CuratorFramework client;
    private static ExecutorService exec = TaskExecutors.newFixedThreadPool(2);
    public static void main(String[] args) {
        String path = "/zk-book";
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ConfigConstant.IP)
                            .sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
                            .connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
                            .retryPolicy(retryPolicy)
                            .build();
            log.info("success connected...");
            client.start();
            log.info( "current thread name:{} ", Thread.currentThread().getName() );
            PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
            /**
             * NORMAL,
             * cache will _not_ be primed. i.e. it will start empty and you will receive
             * events for all nodes added, etc.
             */
            cache.start(PathChildrenCache.StartMode.NORMAL);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client,
                                       PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                            log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                            log.info( "current thread name:{} ", Thread.currentThread().getName() );
                            break;
                        case CHILD_UPDATED:
                            log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                            log.info( "current thread name:{} ", Thread.currentThread().getName() );
                            break;
                        case CHILD_REMOVED:
                            log.info("CHILD_REMOVED,{}",event.getData().getPath());
                            log.info( "current thread name:{} ", Thread.currentThread().getName() );
                            break;
                        default:
                            break;
                    }
                }
            });
            client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
            log.info("success create /zk-book...");
            Thread.sleep( 1000 );
            client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
            log.info("success create child c1...");
            Thread.sleep( 1000 );
            client.setData().forPath(path+"/c1","update".getBytes());
            log.info("success update child c1...");
            Thread.sleep( 1000 );
            client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1/d1");
            log.info("success create child c1/d1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path+"/c1/d1");
            log.info("success delete child c1/d1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path+"/c1");
            log.info("success delete child c1 ...");
            Thread.sleep( 1000 );
            client.delete().forPath(path);
            log.info("success delete child /zk-book...");
            Thread.sleep( 3000 );
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }
}

上面几个示例,我们主要使用的是节点监听器NodeCache,和一级目录监听器PathChildrenCache。关键sql如下:

  1. 节点监听器NodeCache
    final NodeCache cache = new NodeCache(client,path,false);
    cache.getListenable().addListener(new NodeCacheListener() {
     @Override
     public void nodeChanged() throws UnsupportedEncodingException {
         log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
     }
    });
    
  2. 一级目录监听器PathChildrenCache
    PathChildrenCache cache = new PathChildrenCache(client, path, true);
    /**
      * After cache is primed with initial values (in the background) a
      * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
      */
    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    cache.getListenable().addListener(new PathChildrenCacheListener() {
      @Override
      public void childEvent(CuratorFramework client,
                             PathChildrenCacheEvent event) throws Exception {
          switch (event.getType()) {
              case CHILD_ADDED:
                  String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                  log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                  break;
              case CHILD_UPDATED:
                  log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                  break;
              case CHILD_REMOVED:
                  log.info("CHILD_REMOVED,{}",event.getData().getPath());
                  break;
              default:
                  break;
          }
      }
    });
    
PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
/**
  * NORMAL,
  * cache will _not_ be primed. i.e. it will start empty and you will receive
  * events for all nodes added, etc.
  */
cache.start(PathChildrenCache.StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
     @Override
     public void childEvent(CuratorFramework client,
                            PathChildrenCacheEvent event) throws Exception {
         switch (event.getType()) {
             case CHILD_ADDED:
                 String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                 log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             case CHILD_UPDATED:
                 log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             case CHILD_REMOVED:
                 log.info("CHILD_REMOVED,{}",event.getData().getPath());
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             default:
                 break;
         }
     }
});

下面我们分别来看节点监听器NodeCache

节点监听器NodeCache

  1. 节点监听器NodeCache
    final NodeCache cache = new NodeCache(client,path,false);
    cache.getListenable().addListener(new NodeCacheListener() {
     @Override
     public void nodeChanged() throws UnsupportedEncodingException {
         log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
     }
    });
    

先来看一下这个节点监听缓存的定义:

public class NodeCache implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client; //Curator框架客户端
    private final String path;
    private final boolean dataIsCompressed;
    private final EnsurePath ensurePath;
    //子节点数据
    private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);//监听状态
    //节点监听器容器
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    private final AtomicBoolean isConnected = new AtomicBoolean(true);
    //连接状态监听器
    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
            {
                if ( isConnected.compareAndSet(false, true) )
                {
                    try
                    {
                        //重新处理节点监听器
                        reset();
                    }
                    catch ( Exception e )
                    {
                        log.error("Trying to reset after reconnection", e);
                    }
                }
            }
            else
            {
                isConnected.set(false);
            }
        }
    };
    //Curator监听器
    private final CuratorWatcher watcher = new CuratorWatcher()
    {
        @Override
        public void process(WatchedEvent event) throws Exception
        {
            reset();
        }
    };

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            processBackgroundResult(event);
        }
    };

    /**
     * @param client curztor client
     * @param path the full path to the node to cache
     */
    public NodeCache(CuratorFramework client, String path)
    {
        this(client, path, false);
    }

    /**
     * @param client curztor client
     * @param path the full path to the node to cache
     * @param dataIsCompressed if true, data in the path is compressed
     */
    public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
    {
        this.client = client;
        this.path = path;
        this.dataIsCompressed = dataIsCompressed;
        ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
    }
}
//节点缓存监听器
public interface NodeCacheListener
{
    /**
     * Called when a change has occurred
     */
    public void     nodeChanged() throws Exception;
}

从上面可以看出节点监听缓存NodeCache,内部关联一下Curator框架客户端CuratorFramework,节点监听器容器 listeners(ListenerContainer),用于 存放节点监听器。

再来看一下节点监听缓存NodeCache添加监听器

cache.getListenable().addListener(new NodeCacheListener() {
     @Override
     public void nodeChanged() throws UnsupportedEncodingException {
         log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
     }
});

//NodeCache
/**
  * Return the cache listenable
  *
  * @return listenable
  */
public ListenerContainer<NodeCacheListener> getListenable()
{
     Preconditions.checkState(state.get() != State.CLOSED, "Closed");

     return listeners;
}
//ListenerContainer
public class ListenerContainer<T> implements Listenable<T>
{
    private final Logger                        log = LoggerFactory.getLogger(getClass());
    private final Map<T, ListenerEntry<T>>      listeners = Maps.newConcurrentMap();

    @Override
    public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void addListener(T listener, Executor executor)
    {
        listeners.put(listener, new ListenerEntry<T>(listener, executor));
    }
    ...
}

从上面可以看出,添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer中。

再来看一下启动节点缓存

/**
    * Start the cache. The cache is not started automatically. You must call this method.
    *
    * @throws Exception errors
    */
   public void     start() throws Exception
   {
       start(false);
   }
/**
     * Same as {@link #start()} but gives the option of doing an initial build
     *
     * @param buildInitial if true, {@link #rebuild()} will be called before this method
     *                     returns in order to get an initial view of the node
     * @throws Exception errors
     */
    public void     start(boolean buildInitial) throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        ensurePath.ensure(client.getZookeeperClient());
       //添加连接监听器到Curator框架内部的连接状态监听器
        client.getConnectionStateListenable().addListener(connectionStateListener);

        if ( buildInitial )
        {   
           //如果需要者重新构建节点数据
            internalRebuild();
        }
        //重置
        reset();
    }

// CuratorFrameworkImpl
public class CuratorFrameworkImpl implements CuratorFramework
{
    private final Logger                                                log = LoggerFactory.getLogger(getClass());
    private final CuratorZookeeperClient                                client;
    private final ListenerContainer<CuratorListener>                    listeners;
    private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
    private final ThreadFactory                                         threadFactory;
    private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
    private final NamespaceImpl                                         namespace;
    private final ConnectionStateManager                                connectionStateManager;
@Override
   public Listenable<ConnectionStateListener> getConnectionStateListenable()
   {
       return connectionStateManager.getListenable();
   }
   ...
}
//ChildData
public class ChildData implements Comparable<ChildData>
{
    private final String    path;
    private final Stat      stat;
    private final AtomicReference<byte[]>    data;

    public ChildData(String path, Stat stat, byte[] data)
    {
        this.path = path;
        this.stat = stat;
        this.data = new AtomicReference<byte[]>(data);
    }
    ...
}

我们上面的连个关键点:

  1. 如果需要者重新构建节点数据
    if ( buildInitial )
    {   
    //如果需要者重新构建节点数据
     internalRebuild();
    }
    private void     internalRebuild() throws Exception
    {
        try
        {
            Stat    stat = new Stat();
            byte[]  bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);
            //重置节点数据
            data.set(new ChildData(path, stat, bytes));
        }
        catch ( KeeperException.NoNodeException e )
        {
            data.set(null);
        }
    }
    

2.

//重置,重新注册监听器
reset();
private void     reset() throws Exception
  {
      if ( (state.get() == State.STARTED) && isConnected.get() )
      {
          client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
      }
  }
//CuratorFrameworkImpl
@Override
public ExistsBuilder checkExists()
{
     Preconditions.checkState(isStarted(), "instance must be started before calling this method");

     return new ExistsBuilderImpl(this);
}
class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
{
    private final CuratorFrameworkImpl client;//Curator框架实现
    private Backgrounding backgrounding;
    private Watching watching;

    ExistsBuilderImpl(CuratorFrameworkImpl client)
    {
        this.client = client;
        backgrounding = new Backgrounding();
        watching = new Watching();
    }
    //添加观察者
    @Override
   public BackgroundPathable<Stat> usingWatcher(Watcher watcher)
   {
       watching = new Watching(client, watcher);
       return this;
   }
    //添加观察者CuratorWatcher
   @Override
   public BackgroundPathable<Stat> usingWatcher(CuratorWatcher watcher)
   {
       watching = new Watching(client, watcher);
       return this;
   }
   //添加后台回调
   @Override
  public Pathable<Stat> inBackground(BackgroundCallback callback, Object context)
  {
      backgrounding = new Backgrounding(callback, context);
      return this;
  }

  @Override
  public Pathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor)
  {
      backgrounding = new Backgrounding(client, callback, context, executor);
      return this;
  }

  @Override
  public Pathable<Stat> inBackground(BackgroundCallback callback)
  {
      backgrounding = new Backgrounding(callback);
      return this;
  }

  @Override
  public Pathable<Stat> inBackground(BackgroundCallback callback, Executor executor)
  {
      backgrounding = new Backgrounding(client, callback, executor);
      return this;
  }

}
//Watching
class Watching
{
        private final Watcher       watcher;
        private final boolean       watched;
        ...
}
class Backgrounding
{
    private final boolean               inBackground;//是否为后台任务
    private final Object                context;//上下文
    private final BackgroundCallback    callback;//回调
}
/**
 * Functor for an async background operation
 */
public interface BackgroundCallback
{
    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

我们回到NodeCache内部成员变量

//连接状态监听器
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
        {
            if ( isConnected.compareAndSet(false, true) )
            {
                try
                {
                    //重新注册节点监听器
                    reset();
                }
                catch ( Exception e )
                {
                    log.error("Trying to reset after reconnection", e);
                }
            }
        }
        else
        {
            isConnected.set(false);
        }
    }
};
//Curator监听器
private final CuratorWatcher watcher = new CuratorWatcher()
{
    @Override
    public void process(WatchedEvent event) throws Exception
    {
          //重新注册节点监听器
        reset();
    }
};

private enum State
{
    LATENT,
    STARTED,
    CLOSED
}

private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
    {
        processBackgroundResult(event);
    }
};
//后台处理
private void processBackgroundResult(CuratorEvent event) throws Exception
    {
        switch ( event.getType() )
        {
            //处理获取数据事件
            case GET_DATA:
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                    setNewData(childData);
                }
                break;
            }
           //处理检查目录事件
            case EXISTS:
            {
                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                {
                    setNewData(null);
                }
                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    //重新注册监听器
                    if ( dataIsCompressed )
                    {
                        client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                    else
                    {
                        client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                }
                break;
            }
        }
    }
    private void setNewData(ChildData newData) throws InterruptedException
        {
            //比较最新数据和变更前的数据,查看是否有变更
            ChildData   previousData = data.getAndSet(newData);
            if ( !Objects.equal(previousData, newData) )
            {
                //调用节点监听容器内部的监听器处理目录变更事件
                listeners.forEach
                (
                    new Function<NodeCacheListener, Void>()
                    {
                        @Override
                        public Void apply(NodeCacheListener listener)
                        {
                            try
                            {
                                listener.nodeChanged();
                            }
                            catch ( Exception e )
                            {
                                log.error("Calling listener", e);
                            }
                            return null;
                        }
                    }
                );

                if ( rebuildTestExchanger != null )
                {
                    try
                    {
                        rebuildTestExchanger.exchange(new Object());
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

从上面可以看出,启动节点监听器,实际上是注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要,则重新构建节点数据,同时重新注册节点监听器CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。

我们再来看,注册的监听器,是如何触发的,这个我们要回到CuratorFramework实现启动

//CuratorFrameworkImpl
public class CuratorFrameworkImpl implements CuratorFramework
{
    private final Logger                                                log = LoggerFactory.getLogger(getClass());
    private final CuratorZookeeperClient                                client;//zk客户端
    private final ListenerContainer<CuratorListener>                    listeners;//Curator监听器
    private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
    private final ThreadFactory                                         threadFactory;
    private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
    private final NamespaceImpl                                         namespace;
    private final ConnectionStateManager                                connectionStateManager;//连接状态管理器
    private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
    private final byte[]                                                defaultData;
    private final FailedDeleteManager                                   failedDeleteManager;
    private final CompressionProvider                                   compressionProvider;
    private final ACLProvider                                           aclProvider;
    private final NamespaceFacadeCache                                  namespaceFacadeCache;
    private final NamespaceWatcherMap                                   namespaceWatcherMap = new NamespaceWatcherMap(this);
    private volatile ExecutorService                                    executorService;

    @Override
    public void     start()
    {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            IllegalStateException error = new IllegalStateException();
            log.error("Cannot be started more than once", error);
            throw error;
        }

        try
        {
            //启动连接状态管理器
            connectionStateManager.start(); // ordering dependency - must be called before client.start()
            //启动客户端
            client.start();
            executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops
            executorService.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        backgroundOperationsLoop();
                        return null;
                    }
                }
            );
        }
        catch ( Exception e )
        {
            handleBackgroundOperationException(null, e);
        }
    }
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
   {
       ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
       //初始化客户端
       this.client = new CuratorZookeeperClient
       (
           localZookeeperFactory,
           builder.getEnsembleProvider(),
           builder.getSessionTimeoutMs(),
           builder.getConnectionTimeoutMs(),
           //关键点
           new Watcher()
           {
               @Override
               public void process(WatchedEvent watchedEvent)
               {
                   //包装原生WatchedEvent事件为CuratorEvent
                   CuratorEvent event = new CuratorEventImpl
                   (
                       CuratorFrameworkImpl.this,
                       CuratorEventType.WATCHED,
                       watchedEvent.getState().getIntValue(),
                       unfixForNamespace(watchedEvent.getPath()),
                       null,
                       null,
                       null,
                       null,
                       null,
                       watchedEvent,
                       null
                   );
                   //处理事件
                   processEvent(event);
               }
           },
           builder.getRetryPolicy(),
           builder.canBeReadOnly()
       );

       listeners = new ListenerContainer<CuratorListener>();
       unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
       backgroundOperations = new DelayQueue<OperationAndData<?>>();
       namespace = new NamespaceImpl(this, builder.getNamespace());
       threadFactory = getThreadFactory(builder);
       //初始化连接状态管理器,关键点
       connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
       compressionProvider = builder.getCompressionProvider();
       aclProvider = builder.getAclProvider();
       state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);

       byte[]      builderDefaultData = builder.getDefaultData();
       defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];

       if ( builder.getAuthScheme() != null )
       {
           authInfo.set(new AuthInfo(builder.getAuthScheme(), builder.getAuthValue()));
       }

       failedDeleteManager = new FailedDeleteManager(this);
       namespaceFacadeCache = new NamespaceFacadeCache(this);
   }
   //处理CuratorEvent事件
   private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach
        (
            new Function<CuratorListener, Void>()
            {
                @Override
                public Void apply(CuratorListener listener)
                {
                    try
                    {
                        TimeTrace trace = client.startTracer("EventListener");
                        listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                        trace.commit();
                    }
                    catch ( Exception e )
                    {
                        logError("Event listener threw exception", e);
                    }
                    return null;
                }
            }
        );
    }
}

从上面可以看出,Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager,然后再启动客户端CuratorZookeeperClient。

再来看CuratorZookeeperClient的启动,

public class CuratorZookeeperClient implements Closeable
{
    private final Logger                            log = LoggerFactory.getLogger(getClass());
    private final ConnectionState                   state;//连接状态
    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();
    private final int                               connectionTimeoutMs;
    private final AtomicBoolean                     started = new AtomicBoolean(false);
    private final AtomicReference<TracerDriver>     tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
    /**
    *
    * @param connectString list of servers to connect to
    * @param sessionTimeoutMs session timeout
    * @param connectionTimeoutMs connection timeout
    * @param watcher default watcher or null
    * @param retryPolicy the retry policy to use
    */
   public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
   {
       this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
   }

   /**
    * @param ensembleProvider the ensemble provider
    * @param sessionTimeoutMs session timeout
    * @param connectionTimeoutMs connection timeout
    * @param watcher default watcher or null
    * @param retryPolicy the retry policy to use
    */
   public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
   {
       this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
   }

   /**
    * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
    * @param ensembleProvider the ensemble provider
    * @param sessionTimeoutMs session timeout
    * @param connectionTimeoutMs connection timeout
    * @param watcher default watcher or null
    * @param retryPolicy the retry policy to use
    * @param canBeReadOnly if true, allow ZooKeeper client to enter
    *                      read only mode in case of a network partition. See
    *                      {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
    *                      for details
    */
   public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
   {
       if ( sessionTimeoutMs < connectionTimeoutMs )
       {
           log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
       }

       retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
       ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");

       this.connectionTimeoutMs = connectionTimeoutMs;
       //初始化连接连接状态ConnectionState,关键点
       state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
       setRetryPolicy(retryPolicy);
   }
   /**
     * Must be called after construction
     *
     * @throws IOException errors
     */
    public void     start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            IllegalStateException error = new IllegalStateException();
            log.error("Already started", error);
            throw error;
        }
        //启动连接状态ConnectionState
        state.start();
    }

再来看启动连接状态ConnectionState:

class ConnectionState implements Watcher, Closeable
{
    private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
    private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final HandleHolder zooKeeper;
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final EnsembleProvider ensembleProvider;
    private final int sessionTimeoutMs;
    private final int connectionTimeoutMs;
    private final AtomicReference<TracerDriver> tracer;
    private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
    //监听器队列
    private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
    private final AtomicLong instanceIndex = new AtomicLong();
    private volatile long connectionStartMs = 0;

    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
    {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        //添加WACher监听器,关键点
        if ( parentWatcher != null )
        {
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }
    //处理监听事件
    @Override
  public void process(WatchedEvent event)
  {
      if ( LOG_EVENTS )
      {
          log.debug("ConnectState watcher: " + event);
      }

      for ( Watcher parentWatcher : parentWatchers )
      {
          TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
          parentWatcher.process(event);
          timeTrace.commit();
      }

      boolean wasConnected = isConnected.get();
      boolean newIsConnected = wasConnected;
      if ( event.getType() == Watcher.Event.EventType.None )
      {
          newIsConnected = checkState(event.getState(), wasConnected);
      }

      if ( newIsConnected != wasConnected )
      {
          isConnected.set(newIsConnected);
          connectionStartMs = System.currentTimeMillis();
      }
    }
  }
   void start() throws Exception
  {
      log.debug("Starting");
      ensembleProvider.start();
      reset();
  }
}

从上面可以看出,。Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。

回到连接状态管理器的启动:

/**
 * Used internally to manage connection state
 */
public class ConnectionStateManager implements Closeable
{
    private static final int QUEUE_SIZE;

    static
    {
        int size = 25;
        String property = System.getProperty("ConnectionStateManagerSize", null);
        if ( property != null )
        {
            try
            {
                size = Integer.parseInt(property);
            }
            catch ( NumberFormatException ignore )
            {
                // ignore
            }
        }
        QUEUE_SIZE = size;
    }

    private final Logger log = LoggerFactory.getLogger(getClass());
    //连接状态变更事件
    private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    private final CuratorFramework client;
    //连接状态监听器容器
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
    private final ExecutorService service;
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

    // guarded by sync
    private ConnectionState currentConnectionState;

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    /**
     * @param client        the client
     * @param threadFactory thread factory to use or null for a default
     */
    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
    {
        this.client = client;
        if ( threadFactory == null )
        {
            threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
        }
        service = Executors.newSingleThreadExecutor(threadFactory);
    }

    /**
     * Start the manager
     */
    public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //执行连接状态事件处理
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }

    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }

    /**
     * Return the listenable
     *
     * @return listenable
     */
    public ListenerContainer<ConnectionStateListener> getListenable()
    {
        return listeners;
    }
    //处理连接状态事件
    private void processEvents()
 {
     try
     {
         while ( !Thread.currentThread().isInterrupted() )
         {
             //消费连队列接状态事件
             final ConnectionState newState = eventQueue.take();

             if ( listeners.size() == 0 )
             {
                 log.warn("There are no ConnectionStateListeners registered.");
             }

             listeners.forEach
                 (
                     new Function<ConnectionStateListener, Void>()
                     {
                         @Override
                         public Void apply(ConnectionStateListener listener)
                         {
                             //调用连接状态监听器,处理连接状态事件
                             listener.stateChanged(client, newState);
                             return null;
                         }
                     }
                 );
         }
     }
     catch ( InterruptedException e )
     {
         Thread.currentThread().interrupt();
     }
 }
}

从上面可以看出,启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer中的监听器,消费连接状态事件队列BlockingQueue中事件。

结合注册节点监听器,我们来看一下小节一下:

添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer(CuratorFrameworkImpl内部的成员添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer)中。 启动节点监听器,实际上是注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要,则重新构建节点数据,同时重新注册节点监听器CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。

Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。

启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer中的监听器,消费连接状态事件队列BlockingQueue中事件。

再来看子目录监听PathChildrenCache

子一级目录监听器PathChildrenCache

  1. 子一级目录监听器PathChildrenCache
    PathChildrenCache cache = new PathChildrenCache(client, path, true);
    /**
      * After cache is primed with initial values (in the background) a
      * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
      */
    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    cache.getListenable().addListener(new PathChildrenCacheListener() {
      @Override
      public void childEvent(CuratorFramework client,
                             PathChildrenCacheEvent event) throws Exception {
          switch (event.getType()) {
              case CHILD_ADDED:
                  String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                  log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                  break;
              case CHILD_UPDATED:
                  log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                  break;
              case CHILD_REMOVED:
                  log.info("CHILD_REMOVED,{}",event.getData().getPath());
                  break;
              default:
                  break;
          }
      }
    });
    
PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
/**
  * NORMAL,
  * cache will _not_ be primed. i.e. it will start empty and you will receive
  * events for all nodes added, etc.
  */
cache.start(PathChildrenCache.StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
     @Override
     public void childEvent(CuratorFramework client,
                            PathChildrenCacheEvent event) throws Exception {
         switch (event.getType()) {
             case CHILD_ADDED:
                 String currentPathValue =  new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
                 log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             case CHILD_UPDATED:
                 log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             case CHILD_REMOVED:
                 log.info("CHILD_REMOVED,{}",event.getData().getPath());
                 log.info( "current thread name:{} ", Thread.currentThread().getName() );
                 break;
             default:
                 break;
         }
     }
});

我们来看PathChildrenCache的定义

public class PathChildrenCache implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client;//框架客户端实现
    private final String path;
    private final CloseableExecutorService executorService;//事件执行器
    private final boolean cacheData;
    private final boolean dataIsCompressed;
    private final EnsurePath ensurePath;
    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();//子目录监听容器
    private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();//
    private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());//事件操作集
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");

   /**
    * @param client the client
    * @param path   path to watch
    * @param mode   caching mode
    * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
    */
   @SuppressWarnings("deprecation")
   public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
   {
       this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
   }

   /**
    * @param client        the client
    * @param path          path to watch
    * @param mode          caching mode
    * @param threadFactory factory to use when creating internal threads
    * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
    */
   @SuppressWarnings("deprecation")
   public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
   {
       this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
   }

   /**
    * @param client    the client
    * @param path      path to watch
    * @param cacheData if true, node contents are cached in addition to the stat
    */
   public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
   {
       this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
   }

   /**
    * @param client        the client
    * @param path          path to watch
    * @param cacheData     if true, node contents are cached in addition to the stat
    * @param threadFactory factory to use when creating internal threads
    */
   public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
   {
       this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
   }

   /**
    * @param client           the client
    * @param path             path to watch
    * @param cacheData        if true, node contents are cached in addition to the stat
    * @param dataIsCompressed if true, data in the path is compressed
    * @param threadFactory    factory to use when creating internal threads
    */
   public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
   {
       this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
   }

   /**
    * @param client           the client
    * @param path             path to watch
    * @param cacheData        if true, node contents are cached in addition to the stat
    * @param dataIsCompressed if true, data in the path is compressed
    * @param executorService  ExecutorService to use for the PathChildrenCache's background thread
    */
   public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
   {
       this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
   }

   /**
    * @param client           the client
    * @param path             path to watch
    * @param cacheData        if true, node contents are cached in addition to the stat
    * @param dataIsCompressed if true, data in the path is compressed
    * @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread
    */
   public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
   {
       this.client = client;
       this.path = path;
       this.cacheData = cacheData;
       this.dataIsCompressed = dataIsCompressed;
       this.executorService = executorService;
       ensurePath = client.newNamespaceAwareEnsurePath(path);
   }
}
/**
 * Listener for PathChildrenCache changes
 */
public interface PathChildrenCacheListener
{
    /**
     * Called when a change has occurred
     *
     * @param client the client
     * @param event describes the change
     * @throws Exception errors
     */
    public void     childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
public class PathChildrenCacheEvent
{
    private final Type type;
    private final ChildData data;

    /**
     * Type of change
     */
    public enum Type
    {
        /**
         * A child was added to the path
         */
        CHILD_ADDED,

        /**
         * A child's data was changed
         */
        CHILD_UPDATED,
        CHILD_REMOVED,
        CONNECTION_SUSPENDED,
        CONNECTION_RECONNECTED,
        CONNECTION_LOST,
        INITIALIZED
    }
}
/**
 * Decoration on an ExecutorService that tracks created futures and provides
 * a method to close futures created via this class
 */
public class CloseableExecutorService implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(CloseableExecutorService.class);
    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
    private final ExecutorService executorService;
    private final boolean shutdownOnClose;
    protected final AtomicBoolean isOpen = new AtomicBoolean(true);
    /**
    * @param executorService the service to decorate
    */
   public CloseableExecutorService(ExecutorService executorService)
   {
      this(executorService, false);
   }

   /**
    * @param executorService the service to decorate
    * @param shutdownOnClose if true, shutdown the executor service when this is closed
    */
   public CloseableExecutorService(ExecutorService executorService, boolean shutdownOnClose)
   {
       this.executorService = executorService;
       this.shutdownOnClose = shutdownOnClose;
   }
    ...
}

从上面可以看出,子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器ListenerContainer, 及事件执行器CloseableExecutorService,事件操作集Set

来看子目录监听器PathChildrenCache启动

/**
 * Start the cache. The cache is not started automatically. You must call this method.
 *
 * @throws Exception errors
 */
public void start() throws Exception
{
    start(StartMode.NORMAL);
}

/**
 * Same as {@link #start()} but gives the option of doing an initial build
 *
 * @param buildInitial if true, {@link #rebuild()} will be called before this method
 *                     returns in order to get an initial view of the node; otherwise,
 *                     the cache will be initialized asynchronously
 * @throws Exception errors
 * @deprecated use {@link #start(StartMode)}
 */
public void start(boolean buildInitial) throws Exception
{
    start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
}

/**
 * Method of priming cache on {@link PathChildrenCache#start(StartMode)}
 */
public enum StartMode
{
    /**
     * cache will _not_ be primed. i.e. it will start empty and you will receive
     * events for all nodes added, etc.
     缓存不会初始化,将会接受所有节点的添加时间
     */
    NORMAL,

    /**
     * {@link PathChildrenCache#rebuild()} will be called before this method returns in
     * order to get an initial view of the node.
     */
    BUILD_INITIAL_CACHE,

    /**
     * After cache is primed with initial values (in the background) a
     * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
     在缓存初始化完成后
     */
    POST_INITIALIZED_EVENT
}

/**
 * Start the cache. The cache is not started automatically. You must call this method.
 *
 * @param mode Method for priming the cache
 * @throws Exception errors
 */
public void start(StartMode mode) throws Exception
{
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
    mode = Preconditions.checkNotNull(mode, "mode cannot be null");
    //添加状态监听器
    client.getConnectionStateListenable().addListener(connectionStateListener);

    switch ( mode )
    {
        case NORMAL:
        {
            //添加刷新操作
            offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
            break;
        }

        case BUILD_INITIAL_CACHE:
        {
            //重新构建
            rebuild();
            break;
        }

        case POST_INITIALIZED_EVENT:
        {
            //重新添加刷新操作
            initialSet.set(Maps.<String, ChildData>newConcurrentMap());
            offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
            break;
        }
    }
}

启动方法有几下几点需要关注

  1. 添加状态监听器
    //添加状态监听器
    client.getConnectionStateListenable().addListener(connectionStateListener);
    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
      @Override
      public void stateChanged(CuratorFramework client, ConnectionState newState)
      {
          handleStateChange(newState);
      }
    };
    private void handleStateChange(ConnectionState newState)
    {
     switch ( newState )
     {
     case SUSPENDED:
     {
         offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
         break;
     }
    
     case LOST://失去连接
     {
         offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
         break;
     }
    
     case RECONNECTED://重新连接
     {
         try
         {
             offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
             offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
         }
         catch ( Exception e )
         {
             handleException(e);
         }
         break;
     }
     }
    }
    

    从上面可以看出连接状态监听器ConnectionStateListener,根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 先来看事件操作

class EventOperation implements Operation
{
    private final PathChildrenCache cache;
    private final PathChildrenCacheEvent event;

    EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event)
    {
        this.cache = cache;
        this.event = event;
    }

    @Override
    public void invoke()
    {
        cache.callListeners(event);
    }

    @Override
    public String toString()
    {
        return "EventOperation{" +
            "event=" + event +
            '}';
    }
}
interface Operation
{
    public void     invoke() throws Exception;
}
//PathChildrenCache
void callListeners(final PathChildrenCacheEvent event)
{
    listeners.forEach
        (
            new Function<PathChildrenCacheListener, Void>()
            {
                @Override
                public Void apply(PathChildrenCacheListener listener)
                {
                    try
                    {
                        //通知监听器,子目录节点变更事件
                        listener.childEvent(client, event);
                    }
                    catch ( Exception e )
                    {
                        handleException(e);
                    }
                    return null;
                }
            }
        );
}

从上面可以看出,事件操作,主要是触发监听器的子目录事件操作。 再来看刷新操作

class RefreshOperation implements Operation
{
    private final PathChildrenCache cache;
    private final PathChildrenCache.RefreshMode mode;

    RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
    {
        this.cache = cache;
        this.mode = mode;
    }

    @Override
    public void invoke() throws Exception
    {
        cache.refresh(mode);
    }
    ...
}
//RefreshOperation
void refresh(final RefreshMode mode) throws Exception
{
    ensurePath.ensure(client.getZookeeperClient());
    //后台回调
    final BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                //处理子目录事件
                processChildren(event.getChildren(), mode);
            }
        }
    };
    //这个是关键
    client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
private void processChildren(List<String> children, RefreshMode mode) throws Exception
  {
      List<String> fullPaths = Lists.newArrayList(Lists.transform
          (
              children,
              new Function<String, String>()
              {
                  @Override
                  public String apply(String child)
                  {
                      return ZKPaths.makePath(path, child);
                  }
              }
          ));
      Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
      removedNodes.removeAll(fullPaths);

      for ( String fullPath : removedNodes )
      {
          remove(fullPath);
      }

      for ( String name : children )
      {
          String fullPath = ZKPaths.makePath(path, name);

          if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
          {
              //监听子一级目录
              getDataAndStat(fullPath);
          }

          updateInitialSet(name, NULL_CHILD_DATA);
      }
      maybeOfferInitializedEvent(initialSet.get());
}
//监听子一级目录
void getDataAndStat(final String fullPath) throws Exception
{
     BackgroundCallback existsCallback = new BackgroundCallback()
     {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
         {
             applyNewData(fullPath, event.getResultCode(), event.getStat(), null);
         }
     };

     BackgroundCallback getDataCallback = new BackgroundCallback()
     {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
         {
             applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData());
         }
     };

     if ( cacheData )
     {
         if ( dataIsCompressed )
         {
             //这个我们应该很熟悉,注册节点观察器Watcher
             client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
         }
         else
         {
             client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
         }
     }
     else
     {
         client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath);
     }
}
//监测子路径的添加与更新事件操作
private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
{
    if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
    {
        ChildData data = new ChildData(fullPath, stat, bytes);
        ChildData previousData = currentData.put(fullPath, data);
        if ( previousData == null ) // i.e. new
        {
            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
        }
        else if ( previousData.getStat().getVersion() != stat.getVersion() )
        {
            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
        }
        updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data);
    }
}
//观察器
private final Watcher dataWatcher = new Watcher()
   {
       @Override
       public void process(WatchedEvent event)
       {
           try
           {
               if ( event.getType() == Event.EventType.NodeDeleted )
               {
                   remove(event.getPath());
               }
               else if ( event.getType() == Event.EventType.NodeDataChanged )
               {
                   offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
               }
           }
           catch ( Exception e )
           {
               handleException(e);
           }
       }
};
class GetDataOperation implements Operation
{
    private final PathChildrenCache cache;
    private final String fullPath;

    GetDataOperation(PathChildrenCache cache, String fullPath)
    {
        this.cache = cache;
        this.fullPath = fullPath;
    }

    @Override
    public void invoke() throws Exception
    {
       //重新注册监听
        cache.getDataAndStat(fullPath);
    }
    ...
}

我们回到添加操作方法offerOperation

//添加操作到操作队列
private void offerOperation(final Operation operation)
{
    if ( operationsQuantizer.add(operation) )
    {
        submitToExecutor
        (
            new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        operationsQuantizer.remove(operation);
                        operation.invoke();
                    }
                    catch ( Exception e )
                    {
                        handleException(e);
                    }
                }
            }
        );
    }
}
private synchronized void submitToExecutor(final Runnable command)
  {
      if ( state.get() == State.STARTED )
      {
          executorService.submit(command);
      }
  }

从上面可以看出,刷新操作主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。

  1. 添加刷新操作
    //添加刷新操作
    offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
    

    有了第一点的解读,这一点应该很好理解。

3.

//重新构建
rebuild();
@VisibleForTesting
 volatile Exchanger<Object> rebuildTestExchanger;
/**
 * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
 * for all needed data WITHOUT generating any events to send to listeners.
 *
 * @throws Exception errors
 */
public void rebuild() throws Exception
{
    Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");

    ensurePath.ensure(client.getZookeeperClient());

    clear();
    //重新获取子路径
    List<String> children = client.getChildren().forPath(path);
    for ( String child : children )
    {
        String fullPath = ZKPaths.makePath(path, child);
        internalRebuildNode(fullPath);

        if ( rebuildTestExchanger != null )
        {
            rebuildTestExchanger.exchange(new Object());
        }
    }

    // this is necessary so that any updates that occurred while rebuilding are taken
    offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
  * Clears the current data without beginning a new query and without generating any events
  * for listeners.
  */
public void clear()
{
     currentData.clear();
}
private void internalRebuildNode(String fullPath) throws Exception
{
    if ( cacheData )
    {
        try
        {
            Stat stat = new Stat();
            byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
            currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
        }
        catch ( KeeperException.NoNodeException ignore )
        {
            // node no longer exists - remove it
            currentData.remove(fullPath);
        }
    }
    else
    {
        Stat stat = client.checkExists().forPath(fullPath);
        if ( stat != null )
        {
            currentData.put(fullPath, new ChildData(fullPath, stat, null));
        }
        else
        {
            // node no longer exists - remove it
            currentData.remove(fullPath);
        }
    }
}

从上面可以看出,重新构建操作,主要是刷新缓存路径数据,并注册刷新操作。 小节一下: 一级目录监听器PathChildrenCache,启动主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 事件操作EventOperation,主要是触发监听器的子目录事件操作;刷新操作RefreshOperation主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。

最后我们再来看下一客户端框架的关闭操作

@Override
public void     close()
{
    log.debug("Closing");
    if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
    {
        //触发监听器,状态关闭时间
        listeners.forEach
            (
                new Function<CuratorListener, Void>()
                {
                    @Override
                    public Void apply(CuratorListener listener)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
                        try
                        {
                            listener.eventReceived(CuratorFrameworkImpl.this, event);
                        }
                        catch ( Exception e )
                        {
                            log.error("Exception while sending Closing event", e);
                        }
                        return null;
                    }
                }
            );
        //清除监听器,及连接状态管理器
        listeners.clear();
        unhandledErrorListeners.clear();
        connectionStateManager.close();//关闭连接状态管理器
        client.close();//关闭客户端
        namespaceWatcherMap.close();
        if ( executorService != null )
        {
            executorService.shutdownNow();
        }
    }
}

再来看关闭连接状态管理器:

//ConnectionStateManager
@Override
public void close()
{
    if ( state.compareAndSet(State.STARTED, State.CLOSED) )
    {
        service.shutdownNow();
        listeners.clear();
    }
}

再来看关闭客户端

//CuratorZookeeperClient
/**
  * Close the client
  */
 public void     close()
 {
     log.debug("Closing");

     started.set(false);
     try
     {
         state.close();
     }
     catch ( IOException e )
     {
         log.error("", e);
     }
}
//ConnectionState
Override
public void close() throws IOException
{
    log.debug("Closing");

    CloseableUtils.closeQuietly(ensembleProvider);
    try
    {
        zooKeeper.closeAndClear();
    }
    catch ( Exception e )
    {
        throw new IOException(e);
    }
    finally
    {
        isConnected.set(false);
    }
}

从上面可以看出,关闭客户端框架,主要是清除监听器,连接状态管理器,关闭zk客户端。

总结

节点监听缓存NodeCache,内部关联一下Curator框架客户端CuratorFramework,节点监听器容器 listeners(ListenerContainer),用于 存放节点监听器。

添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer(CuratorFrameworkImpl内部的成员添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer)中。 启动节点监听器,实际上是注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要,则重新构建节点数据,同时重新注册节点监听器CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。

Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。

启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer中的监听器,消费连接状态事件队列BlockingQueue中事件。

子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器ListenerContainer,及事件执行器CloseableExecutorService,事件操作集Set

一级目录监听器PathChildrenCache,启动主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 事件操作EventOperation,主要是触发监听器的子目录事件操作;刷新操作RefreshOperation主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。

关闭客户端框架,主要是清除监听器,连接状态管理器,关闭zk客户端。