0xTTEPX

Just do it, deeply...

Follow me on GitHub

Curator

write by donaldhan, 2018-06-18 12:16

引言

前一篇文章,我们分析了一下Zookeeper客户端ZkClient,先来回顾一下:

Zk客户端ZkClient主要的成员变量为,客户端连接IZkConnection,子节点监听器集IZkChildListener,节点数据监听器集IZkDataListener,当前状态KeeperState,事件锁ZkLock, 客户端状态监听器集IZkStateListener,事件线程ZkEventThread,序列化器ZkSerializer,最要的一点实现了 Watcher 接口。

节点数据监听器IZkDataListener,主要监控节点数据的变化,包括创建,变更,和删除事件。

子节点监听器IZkChildListener,监控路径子节点的变化,包括创建,变更,和删除事件。

客户端状态监听器IZkStateListener,处理连接状态的变更,并在会话过期时,重新创建连接。

事件锁,为可重入锁,有三个条件,分别为节点数据变更,会话状态变更,节点事件条件。

序列化器ZkSerializer,用于序列化,发送给Zkserver的数据,反序列化,从zk服务器接受的数据。

Zkclient的构造,主要是初始化Zk会话连接,会话超时时间和会话连接超时时间。默认的序列化器为SerializableSerializer,同时我们可以自己实现字节的序列化器。

会话接口IZkConnection,主要提供了ZK的CRWDA操作,这个与[Zk原生API的客户端socket][]作用相同。

ZkClient会话客户端ZkConnection,主要成员变量,一个为远程Zk客户端ZooKeeper,一个用户控制会话连接与关闭的可重入锁ReentrantLock。 连接操作,主要是创建原生Zookeeper客户端,关闭操作实际,是关闭原生Zookeeper客户端。 CDRWA操作实际委托给内部的原生Zookeeper客户端,ZkClient会话客户端连接ZkConnection,面向的能染是字节流。 创建zk目录时,我们可以根据布尔参数createParents,来决定是否需要创建父目录,实际操作委托给内部的ZkClient会话连接。 删除操作,当会话失去连接时,重新连接,通过回调再执行删除目录操作,实际操作委托给内部的ZkClient会话连接。 检查目录是否存在操作,当会话失去连接时,重新连接,通过回调再执行检查目录操作,实际操作委托给内部的ZkClient会话连接。 读操作的如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话读取目录数据,如果存在目录监听器,则触发目录监听器,同时反序列化读取的字节序列。 写操作先序列化数据,如果失去连接,则重新连接,连接成功后,通过回调,委托ZkClient会话写目录数据。

事件线程ZkEventThread内部有一个zk事件ZkEvent队列LinkedBlockingQueue,事件线程的主要任务是,消费zk事件ZkEvent队列中的 事件,并执行相应的事件。

ZkClient实现Watcher的目的主要处理目录变更和会话状态变更相关事件,对于在会话关闭时,触发的事件,直接丢弃。 状态变更事件处理,主要是将触发状态监听任务保证成ZK事件ZkEvent,放入事件线程的事件队列中,如果会话过期,则重新连接。

触发目录变更及子目录变更事件的原理和状态变更基本相同,都是将触发监听器操作包装成包装成ZK事件ZkEvent,放入事件线程ZkEventThread的事件队列中,对于目录变更事件,则重新注册监听器, 从而避免了原生API的重复注册的弊端。

这篇文章所有使用的示例代码可以参考zookeeper-demo

目录

Curator

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端编程。 Curator是对ZK的高阶封装. 与操作原生的Zookeeper相比, 它提供了对ZK的完美封装, 简化了对集群的连接, 错误的处理; 实现了一系列经典”模式”, 比如分布式锁, Leader选举等。 ZooKeeper本身自带一个Java客户端,但使用这个客户端繁琐而且容易出错。客户端的使用者需要做大量的手动维护性工作。比如:

  • 连接问题

    初始化连接:ZooKeeper客户端与服务器进行握手,这需要花一些时间。如果握手未完成,任何要与服务器端同步执行的方法(如,create(),getData()等)都会抛出异常。 Failover:如果ZooKeeper客户端与服务器连接断开,它会failover到集群中另外一台服务器。然后,这个过程会使客户端退回到”初始化连接”的模式。 Session过期:有些边际情况可以导致ZooKeeper session过期。客户端需要监视这个状态,关闭并重建ZooKeeper客户端实例。

  • 恢复问题

    当在Server创建顺序节点(sequential ZNode)时,有可能出现这种情况:节点成功创建了,但server在将节点名返回给客户端之前崩溃了。 ZooKeeper客户端可能会抛出几个可恢复的异常,使用者需要捕捉这些异常并做重试操作。

  • Recipe方面

    标准的ZooKeeper recipe(如锁,选leader等)只是得到最低程序的描述,要正确地编写出来比较困难。 有一些重要的边界情况在recipe描述里没有提到。例如,锁recipe的描述中,没有说到如何处理服务器成功创建了顺序(Sequential)/临时(Ephemeral)节点,但在向客户端返回结点名之前就崩溃的情况。如果没有得到正确处理,可能会导致死锁。 某些使用场景下,必须要注意可能出现的连接问题。例如,选leader过程要监视连接的稳定性。如果连接到的服务器崩溃了,leader就不能假定自己继续为leader,除非已经成功failover到另外的服务器。

上述问题(和其它类似的问题)必须由每个ZooKeeper使用者来处理。问题解决方案既不容易编写,也不是显而易见的,需要消耗相当多的时间。而Curator处理了所有的问题。 Curator是什么

Curator n ˈkyoor͝ˌātər:,展品或者其它收藏品的看守者,管理员,ZooKeeper的Keeper。它由3个相关的项目组成: 1. curator-client - ZooKeeper自带客户端的替代者,它负责处理低层次的维护工作,并提供某些有用的小功能 2. curator-framework - Curator Framework大大地简化ZooKeeper使用的高层次API。它在ZooKeeper客户端之上添加了很多功能,并处理了与ZooKeeper集群连接管理和重试操作的复杂性。 3. curator-recipes - ZooKeeper某些通用recipe的实现。它是基于Curator Framework之上实现的。

Curator专注于锁,选Leader等这些recipe。大部分对ZooKeeper感兴趣的人不需要关心连接管理等细节。他们想要的只是简单的使用这些recipe。Curator就是以此作为目标。

Curator通过以下方式处理了使用ZooKeeper的复杂度:

1. 重试机制:Curator支持可插拔式的(pluggable)重试机制。所有会产生可恢复异常的ZooKeeper操作都会在配置好的重试策略下得到重试。Curator自带了几个标准的重试策略(如二元指数后退策略)。
2. 连接状态监视:Curator不断监视ZooKeeper连接的状态,Curator用户可以监听连接状态变化并相应的作出回应。
3. ZooKeeper客户端实例管理:Curator通过标准的ZooKeeper类实例来管理与ZooKeeper集群的实际连接。然而,这些实例是管理在内部(尽管你若需要也可以访问),在需要的时候被重新创建。因此,Curator提供了对ZooKeeper集群的可靠处理(不像ZooKeeper自带的实现)。
4. 正确,可靠的recipe:Curator实现了大部分重要的ZooKeeper recipe(还有一些附加的recipe)。它们的实现使用了ZooKeeper的最佳实践,处理了所有已知的边界情况(像前面所说的)。
Curator专注于那些让你的代码更强健,因为你完全专心于你感兴趣的ZooKeeper功能,而不用担心怎么正确完成那些的维护性工作。

