Zookeeper原生API
write by donaldhan, 2018-06-14 17:07引言
随着分布式应用的发展,对配置的管理变得越来越繁琐,单机配置中心,容易导致配置中心挂掉的情况下,出现整体应用瘫痪的场景。Zookeeper出现解决了单点配置的缺点, 同时我们可以很容易使用Zookeeper建立配置集群,当集群中的某台配置服务器宕机时,不会对应用造成任务影响。更重要的是ZK提供了观察者机制,可以动态监听配置的变更。 Zookeeper以文件目录的方式存储数据,使我们可以非常方便的管理配置。
这篇文章,我们不打算深入的研究Zookeeper,我们关注的是Zookeeper原生API, ZkClient和Curator,3客户端的优缺点,以后有时间我们在来探究Zookeeper的源码。这篇文章所有使用的示例代码可以参考zookeeper-demo。
目录
原生API
Apache 原生API,的缺点:
- 由于设置和获取路径节点的数据都是字节序列,所以自己去处理序列化。
- 同时事件注册是一次性的,如果需要持续监听一个节点,必须在监听器捕捉一个事件后,重新注册。
- 无法创建父路径不存在的路径。
- 删除操作,只能删除节点下没有子节点的路径。
客户端为 @see org.apache.zookeeper.ZooKeeper,观察者为 @see org.apache.zookeeper.Watcher
ZK的节点有5种操作权限: CREATE、READ、WRITE、DELETE、ADMIN 也就是 增、删、改、查、管理权限,这5种权限简写为crwda(即:每个单词的首字符缩写) 注:这5种权限中,delete是指对子节点的删除权限,其它4种权限指对自身节点的操作权限。
身份的认证有4种方式:
- world:默认方式,相当于全世界都能访问
- auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
- digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
- ip:使用Ip地址认证 一般使用digest方式。
设置访问控制: 方式一:(推荐)
- 增加一个认证用户
addauth digest 用户名:密码明文 eg. addauth digest user:password
- 设置权限 setAcl /path auth:用户名:密码明文:权限 eg. setAcl /test auth:user:password:cdrwa
- 查看Acl设置 getAcl /path
方式二:
setAcl /path digest:用户名:密码密文:权限
注:这里的加密规则是SHA1加密,然后base64编码。
建议使用第一种方式。 在创建Zookeeper原生API客户端的时候,我们一般使用如下方式:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
...
//配置默认的watcher
watchManager.defaultWatcher = watcher;
//解析连接主机和端口
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//获取主机ip地址
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//创建客户端
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//启动客户端
cnxn.start();
}
针对Zookeeper,主要有两个参数
public class ZooKeeper {
public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
//与服务端通信的客户端
protected final ClientCnxn cnxn;
static {
//Keep these two lines together to keep the initialization order explicit
LOG = LoggerFactory.getLogger(ZooKeeper.class);
Environment.logEnv("Client environment:", LOG);
}
public ZooKeeperSaslClient getSaslClient() {
return cnxn.zooKeeperSaslClient;
}
//观察者管理器
private final ZKWatchManager watchManager = new ZKWatchManager();
}
从上面可以看出Zookeeper主要有两个成员分别为客户端和watcher管理器。
先来看watcher管理ZKWatchManager
private static class ZKWatchManager implements ClientWatchManager {
//节点数据观察器
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
//节点存在观察器
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
//节点孩子节点观察器
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
//默认观察期器
private volatile Watcher defaultWatcher;
}
ZKWatchManager
为了便于理解ZKWatchManager,我们来看ZKWatchManager的
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(existWatches.remove(clientPath), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
}
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
再来看
public interface Watcher {
/**
* This interface defines the possible states an Event may represent
*/
public interface Event {
/**
* Enumeration of states the ZooKeeper may be at the event
*/
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
SyncConnected (3),
/**
* Auth failed state
*/
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
Expired (-112);
private final int intValue; // Integer representation of valu
}
}
/**
* Enumeration of types of events that may occur on the ZooKeeper
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
private final int intValue; // Integer representation of value
// for sending over wire
}
abstract public void process(WatchedEvent event);
}
从上面可以看出,watcher观察者管理器ZKWatchManager,主要关注点的事件类型有节点创建NodeCreated,节点删除NodeDeleted,节点数据改变NodeDataChanged,节点子节点更新事件类型NodeChildrenChanged;客户端状态有: 同步连接SyncConnected,断开连接Disconnected,只读连接ConnectedReadOnly,验证失败AuthFailed,已验证SaslAuthenticated,会话过期Expired等状态。 Watcher观察器,主要根据事件类型,注册节点观察器,默认为节点数据观察器集,节点存在观察器集,节点孩子节点观察器集,默认观察期器集;如果是NodeCreated和NodeDeleted,则注册节点数据观察器集,节点存在观察器集; 如果是NodeDataChanged,则注册节点孩子节点观察器集;如果是NodeDeleted,则注册节点数据观察器集,节点存在观察器集,节点孩子节点观察器集。
我们再来看Zookeeper的另一个关键成员客户端ClientCnxn。
ClientCnxn
public class ClientCnxn {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
/** This controls whether automatic watch resetting is enabled.
* Clients automatically reset watches during session reconnect, this
* option allows the client to turn off this behavior by setting
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
private static boolean disableAutoWatchReset;
static {
// this var should not be public, but otw there is no easy way
// to test
disableAutoWatchReset =
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
if (LOG.isDebugEnabled()) {
LOG.debug("zookeeper.disableAutoWatchReset is "
+ disableAutoWatchReset);
}
}
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
/**
* These are the packets that have been sent and are waiting for a response.
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
/**
* These are the packets that need to be sent.
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
private int connectTimeout;
/**
* The timeout in ms the client negotiated with the server. This is the
* "real" timeout, not the timeout request by the client (which may have
* been increased/decreased by the server which applies bounds to this
* value.
*/
private volatile int negotiatedSessionTimeout;
private int readTimeout;
private final int sessionTimeout;
private final ZooKeeper zooKeeper;
private final ClientWatchManager watcher;
private long sessionId;
private byte sessionPasswd[] = new byte[16];
/**
* If true, the connection is allowed to go to r-o mode. This field's value
* is sent, besides other data, during session creation handshake. If the
* server on the other side of the wire is partitioned it'll accept
* read-only clients only.
*/
private boolean readOnly;
final String chrootPath;
final SendThread sendThread;
final EventThread eventThread;
/**
* Set to true when close is called. Latches the connection such that we
* don't attempt to re-connect to the server if in the middle of closing the
* connection (client sends session disconnect to server as part of close
* operation)
*/
private volatile boolean closing = false;
/**
* A set of ZooKeeper hosts this client could connect to.
*/
private final HostProvider hostProvider;
/**
* Is set to true when a connection to a r/w server is established for the
* first time; never changed afterwards.
* <p>
* Is used to handle situations when client without sessionId connects to a
* read-only server. Such client receives "fake" sessionId from read-only
* server, but this sessionId is invalid for other servers. So when such
* client finds a r/w server, it sends 0 instead of fake sessionId during
* connection handshake and establishes new, valid session.
* <p>
* If this field is false (which implies we haven't seen r/w server before)
* then non-zero sessionId is fake, otherwise it is valid.
*/
volatile boolean seenRwServerBefore = false;
public ZooKeeperSaslClient zooKeeperSaslClient;
}
客户端ClientCnxn中最重要的是发送线程SendThread和事件线程EventThread,同时关联一个ZooKeeper,以及客户端watcher管理器ClientWatchManager,实际为ZKWatchManager,
还有一个我们需要关注的点是等待发送数据包队列pendingQueue(LinkedList
我们来看一下数据包 Packet。
/**
* This class allows us to pass the headers and the relevant records around.
*/
static class Packet {
//请求头部
RequestHeader requestHeader;
//响应头部
ReplyHeader replyHeader;
//请求
Record request;
//响应
Record response;
//字节缓冲区
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
//异步回调接口
AsyncCallback cb;
//包数据上下文
Object ctx;
//观察者注册器
WatchRegistration watchRegistration;
}
从上面可以看出,数据包Packet主要有请求头部requestHeader(RequestHeader),响应头部replyHeader(ReplyHeader),请求request(Record),响应response(Record),字节缓冲区ByteBuffer,客户端路径clientPath,服务端路径serverPath,异步回调接口AsyncCallback,数据包上下文,观察者注册器watchRegistration。 再来看请求头部,响应头部,请求和响应record
public class RequestHeader implements Record {
private int xid;
private int type;
}
public class ReplyHeader implements Record {
private int xid;
private long zxid;
private int err;//错误代码
}
/**
* Interface that is implemented by generated classes.
*
*/
public interface Record {
public void serialize(OutputArchive archive, String tag)
throws IOException;
public void deserialize(InputArchive archive, String tag)
throws IOException;
}
Record主要作用为序列与反序列数据,再来看OutputArchive,InputArchive
/**
* Interface that alll the serializers have to implement.
*
*/
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag)
throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List v, String tag) throws IOException;
public void endVector(List v, String tag) throws IOException;
public void startMap(TreeMap v, String tag) throws IOException;
public void endMap(TreeMap v, String tag) throws IOException;
}
/**
* Interface that all the Deserializers have to implement.
*
*/
public interface InputArchive {
public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;
}
再来看异步回调接口:
public interface AsyncCallback {
interface StatCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, Stat stat);
}
interface DataCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, byte data[],
Stat stat);
}
interface ACLCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<ACL> acl, Stat stat);
}
interface ChildrenCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children);
}
interface Children2Callback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children, Stat stat);
}
interface StringCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, String name);
}
interface VoidCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx);
}
}
异步回调接口中AsyncCallback,定义了各种回调接口,比如数据变更,子节点变更等。
再来看观察者注册器:
/**
* Register a watcher for a particular path.
*/
abstract class WatchRegistration {
private Watcher watcher;//观察者
private String clientPath;
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
* add the watch on the path.
*/
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
/**
* Determine whether the watch should be added based on return code.
* @param rc the result code of the operation that attempted to add the
* watch on the node
* @return true if the watch should be added, otw false
*/
protected boolean shouldAddWatch(int rc) {
return rc == 0;
}
}
再来看观察者注册器的几个实现:
/** Handle the special case of exists watches - they add a watcher
* even in the case where NONODE result code is returned.
*/
class ExistsWatchRegistration extends WatchRegistration {
public ExistsWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
@Override
protected boolean shouldAddWatch(int rc) {
return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
}
}
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
class ChildWatchRegistration extends WatchRegistration {
public ChildWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.childWatches;
}
}
Zookeeper客户端状态:
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
/**
* Returns whether we are connected to a server (which
* could possibly be read-only, if this client is allowed
* to go to read-only mode)
* */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
以上这部分,我们不需要纠结,只需要了解就行。
再来看客户端ClientCnxn的发送线程:
/**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends Thread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;//客户端Socket
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
void readResponse(ByteBuffer incomingBuffer) throws IOException {
...
}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
// TODO: can not name this method getState since Thread.getState()
// already exists
// It would be cleaner to make class SendThread an implementation of
// Runnable
/**
* Used by ClientCnxnSocket
*
* @return
*/
ZooKeeper.States getZkState() {
return state;
}
ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
}
我们来看一下发送线程的ping方法:
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
//添加请求头部到队列
queuePacket(h, null, null, null, null, null, null, null, null);
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//添加数据包到包队列
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
再来看启动连接方法
private void startConnect() throws IOException {
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
try {
zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
logStartConnect(addr);
//委托给内存socket客户端
clientCnxnSocket.connect(addr);
}
private void logStartConnect(InetSocketAddress addr) {
String msg = "Opening socket connection to server " + addr;
if (zooKeeperSaslClient != null) {
msg += ". " + zooKeeperSaslClient.getConfigStatus();
}
LOG.info(msg);
}
再来看发送线程主逻辑
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(
"Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv() + "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId));
}
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend();
if (timeToNextPing <= 0) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//关键点,在这,发送数据包队列
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
}
从上面可以看出发送线程主要的作用是发送客户端请求数据包,实际委托给内部的clientCnxnSocket。
再来看客户端Socket的定义
abstract class ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
protected boolean initialized;
/**
* This buffer is only used to read the length of the incoming message.
*/
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
/**
* After the length is read, a new incomingBuffer is allocated in
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
protected long sentCount = 0;
protected long recvCount = 0;
protected long lastHeard;
protected long lastSend;
protected long now;
protected ClientCnxn.SendThread sendThread;
/**
* The sessionId is only available here for Log and Exception messages.
* Otherwise the socket doesn't need to know it.
*/
protected long sessionId;
void updateLastSend() {
this.lastSend = now;
}
void updateLastSendAndHeard() {
this.lastSend = now;
this.lastHeard = now;
}
protected void readLength() throws IOException {
int len = incomingBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {
throw new IOException("Packet len" + len + " is out of range!");
}
incomingBuffer = ByteBuffer.allocate(len);
}
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
abstract boolean isConnected();
abstract void connect(InetSocketAddress addr) throws IOException;
abstract SocketAddress getRemoteSocketAddress();
abstract SocketAddress getLocalSocketAddress();
abstract void cleanup();
abstract void close();
abstract void wakeupCnxn();
abstract void enableWrite();
abstract void disableWrite();
abstract void enableReadWriteOnly();
//调度数据包队列
abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException;
abstract void testableCloseSocket() throws IOException;
//发送数据包发送数据包和
abstract void sendPacket(Packet p) throws IOException;
}
从上面可以看出,客户端socket的主要功能为发送数据包sendPacket和调度数据包队列doTransport。
我们来看默认socket客户端的实现类:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
throws IOException
{
...
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();
}
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = System
.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
从上面可以看出客户端socket的默认实现为ClientCnxnSocketNIO。
我们来看ClientCnxnSocketNIO
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory
.getLogger(ClientCnxnSocketNIO.class);
private final Selector selector = Selector.open();
private SelectionKey sockKey;
ClientCnxnSocketNIO() throws IOException {
super();
}
}
从上面可以看出,客户端Socket的实现ClientCnxnSocketNIO,内部主要使用nio的选择器和选择key。
再来看创建socket
/**
* create a socket channel.
* @return the created socket channel
* @throws IOException
*/
SocketChannel createSock() throws IOException {
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
再来看发送数据包:
@Override
void sendPacket(Packet p) throws IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
p.createBB();
ByteBuffer pbb = p.bb;
sock.write(pbb);
}
发送数据包,实际委托给内Socket通道。
再来看调度数据包队列:
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//实际调度数据包队列
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
/**
* @return true if a packet was received
* @throws InterruptedException
* @throws IOException
*/
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//读数据
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//读取数据,解析为响应Record
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//写数据
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
//写数据包
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
调度数据包队列,实际委托给内Socket通道,如果是响应消息,则转化为响应Record,如果是发送数据包,则委托给内部的socket通道。
到这里我们报客户端Socket的发送线程分析完,小节一下:
客户端ClientCnxn中最重要的是发送线程SendThread和事件线程EventThread,同时关联一个ZooKeeper,以及客户端watcher管理器ClientWatchManager,实际为ZKWatchManager,
还有一个我们需要关注的点是等待发送数据包队列pendingQueue(LinkedList
数据包Packet主要有请求头部requestHeader(RequestHeader),响应头部replyHeader(ReplyHeader),请求request(Record),响应response(Record),字节缓冲区ByteBuffer,客户端路径clientPath,服务端路径serverPath,异步回调接口AsyncCallback,数据包上下文,观察者注册器watchRegistration。
发送线程SendThread主要的作用是发送客户端请求数据包,实际委托给内部的clientCnxnSocket。
客户端socket的主要功能为发送数据包sendPacket和调度数据包队列doTransport。
客户端Socket的实现ClientCnxnSocketNIO,内部主要使用nio的选择器和选择key。
发送数据包,实际委托给内Socket通道。
调度数据包队列,实际委托给内Socket通道,如果是响应消息,则转化为响应Record,如果是发送数据包,则委托给内部的socket通道。
再来看客户端ClientCnxn中的事件线程EventThread:
class EventThread extends Thread {
//待处理事件队列
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
EventThread() {
super(makeThreadName("-EventThread"));
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
}
来看事件线程的主流程:
@Override
public void run() {
try {
isRunning = true;
while (true) {
//消费队列事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
//处理事件
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down");
}
再来看处理事件:
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
//如果是事件观察者对,则条用观察者处理事件
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
//否则为数据包,转换事件
Packet p = (Packet) event;
int rc = 0;
//客户端路径
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {//异步回调为空
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
//状态回调
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
//节点存在检查响应
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
//设置节点数据响应
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
//安全控制响应
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
//获取数据响应
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
//获取安全控制响应
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
//获取节点子节点响应
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
//创建节点响应
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
}
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;//事件观察者
private final WatchedEvent event;//事件
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
this.watchers = watchers;
this.event = event;
}
}
public class WatchedEvent {
final private KeeperState keeperState;
final private EventType eventType;
private String path;
}
从上面可以看出事件线程主要处理创建、设值,获取节点数据和获取节点子节点数据,检查节点是否存在,删除节点等事件,并处理。
再来看一下创建Zookeeper客户端:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
throws IOException
{
...
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();//启动客户端
}
来看启动客户端Socket:
public class ClientCnxn {
private Object eventOfDeath = new Object();
public void start() {
sendThread.start();
eventThread.start();
}
}
从上面可以看出,启动客户端Socket,实际上启动发送数据包线程(处理数据的请求和响应)和事件线程(处理crwda相关事件)。
CRWDA操作
再来看一下,创建节点,设置,获取节点数据。
- 创建节点
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); //创建请求和响应 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); //委托给socket客户端,发送创建节点操作 ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
ClientCnxn
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
创建节点,创建创建请求和响应,委托给socket客户端,发送创建节点操作。 再来看创建节点,异步回调处理结果: ```java /** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */
public void create(final String path, byte data[], List
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
ReplyHeader r = new ReplyHeader();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
request.setAcl(acl);
//委托给socket客户端
cnxn.queuePacket(h, r, request, response, cb, clientPath,
serverPath, ctx, null); } ``` 2. 设置 ```java public Stat setData(final String path, byte data[], int version)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat();
}
3. 获取节点数据
```java
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
/**
* The asynchronous version of getData.
*
* @see #getData(String, Watcher, Stat)
*/
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
//委托给socket客户端
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
从上面可以看出,Zk的crwda的相关操作,首先创建相应类型的请求和响应,然后委托给socket客户端,处理响应的操作,并解析响应消息。
其他操作,我们不在讲解,放在附中。
由于篇幅问题,另外两种客户端ZkClient和Curator,我们放在后面的文章中再讲。
ZkClient
Curator
总结
Zookeeper主要有两个成员分别为客户端和watcher管理器。watcher观察器,主要关注点的事件类型有节点创建NodeCreated,节点删除NodeDeleted,节点数据改变NodeDataChanged, 节点子节点更新事件类型NodeChildrenChanged;客户端状态有:同步连接SyncConnected,断开连接Disconnected,只读连接ConnectedReadOnly,验证失败AuthFailed,已验证SaslAuthenticated,会话过期Expired等状态。 Watcher观察者管理器ZKWatchManager,主要根据事件类型,注册节点观察器,默认为节点数据观察器集,节点存在观察器集,节点孩子节点观察器集,默认观察期器集;如果是NodeCreated和NodeDeleted,则注册节点数据观察器集,节点存在观察器集; 如果是NodeDataChanged,则注册节点孩子节点观察器集;如果是NodeDeleted,则注册节点数据观察器集,节点存在观察器集,节点孩子节点观察器集。
客户端ClientCnxn中最重要的是发送线程SendThread和事件线程EventThread,同时关联一个ZooKeeper,以及客户端watcher管理器ClientWatchManager,实际为ZKWatchManager,
还有一个我们需要关注的点是等待发送数据包队列pendingQueue(LinkedList
数据包Packet主要有请求头部requestHeader(RequestHeader),响应头部replyHeader(ReplyHeader),请求request(Record),响应response(Record),字节缓冲区ByteBuffer,客户端路径clientPath,服务端路径serverPath,异步回调接口AsyncCallback,数据包上下文,观察者注册器watchRegistration。
发送线程SendThread主要的作用是发送客户端请求数据包,实际委托给内部的clientCnxnSocket。
客户端socket的主要功能为发送数据包sendPacket和调度数据包队列doTransport。
客户端Socket的实现ClientCnxnSocketNIO,内部主要使用nio的选择器和选择key。
发送数据包,实际委托给内Socket通道。
调度数据包队列,实际委托给内Socket通道,如果是响应消息,则转化为响应Record,如果是发送数据包,则委托给内部的socket通道。
事件线程主要处理创建、设值,获取节点数据和获取节点子节点数据,检查节点是否存在,删除节点等事件,并处理。
启动客户端Socket,实际上启动发送数据包线程(处理数据的请求和响应)和事件线程(处理crwda相关事件)。
创建节点,创建创建请求和响应,委托给socket客户端,发送创建节点操作。
Zk的crwda的相关操作,首先创建相应类型的请求和响应,然后委托给socket客户端,处理响应的操作,并解析响应消息。
附
public void delete(final String path, int version)
throws InterruptedException, KeeperException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath;
// maintain semantics even in chroot case
// specifically - root cannot be deleted
// I think this makes sense even in chroot case.
if (clientPath.equals("/")) {
// a bit of a hack, but delete(/) will never succeed and ensures
// that the same semantics are maintained
serverPath = clientPath;
} else {
serverPath = prependChroot(clientPath);
}
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.delete);
DeleteRequest request = new DeleteRequest();
request.setPath(serverPath);
request.setVersion(version);
ReplyHeader r = cnxn.submitRequest(h, request, null, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
}
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = new GetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = new GetChildrenResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}
public void exists(final String path, Watcher watcher,
StatCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}