Curator目录监听
write by donaldhan, 2018-06-29 09:40引言
上一篇文章,我们简单看一下Curator的CDRWA相关的构造器,及Curator框架实现,[Curator][]框架工厂CuratorFrameworkFactory内部,主要成员变量为默认的会话超时与连接超时时间,本地地址,字节压缩器GzipCompressionProvider, 默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。GzipCompressionProvider用于压缩字节流。 默认的Zookeeper工厂DefaultZookeeperFactory,用于创建原生Zookeeper客户端。DefaultACLProvider主要用户获取节点的ACL权限。
CuratorFrameworkFactory内部构建器Builder,除了会话超时与连接超时时间,字节压缩器,原生API客户端工厂, ACL提供器之外,还有线程工程ThreadFactory,验证方式,及验证值,及重试策略RetryPolicy。 ExponentialBackoffRetry主要用户控制会话超时重连的次数和下次尝试时间。 内部构建器Builder,创建的实际为CuratorFrameworkImpl。
CuratorFramework主要提供了启动关闭客户端操作,及CDRWA相关的构建器,如创建节点CreateBuilder,删除节点DeleteBuilder,获取节点数据GetDataBuilder,设置节点数据SetACLBuilder,
,检查节点ExistsBuilder,同步数据构建器SyncBuilder, 事物构建器CuratorTransaction,ACL构建器GetACLBuilder、SetACLBuilder,提供了客户端连接状态监听器Listenable
Curator zk客户端CuratorZookeeperClient主要用于获取原生API ZK客户端,以及用于重新创建失效会话,执行相应的CDRWA操作。 创建构建器CreateBuilder,主要提供了创建持久化和临时节点的操作。
Curator框架实现CuratorFrameworkImpl,创建目录实际上委托给Curator框架内部的原生API zk客户端,如果需要创建建父目录,并且父目录不存在,则创建父目录。 如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调Callable。
删除构建器DeleteBuilder,删除目录,实际操作在一个重试循环中,如果会话过期,则重新连接会话,并将实际删除操作委托给Curator框架内部的原生API zk客户端。
Curator框架实现CuratorFrameworkImpl的获取目录数据操作,检查目录和设置目录数据的原理与创建、删除操作基本相同实际操作委托给Curator框架内部的原生API zk客户端, 并保证会话有效。 本文章所有的示例,参见[zookeeper-demo][]。 [zookeeper-demo]:https://github.com/Donaldhan/zookeeper-demo “zookeeper-demo” [Curator]:https://donaldhan.github.io/zookeeper/2018/06/18/Curator.html “Curator”
目录
Curator节点监听
curator官方推荐的API是对zookeeper原生的JAVA API进行了封装,将重复注册,事件信息等很好的处理了。而且监听事件返回了详细的信息,包括变动的节点路径,节点值等等,这是原生API所没有的。这个对事件的监听类似于一个本地缓存视图和远程Zookeeper视图的对比过程。curator的方法调用采用的是流式API,此种风格的优点及使用注意事项可自行查阅资料了解。对于目录的监听,curator提供了三个接口,分别如下:
- NodeCache:对一个节点进行监听,监听事件包括指定路径的增删改操作;
- PathChildrenCache:对指定路径节点的一级子目录监听,不对该节点的操作监听,对其子目录的增删改操作监听
- TreeCache,综合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听,可以设置监听深度。
下面我们来看几个示例:
监听节点数据变化
package org.donald.curator.recipes.cache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.donald.constant.ConfigConstant;
import java.io.UnsupportedEncodingException;
/**
* @ClassName: NodeCacheSample
* @Description: 节点监听器,类似于原生Watcher
* @Author: Donaldhan
* @Date: 2018-05-16 8:24
*/
@Slf4j
public class NodeCacheSample {
private static CuratorFramework client;
public static void main(String[] args) {
String path = "/zk-book/nodecache";
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
log.info("success connected...");
client.start();
final NodeCache cache = new NodeCache(client,path,false);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws UnsupportedEncodingException {
log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
}
});
//不要忘记启动cache
cache.start(true);
//如果需要创建父节点,需要注意一个问题,创建的父节点是持久化的
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
log.info("success create:{}...", path);
client.setData().forPath( path, "update".getBytes() );
log.info("success update:{}...", path);
Thread.sleep( 3000 );
client.delete().deletingChildrenIfNeeded().forPath( path );
log.info("success delete:{}...", path);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
client.close();
}
}
}
}
监听节点不存在
package org.donald.curator.recipes.cache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: NodeCacheNotExistsSample
* @Description: 测试监听节点不存在的情况
* @Author: Donaldhan
* @Date: 2018-05-16 14:42
*/
@Slf4j
public class NodeCacheNotExistsSample {
private static CuratorFramework client;
public static void main(String[] args) {
String path = "/curator_nodecache_sample";
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
log.info("success connected...");
client.start();
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() {
log.info("Node data update, new data: {}", new String(cache.getCurrentData().getData()));
}
});
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Thread.sleep(6000);
//使用过后,不要忘了关闭节点缓存
cache.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
client.close();
}
}
}
}
监听一级目录
package org.donald.curator.recipes.cache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: PathChildrenCacheSample
* @Description: 监控路径子节点变化
* @Author: Donaldhan
* @Date: 2018-05-16 9:37
*/
@Slf4j
public class PathChildrenCacheSample {
private static CuratorFramework client;
public static void main(String[] args) {
String path = "/zk-book";
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
log.info("success connected...");
client.start();
PathChildrenCache cache = new PathChildrenCache(client, path, true);
/**
* After cache is primed with initial values (in the background) a
* {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
*/
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
break;
case CHILD_UPDATED:
log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
break;
case CHILD_REMOVED:
log.info("CHILD_REMOVED,{}",event.getData().getPath());
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
log.info("success create /zk-book...");
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
log.info("success create child c1...");
Thread.sleep( 1000 );
client.setData().forPath(path+"/c1","update".getBytes());
log.info("success update child c1...");
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1/d1");
log.info("success create child c1/d1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path+"/c1/d1");
log.info("success delete child c1/d1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path+"/c1");
log.info("success delete child c1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path);
log.info("success delete child /zk-book...");
Thread.sleep( 3000 );
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
client.close();
}
}
}
}
监听一级目录,并使用执行器,执行相应的事件
package org.donald.curator.recipes.cache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.donald.common.threadpool.TaskExecutors;
import org.donald.constant.ConfigConstant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName: PathChildrenCacheSample
* @Description: 监控路径子节点变化,使用自定义线程池,执行监听事件
* @Author: Donaldhan
* @Date: 2018-05-16 9:37
*/
@Slf4j
public class PathChildrenCacheWithExecutorServiceSample {
private static CuratorFramework client;
private static ExecutorService exec = TaskExecutors.newFixedThreadPool(2);
public static void main(String[] args) {
String path = "/zk-book";
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
log.info("success connected...");
client.start();
log.info( "current thread name:{} ", Thread.currentThread().getName() );
PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
/**
* NORMAL,
* cache will _not_ be primed. i.e. it will start empty and you will receive
* events for all nodes added, etc.
*/
cache.start(PathChildrenCache.StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_UPDATED:
log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_REMOVED:
log.info("CHILD_REMOVED,{}",event.getData().getPath());
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
log.info("success create /zk-book...");
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
log.info("success create child c1...");
Thread.sleep( 1000 );
client.setData().forPath(path+"/c1","update".getBytes());
log.info("success update child c1...");
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1/d1");
log.info("success create child c1/d1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path+"/c1/d1");
log.info("success delete child c1/d1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path+"/c1");
log.info("success delete child c1 ...");
Thread.sleep( 1000 );
client.delete().forPath(path);
log.info("success delete child /zk-book...");
Thread.sleep( 3000 );
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
client.close();
}
}
}
}
上面几个示例,我们主要使用的是节点监听器NodeCache,和一级目录监听器PathChildrenCache。关键sql如下:
- 节点监听器NodeCache
final NodeCache cache = new NodeCache(client,path,false); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws UnsupportedEncodingException { log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME)); } });
- 一级目录监听器PathChildrenCache
PathChildrenCache cache = new PathChildrenCache(client, path, true); /** * After cache is primed with initial values (in the background) a * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted */ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME); log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue); break; case CHILD_UPDATED: log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME)); break; case CHILD_REMOVED: log.info("CHILD_REMOVED,{}",event.getData().getPath()); break; default: break; } } });
PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
/**
* NORMAL,
* cache will _not_ be primed. i.e. it will start empty and you will receive
* events for all nodes added, etc.
*/
cache.start(PathChildrenCache.StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_UPDATED:
log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_REMOVED:
log.info("CHILD_REMOVED,{}",event.getData().getPath());
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
default:
break;
}
}
});
下面我们分别来看节点监听器NodeCache
节点监听器NodeCache
- 节点监听器NodeCache
final NodeCache cache = new NodeCache(client,path,false); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws UnsupportedEncodingException { log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME)); } });
先来看一下这个节点监听缓存的定义:
public class NodeCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client; //Curator框架客户端
private final String path;
private final boolean dataIsCompressed;
private final EnsurePath ensurePath;
//子节点数据
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);//监听状态
//节点监听器容器
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
private final AtomicBoolean isConnected = new AtomicBoolean(true);
//连接状态监听器
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
{
if ( isConnected.compareAndSet(false, true) )
{
try
{
//重新处理节点监听器
reset();
}
catch ( Exception e )
{
log.error("Trying to reset after reconnection", e);
}
}
}
else
{
isConnected.set(false);
}
}
};
//Curator监听器
private final CuratorWatcher watcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
reset();
}
};
private enum State
{
LATENT,
STARTED,
CLOSED
}
private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
processBackgroundResult(event);
}
};
/**
* @param client curztor client
* @param path the full path to the node to cache
*/
public NodeCache(CuratorFramework client, String path)
{
this(client, path, false);
}
/**
* @param client curztor client
* @param path the full path to the node to cache
* @param dataIsCompressed if true, data in the path is compressed
*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
this.client = client;
this.path = path;
this.dataIsCompressed = dataIsCompressed;
ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
}
}
//节点缓存监听器
public interface NodeCacheListener
{
/**
* Called when a change has occurred
*/
public void nodeChanged() throws Exception;
}
从上面可以看出节点监听缓存NodeCache,内部关联一下Curator框架客户端CuratorFramework,节点监听器容器 listeners(ListenerContainer
再来看一下节点监听缓存NodeCache添加监听器
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws UnsupportedEncodingException {
log.info("Node data update, new data: {}" , new String(cache.getCurrentData().getData(), ConfigConstant.CHAR_SET_NAME));
}
});
//NodeCache
/**
* Return the cache listenable
*
* @return listenable
*/
public ListenerContainer<NodeCacheListener> getListenable()
{
Preconditions.checkState(state.get() != State.CLOSED, "Closed");
return listeners;
}
//ListenerContainer
public class ListenerContainer<T> implements Listenable<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
@Override
public void addListener(T listener)
{
addListener(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void addListener(T listener, Executor executor)
{
listeners.put(listener, new ListenerEntry<T>(listener, executor));
}
...
}
从上面可以看出,添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer
再来看一下启动节点缓存
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @throws Exception errors
*/
public void start() throws Exception
{
start(false);
}
/**
* Same as {@link #start()} but gives the option of doing an initial build
*
* @param buildInitial if true, {@link #rebuild()} will be called before this method
* returns in order to get an initial view of the node
* @throws Exception errors
*/
public void start(boolean buildInitial) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
ensurePath.ensure(client.getZookeeperClient());
//添加连接监听器到Curator框架内部的连接状态监听器
client.getConnectionStateListenable().addListener(connectionStateListener);
if ( buildInitial )
{
//如果需要者重新构建节点数据
internalRebuild();
}
//重置
reset();
}
// CuratorFrameworkImpl
public class CuratorFrameworkImpl implements CuratorFramework
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable()
{
return connectionStateManager.getListenable();
}
...
}
//ChildData
public class ChildData implements Comparable<ChildData>
{
private final String path;
private final Stat stat;
private final AtomicReference<byte[]> data;
public ChildData(String path, Stat stat, byte[] data)
{
this.path = path;
this.stat = stat;
this.data = new AtomicReference<byte[]>(data);
}
...
}
我们上面的连个关键点:
- 如果需要者重新构建节点数据
if ( buildInitial ) { //如果需要者重新构建节点数据 internalRebuild(); } private void internalRebuild() throws Exception { try { Stat stat = new Stat(); byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path); //重置节点数据 data.set(new ChildData(path, stat, bytes)); } catch ( KeeperException.NoNodeException e ) { data.set(null); } }
2.
//重置,重新注册监听器
reset();
private void reset() throws Exception
{
if ( (state.get() == State.STARTED) && isConnected.get() )
{
client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
//CuratorFrameworkImpl
@Override
public ExistsBuilder checkExists()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new ExistsBuilderImpl(this);
}
class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
{
private final CuratorFrameworkImpl client;//Curator框架实现
private Backgrounding backgrounding;
private Watching watching;
ExistsBuilderImpl(CuratorFrameworkImpl client)
{
this.client = client;
backgrounding = new Backgrounding();
watching = new Watching();
}
//添加观察者
@Override
public BackgroundPathable<Stat> usingWatcher(Watcher watcher)
{
watching = new Watching(client, watcher);
return this;
}
//添加观察者CuratorWatcher
@Override
public BackgroundPathable<Stat> usingWatcher(CuratorWatcher watcher)
{
watching = new Watching(client, watcher);
return this;
}
//添加后台回调
@Override
public Pathable<Stat> inBackground(BackgroundCallback callback, Object context)
{
backgrounding = new Backgrounding(callback, context);
return this;
}
@Override
public Pathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor)
{
backgrounding = new Backgrounding(client, callback, context, executor);
return this;
}
@Override
public Pathable<Stat> inBackground(BackgroundCallback callback)
{
backgrounding = new Backgrounding(callback);
return this;
}
@Override
public Pathable<Stat> inBackground(BackgroundCallback callback, Executor executor)
{
backgrounding = new Backgrounding(client, callback, executor);
return this;
}
}
//Watching
class Watching
{
private final Watcher watcher;
private final boolean watched;
...
}
class Backgrounding
{
private final boolean inBackground;//是否为后台任务
private final Object context;//上下文
private final BackgroundCallback callback;//回调
}
/**
* Functor for an async background operation
*/
public interface BackgroundCallback
{
/**
* Called when the async background operation completes
*
* @param client the client
* @param event operation result details
* @throws Exception errors
*/
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
我们回到NodeCache内部成员变量
//连接状态监听器
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
{
if ( isConnected.compareAndSet(false, true) )
{
try
{
//重新注册节点监听器
reset();
}
catch ( Exception e )
{
log.error("Trying to reset after reconnection", e);
}
}
}
else
{
isConnected.set(false);
}
}
};
//Curator监听器
private final CuratorWatcher watcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
//重新注册节点监听器
reset();
}
};
private enum State
{
LATENT,
STARTED,
CLOSED
}
private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
processBackgroundResult(event);
}
};
//后台处理
private void processBackgroundResult(CuratorEvent event) throws Exception
{
switch ( event.getType() )
{
//处理获取数据事件
case GET_DATA:
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData childData = new ChildData(path, event.getStat(), event.getData());
setNewData(childData);
}
break;
}
//处理检查目录事件
case EXISTS:
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
setNewData(null);
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
//重新注册监听器
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
else
{
client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
private void setNewData(ChildData newData) throws InterruptedException
{
//比较最新数据和变更前的数据,查看是否有变更
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
//调用节点监听容器内部的监听器处理目录变更事件
listeners.forEach
(
new Function<NodeCacheListener, Void>()
{
@Override
public Void apply(NodeCacheListener listener)
{
try
{
listener.nodeChanged();
}
catch ( Exception e )
{
log.error("Calling listener", e);
}
return null;
}
}
);
if ( rebuildTestExchanger != null )
{
try
{
rebuildTestExchanger.exchange(new Object());
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
从上面可以看出,启动节点监听器,实际上是注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要,则重新构建节点数据,同时重新注册节点监听器CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。
我们再来看,注册的监听器,是如何触发的,这个我们要回到CuratorFramework实现启动
//CuratorFrameworkImpl
public class CuratorFrameworkImpl implements CuratorFramework
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;//zk客户端
private final ListenerContainer<CuratorListener> listeners;//Curator监听器
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;//连接状态管理器
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private volatile ExecutorService executorService;
@Override
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
IllegalStateException error = new IllegalStateException();
log.error("Cannot be started more than once", error);
throw error;
}
try
{
//启动连接状态管理器
connectionStateManager.start(); // ordering dependency - must be called before client.start()
//启动客户端
client.start();
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
executorService.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
}
);
}
catch ( Exception e )
{
handleBackgroundOperationException(null, e);
}
}
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//初始化客户端
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
//关键点
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
//包装原生WatchedEvent事件为CuratorEvent
CuratorEvent event = new CuratorEventImpl
(
CuratorFrameworkImpl.this,
CuratorEventType.WATCHED,
watchedEvent.getState().getIntValue(),
unfixForNamespace(watchedEvent.getPath()),
null,
null,
null,
null,
null,
watchedEvent,
null
);
//处理事件
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
//初始化连接状态管理器,关键点
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
if ( builder.getAuthScheme() != null )
{
authInfo.set(new AuthInfo(builder.getAuthScheme(), builder.getAuthValue()));
}
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
//处理CuratorEvent事件
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach
(
new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
TimeTrace trace = client.startTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
logError("Event listener threw exception", e);
}
return null;
}
}
);
}
}
从上面可以看出,Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager,然后再启动客户端CuratorZookeeperClient。
再来看CuratorZookeeperClient的启动,
public class CuratorZookeeperClient implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionState state;//连接状态
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
private final int connectionTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
/**
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
* @param zookeeperFactory factory for creating {@link ZooKeeper} instances
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
* @param canBeReadOnly if true, allow ZooKeeper client to enter
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
//初始化连接连接状态ConnectionState,关键点
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}
/**
* Must be called after construction
*
* @throws IOException errors
*/
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
IllegalStateException error = new IllegalStateException();
log.error("Already started", error);
throw error;
}
//启动连接状态ConnectionState
state.start();
}
再来看启动连接状态ConnectionState:
class ConnectionState implements Watcher, Closeable
{
private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
private final Logger log = LoggerFactory.getLogger(getClass());
private final HandleHolder zooKeeper;
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final EnsembleProvider ensembleProvider;
private final int sessionTimeoutMs;
private final int connectionTimeoutMs;
private final AtomicReference<TracerDriver> tracer;
private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
//监听器队列
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
private final AtomicLong instanceIndex = new AtomicLong();
private volatile long connectionStartMs = 0;
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
//添加WACher监听器,关键点
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
//处理监听事件
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
for ( Watcher parentWatcher : parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
boolean wasConnected = isConnected.get();
boolean newIsConnected = wasConnected;
if ( event.getType() == Watcher.Event.EventType.None )
{
newIsConnected = checkState(event.getState(), wasConnected);
}
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
}
void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
reset();
}
}
从上面可以看出,。Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。
回到连接状态管理器的启动:
/**
* Used internally to manage connection state
*/
public class ConnectionStateManager implements Closeable
{
private static final int QUEUE_SIZE;
static
{
int size = 25;
String property = System.getProperty("ConnectionStateManagerSize", null);
if ( property != null )
{
try
{
size = Integer.parseInt(property);
}
catch ( NumberFormatException ignore )
{
// ignore
}
}
QUEUE_SIZE = size;
}
private final Logger log = LoggerFactory.getLogger(getClass());
//连接状态变更事件
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
private final CuratorFramework client;
//连接状态监听器容器
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
// guarded by sync
private ConnectionState currentConnectionState;
private enum State
{
LATENT,
STARTED,
CLOSED
}
/**
* @param client the client
* @param threadFactory thread factory to use or null for a default
*/
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
{
this.client = client;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
service = Executors.newSingleThreadExecutor(threadFactory);
}
/**
* Start the manager
*/
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
//执行连接状态事件处理
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
/**
* Return the listenable
*
* @return listenable
*/
public ListenerContainer<ConnectionStateListener> getListenable()
{
return listeners;
}
//处理连接状态事件
private void processEvents()
{
try
{
while ( !Thread.currentThread().isInterrupted() )
{
//消费连队列接状态事件
final ConnectionState newState = eventQueue.take();
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
listeners.forEach
(
new Function<ConnectionStateListener, Void>()
{
@Override
public Void apply(ConnectionStateListener listener)
{
//调用连接状态监听器,处理连接状态事件
listener.stateChanged(client, newState);
return null;
}
}
);
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
从上面可以看出,启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer
结合注册节点监听器,我们来看一下小节一下:
添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer
Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。
启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer
再来看子目录监听PathChildrenCache
子一级目录监听器PathChildrenCache
- 子一级目录监听器PathChildrenCache
PathChildrenCache cache = new PathChildrenCache(client, path, true); /** * After cache is primed with initial values (in the background) a * {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted */ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME); log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue); break; case CHILD_UPDATED: log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME)); break; case CHILD_REMOVED: log.info("CHILD_REMOVED,{}",event.getData().getPath()); break; default: break; } } });
PathChildrenCache cache = new PathChildrenCache(client, path, true, false, exec);
/**
* NORMAL,
* cache will _not_ be primed. i.e. it will start empty and you will receive
* events for all nodes added, etc.
*/
cache.start(PathChildrenCache.StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
String currentPathValue = new String(client.getData().storingStatIn(new Stat()).forPath(path),ConfigConstant.CHAR_SET_NAME);
log.info("CHILD_ADDED,{},parent node data: {}",event.getData().getPath(), currentPathValue);
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_UPDATED:
log.info("CHILD_UPDATED,{},vaule:{}",event.getData().getPath(), new String(event.getData().getData(),ConfigConstant.CHAR_SET_NAME));
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
case CHILD_REMOVED:
log.info("CHILD_REMOVED,{}",event.getData().getPath());
log.info( "current thread name:{} ", Thread.currentThread().getName() );
break;
default:
break;
}
}
});
我们来看PathChildrenCache的定义
public class PathChildrenCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;//框架客户端实现
private final String path;
private final CloseableExecutorService executorService;//事件执行器
private final boolean cacheData;
private final boolean dataIsCompressed;
private final EnsurePath ensurePath;
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();//子目录监听容器
private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();//
private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());//事件操作集
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private enum State
{
LATENT,
STARTED,
CLOSED
}
private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
/**
* @param client the client
* @param path path to watch
* @param mode caching mode
* @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
*/
@SuppressWarnings("deprecation")
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param mode caching mode
* @param threadFactory factory to use when creating internal threads
* @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
*/
@SuppressWarnings("deprecation")
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param threadFactory factory to use when creating internal threads
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param threadFactory factory to use when creating internal threads
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService ExecutorService to use for the PathChildrenCache's background thread
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the PathChildrenCache's background thread
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
{
this.client = client;
this.path = path;
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.executorService = executorService;
ensurePath = client.newNamespaceAwareEnsurePath(path);
}
}
/**
* Listener for PathChildrenCache changes
*/
public interface PathChildrenCacheListener
{
/**
* Called when a change has occurred
*
* @param client the client
* @param event describes the change
* @throws Exception errors
*/
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
public class PathChildrenCacheEvent
{
private final Type type;
private final ChildData data;
/**
* Type of change
*/
public enum Type
{
/**
* A child was added to the path
*/
CHILD_ADDED,
/**
* A child's data was changed
*/
CHILD_UPDATED,
CHILD_REMOVED,
CONNECTION_SUSPENDED,
CONNECTION_RECONNECTED,
CONNECTION_LOST,
INITIALIZED
}
}
/**
* Decoration on an ExecutorService that tracks created futures and provides
* a method to close futures created via this class
*/
public class CloseableExecutorService implements Closeable
{
private final Logger log = LoggerFactory.getLogger(CloseableExecutorService.class);
private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
private final ExecutorService executorService;
private final boolean shutdownOnClose;
protected final AtomicBoolean isOpen = new AtomicBoolean(true);
/**
* @param executorService the service to decorate
*/
public CloseableExecutorService(ExecutorService executorService)
{
this(executorService, false);
}
/**
* @param executorService the service to decorate
* @param shutdownOnClose if true, shutdown the executor service when this is closed
*/
public CloseableExecutorService(ExecutorService executorService, boolean shutdownOnClose)
{
this.executorService = executorService;
this.shutdownOnClose = shutdownOnClose;
}
...
}
从上面可以看出,子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器ListenerContainer
来看子目录监听器PathChildrenCache启动
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @throws Exception errors
*/
public void start() throws Exception
{
start(StartMode.NORMAL);
}
/**
* Same as {@link #start()} but gives the option of doing an initial build
*
* @param buildInitial if true, {@link #rebuild()} will be called before this method
* returns in order to get an initial view of the node; otherwise,
* the cache will be initialized asynchronously
* @throws Exception errors
* @deprecated use {@link #start(StartMode)}
*/
public void start(boolean buildInitial) throws Exception
{
start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
}
/**
* Method of priming cache on {@link PathChildrenCache#start(StartMode)}
*/
public enum StartMode
{
/**
* cache will _not_ be primed. i.e. it will start empty and you will receive
* events for all nodes added, etc.
缓存不会初始化,将会接受所有节点的添加时间
*/
NORMAL,
/**
* {@link PathChildrenCache#rebuild()} will be called before this method returns in
* order to get an initial view of the node.
*/
BUILD_INITIAL_CACHE,
/**
* After cache is primed with initial values (in the background) a
* {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
在缓存初始化完成后
*/
POST_INITIALIZED_EVENT
}
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @param mode Method for priming the cache
* @throws Exception errors
*/
public void start(StartMode mode) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
mode = Preconditions.checkNotNull(mode, "mode cannot be null");
//添加状态监听器
client.getConnectionStateListenable().addListener(connectionStateListener);
switch ( mode )
{
case NORMAL:
{
//添加刷新操作
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
break;
}
case BUILD_INITIAL_CACHE:
{
//重新构建
rebuild();
break;
}
case POST_INITIALIZED_EVENT:
{
//重新添加刷新操作
initialSet.set(Maps.<String, ChildData>newConcurrentMap());
offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
break;
}
}
}
启动方法有几下几点需要关注
- 添加状态监听器
//添加状态监听器 client.getConnectionStateListenable().addListener(connectionStateListener); private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; private void handleStateChange(ConnectionState newState) { switch ( newState ) { case SUSPENDED: { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null))); break; } case LOST://失去连接 { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null))); break; } case RECONNECTED://重新连接 { try { offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT)); offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null))); } catch ( Exception e ) { handleException(e); } break; } } }
从上面可以看出连接状态监听器ConnectionStateListener,根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 先来看事件操作
class EventOperation implements Operation
{
private final PathChildrenCache cache;
private final PathChildrenCacheEvent event;
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event)
{
this.cache = cache;
this.event = event;
}
@Override
public void invoke()
{
cache.callListeners(event);
}
@Override
public String toString()
{
return "EventOperation{" +
"event=" + event +
'}';
}
}
interface Operation
{
public void invoke() throws Exception;
}
//PathChildrenCache
void callListeners(final PathChildrenCacheEvent event)
{
listeners.forEach
(
new Function<PathChildrenCacheListener, Void>()
{
@Override
public Void apply(PathChildrenCacheListener listener)
{
try
{
//通知监听器,子目录节点变更事件
listener.childEvent(client, event);
}
catch ( Exception e )
{
handleException(e);
}
return null;
}
}
);
}
从上面可以看出,事件操作,主要是触发监听器的子目录事件操作。 再来看刷新操作
class RefreshOperation implements Operation
{
private final PathChildrenCache cache;
private final PathChildrenCache.RefreshMode mode;
RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
{
this.cache = cache;
this.mode = mode;
}
@Override
public void invoke() throws Exception
{
cache.refresh(mode);
}
...
}
//RefreshOperation
void refresh(final RefreshMode mode) throws Exception
{
ensurePath.ensure(client.getZookeeperClient());
//后台回调
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
//处理子目录事件
processChildren(event.getChildren(), mode);
}
}
};
//这个是关键
client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
List<String> fullPaths = Lists.newArrayList(Lists.transform
(
children,
new Function<String, String>()
{
@Override
public String apply(String child)
{
return ZKPaths.makePath(path, child);
}
}
));
Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
removedNodes.removeAll(fullPaths);
for ( String fullPath : removedNodes )
{
remove(fullPath);
}
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
//监听子一级目录
getDataAndStat(fullPath);
}
updateInitialSet(name, NULL_CHILD_DATA);
}
maybeOfferInitializedEvent(initialSet.get());
}
//监听子一级目录
void getDataAndStat(final String fullPath) throws Exception
{
BackgroundCallback existsCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), null);
}
};
BackgroundCallback getDataCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData());
}
};
if ( cacheData )
{
if ( dataIsCompressed )
{
//这个我们应该很熟悉,注册节点观察器Watcher
client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
}
else
{
client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
}
}
else
{
client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath);
}
}
//监测子路径的添加与更新事件操作
private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
{
if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
{
ChildData data = new ChildData(fullPath, stat, bytes);
ChildData previousData = currentData.put(fullPath, data);
if ( previousData == null ) // i.e. new
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
}
else if ( previousData.getStat().getVersion() != stat.getVersion() )
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
}
updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data);
}
}
//观察器
private final Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
handleException(e);
}
}
};
class GetDataOperation implements Operation
{
private final PathChildrenCache cache;
private final String fullPath;
GetDataOperation(PathChildrenCache cache, String fullPath)
{
this.cache = cache;
this.fullPath = fullPath;
}
@Override
public void invoke() throws Exception
{
//重新注册监听
cache.getDataAndStat(fullPath);
}
...
}
我们回到添加操作方法offerOperation
//添加操作到操作队列
private void offerOperation(final Operation operation)
{
if ( operationsQuantizer.add(operation) )
{
submitToExecutor
(
new Runnable()
{
@Override
public void run()
{
try
{
operationsQuantizer.remove(operation);
operation.invoke();
}
catch ( Exception e )
{
handleException(e);
}
}
}
);
}
}
private synchronized void submitToExecutor(final Runnable command)
{
if ( state.get() == State.STARTED )
{
executorService.submit(command);
}
}
从上面可以看出,刷新操作主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。
- 添加刷新操作
//添加刷新操作 offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
有了第一点的解读,这一点应该很好理解。
3.
//重新构建
rebuild();
@VisibleForTesting
volatile Exchanger<Object> rebuildTestExchanger;
/**
* NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
* for all needed data WITHOUT generating any events to send to listeners.
*
* @throws Exception errors
*/
public void rebuild() throws Exception
{
Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
ensurePath.ensure(client.getZookeeperClient());
clear();
//重新获取子路径
List<String> children = client.getChildren().forPath(path);
for ( String child : children )
{
String fullPath = ZKPaths.makePath(path, child);
internalRebuildNode(fullPath);
if ( rebuildTestExchanger != null )
{
rebuildTestExchanger.exchange(new Object());
}
}
// this is necessary so that any updates that occurred while rebuilding are taken
offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
* Clears the current data without beginning a new query and without generating any events
* for listeners.
*/
public void clear()
{
currentData.clear();
}
private void internalRebuildNode(String fullPath) throws Exception
{
if ( cacheData )
{
try
{
Stat stat = new Stat();
byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
}
catch ( KeeperException.NoNodeException ignore )
{
// node no longer exists - remove it
currentData.remove(fullPath);
}
}
else
{
Stat stat = client.checkExists().forPath(fullPath);
if ( stat != null )
{
currentData.put(fullPath, new ChildData(fullPath, stat, null));
}
else
{
// node no longer exists - remove it
currentData.remove(fullPath);
}
}
}
从上面可以看出,重新构建操作,主要是刷新缓存路径数据,并注册刷新操作。 小节一下: 一级目录监听器PathChildrenCache,启动主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 事件操作EventOperation,主要是触发监听器的子目录事件操作;刷新操作RefreshOperation主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。
最后我们再来看下一客户端框架的关闭操作
@Override
public void close()
{
log.debug("Closing");
if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
{
//触发监听器,状态关闭时间
listeners.forEach
(
new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
try
{
listener.eventReceived(CuratorFrameworkImpl.this, event);
}
catch ( Exception e )
{
log.error("Exception while sending Closing event", e);
}
return null;
}
}
);
//清除监听器,及连接状态管理器
listeners.clear();
unhandledErrorListeners.clear();
connectionStateManager.close();//关闭连接状态管理器
client.close();//关闭客户端
namespaceWatcherMap.close();
if ( executorService != null )
{
executorService.shutdownNow();
}
}
}
再来看关闭连接状态管理器:
//ConnectionStateManager
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
再来看关闭客户端
//CuratorZookeeperClient
/**
* Close the client
*/
public void close()
{
log.debug("Closing");
started.set(false);
try
{
state.close();
}
catch ( IOException e )
{
log.error("", e);
}
}
//ConnectionState
Override
public void close() throws IOException
{
log.debug("Closing");
CloseableUtils.closeQuietly(ensembleProvider);
try
{
zooKeeper.closeAndClear();
}
catch ( Exception e )
{
throw new IOException(e);
}
finally
{
isConnected.set(false);
}
}
从上面可以看出,关闭客户端框架,主要是清除监听器,连接状态管理器,关闭zk客户端。
总结
节点监听缓存NodeCache,内部关联一下Curator框架客户端CuratorFramework,节点监听器容器 listeners(ListenerContainer
添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer
Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。
启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer
子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器ListenerContainer
一级目录监听器PathChildrenCache,启动主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 事件操作EventOperation,主要是触发监听器的子目录事件操作;刷新操作RefreshOperation主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。
关闭客户端框架,主要是清除监听器,连接状态管理器,关闭zk客户端。