ZooKeeper在Netflix

ZooKeeper/Curator在Netflix得到了广泛的使用。使用情景有: 1. InterProcessMutex在各种顺序ID生成器中被用来保证值的唯一性 2. Cassandra备份 3. TrackID服务 4. Chukwa收集器使用LeaderSelector来做各种维护性的任务 我们用了一些第三方的服务,但它们只允许有限数目的并发用户。InterProcessSemphore被用来处理这个。各种Cache。

下面我们来进入Curator客户端的创建及CDRWA操作原理。我们先从一个创建客户端示例入手:

CuratorFrameworkFactory

package org.donald.curator.session;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.constant.ConfigConstant;

/**
 * @ClassName: CreateSesssionSample
 * @Description: 使用curator来创建一个ZooKeeper客户端
 * @Author: Donaldhan
 * @Date: 2018-05-13 20:44
 */
@Slf4j
public class CreateSesssionSample {
    private static CuratorFramework client;

    public static void main(String[] args) {
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.newClient(ConfigConstant.IP,
                            ConfigConstant.SESSION_TIMEOUT,
                            ConfigConstant.CONNETING_TIMEOUT,
                            retryPolicy);
            client.start();
            log.info("success connected...");
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }
}

使用Fluent风格的API接口创建客户端

package org.donald.curator.session;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.constant.ConfigConstant;

/**
 * @ClassName: CreateSessionWithNamespace
 * @Description:
 * @Author: Donaldhan
 * @Date: 2018-05-13 21:13
 */
@Slf4j
public class CreateSessionWithNamespace {
    private static CuratorFramework client;
    public static void main(String[] args) {
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ConfigConstant.IP)
                            .sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
                            .connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
                            .retryPolicy(retryPolicy)
                            .namespace("base")
                            .build();
            client.start();
            log.info("success connected...");
            log.info("client namespace:{}",client.getNamespace());
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }

}

默认情况下,Curator使用CuratorFrameworkFactory创建客户端,同时可以使用重试策略,应对会话过期的情况。 来看一下使用直接创建客户端方式:

/**
 * Create a new client with default session timeout and default connection timeout
 *
 *
 * @param connectString list of servers to connect to
 * @param retryPolicy retry policy to use
 * @return client
 */
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
    return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}

/**
 * Create a new client
 *
 *
 * @param connectString list of servers to connect to
 * @param sessionTimeoutMs session timeout
 * @param connectionTimeoutMs connection timeout
 * @param retryPolicy retry policy to use
 * @return client
 */
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
    return builder().
        connectString(connectString).
        sessionTimeoutMs(sessionTimeoutMs).
        connectionTimeoutMs(connectionTimeoutMs).
        retryPolicy(retryPolicy).
        build();
}

从上面可以看出直接创建客户端,内部其实也是走Fluent风格。

来看CuratorFrameworkFactory的定义:

/**
 * Factory methods for creating framework-style clients
 */
public class CuratorFrameworkFactory
{   
    //默认的会话超时与连接超时时间
    private static final int        DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
    private static final int        DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
    //本地地址
    private static final byte[]     LOCAL_ADDRESS = getLocalAddress();
    //字节压缩器
    private static final CompressionProvider        DEFAULT_COMPRESSION_PROVIDER = new GzipCompressionProvider();
    //默认的Zookeeper工厂
    private static final DefaultZookeeperFactory    DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
    //默认ACL提供器
    private static final DefaultACLProvider         DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
    private static final long                       DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
    private static byte[] getLocalAddress()
   {
      try
      {
          return InetAddress.getLocalHost().getHostAddress().getBytes();
      }
      catch ( UnknownHostException ignore )
      {
          // ignore
      }
      return new byte[0];
  }
}

从上面可以看出,Curator框架工厂CuratorFrameworkFactory内部,主要成员变量为默认的会话超时与连接超时时间,本地地址,字节压缩器GzipCompressionProvider, 默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。

下面我们分别来看字节压缩器GzipCompressionProvider,默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。

字节压缩器GzipCompressionProvider

package org.apache.curator.framework.imps;

import org.apache.curator.framework.api.CompressionProvider;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class GzipCompressionProvider implements CompressionProvider
{
    @Override
    public byte[] compress(String path, byte[] data) throws Exception
    {
        ByteArrayOutputStream       bytes = new ByteArrayOutputStream();
        GZIPOutputStream            out = new GZIPOutputStream(bytes);
        out.write(data);
        out.finish();
        return bytes.toByteArray();
    }

    @Override
    public byte[] decompress(String path, byte[] compressedData) throws Exception
    {
        ByteArrayOutputStream       bytes = new ByteArrayOutputStream(compressedData.length);
        GZIPInputStream             in = new GZIPInputStream(new ByteArrayInputStream(compressedData));
        byte[]                      buffer = new byte[compressedData.length];
        for(;;)
        {
            int     bytesRead = in.read(buffer, 0, buffer.length);
            if ( bytesRead < 0 )
            {
                break;
            }
            bytes.write(buffer, 0, bytesRead);
        }
        return bytes.toByteArray();
    }
}

GzipCompressionProvider用于压缩字节流。

默认的Zookeeper工厂DefaultZookeeperFactory

