Curator
write by donaldhan, 2018-06-18 12:16引言
前一篇文章,我们分析了一下Zookeeper客户端ZkClient,先来回顾一下:
Zk客户端ZkClient主要的成员变量为,客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。
节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。
子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。
客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。
事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。
序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。
Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。
会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与[Zk原生API的客户端socket][]作用相同。
ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。 连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。 CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。
事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue
ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。 状态变更事件处理,主要是将触发状态监听任务保证成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。
触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。
这篇文章所有使用的示例代码可以参考zookeeper-demo。
目录
Curator
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端编程。 Curator是对ZK的高阶封装. 与操作原生的Zookeeper相比, 它提供了对ZK的完美封装, 简化了对集群的连接, 错误的处理; 实现了一系列经典”模式”, 比如分布式锁, Leader选举等。 ZooKeeper本身自带一个Java客户端,但使用这个客户端繁琐而且容易出错。客户端的使用者需要做大量的手动维护性工作。比如:
-
连接问题
初始化连接:ZooKeeper客户端与服务器进行握手,这需要花一些时间。如果握手未完成,任何要与服务器端同步执行的方法(如,create(),getData()等)都会抛出异常。 Failover:如果ZooKeeper客户端与服务器连接断开,它会failover到集群中另外一台服务器。然后,这个过程会使客户端退回到”初始化连接”的模式。 Session过期:有些边际情况可以导致ZooKeeper session过期。客户端需要监视这个状态,关闭并重建ZooKeeper客户端实例。
-
恢复问题
当在Server创建顺序节点(sequential ZNode)时,有可能出现这种情况:节点成功创建了,但server在将节点名返回给客户端之前崩溃了。 ZooKeeper客户端可能会抛出几个可恢复的异常,使用者需要捕捉这些异常并做重试操作。
-
Recipe方面
标准的ZooKeeper recipe(如锁,选leader等)只是得到最低程序的描述,要正确地编写出来比较困难。 有一些重要的边界情况在recipe描述里没有提到。例如,锁recipe的描述中,没有说到如何处理服务器成功创建了顺序(Sequential)/临时(Ephemeral)节点,但在向客户端返回结点名之前就崩溃的情况。如果没有得到正确处理,可能会导致死锁。 某些使用场景下,必须要注意可能出现的连接问题。例如,选leader过程要监视连接的稳定性。如果连接到的服务器崩溃了,leader就不能假定自己继续为leader,除非已经成功failover到另外的服务器。
上述问题(和其它类似的问题)必须由每个ZooKeeper使用者来处理。问题解决方案既不容易编写,也不是显而易见的,需要消耗相当多的时间。而Curator处理了所有的问题。 Curator是什么
Curator n ˈkyoor͝ˌātər:,展品或者其它收藏品的看守者,管理员,ZooKeeper的Keeper。它由3个相关的项目组成: 1. curator-client - ZooKeeper自带客户端的替代者,它负责处理低层次的维护工作,并提供某些有用的小功能 2. curator-framework - Curator Framework大大地简化ZooKeeper使用的高层次API。它在ZooKeeper客户端之上添加了很多功能,并处理了与ZooKeeper集群连接管理和重试操作的复杂性。 3. curator-recipes - ZooKeeper某些通用recipe的实现。它是基于Curator Framework之上实现的。
Curator专注于锁,选Leader等这些recipe。大部分对ZooKeeper感兴趣的人不需要关心连接管理等细节。他们想要的只是简单的使用这些recipe。Curator就是以此作为目标。
Curator通过以下方式处理了使用ZooKeeper的复杂度:
1. 重试机制:Curator支持可插拔式的(pluggable)重试机制。所有会产生可恢复异常的ZooKeeper操作都会在配置好的重试策略下得到重试。Curator自带了几个标准的重试策略(如二元指数后退策略)。
2. 连接状态监视:Curator不断监视ZooKeeper连接的状态,Curator用户可以监听连接状态变化并相应的作出回应。
3. ZooKeeper客户端实例管理:Curator通过标准的ZooKeeper类实例来管理与ZooKeeper集群的实际连接。然而,这些实例是管理在内部(尽管你若需要也可以访问),在需要的时候被重新创建。因此,Curator提供了对ZooKeeper集群的可靠处理(不像ZooKeeper自带的实现)。
4. 正确,可靠的recipe:Curator实现了大部分重要的ZooKeeper recipe(还有一些附加的recipe)。它们的实现使用了ZooKeeper的最佳实践,处理了所有已知的边界情况(像前面所说的)。
Curator专注于那些让你的代码更强健,因为你完全专心于你感兴趣的ZooKeeper功能,而不用担心怎么正确完成那些的维护性工作。
ZooKeeper在Netflix
ZooKeeper/Curator在Netflix得到了广泛的使用。使用情景有: 1. InterProcessMutex在各种顺序ID生成器中被用来保证值的唯一性 2. Cassandra备份 3. TrackID服务 4. Chukwa收集器使用LeaderSelector来做各种维护性的任务 我们用了一些第三方的服务,但它们只允许有限数目的并发用户。InterProcessSemphore被用来处理这个。各种Cache。
下面我们来进入Curator客户端的创建及CDRWA操作原理。我们先从一个创建客户端示例入手:
CuratorFrameworkFactory
package org.donald.curator.session;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: CreateSesssionSample
* @Description: 使用curator来创建一个ZooKeeper客户端
* @Author: Donaldhan
* @Date: 2018-05-13 20:44
*/
@Slf4j
public class CreateSesssionSample {
private static CuratorFramework client;
public static void main(String[] args) {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.newClient(ConfigConstant.IP,
ConfigConstant.SESSION_TIMEOUT,
ConfigConstant.CONNETING_TIMEOUT,
retryPolicy);
client.start();
log.info("success connected...");
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (client != null) {
client.close();
}
}
}
}
使用Fluent风格的API接口创建客户端
package org.donald.curator.session;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: CreateSessionWithNamespace
* @Description:
* @Author: Donaldhan
* @Date: 2018-05-13 21:13
*/
@Slf4j
public class CreateSessionWithNamespace {
private static CuratorFramework client;
public static void main(String[] args) {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
log.info("success connected...");
log.info("client namespace:{}",client.getNamespace());
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
}
默认情况下,Curator使用CuratorFrameworkFactory创建客户端,同时可以使用重试策略,应对会话过期的情况。 来看一下使用直接创建客户端方式:
/**
* Create a new client with default session timeout and default connection timeout
*
*
* @param connectString list of servers to connect to
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}
/**
* Create a new client
*
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
从上面可以看出直接创建客户端,内部其实也是走Fluent风格。
来看CuratorFrameworkFactory的定义:
/**
* Factory methods for creating framework-style clients
*/
public class CuratorFrameworkFactory
{
//默认的会话超时与连接超时时间
private static final int DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
//本地地址
private static final byte[] LOCAL_ADDRESS = getLocalAddress();
//字节压缩器
private static final CompressionProvider DEFAULT_COMPRESSION_PROVIDER = new GzipCompressionProvider();
//默认的Zookeeper工厂
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
//默认ACL提供器
private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
private static byte[] getLocalAddress()
{
try
{
return InetAddress.getLocalHost().getHostAddress().getBytes();
}
catch ( UnknownHostException ignore )
{
// ignore
}
return new byte[0];
}
}
从上面可以看出,Curator框架工厂CuratorFrameworkFactory内部,主要成员变量为默认的会话超时与连接超时时间,本地地址,字节压缩器GzipCompressionProvider, 默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。
下面我们分别来看字节压缩器GzipCompressionProvider,默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。
字节压缩器GzipCompressionProvider
package org.apache.curator.framework.imps;
import org.apache.curator.framework.api.CompressionProvider;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipCompressionProvider implements CompressionProvider
{
@Override
public byte[] compress(String path, byte[] data) throws Exception
{
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GZIPOutputStream out = new GZIPOutputStream(bytes);
out.write(data);
out.finish();
return bytes.toByteArray();
}
@Override
public byte[] decompress(String path, byte[] compressedData) throws Exception
{
ByteArrayOutputStream bytes = new ByteArrayOutputStream(compressedData.length);
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressedData));
byte[] buffer = new byte[compressedData.length];
for(;;)
{
int bytesRead = in.read(buffer, 0, buffer.length);
if ( bytesRead < 0 )
{
break;
}
bytes.write(buffer, 0, bytesRead);
}
return bytes.toByteArray();
}
}
GzipCompressionProvider用于压缩字节流。
默认的Zookeeper工厂DefaultZookeeperFactory
package org.apache.curator.utils;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DefaultZookeeperFactory implements ZookeeperFactory
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
默认的Zookeeper工厂DefaultZookeeperFactory,创建原生Zookeeper客户端。
默认ACL提供器DefaultACLProvider
package org.apache.curator.framework.imps;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.util.List;
public class DefaultACLProvider implements ACLProvider
{
@Override
public List<ACL> getDefaultAcl()
{
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
@Override
public List<ACL> getAclForPath(String path)
{
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
}
public class ACL implements Record {
private int perms;
private org.apache.zookeeper.data.Id id;
...
}
从上面可以看出,DefaultACLProvider主要用户获取节点的ACL权限。
下面我们来看CuratorFrameworkFactory内部构建器:
CuratorFrameworkFactory内部构建器
public static class Builder
{
private EnsembleProvider ensembleProvider;
//会话超时与连接超时时间
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
//重试策略
private RetryPolicy retryPolicy;
//线程工程
private ThreadFactory threadFactory = null;
//命名空间
private String namespace;
//验证方式,及验证值
private String authScheme = null;
private byte[] authValue = null;
private byte[] defaultData = LOCAL_ADDRESS;
//字节压缩器
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
//原生API客户端工厂
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
//ACL提供器
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
//是否为只读客户端
private boolean canBeReadOnly = false;
}
从上面可以看出,CuratorFrameworkFactory内部构建器Builder,除了会话超时与连接超时时间,字节压缩器,原生API客户端工厂, ACL提供器之外,还有线程工程ThreadFactory,验证方式,及验证值,及重试策略RetryPolicy。
先来看一下重试策略:
/**
* Abstracts the policy to use when retrying connections
*/
public interface RetryPolicy
{
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*
*
* @param retryCount the number of times retried so far (0 the first time)
* @param elapsedTimeMs the elapsed time in ms since the operation was attempted
* @param sleeper use this to sleep - DO NOT call Thread.sleep
* @return true/false
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
}
/**
* Abstraction for retry policies to sleep
*/
public interface RetrySleeper
{
/**
* Sleep for the given time
*
* @param time time
* @param unit time unit
* @throws InterruptedException if the sleep is interrupted
*/
public void sleepFor(long time, TimeUnit unit) throws InterruptedException;
}
我们来看RetryPolicy的一个实现:
/**
* Retry policy that retries a set number of times with increasing sleep time between retries
*/
public class ExponentialBackoffRetry extends SleepingRetry
{
private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
private static final int MAX_RETRIES_LIMIT = 29;//默认最大尝试次数为29
private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;//默认尝试间隔
private final Random random = new Random();
private final int baseSleepTimeMs;//尝试间隔
private final int maxSleepMs;//
/**
* @param baseSleepTimeMs initial amount of time to wait between retries
* @param maxRetries max number of times to retry
*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
{
this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
}
/**
* @param baseSleepTimeMs initial amount of time to wait between retries
* @param maxRetries max number of times to retry
* @param maxSleepMs max time in ms to sleep on each retry
*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
{
super(validateMaxRetries(maxRetries));
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
@VisibleForTesting
public int getBaseSleepTimeMs()
{
return baseSleepTimeMs;
}
//下次尝试等待时间
@Override
protected int getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
// copied from Hadoop's RetryPolicies.java
int sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
if ( sleepMs > maxSleepMs )
{
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;
}
//校验最大尝试次数
private static int validateMaxRetries(int maxRetries)
{
if ( maxRetries > MAX_RETRIES_LIMIT )
{
log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
maxRetries = MAX_RETRIES_LIMIT;
}
return maxRetries;
}
}
abstract class SleepingRetry implements RetryPolicy
{
private final int n;//最大尝试次数
protected SleepingRetry(int n)
{
this.n = n;
}
// made public for testing
public int getN()
{
return n;
}
//是否允许重试
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
return true;
}
return false;
}
protected abstract int getSleepTimeMs(int retryCount, long elapsedTimeMs);
}
从上面可以看出,ExponentialBackoffRetry主要用户控制会话超时重连的次数和下次尝试时间。
我们回到构建起创建客户端:
/**
* Apply the current values and build a new CuratorFramework
*
* @return new CuratorFramework
*/
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
从上面可以看出内部构建器,创建的实际为CuratorFrameworkImpl。
CuratorFramework
先来看一下CuratorFramework接口:
/**
* Zookeeper framework-style client
*/
public interface CuratorFramework extends Closeable
{
/**
* Start the client. Most mutator methods will not work until the client is started
*/
public void start();
/**
* Stop the client
*/
public void close();
/**
* Returns the state of this instance
*
* @return state
*/
public CuratorFrameworkState getState();
/**
* Return true if the client is started, not closed, etc.
*
* @return true/false
* @deprecated use {@link #getState()} instead
*/
public boolean isStarted();
/**
* Start a create builder
*
* @return builder object
*/
public CreateBuilder create();
/**
* Start a delete builder
*
* @return builder object
*/
public DeleteBuilder delete();
/**
* Start an exists builder
* <p>
* The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called. Thus, a null
* means that it does not exist and an actual Stat object means it does exist.
*
* @return builder object
*/
public ExistsBuilder checkExists();
/**
* Start a get data builder
*
* @return builder object
*/
public GetDataBuilder getData();
/**
* Start a set data builder
*
* @return builder object
*/
public SetDataBuilder setData();
/**
* Start a get children builder
*
* @return builder object
*/
public GetChildrenBuilder getChildren();
/**
* Start a get ACL builder
*
* @return builder object
*/
public GetACLBuilder getACL();
/**
* Start a set ACL builder
*
* @return builder object
*/
public SetACLBuilder setACL();
/**
* Start a transaction builder
*
* @return builder object
*/
public CuratorTransaction inTransaction();
/**
* Perform a sync on the given path - syncs are always in the background
*
* @param path the path
* @param backgroundContextObject optional context
* @deprecated use {@link #sync()} instead
*/
public void sync(String path, Object backgroundContextObject);
/**
* Start a sync builder. Note: sync is ALWAYS in the background even
* if you don't use one of the background() methods
*
* @return builder object
*/
public SyncBuilder sync();
/**
* Returns the listenable interface for the Connect State
*
* @return listenable
*/
public Listenable<ConnectionStateListener> getConnectionStateListenable();
/**
* Returns the listenable interface for events
*
* @return listenable
*/
public Listenable<CuratorListener> getCuratorListenable();
/**
* Returns the listenable interface for unhandled errors
*
* @return listenable
*/
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
/**
* Returns a facade of the current instance that does _not_ automatically
* pre-pend the namespace to all paths
*
* @return facade
* @deprecated use {@link #usingNamespace} passing <code>null</code>
*/
public CuratorFramework nonNamespaceView();
/**
* Returns a facade of the current instance that uses the specified namespace
* or no namespace if <code>newNamespace</code> is <code>null</code>.
*
* @param newNamespace the new namespace or null for none
* @return facade
*/
public CuratorFramework usingNamespace(String newNamespace);
/**
* Return the current namespace or "" if none
*
* @return namespace
*/
public String getNamespace();
/**
* Return the managed zookeeper client
*
* @return client
*/
public CuratorZookeeperClient getZookeeperClient();
/**
* Allocates an ensure path instance that is namespace aware
*
* @param path path to ensure
* @return new EnsurePath instance
*/
public EnsurePath newNamespaceAwareEnsurePath(String path);
}
/**
* @see CuratorFramework#getState()
*/
public enum CuratorFrameworkState
{
/**
* {@link CuratorFramework#start()} has not yet been called
*/
LATENT,
/**
* {@link CuratorFramework#start()} has been called
*/
STARTED,
/**
* {@link CuratorFramework#close()} has been called
*/
STOPPED
}
从上面可以看出,CuratorFramework主要提供了启动关闭客户端操作,及CDRWA相关的构建器,如创建节点CreateBuilder,删除节点DeleteBuilder,获取节点数据GetDataBuilder,设置节点数据SetACLBuilder,
,检查节点ExistsBuilder,同步数据构建器SyncBuilder, 事物构建器CuratorTransaction,ACL构建器GetACLBuilder、SetACLBuilder,提供了获取客户端连接状态监听器Listenable
下面我们分别来看这些构建器的定义,及监听器的定义:
创建节点CreateBuilder
public interface CreateBuilder extends
BackgroundPathAndBytesable<String>,
CreateModable<ACLBackgroundPathAndBytesable<String>>,
ACLCreateModeBackgroundPathAndBytesable<String>,
Compressible<CreateBackgroundModeACLable>
{
/**
* Causes any parent nodes to get created if they haven't already been
*
* @return this
*/
public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded();
/**
* @deprecated this has been generalized to support all create modes. Instead, use:
* <pre>
* client.create().withProtection().withMode(CreateMode.PERSISTENT_SEQUENTIAL)...
* </pre>
* @return this
*/
public ACLPathAndBytesable<String> withProtectedEphemeralSequential();
...
}
public interface BackgroundPathAndBytesable<T> extends
Backgroundable<PathAndBytesable<T>>,
PathAndBytesable<T>
{
}
public interface PathAndBytesable<T>
{
/**
* Commit the currently building operation using the given path and data
*
* @param path the path
* @param data the data
* @return operation result if any
* @throws Exception errors
*/
public T forPath(String path, byte[] data) throws Exception;
/**
* Commit the currently building operation using the given path and the default data
* for the client (usually a byte[0] unless changed via
* {@link CuratorFrameworkFactory.Builder#defaultData(byte[])}).
*
* @param path the path
* @return operation result if any
* @throws Exception errors
*/
public T forPath(String path) throws Exception;
}
public interface CreateModable<T>
{
/**
* Set a create mode - the default is {@link CreateMode#PERSISTENT}
*
* @param mode new create mode
* @return this
*/
public T withMode(CreateMode mode);
}
public enum CreateMode {
/**
* The znode will not be automatically deleted upon client's disconnect.
*/
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
*/
PERSISTENT_SEQUENTIAL (2, false, true),
/**
* The znode will be deleted upon the client's disconnect.
*/
EPHEMERAL (1, true, false),
/**
* The znode will be deleted upon the client's disconnect, and its name
* will be appended with a monotonically increasing number.
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
...
}
从上面可以看出,创建构建器CreateBuilder,主要提供了创建持久化和临时节点的操作。
我们来看创建构建器CreateBuilder的实现,我们先从CuratorFramework框架看起:
public class CuratorFrameworkImpl implements CuratorFramework
{
//ZK客户端
private final CuratorZookeeperClient client;
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
//后台执行任务
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
//线程池服务
private volatile ExecutorService executorService;
interface DebugBackgroundListener
{
void listen(OperationAndData<?> data);
}
volatile DebugBackgroundListener debugListener = null;
private final AtomicReference<CuratorFrameworkState> state;
//客户端验证信息
private static class AuthInfo
{
final String scheme;
final byte[] auth;
private AuthInfo(String scheme, byte[] auth)
{
this.scheme = scheme;
this.auth = auth;
}
@Override
public String toString()
{
return "AuthInfo{" +
"scheme='" + scheme + '\'' +
", auth=" + Arrays.toString(auth) +
'}';
}
}
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl
(
CuratorFrameworkImpl.this,
CuratorEventType.WATCHED,
watchedEvent.getState().getIntValue(),
unfixForNamespace(watchedEvent.getPath()),
null,
null,
null,
null,
null,
watchedEvent,
null
);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
if ( builder.getAuthScheme() != null )
{
authInfo.set(new AuthInfo(builder.getAuthScheme(), builder.getAuthValue()));
}
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
//构造ZK客户端工厂
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
AuthInfo auth = authInfo.get();
if ( auth != null )
{
zooKeeper.addAuthInfo(auth.scheme, auth.auth);
}
return zooKeeper;
}
};
}
//获取客户端框架线程工厂
private ThreadFactory getThreadFactory(CuratorFrameworkFactory.Builder builder)
{
ThreadFactory threadFactory = builder.getThreadFactory();
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("CuratorFramework");
}
return threadFactory;
}
}
public class ThreadUtils
{
public static ExecutorService newSingleThreadExecutor(String processName)
{
return Executors.newSingleThreadExecutor(newThreadFactory(processName));
}
public static ExecutorService newFixedThreadPool(int qty, String processName)
{
return Executors.newFixedThreadPool(qty, newThreadFactory(processName));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName)
{
return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName));
}
public static ScheduledExecutorService newFixedThreadScheduledPool(int qty, String processName)
{
return Executors.newScheduledThreadPool(qty, newThreadFactory(processName));
}
public static ThreadFactory newThreadFactory(String processName)
{
return new ThreadFactoryBuilder()
.setNameFormat(processName + "-%d")
.setDaemon(true)
.build();
}
}
来看一下客户端的实现CuratorFrameworkImpl的内部成员变量
public class CuratorZookeeperClient implements Closeable
{
private final ConnectionState state;
//重试策略
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
private final int connectionTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
/**
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
* Return the managed ZK instance.
*
* @return client the client
* @throws Exception if the connection timeout has elapsed or an exception occurs in a background process
*/
public ZooKeeper getZooKeeper() throws Exception
{
Preconditions.checkState(started.get(), "Client is not started");
return state.getZooKeeper();
}
/**
* Return a new retry loop. All operations should be performed in a retry loop
*
* @return new retry loop
*/
public RetryLoop newRetryLoop()
{
return new RetryLoop(retryPolicy.get(), tracer);
}
}
从上面可以看出,Curator zk客户端CuratorZookeeperClient主要用于获取原生API ZK客户端,以及用于重新创建失效会话,执行相应的CDRWA操作。
//
public class CuratorFrameworkImpl implements CuratorFramework
{
@Override
public CreateBuilder create()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new CreateBuilderImpl(this);
}
}
下面来看创建构建器的创建目录操作:
class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
{
private final CuratorFrameworkImpl client;//Curator框架客户端
private CreateMode createMode;//创建模式
private Backgrounding backgrounding;
private boolean createParentsIfNeeded;
private boolean doProtected;
private boolean compress;
private String protectedId;
private ACLing acling;
@Override
public String forPath(final String givenPath, byte[] data) throws Exception
{
if ( compress )
{
//如果需要,压缩数据
data = client.getCompressionProvider().compress(givenPath, data);
}
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
String returnPath = null;
if ( backgrounding.inBackground() )
{
//放在后台执行
pathInBackground(adjustedPath, data, givenPath);
}
else
{
//前台执行
String path = protectedPathInForeground(adjustedPath, data);
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
//前台执行
private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
{
try
{
//创建路径
return pathInForeground(adjustedPath, data);
}
catch ( KeeperException.ConnectionLossException e )
{
if ( protectedId != null )
{
/*
* CURATOR-45 : we don't know if the create operation was successful or not,
* register the znode to be sure it is deleted later.
如果出现异常,确保创建的目录删除
*/
findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
/*
* The current UUID is scheduled to be deleted, it is not safe to use it again.
* If this builder is used again later create a new UUID
*/
protectedId = UUID.randomUUID().toString();
}
throw e;
}
}
//创建路径
private String pathInForeground(final String path, final byte[] data) throws Exception
{
//获取zk客户端追踪器,类似于异常堆栈
TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
final AtomicBoolean firstTime = new AtomicBoolean(true);
//如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调。
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
boolean localFirstTime = firstTime.getAndSet(false);
String createdPath = null;
if ( !localFirstTime && doProtected )
{
createdPath = findProtectedNodeInForeground(path);
}
if ( createdPath == null )
{
try
{
//实际操作委托给Curator框架内部的原生API zk客户端
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
catch ( KeeperException.NoNodeException e )
{
if ( createParentsIfNeeded )
{
//如果需要创建建父目录,并且父目录不存在,则创建父目录
ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
else
{
throw e;
}
}
}
if ( failNextCreateForTesting )
{
failNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
return createdPath;
}
}
);
trace.commit();
return returnPath;
}
}
从上面可以看出,Curator框架实现,创建目录实际上委托给Curator框架内部的原生API zk客户端,如果需要创建建父目录,并且父目录不存在,则创建父目录。 如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调Callable。
来简单看一下RetryLoop:
public class RetryLoop
{
private boolean isDone = false;//操作是否完成
private int retryCount = 0;//尝试次数
private final Logger log = LoggerFactory.getLogger(getClass());
private final long startTimeMs = System.currentTimeMillis();
private final RetryPolicy retryPolicy;
private final AtomicReference<TracerDriver> tracer;
private static final RetrySleeper sleeper = new RetrySleeper()
{
@Override
public void sleepFor(long time, TimeUnit unit) throws InterruptedException
{
unit.sleep(time);
}
};
/**
* Returns the default retry sleeper
*
* @return sleeper
*/
public static RetrySleeper getDefaultRetrySleeper()
{
return sleeper;
}
/**
* Convenience utility: creates a retry loop calling the given proc and retrying if needed
*
* @param client Zookeeper
* @param proc procedure to call with retry
* @param <T> return type
* @return procedure result
* @throws Exception any non-retriable errors
*/
public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
T result = null;
RetryLoop retryLoop = client.newRetryLoop();
//如果操作没有完成
while ( retryLoop.shouldContinue() )
{
try
{
//
client.internalBlockUntilConnectedOrTimedOut();
result = proc.call();
//标记操作执行完成
retryLoop.markComplete();
}
catch ( Exception e )
{
retryLoop.takeException(e);
}
}
return result;
}
/**
* If true is returned, make an attempt at the operation
* 如果操作没有执行完,则继续执行
* @return true/false
*/
public boolean shouldContinue()
{
return !isDone;
}
/**
* Call this when your operation has successfully completed
标记操作已完成
*/
public void markComplete()
{
isDone = true;
}
/**
* Pass any caught exceptions here
*
* @param exception the exception
* @throws Exception if not retry-able or the retry policy returned negative
*/
public void takeException(Exception exception) throws Exception
{
boolean rethrow = true;
if ( isRetryException(exception) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry-able exception received", exception);
}
if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) )
{
tracer.get().addCount("retries-allowed", 1);
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retrying operation");
}
rethrow = false;
}
else
{
tracer.get().addCount("retries-disallowed", 1);
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry policy not allowing retry");
}
}
}
if ( rethrow )
{
throw exception;
}
}
}
我们回到删除构建器
删除节点DeleteBuilder
public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
{
}
public interface ChildrenDeletable extends BackgroundVersionable
{
/**
* <p>
* Will also delete children if they exist.
* </p>
* @return
*/
public BackgroundVersionable deletingChildrenIfNeeded();
}
public interface BackgroundVersionable extends
BackgroundPathable<Void>,
Versionable<BackgroundPathable<Void>>
{
}
public interface Versionable<T>
{
/**
* Use the given version (the default is -1)
*
* @param version version to use
* @return this
*/
public T withVersion(int version);
}
public interface BackgroundPathable<T> extends
Backgroundable<Pathable<T>>,
Pathable<T>
{
}
public interface Pathable<T>
{
/**
* Commit the currently building operation using the given path
*
* @param path the path
* @return operation result if any
* @throws Exception errors
*/
public T forPath(String path) throws Exception;
}
来看实现:
//CuratorFrameworkImpl
@Override
public DeleteBuilder delete()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new DeleteBuilderImpl(this);
}
class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
{
private final CuratorFrameworkImpl client;
private int version;
private Backgrounding backgrounding;
private boolean deletingChildrenIfNeeded;//是否需要删除子目录
private boolean guaranteed;
@Override
public Void forPath(String path) throws Exception
{
final String unfixedPath = path;
path = client.fixForNamespace(path);
if ( backgrounding.inBackground() )
{
OperationAndData.ErrorCallback<String> errorCallback = null;
if ( guaranteed )
{
errorCallback = new OperationAndData.ErrorCallback<String>()
{
@Override
public void retriesExhausted(OperationAndData<String> operationAndData)
{
client.getFailedDeleteManager().addFailedDelete(unfixedPath);
}
};
}
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
}
else
{
pathInForeground(path, unfixedPath);
}
return null;
}
private void pathInForeground(final String path, String unfixedPath) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
try
{
RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try {
client.getZooKeeper().delete(path, version);
} catch (KeeperException.NotEmptyException e) {
if (deletingChildrenIfNeeded) {
ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
} else {
throw e;
}
}
return null;
}
}
);
}
catch ( KeeperException.NodeExistsException e )
{
throw e;
}
catch ( Exception e )
{
if ( guaranteed )
{
client.getFailedDeleteManager().addFailedDelete(unfixedPath);
}
throw e;
}
trace.commit();
}
}
从上面可以看出,删除构建器DeleteBuilder,删除目录,实际操作在一个重试循环中,如果会话过期,则重新连接会话,并将实际删除操作委托给Curator框架内部的原生API zk客户端。
获取节点数据GetDataBuilder
//CuratorFrameworkImpl
@Override
public GetDataBuilder getData()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new GetDataBuilderImpl(this);
}
//GetDataBuilderImpl
class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFrameworkImpl client;
private Stat responseStat;
private Watching watching;
private Backgrounding backgrounding;
private boolean decompress;
@Override
public byte[] forPath(String path) throws Exception
{
path = client.fixForNamespace(path);
byte[] responseData = null;
if ( backgrounding.inBackground() )
{
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
responseData = pathInForeground(path);
}
return responseData;
}
private byte[] pathInForeground(final String path) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground");
byte[] responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
public byte[] call() throws Exception
{
byte[] responseData;
if ( watching.isWatched() )
{
responseData = client.getZooKeeper().getData(path, true, responseStat);
}
else
{
responseData = client.getZooKeeper().getData(path, watching.getWatcher(), responseStat);
}
return responseData;
}
}
);
trace.commit();
return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
}
}
这里不再讲解,一看就明白。
设置节点数据SetDataBuilder
//CuratorFrameworkImpl
@Override
public SetDataBuilder setData()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new SetDataBuilderImpl(this);
}
class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>
{
private final CuratorFrameworkImpl client;
private Backgrounding backgrounding;
private int version;
private boolean compress;
SetDataBuilderImpl(CuratorFrameworkImpl client)
{
this.client = client;
backgrounding = new Backgrounding();
version = -1;
compress = false;
}
@Override
public Stat forPath(String path) throws Exception
{
return forPath(path, client.getDefaultData());
}
@Override
public Stat forPath(String path, byte[] data) throws Exception
{
if ( compress )
{
data = client.getCompressionProvider().compress(path, data);
}
path = client.fixForNamespace(path);
Stat resultStat = null;
if ( backgrounding.inBackground() )
{
client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
resultStat = pathInForeground(path, data);
}
return resultStat;
}
int getVersion()
{
return version;
}
private Stat pathInForeground(final String path, final byte[] data) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setData(path, data, version);
}
}
);
trace.commit();
return resultStat;
}
}
检查节点ExistsBuilder
//CuratorFrameworkImpl
@Override
public ExistsBuilder checkExists()
{
Preconditions.checkState(isStarted(), "instance must be started before calling this method");
return new ExistsBuilderImpl(this);
}
class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
{
private final CuratorFrameworkImpl client;
private Backgrounding backgrounding;
private Watching watching;
@Override
public Stat forPath(String path) throws Exception
{
path = client.fixForNamespace(path);
Stat returnStat = null;
if ( backgrounding.inBackground() )
{
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
returnStat = pathInForeground(path);
}
return returnStat;
}
private Stat pathInForeground(final String path) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground");
Stat returnStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
Stat returnStat;
if ( watching.isWatched() )
{
returnStat = client.getZooKeeper().exists(path, true);
}
else
{
returnStat = client.getZooKeeper().exists(path, watching.getWatcher());
}
return returnStat;
}
}
);
trace.commit();
return returnStat;
}
}
再来看获取客户端连接状态监听器Listenable
监听器操作
先来看Curator监听器
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener
{
/**
* Called when a background task has completed or a watch has triggered
*
* @param client client
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
public interface CuratorEvent
{
/**
* check here first - this value determines the type of event and which methods will have
* valid values
*
* @return event type
*/
public CuratorEventType getType();
/**
* @return "rc" from async callbacks
*/
public int getResultCode();
/**
* @return the path
*/
public String getPath();
/**
* @return the context object passed to {@link Backgroundable#inBackground(Object)}
*/
public Object getContext();
/**
* @return any stat
*/
public Stat getStat();
/**
* @return any data
*/
public byte[] getData();
/**
* @return any name
*/
public String getName();
/**
* @return any children
*/
public List<String> getChildren();
/**
* @return any ACL list or null
*/
public List<ACL> getACLList();
/**
* If {@link #getType()} returns {@link CuratorEventType#WATCHED} this will
* return the WatchedEvent
*
* @return any WatchedEvent
*/
public WatchedEvent getWatchedEvent();
}
public enum CuratorEventType
{
/**
* Corresponds to {@link CuratorFramework#create()}
*/
CREATE,
/**
* Corresponds to {@link CuratorFramework#delete()}
*/
DELETE,
/**
* Corresponds to {@link CuratorFramework#checkExists()}
*/
EXISTS,
/**
* Corresponds to {@link CuratorFramework#getData()}
*/
GET_DATA,
/**
* Corresponds to {@link CuratorFramework#setData()}
*/
SET_DATA,
/**
* Corresponds to {@link CuratorFramework#getChildren()}
*/
CHILDREN,
/**
* Corresponds to {@link CuratorFramework#sync(String, Object)}
*/
SYNC,
/**
* Corresponds to {@link CuratorFramework#getACL()}
*/
GET_ACL,
/**
* Corresponds to {@link CuratorFramework#setACL()}
*/
SET_ACL,
/**
* Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
*/
WATCHED,
/**
* Event sent when client is being closed
*/
CLOSING
}
异常监听器:
public interface UnhandledErrorListener
{
/**
* Called when an exception is caught in a background thread, handler, etc. Before this
* listener is called, the error will have been logged and a {@link ConnectionState#LOST}
* event will have been queued for any {@link ConnectionStateListener}s.
*
* @param message Source message
* @param e exception
*/
public void unhandledError(String message, Throwable e);
}
再来看监听器容器:
/**
* Abstracts a listenable object
*/
public interface Listenable<T>
{
/**
* Add the given listener. The listener will be executed in the containing
* instance's thread.
*
* @param listener listener to add
*/
public void addListener(T listener);
/**
* Add the given listener. The listener will be executed using the given
* executor
*
* @param listener listener to add
* @param executor executor to run listener in
*/
public void addListener(T listener, Executor executor);
/**
* Remove the given listener
*
* @param listener listener to remove
*/
public void removeListener(T listener);
}
/**
* Abstracts an object that has listeners
*/
public class ListenerContainer<T> implements Listenable<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
...
}
再来看状态管理器:
public class ConnectionStateManager implements Closeable
{
private static final int QUEUE_SIZE;
static
{
int size = 25;
String property = System.getProperty("ConnectionStateManagerSize", null);
if ( property != null )
{
try
{
size = Integer.parseInt(property);
}
catch ( NumberFormatException ignore )
{
// ignore
}
}
QUEUE_SIZE = size;
}
private final Logger log = LoggerFactory.getLogger(getClass());
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
private final CuratorFramework client;
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
// guarded by sync
private ConnectionState currentConnectionState;
private enum State
{
LATENT,
STARTED,
CLOSED
}
/**
* Start the manager
*/
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
private void processEvents()
{
try
{
while ( !Thread.currentThread().isInterrupted() )
{
final ConnectionState newState = eventQueue.take();
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
listeners.forEach
(
new Function<ConnectionStateListener, Void>()
{
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
回到Curator框架的启动和关闭
CuratorFrameworkImpl启动关闭
@Override
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
IllegalStateException error = new IllegalStateException();
log.error("Cannot be started more than once", error);
throw error;
}
try
{
//启动连接状态管理器
connectionStateManager.start(); // ordering dependency - must be called before client.start()
//启动客户端
client.start();
//创建执行器
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
executorService.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
//执行后台任务
backgroundOperationsLoop();
return null;
}
}
);
}
catch ( Exception e )
{
//处理异常
handleBackgroundOperationException(null, e);
}
}
- 先来看启动客户端
```java
//启动客户端
client.start();
//CuratorZookeeperClient
public class CuratorZookeeperClient implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionState state;
private final AtomicReference
retryPolicy = new AtomicReference (); private final int connectionTimeoutMs; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicReference tracer = new AtomicReference (new DefaultTracerDriver()); /** - Must be called after construction *
-
@throws IOException errors */ public void start() throws Exception { log.debug(“Starting”);
if ( !started.compareAndSet(false, true) ) { IllegalStateException error = new IllegalStateException(); log.error(“Already started”, error); throw error; } //启动连接状态 state.start(); } /**
-
Close the client */ public void close() { log.debug(“Closing”);
started.set(false); try { state.close(); } catch ( IOException e ) { log.error(“”, e); } } void internalBlockUntilConnectedOrTimedOut() throws InterruptedException { long waitTimeMs = connectionTimeoutMs; while ( !state.isConnected() && (waitTimeMs > 0) ) { final CountDownLatch latch = new CountDownLatch(1); Watcher tempWatcher = new Watcher() { @Override public void process(WatchedEvent event) { latch.countDown(); } };
state.addParentWatcher(tempWatcher); long startTimeMs = System.currentTimeMillis(); try { latch.await(1, TimeUnit.SECONDS); } finally { state.removeParentWatcher(tempWatcher); } long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs); waitTimeMs -= elapsed; } } } //ConnectionState 实现了Watcher class ConnectionState implements Watcher, Closeable { private static final int MAX_BACKGROUND_EXCEPTIONS = 10; private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS); private final Logger log = LoggerFactory.getLogger(getClass()); private final HandleHolder zooKeeper; private final AtomicBoolean isConnected = new AtomicBoolean(false); private final EnsembleProvider ensembleProvider; private final int sessionTimeoutMs; private final int connectionTimeoutMs; private final AtomicReference<TracerDriver> tracer; private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); private final AtomicLong instanceIndex = new AtomicLong(); private volatile long connectionStartMs = 0; void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); reset(); }
@Override public void close() throws IOException { log.debug(“Closing”);
CloseableUtils.closeQuietly(ensembleProvider); try { zooKeeper.closeAndClear(); } catch ( Exception e ) { throw new IOException(e); } finally { isConnected.set(false); } } void addParentWatcher(Watcher watcher) { parentWatchers.offer(watcher); }
void removeParentWatcher(Watcher watcher) { parentWatchers.remove(watcher); } @Override public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug(“ConnectState watcher: “ + event); }
for ( Watcher parentWatcher : parentWatchers ) { TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get()); parentWatcher.process(event); timeTrace.commit(); } boolean wasConnected = isConnected.get(); boolean newIsConnected = wasConnected; if ( event.getType() == Watcher.Event.EventType.None ) { newIsConnected = checkState(event.getState(), wasConnected); } if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); } } ... } ```
- 执行后台任务 ```java //执行后台任务 backgroundOperationsLoop(); // CuratorFrameworkImpl private void backgroundOperationsLoop() { while ( !Thread.interrupted() ) { OperationAndData<?> operationAndData; try { //消费后台操作任务集 operationAndData = backgroundOperations.take(); if ( debugListener != null ) { debugListener.listen(operationAndData); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); break; } //执行后台任务 performBackgroundOperation(operationAndData); } } private void performBackgroundOperation(OperationAndData<?> operationAndData) { try { if ( client.isConnected() ) { //处于连接中,则直接执行 operationAndData.callPerformBackgroundOperation(); } else { client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() ) { throw new CuratorConnectionLossException(); } operationAndData.sleepFor(1, TimeUnit.SECONDS); //放入后台任务队列 queueOperation(operationAndData); } } catch ( Throwable e ) { /** * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy * and callbacks need to get invoked, etc. */ if ( e instanceof CuratorConnectionLossException ) { WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null); CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null); if ( checkBackgroundRetry(operationAndData, event) ) { queueOperation(operationAndData); } else { logError(“Background retry gave up”, e); } } else { handleBackgroundOperationException(operationAndData, e); } } }