package org.apache.curator.utils;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DefaultZookeeperFactory implements ZookeeperFactory
{
    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
    {
        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}

默认的Zookeeper工厂DefaultZookeeperFactory,创建原生Zookeeper客户端。

默认ACL提供器DefaultACLProvider

package org.apache.curator.framework.imps;

import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.util.List;

public class DefaultACLProvider implements ACLProvider
{
    @Override
    public List<ACL> getDefaultAcl()
    {
        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
    }

    @Override
    public List<ACL> getAclForPath(String path)
    {
        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
    }
}
public class ACL implements Record {
  private int perms;
  private org.apache.zookeeper.data.Id id;
  ...
}

从上面可以看出,DefaultACLProvider主要用户获取节点的ACL权限。

下面我们来看CuratorFrameworkFactory内部构建器:

CuratorFrameworkFactory内部构建器

public static class Builder
   {
       private EnsembleProvider    ensembleProvider;
       //会话超时与连接超时时间
       private int                 sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
       private int                 connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
       //重试策略
       private RetryPolicy         retryPolicy;
       //线程工程
       private ThreadFactory       threadFactory = null;
       //命名空间
       private String              namespace;
       //验证方式,及验证值
       private String              authScheme = null;
       private byte[]              authValue = null;
       private byte[]              defaultData = LOCAL_ADDRESS;
       //字节压缩器
       private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
       //原生API客户端工厂
       private ZookeeperFactory    zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
       //ACL提供器
       private ACLProvider         aclProvider = DEFAULT_ACL_PROVIDER;
       //是否为只读客户端
       private boolean             canBeReadOnly = false;
}

从上面可以看出,CuratorFrameworkFactory内部构建器Builder,除了会话超时与连接超时时间,字节压缩器,原生API客户端工厂, ACL提供器之外,还有线程工程ThreadFactory,验证方式,及验证值,及重试策略RetryPolicy。

先来看一下重试策略:


/**
 * Abstracts the policy to use when retrying connections
 */
public interface RetryPolicy
{
    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     *
     *
     * @param retryCount the number of times retried so far (0 the first time)
     * @param elapsedTimeMs the elapsed time in ms since the operation was attempted
     * @param sleeper use this to sleep - DO NOT call Thread.sleep
     * @return true/false
     */
    public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
}
/**
 * Abstraction for retry policies to sleep
 */
public interface RetrySleeper
{
    /**
     * Sleep for the given time
     *
     * @param time time
     * @param unit time unit
     * @throws InterruptedException if the sleep is interrupted
     */
    public void     sleepFor(long time, TimeUnit unit) throws InterruptedException;
}

我们来看RetryPolicy的一个实现:

/**
 * Retry policy that retries a set number of times with increasing sleep time between retries
 */
public class ExponentialBackoffRetry extends SleepingRetry
{
    private static final Logger     log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);

    private static final int MAX_RETRIES_LIMIT = 29;//默认最大尝试次数为29
    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;//默认尝试间隔

    private final Random random = new Random();
    private final int baseSleepTimeMs;//尝试间隔
    private final int maxSleepMs;//

    /**
     * @param baseSleepTimeMs initial amount of time to wait between retries
     * @param maxRetries max number of times to retry
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }

    /**
     * @param baseSleepTimeMs initial amount of time to wait between retries
     * @param maxRetries max number of times to retry
     * @param maxSleepMs max time in ms to sleep on each retry
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
    {
        super(validateMaxRetries(maxRetries));
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }

    @VisibleForTesting
    public int getBaseSleepTimeMs()
    {
        return baseSleepTimeMs;
    }
    //下次尝试等待时间
    @Override
    protected int getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        // copied from Hadoop's RetryPolicies.java
        int sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
        if ( sleepMs > maxSleepMs )
        {
            log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }
   //校验最大尝试次数
    private static int validateMaxRetries(int maxRetries)
    {
        if ( maxRetries > MAX_RETRIES_LIMIT )
        {
            log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
            maxRetries = MAX_RETRIES_LIMIT;
        }
        return maxRetries;
    }
}

abstract class SleepingRetry implements RetryPolicy
{
    private final int n;//最大尝试次数

    protected SleepingRetry(int n)
    {
        this.n = n;
    }

    // made public for testing
    public int getN()
    {
        return n;
    }
    //是否允许重试
    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        if ( retryCount < n )
        {
            try
            {
                sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
                return false;
            }
            return true;
        }
        return false;
    }

    protected abstract int   getSleepTimeMs(int retryCount, long elapsedTimeMs);
}

从上面可以看出,ExponentialBackoffRetry主要用户控制会话超时重连的次数和下次尝试时间。

我们回到构建起创建客户端:

/**
  * Apply the current values and build a new CuratorFramework
  *
  * @return new CuratorFramework
  */
public CuratorFramework build()
{
     return new CuratorFrameworkImpl(this);
}

从上面可以看出内部构建器,创建的实际为CuratorFrameworkImpl。

CuratorFramework

先来看一下CuratorFramework接口:


/**
 * Zookeeper framework-style client
 */
public interface CuratorFramework extends Closeable
{
    /**
     * Start the client. Most mutator methods will not work until the client is started
     */
    public void start();

    /**
     * Stop the client
     */
    public void close();

    /**
     * Returns the state of this instance
     *
     * @return state
     */
    public CuratorFrameworkState getState();

    /**
     * Return true if the client is started, not closed, etc.
     *
     * @return true/false
     * @deprecated use {@link #getState()} instead
     */
    public boolean isStarted();

    /**
     * Start a create builder
     *
     * @return builder object
     */
    public CreateBuilder create();

    /**
     * Start a delete builder
     *
     * @return builder object
     */
    public DeleteBuilder delete();

    /**
     * Start an exists builder
     * <p>
     * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
     * means that it does not exist and an actual Stat object means it does exist.
     *
     * @return builder object
     */
    public ExistsBuilder checkExists();

    /**
     * Start a get data builder
     *
     * @return builder object
     */
    public GetDataBuilder getData();

    /**
     * Start a set data builder
     *
     * @return builder object
     */
    public SetDataBuilder setData();

    /**
     * Start a get children builder
     *
     * @return builder object
     */
    public GetChildrenBuilder getChildren();

    /**
     * Start a get ACL builder
     *
     * @return builder object
     */
    public GetACLBuilder getACL();

    /**
     * Start a set ACL builder
     *
     * @return builder object
     */
    public SetACLBuilder setACL();

    /**
     * Start a transaction builder
     *
     * @return builder object
     */
    public CuratorTransaction inTransaction();

    /**
     * Perform a sync on the given path - syncs are always in the background
     *
     * @param path                    the path
     * @param backgroundContextObject optional context
     * @deprecated use {@link #sync()} instead
     */
    public void sync(String path, Object backgroundContextObject);

    /**
     * Start a sync builder. Note: sync is ALWAYS in the background even
     * if you don't use one of the background() methods
     *
     * @return builder object
     */
    public SyncBuilder sync();

    /**
     * Returns the listenable interface for the Connect State
     *
     * @return listenable
     */
    public Listenable<ConnectionStateListener> getConnectionStateListenable();

    /**
     * Returns the listenable interface for events
     *
     * @return listenable
     */
    public Listenable<CuratorListener> getCuratorListenable();

    /**
     * Returns the listenable interface for unhandled errors
     *
     * @return listenable
     */
    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();

    /**
     * Returns a facade of the current instance that does _not_ automatically
     * pre-pend the namespace to all paths
     *
     * @return facade
     * @deprecated use {@link #usingNamespace} passing <code>null</code>
     */
    public CuratorFramework nonNamespaceView();

    /**
     * Returns a facade of the current instance that uses the specified namespace
     * or no namespace if <code>newNamespace</code> is <code>null</code>.
     *
     * @param newNamespace the new namespace or null for none
     * @return facade
     */
    public CuratorFramework usingNamespace(String newNamespace);

    /**
     * Return the current namespace or "" if none
     *
     * @return namespace
     */
    public String getNamespace();

    /**
     * Return the managed zookeeper client
     *
     * @return client
     */
    public CuratorZookeeperClient getZookeeperClient();

    /**
     * Allocates an ensure path instance that is namespace aware
     *
     * @param path path to ensure
     * @return new EnsurePath instance
     */
    public EnsurePath newNamespaceAwareEnsurePath(String path);
}
/**
 * @see CuratorFramework#getState()
 */
public enum CuratorFrameworkState
{
    /**
     * {@link CuratorFramework#start()} has not yet been called
     */
    LATENT,

    /**
     * {@link CuratorFramework#start()} has been called
     */
    STARTED,

    /**
     * {@link CuratorFramework#close()} has been called
     */
    STOPPED
}

从上面可以看出,CuratorFramework主要提供了启动关闭客户端操作,及CDRWA相关的构建器,如创建节点CreateBuilder,删除节点DeleteBuilder,获取节点数据GetDataBuilder,设置节点数据SetACLBuilder, ,检查节点ExistsBuilder,同步数据构建器SyncBuilder, 事物构建器CuratorTransaction,ACL构建器GetACLBuilder、SetACLBuilder,提供了获取客户端连接状态监听器Listenable, 客户端监听器Listenable ,无处理错误监听器Listenable操作,同时提供了获取zk客户端CuratorZookeeperClient和确保路径的操作EnsurePath。

下面我们分别来看这些构建器的定义,及监听器的定义:

创建节点CreateBuilder

public interface CreateBuilder extends
    BackgroundPathAndBytesable<String>,
    CreateModable<ACLBackgroundPathAndBytesable<String>>,
    ACLCreateModeBackgroundPathAndBytesable<String>,
    Compressible<CreateBackgroundModeACLable>
{
    /**
     * Causes any parent nodes to get created if they haven't already been
     *
     * @return this
     */
    public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded();

    /**
     * @deprecated this has been generalized to support all create modes. Instead, use:
     * <pre>
     *     client.create().withProtection().withMode(CreateMode.PERSISTENT_SEQUENTIAL)...
     * </pre>
     * @return this
     */
    public ACLPathAndBytesable<String>              withProtectedEphemeralSequential();
    ...
}
public interface BackgroundPathAndBytesable<T> extends
    Backgroundable<PathAndBytesable<T>>,
    PathAndBytesable<T>
{
}
public interface PathAndBytesable<T>
{
    /**
     * Commit the currently building operation using the given path and data
     *
     * @param path the path
     * @param data the data
     * @return operation result if any
     * @throws Exception errors
     */
    public T        forPath(String path, byte[] data) throws Exception;

    /**
     * Commit the currently building operation using the given path and the default data
     * for the client (usually a byte[0] unless changed via
     * {@link CuratorFrameworkFactory.Builder#defaultData(byte[])}).
     *
     * @param path the path
     * @return operation result if any
     * @throws Exception errors
     */
    public T        forPath(String path) throws Exception;
}
public interface CreateModable<T>
{
    /**
     * Set a create mode - the default is {@link CreateMode#PERSISTENT}
     *
     * @param mode new create mode
     * @return this
     */
    public T withMode(CreateMode mode);
}
public enum CreateMode {

    /**
     * The znode will not be automatically deleted upon client's disconnect.
     */
    PERSISTENT (0, false, false),
    /**
    * The znode will not be automatically deleted upon client's disconnect,
    * and its name will be appended with a monotonically increasing number.
    */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * The znode will be deleted upon the client's disconnect.
     */
    EPHEMERAL (1, true, false),
    /**
     * The znode will be deleted upon the client's disconnect, and its name
     * will be appended with a monotonically increasing number.
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
    ...
}

从上面可以看出,创建构建器CreateBuilder,主要提供了创建持久化和临时节点的操作。

我们来看创建构建器CreateBuilder的实现,我们先从CuratorFramework框架看起:

public class CuratorFrameworkImpl implements CuratorFramework
{
    //ZK客户端
    private final CuratorZookeeperClient                                client;
    private final ListenerContainer<CuratorListener>                    listeners;
    private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
    private final ThreadFactory                                         threadFactory;
    //后台执行任务
    private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
    private final NamespaceImpl                                         namespace;
    private final ConnectionStateManager                                connectionStateManager;
    private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
    private final byte[]                                                defaultData;
    private final FailedDeleteManager                                   failedDeleteManager;
    private final CompressionProvider                                   compressionProvider;
    private final ACLProvider                                           aclProvider;
    private final NamespaceFacadeCache                                  namespaceFacadeCache;
    private final NamespaceWatcherMap                                   namespaceWatcherMap = new NamespaceWatcherMap(this);
    //线程池服务
    private volatile ExecutorService                                    executorService;
    interface DebugBackgroundListener
  {
      void        listen(OperationAndData<?> data);
  }
  volatile DebugBackgroundListener        debugListener = null;
  private final AtomicReference<CuratorFrameworkState>                    state;
  //客户端验证信息
  private static class AuthInfo
  {
      final String    scheme;
      final byte[]    auth;

      private AuthInfo(String scheme, byte[] auth)
      {
          this.scheme = scheme;
          this.auth = auth;
      }

      @Override
      public String toString()
      {
          return "AuthInfo{" +
              "scheme='" + scheme + '\'' +
              ", auth=" + Arrays.toString(auth) +
              '}';
      }
  }

  public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
  {
      ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
      this.client = new CuratorZookeeperClient
      (
          localZookeeperFactory,
          builder.getEnsembleProvider(),
          builder.getSessionTimeoutMs(),
          builder.getConnectionTimeoutMs(),
          new Watcher()
          {
              @Override
              public void process(WatchedEvent watchedEvent)
              {
                  CuratorEvent event = new CuratorEventImpl
                  (
                      CuratorFrameworkImpl.this,
                      CuratorEventType.WATCHED,
                      watchedEvent.getState().getIntValue(),
                      unfixForNamespace(watchedEvent.getPath()),
                      null,
                      null,
                      null,
                      null,
                      null,
                      watchedEvent,
                      null
                  );
                  processEvent(event);
              }
          },
          builder.getRetryPolicy(),
          builder.canBeReadOnly()
      );

      listeners = new ListenerContainer<CuratorListener>();
      unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
      backgroundOperations = new DelayQueue<OperationAndData<?>>();
      namespace = new NamespaceImpl(this, builder.getNamespace());
      threadFactory = getThreadFactory(builder);
      connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
      compressionProvider = builder.getCompressionProvider();
      aclProvider = builder.getAclProvider();
      state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);

      byte[]      builderDefaultData = builder.getDefaultData();
      defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];

      if ( builder.getAuthScheme() != null )
      {
          authInfo.set(new AuthInfo(builder.getAuthScheme(), builder.getAuthValue()));
      }

      failedDeleteManager = new FailedDeleteManager(this);
      namespaceFacadeCache = new NamespaceFacadeCache(this);
  }
  //构造ZK客户端工厂
  private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
  {
      return new ZookeeperFactory()
      {
          @Override
          public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
          {
              ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
              AuthInfo auth = authInfo.get();
              if ( auth != null )
              {
                  zooKeeper.addAuthInfo(auth.scheme, auth.auth);
              }

              return zooKeeper;
          }
      };
  }
  //获取客户端框架线程工厂
  private ThreadFactory getThreadFactory(CuratorFrameworkFactory.Builder builder)
  {
      ThreadFactory threadFactory = builder.getThreadFactory();
      if ( threadFactory == null )
      {
          threadFactory = ThreadUtils.newThreadFactory("CuratorFramework");
      }
      return threadFactory;
  }
}
public class ThreadUtils
{
    public static ExecutorService newSingleThreadExecutor(String processName)
    {
        return Executors.newSingleThreadExecutor(newThreadFactory(processName));
    }
    public static ExecutorService newFixedThreadPool(int qty, String processName)
    {
        return Executors.newFixedThreadPool(qty, newThreadFactory(processName));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName)
    {
        return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName));
    }
    public static ScheduledExecutorService newFixedThreadScheduledPool(int qty, String processName)
    {
        return Executors.newScheduledThreadPool(qty, newThreadFactory(processName));
    }
    public static ThreadFactory newThreadFactory(String processName)
    {
        return new ThreadFactoryBuilder()
            .setNameFormat(processName + "-%d")
            .setDaemon(true)
            .build();
    }
}

来看一下客户端的实现CuratorFrameworkImpl的内部成员变量

public class CuratorZookeeperClient implements Closeable
{
    private final ConnectionState                   state;
    //重试策略
    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();
    private final int                               connectionTimeoutMs;
    private final AtomicBoolean                     started = new AtomicBoolean(false);
    private final AtomicReference<TracerDriver>     tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());

    /**
     *
     * @param connectString list of servers to connect to
     * @param sessionTimeoutMs session timeout
     * @param connectionTimeoutMs connection timeout
     * @param watcher default watcher or null
     * @param retryPolicy the retry policy to use
     */
    public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
    {
        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
    }
    /**
     * Return the managed ZK instance.
     *
     * @return client the client
     * @throws Exception if the connection timeout has elapsed or an exception occurs in a background process
     */
    public ZooKeeper getZooKeeper() throws Exception
    {
        Preconditions.checkState(started.get(), "Client is not started");

        return state.getZooKeeper();
    }

    /**
     * Return a new retry loop. All operations should be performed in a retry loop
     *
     * @return new retry loop
     */
    public RetryLoop newRetryLoop()
    {
        return new RetryLoop(retryPolicy.get(), tracer);
    }
}

从上面可以看出,Curator zk客户端CuratorZookeeperClient主要用于获取原生API ZK客户端,以及用于重新创建失效会话,执行相应的CDRWA操作。

//
public class CuratorFrameworkImpl implements CuratorFramework
{
  @Override
  public CreateBuilder create()
  {
      Preconditions.checkState(isStarted(), "instance must be started before calling this method");

      return new CreateBuilderImpl(this);
  }
}

下面来看创建构建器的创建目录操作:

class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
{
    private final CuratorFrameworkImpl client;//Curator框架客户端
    private CreateMode createMode;//创建模式
    private Backgrounding backgrounding;
    private boolean createParentsIfNeeded;
    private boolean doProtected;
    private boolean compress;
    private String protectedId;
    private ACLing acling;

   @Override
    public String forPath(final String givenPath, byte[] data) throws Exception
    {
        if ( compress )
        {
            //如果需要,压缩数据
            data = client.getCompressionProvider().compress(givenPath, data);
        }

        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));

        String returnPath = null;
        if ( backgrounding.inBackground() )
        {
            //放在后台执行
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            //前台执行
            String path = protectedPathInForeground(adjustedPath, data);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }
      //前台执行
    private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
   {
       try
       {
           //创建路径
           return pathInForeground(adjustedPath, data);
       }
       catch ( KeeperException.ConnectionLossException e )
       {
           if ( protectedId != null )
           {
               /*
                * CURATOR-45 : we don't know if the create operation was successful or not,
                * register the znode to be sure it is deleted later.
                如果出现异常,确保创建的目录删除
                */
               findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
               /*
               * The current UUID is scheduled to be deleted, it is not safe to use it again.
               * If this builder is used again later create a new UUID
               */
               protectedId = UUID.randomUUID().toString();
           }
           throw e;
       }
   }
   //创建路径
   private String pathInForeground(final String path, final byte[] data) throws Exception
    {
        //获取zk客户端追踪器,类似于异常堆栈
        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");

        final AtomicBoolean firstTime = new AtomicBoolean(true);
        //如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调。
        String returnPath = RetryLoop.callWithRetry
            (
                client.getZookeeperClient(),
                new Callable<String>()
                {
                    @Override
                    public String call() throws Exception
                    {
                        boolean localFirstTime = firstTime.getAndSet(false);

                        String createdPath = null;
                        if ( !localFirstTime && doProtected )
                        {
                            createdPath = findProtectedNodeInForeground(path);
                        }

                        if ( createdPath == null )
                        {
                            try
                            {
                                //实际操作委托给Curator框架内部的原生API zk客户端
                                createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                            }
                            catch ( KeeperException.NoNodeException e )
                            {
                                if ( createParentsIfNeeded )
                                {
                                    //如果需要创建建父目录,并且父目录不存在,则创建父目录
                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                                }
                                else
                                {
                                    throw e;
                                }
                            }
                        }

                        if ( failNextCreateForTesting )
                        {
                            failNextCreateForTesting = false;
                            throw new KeeperException.ConnectionLossException();
                        }
                        return createdPath;
                    }
                }
            );

        trace.commit();
        return returnPath;
    }

}

从上面可以看出,Curator框架实现,创建目录实际上委托给Curator框架内部的原生API zk客户端,如果需要创建建父目录,并且父目录不存在,则创建父目录。 如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调Callable。

来简单看一下RetryLoop:

public class RetryLoop
{
    private boolean         isDone = false;//操作是否完成
    private int             retryCount = 0;//尝试次数

    private final Logger            log = LoggerFactory.getLogger(getClass());
    private final long              startTimeMs = System.currentTimeMillis();
    private final RetryPolicy       retryPolicy;
    private final AtomicReference<TracerDriver>     tracer;

    private static final RetrySleeper  sleeper = new RetrySleeper()
    {
        @Override
        public void sleepFor(long time, TimeUnit unit) throws InterruptedException
        {
            unit.sleep(time);
        }
    };

    /**
     * Returns the default retry sleeper
     *
     * @return sleeper
     */
    public static RetrySleeper      getDefaultRetrySleeper()
    {
        return sleeper;
    }

    /**
     * Convenience utility: creates a retry loop calling the given proc and retrying if needed
     *
     * @param client Zookeeper
     * @param proc procedure to call with retry
     * @param <T> return type
     * @return procedure result
     * @throws Exception any non-retriable errors
     */
    public static<T> T      callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
    {
        T               result = null;
        RetryLoop       retryLoop = client.newRetryLoop();
        //如果操作没有完成
        while ( retryLoop.shouldContinue() )
        {
            try
            {
                //
                client.internalBlockUntilConnectedOrTimedOut();
                result = proc.call();
                //标记操作执行完成
                retryLoop.markComplete();
            }
            catch ( Exception e )
            {
                retryLoop.takeException(e);
            }
        }
        return result;
    }
    /**
     * If true is returned, make an attempt at the operation
     * 如果操作没有执行完,则继续执行
     * @return true/false
     */
    public boolean      shouldContinue()
    {
        return !isDone;
    }
    /**
    * Call this when your operation has successfully completed
    标记操作已完成
    */
   public void     markComplete()
   {
       isDone = true;
   }
    /**
     * Pass any caught exceptions here
     *
     * @param exception the exception
     * @throws Exception if not retry-able or the retry policy returned negative
     */
    public void         takeException(Exception exception) throws Exception
    {
        boolean     rethrow = true;
        if ( isRetryException(exception) )
        {
            if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
            {
                log.debug("Retry-able exception received", exception);
            }

            if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) )
            {
                tracer.get().addCount("retries-allowed", 1);
                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                {
                    log.debug("Retrying operation");
                }
                rethrow = false;
            }
            else
            {
                tracer.get().addCount("retries-disallowed", 1);
                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                {
                    log.debug("Retry policy not allowing retry");
                }
            }
        }

        if ( rethrow )
        {
            throw exception;
        }
    }
}

我们回到删除构建器

删除节点DeleteBuilder

public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
{
}
public interface ChildrenDeletable extends BackgroundVersionable
{

    /**
     * <p>
     *     Will also delete children if they exist.
     * </p>
     * @return
     */
    public BackgroundVersionable deletingChildrenIfNeeded();
}
public interface BackgroundVersionable extends
    BackgroundPathable<Void>,
    Versionable<BackgroundPathable<Void>>
{
}
public interface Versionable<T>
{
    /**
     * Use the given version (the default is -1)
     *
     * @param version version to use
     * @return this
     */
    public T     withVersion(int version);
}
public interface BackgroundPathable<T> extends
    Backgroundable<Pathable<T>>,
    Pathable<T>
{
}
public interface Pathable<T>
{
    /**
     * Commit the currently building operation using the given path
     *
     * @param path the path
     * @return operation result if any
     * @throws Exception errors
     */
    public T       forPath(String path) throws Exception;
}

来看实现:

//CuratorFrameworkImpl
@Override
  public DeleteBuilder delete()
  {
      Preconditions.checkState(isStarted(), "instance must be started before calling this method");

      return new DeleteBuilderImpl(this);
  }
  class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
  {
      private final CuratorFrameworkImpl  client;
      private int                         version;
      private Backgrounding               backgrounding;
      private boolean                     deletingChildrenIfNeeded;//是否需要删除子目录
      private boolean                     guaranteed;
    @Override
   public Void forPath(String path) throws Exception
   {
       final String        unfixedPath = path;
       path = client.fixForNamespace(path);

       if ( backgrounding.inBackground() )
       {
           OperationAndData.ErrorCallback<String>  errorCallback = null;
           if ( guaranteed )
           {
               errorCallback = new OperationAndData.ErrorCallback<String>()
               {
                   @Override
                   public void retriesExhausted(OperationAndData<String> operationAndData)
                   {
                       client.getFailedDeleteManager().addFailedDelete(unfixedPath);
                   }
               };
           }
           client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
       }
       else
       {
           pathInForeground(path, unfixedPath);
       }
       return null;
   }
   private void pathInForeground(final String path, String unfixedPath) throws Exception
    {
        TimeTrace       trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
        try
        {
            RetryLoop.callWithRetry
            (
                client.getZookeeperClient(),
                new Callable<Void>()
                {
                    @Override
                    public Void call() throws Exception
                    {
                        try {
                        client.getZooKeeper().delete(path, version);
                        } catch (KeeperException.NotEmptyException e) {
                            if (deletingChildrenIfNeeded) {
                                ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
                            } else {
                                throw e;
                            }
                        }
                        return null;
                    }
                }
            );
        }
        catch ( KeeperException.NodeExistsException e )
        {
            throw e;
        }
        catch ( Exception e )
        {
            if ( guaranteed )
            {
                client.getFailedDeleteManager().addFailedDelete(unfixedPath);
            }
            throw e;
        }
        trace.commit();
    }
 }

从上面可以看出,删除构建器DeleteBuilder,删除目录,实际操作在一个重试循环中,如果会话过期,则重新连接会话,并将实际删除操作委托给Curator框架内部的原生API zk客户端。

获取节点数据GetDataBuilder

//CuratorFrameworkImpl
@Override
 public GetDataBuilder getData()
 {
     Preconditions.checkState(isStarted(), "instance must be started before calling this method");

     return new GetDataBuilderImpl(this);
 }
 //GetDataBuilderImpl
 class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>
{
    private final Logger                log = LoggerFactory.getLogger(getClass());
    private final CuratorFrameworkImpl  client;
    private Stat                        responseStat;
    private Watching                    watching;
    private Backgrounding               backgrounding;
    private boolean                     decompress;
    @Override
   public byte[] forPath(String path) throws Exception
   {
       path = client.fixForNamespace(path);

       byte[]      responseData = null;
       if ( backgrounding.inBackground() )
       {
           client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
       }
       else
       {
           responseData = pathInForeground(path);
       }
       return responseData;
   }

   private byte[] pathInForeground(final String path) throws Exception
   {
       TimeTrace   trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground");
       byte[]      responseData = RetryLoop.callWithRetry
       (
           client.getZookeeperClient(),
           new Callable<byte[]>()
           {
               @Override
               public byte[] call() throws Exception
               {
                   byte[]      responseData;
                   if ( watching.isWatched() )
                   {
                       responseData = client.getZooKeeper().getData(path, true, responseStat);
                   }
                   else
                   {
                       responseData = client.getZooKeeper().getData(path, watching.getWatcher(), responseStat);
                   }
                   return responseData;
               }
           }
       );
       trace.commit();

       return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
   }
}

这里不再讲解,一看就明白。

设置节点数据SetDataBuilder

//CuratorFrameworkImpl
@Override
    public SetDataBuilder setData()
    {
        Preconditions.checkState(isStarted(), "instance must be started before calling this method");

        return new SetDataBuilderImpl(this);
    }
    class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>
    {
        private final CuratorFrameworkImpl      client;
        private Backgrounding                   backgrounding;
        private int                             version;
        private boolean                         compress;

        SetDataBuilderImpl(CuratorFrameworkImpl client)
        {
            this.client = client;
            backgrounding = new Backgrounding();
            version = -1;
            compress = false;
        }
        @Override
   public Stat forPath(String path) throws Exception
   {
       return forPath(path, client.getDefaultData());
   }

   @Override
   public Stat forPath(String path, byte[] data) throws Exception
   {
       if ( compress )
       {
           data = client.getCompressionProvider().compress(path, data);
       }

       path = client.fixForNamespace(path);

       Stat        resultStat = null;
       if ( backgrounding.inBackground()  )
       {
           client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
       }
       else
       {
           resultStat = pathInForeground(path, data);
       }
       return resultStat;
   }

   int getVersion()
   {
       return version;
   }

   private Stat pathInForeground(final String path, final byte[] data) throws Exception
   {
       TimeTrace   trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Foreground");
       Stat        resultStat = RetryLoop.callWithRetry
       (
           client.getZookeeperClient(),
           new Callable<Stat>()
           {
               @Override
               public Stat call() throws Exception
               {
                   return client.getZooKeeper().setData(path, data, version);
               }
           }
       );
       trace.commit();
       return resultStat;
   }
}

检查节点ExistsBuilder

//CuratorFrameworkImpl
@Override
  public ExistsBuilder checkExists()
  {
      Preconditions.checkState(isStarted(), "instance must be started before calling this method");

      return new ExistsBuilderImpl(this);
  }
  class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
{
    private final CuratorFrameworkImpl client;
    private Backgrounding backgrounding;
    private Watching watching;
    @Override
   public Stat forPath(String path) throws Exception
   {
       path = client.fixForNamespace(path);

       Stat        returnStat = null;
       if ( backgrounding.inBackground() )
       {
           client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
       }
       else
       {
           returnStat = pathInForeground(path);
       }

       return returnStat;
   }

   private Stat pathInForeground(final String path) throws Exception
   {
       TimeTrace   trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground");
       Stat        returnStat = RetryLoop.callWithRetry
       (
           client.getZookeeperClient(),
           new Callable<Stat>()
           {
               @Override
               public Stat call() throws Exception
               {
                   Stat    returnStat;
                   if ( watching.isWatched() )
                   {
                       returnStat = client.getZooKeeper().exists(path, true);
                   }
                   else
                   {
                       returnStat = client.getZooKeeper().exists(path, watching.getWatcher());
                   }
                   return returnStat;
               }
           }
       );
       trace.commit();
       return returnStat;
   }
}

再来看获取客户端连接状态监听器Listenable, 客户端监听器Listenable ,无处理错误监听器Listenable操作

监听器操作

先来看Curator监听器

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener
{
    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void         eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

public interface CuratorEvent
{
    /**
     * check here first - this value determines the type of event and which methods will have
     * valid values
     *
     * @return event type
     */
    public CuratorEventType getType();

    /**
     * @return "rc" from async callbacks
     */
    public int getResultCode();

    /**
     * @return the path
     */
    public String getPath();

    /**
     * @return the context object passed to {@link Backgroundable#inBackground(Object)}
     */
    public Object getContext();

    /**
     * @return any stat
     */
    public Stat getStat();

    /**
     * @return any data
     */
    public byte[] getData();

    /**
     * @return any name
     */
    public String getName();

    /**
     * @return any children
     */
    public List<String> getChildren();

    /**
     * @return any ACL list or null
     */
    public List<ACL> getACLList();

    /**
     * If {@link #getType()} returns {@link CuratorEventType#WATCHED} this will
     * return the WatchedEvent
     *
     * @return any WatchedEvent
     */
    public WatchedEvent getWatchedEvent();
}
public enum CuratorEventType
{
    /**
     * Corresponds to {@link CuratorFramework#create()}
     */
    CREATE,

    /**
     * Corresponds to {@link CuratorFramework#delete()}
     */
    DELETE,

    /**
     * Corresponds to {@link CuratorFramework#checkExists()}
     */
    EXISTS,

    /**
     * Corresponds to {@link CuratorFramework#getData()}
     */
    GET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#setData()}
     */
    SET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#getChildren()}
     */
    CHILDREN,

    /**
     * Corresponds to {@link CuratorFramework#sync(String, Object)}
     */
    SYNC,

    /**
     * Corresponds to {@link CuratorFramework#getACL()}
     */
    GET_ACL,

    /**
     * Corresponds to {@link CuratorFramework#setACL()}
     */
    SET_ACL,

    /**
     * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
     */
    WATCHED,

    /**
     * Event sent when client is being closed
     */
    CLOSING
}

异常监听器:

public interface UnhandledErrorListener
{
    /**
     * Called when an exception is caught in a background thread, handler, etc. Before this
     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
     * event will have been queued for any {@link ConnectionStateListener}s.
     *
     * @param message Source message
     * @param e exception
     */
    public void     unhandledError(String message, Throwable e);
}

再来看监听器容器:

/**
 * Abstracts a listenable object
 */
public interface Listenable<T>
{
    /**
     * Add the given listener. The listener will be executed in the containing
     * instance's thread.
     *
     * @param listener listener to add
     */
    public void     addListener(T listener);

    /**
     * Add the given listener. The listener will be executed using the given
     * executor
     *
     * @param listener listener to add
     * @param executor executor to run listener in
     */
    public void     addListener(T listener, Executor executor);

    /**
     * Remove the given listener
     *
     * @param listener listener to remove
     */
    public void     removeListener(T listener);
}
/**
 * Abstracts an object that has listeners
 */
public class ListenerContainer<T> implements Listenable<T>
{
    private final Logger                        log = LoggerFactory.getLogger(getClass());
    private final Map<T, ListenerEntry<T>>      listeners = Maps.newConcurrentMap();
    ...
}

再来看状态管理器:

public class ConnectionStateManager implements Closeable
{
    private static final int QUEUE_SIZE;

    static
    {
        int size = 25;
        String property = System.getProperty("ConnectionStateManagerSize", null);
        if ( property != null )
        {
            try
            {
                size = Integer.parseInt(property);
            }
            catch ( NumberFormatException ignore )
            {
                // ignore
            }
        }
        QUEUE_SIZE = size;
    }

    private final Logger log = LoggerFactory.getLogger(getClass());
    private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    private final CuratorFramework client;
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
    private final ExecutorService service;
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    // guarded by sync
   private ConnectionState currentConnectionState;

   private enum State
   {
       LATENT,
       STARTED,
       CLOSED
   }
   /**
     * Start the manager
     */
    public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }

    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }
    private void processEvents()
   {
       try
       {
           while ( !Thread.currentThread().isInterrupted() )
           {
               final ConnectionState newState = eventQueue.take();

               if ( listeners.size() == 0 )
               {
                   log.warn("There are no ConnectionStateListeners registered.");
               }

               listeners.forEach
                   (
                       new Function<ConnectionStateListener, Void>()
                       {
                           @Override
                           public Void apply(ConnectionStateListener listener)
                           {
                               listener.stateChanged(client, newState);
                               return null;
                           }
                       }
                   );
           }
       }
       catch ( InterruptedException e )
       {
           Thread.currentThread().interrupt();
       }
   }
}

回到Curator框架的启动和关闭

CuratorFrameworkImpl启动关闭

@Override
public void     start()
{
     log.info("Starting");
     if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
     {
         IllegalStateException error = new IllegalStateException();
         log.error("Cannot be started more than once", error);
         throw error;
     }

     try
     {
         //启动连接状态管理器
         connectionStateManager.start(); // ordering dependency - must be called before client.start()
         //启动客户端
         client.start();
         //创建执行器
         executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops

         executorService.submit
         (
             new Callable<Object>()
             {
                 @Override
                 public Object call() throws Exception
                 {
                     //执行后台任务
                     backgroundOperationsLoop();
                     return null;
                 }
             }
         );
     }
     catch ( Exception e )
     {
         //处理异常
         handleBackgroundOperationException(null, e);
     }
}
  1. 先来看启动客户端 ```java //启动客户端 client.start(); //CuratorZookeeperClient public class CuratorZookeeperClient implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); private final ConnectionState state; private final AtomicReference retryPolicy = new AtomicReference(); private final int connectionTimeoutMs; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicReference tracer = new AtomicReference(new DefaultTracerDriver()); /**
    • Must be called after construction *
    • @throws IOException errors */ public void start() throws Exception { log.debug(“Starting”);

      if ( !started.compareAndSet(false, true) ) { IllegalStateException error = new IllegalStateException(); log.error(“Already started”, error); throw error; } //启动连接状态 state.start(); } /**

    • Close the client */ public void close() { log.debug(“Closing”);

      started.set(false); try { state.close(); } catch ( IOException e ) { log.error(“”, e); } } void internalBlockUntilConnectedOrTimedOut() throws InterruptedException { long waitTimeMs = connectionTimeoutMs; while ( !state.isConnected() && (waitTimeMs > 0) ) { final CountDownLatch latch = new CountDownLatch(1); Watcher tempWatcher = new Watcher() { @Override public void process(WatchedEvent event) { latch.countDown(); } };

      state.addParentWatcher(tempWatcher);
      long        startTimeMs = System.currentTimeMillis();
      try
      {
          latch.await(1, TimeUnit.SECONDS);
      }
      finally
      {
          state.removeParentWatcher(tempWatcher);
      }
      long        elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
      waitTimeMs -= elapsed; } } } //ConnectionState 实现了Watcher class ConnectionState implements Watcher, Closeable {  private static final int MAX_BACKGROUND_EXCEPTIONS = 10;  private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);  private final Logger log = LoggerFactory.getLogger(getClass());  private final HandleHolder zooKeeper;  private final AtomicBoolean isConnected = new AtomicBoolean(false);  private final EnsembleProvider ensembleProvider;  private final int sessionTimeoutMs;  private final int connectionTimeoutMs;  private final AtomicReference<TracerDriver> tracer;  private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();  private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();  private final AtomicLong instanceIndex = new AtomicLong();  private volatile long connectionStartMs = 0;  void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); reset(); }
      

    @Override public void close() throws IOException { log.debug(“Closing”);

    CloseableUtils.closeQuietly(ensembleProvider);
    try
    {
        zooKeeper.closeAndClear();
    }
    catch ( Exception e )
    {
        throw new IOException(e);
    }
    finally
    {
        isConnected.set(false);
    } } void addParentWatcher(Watcher watcher) {
    parentWatchers.offer(watcher); }
    

    void removeParentWatcher(Watcher watcher) { parentWatchers.remove(watcher); } @Override public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug(“ConnectState watcher: “ + event); }

    for ( Watcher parentWatcher : parentWatchers )
    {
        TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
        parentWatcher.process(event);
        timeTrace.commit();
    }
    
    boolean wasConnected = isConnected.get();
    boolean newIsConnected = wasConnected;
    if ( event.getType() == Watcher.Event.EventType.None )
    {
        newIsConnected = checkState(event.getState(), wasConnected);
    }
    
    if ( newIsConnected != wasConnected )
    {
        isConnected.set(newIsConnected);
        connectionStartMs = System.currentTimeMillis();
    } } ... } ```
    
  2. 执行后台任务 ```java //执行后台任务 backgroundOperationsLoop(); // CuratorFrameworkImpl private void backgroundOperationsLoop() { while ( !Thread.interrupted() ) { OperationAndData<?> operationAndData; try { //消费后台操作任务集 operationAndData = backgroundOperations.take(); if ( debugListener != null ) { debugListener.listen(operationAndData); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); break; } //执行后台任务 performBackgroundOperation(operationAndData); } } private void performBackgroundOperation(OperationAndData<?> operationAndData) { try { if ( client.isConnected() ) { //处于连接中,则直接执行 operationAndData.callPerformBackgroundOperation(); } else { client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() ) { throw new CuratorConnectionLossException(); } operationAndData.sleepFor(1, TimeUnit.SECONDS); //放入后台任务队列 queueOperation(operationAndData); } } catch ( Throwable e ) { /** * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy * and callbacks need to get invoked, etc. */ if ( e instanceof CuratorConnectionLossException ) { WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null); CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null); if ( checkBackgroundRetry(operationAndData, event) ) { queueOperation(operationAndData); } else { logError(“Background retry gave up”, e); } } else { handleBackgroundOperationException(operationAndData, e); } } }
void queueOperation(OperationAndData operationAndData) { backgroundOperations.offer(operationAndData); } ``` 3. 处理异常 ```java handleBackgroundOperationException(null, e); } private void handleBackgroundOperationException(OperationAndData operationAndData, Throwable e) { do { if ( (operationAndData != null) && RetryLoop.isRetryException(e) ) { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.debug("Retry-able exception received", e); } if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) ) { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.debug("Retrying operation"); } backgroundOperations.offer(operationAndData); break; } else { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.debug("Retry policy did not allow retry"); } if ( operationAndData.getErrorCallback() != null ) { operationAndData.getErrorCallback().retriesExhausted(operationAndData); } } } logError("Background exception was not retry-able or retry gave up", e); } while ( false ); } ``` ## 总结 Curator框架工厂CuratorFrameworkFactory内部,主要成员变量为默认的会话超时与连接超时时间,本地地址,字节压缩器GzipCompressionProvider, 默认的Zookeeper工厂DefaultZookeeperFactory,默认ACL提供器DefaultACLProvider。GzipCompressionProvider用于压缩字节流。 默认的Zookeeper工厂DefaultZookeeperFactory,用于创建原生Zookeeper客户端。DefaultACLProvider主要用户获取节点的ACL权限。 CuratorFrameworkFactory内部构建器Builder,除了会话超时与连接超时时间,字节压缩器,原生API客户端工厂, ACL提供器之外,还有线程工程ThreadFactory,验证方式,及验证值,及重试策略RetryPolicy。 ExponentialBackoffRetry主要用户控制会话超时重连的次数和下次尝试时间。 内部构建器Builder,创建的实际为CuratorFrameworkImpl。 CuratorFramework主要提供了启动关闭客户端操作,及CDRWA相关的构建器,如创建节点CreateBuilder,删除节点DeleteBuilder,获取节点数据GetDataBuilder,设置节点数据SetACLBuilder, ,检查节点ExistsBuilder,同步数据构建器SyncBuilder, 事物构建器CuratorTransaction,ACL构建器GetACLBuilder、SetACLBuilder,提供了客户端连接状态监听器Listenable, 客户端监听器Listenable ,无处理错误监听器Listenable操作,同时提供了获取zk客户端和CuratorZookeeperClient和确保路径的操作EnsurePath。 Curator zk客户端CuratorZookeeperClient主要用于获取原生API ZK客户端,以及用于重新创建失效会话,执行相应的CDRWA操作。 创建构建器CreateBuilder,主要提供了创建持久化和临时节点的操作。 Curator框架实现CuratorFrameworkImpl,创建目录实际上委托给Curator框架内部的原生API zk客户端,如果需要创建建父目录,并且父目录不存在,则创建父目录。 如果会话失效,则重新建立会话,如果建立会话成功,则调用创建目录回调Callable。 删除构建器DeleteBuilder,删除目录,实际操作在一个重试循环中,如果会话过期,则重新连接会话,并将实际删除操作委托给Curator框架内部的原生API zk客户端。 Curator框架实现CuratorFrameworkImpl的获取目录数据操作,检查目录和设置目录数据的原理与创建、删除操作基本相同实际操作委托给Curator框架内部的原生API zk客户端, 并保证会话有效。