Curator分布式锁
write by donaldhan, 2018-06-30 12:00引言
节点监听缓存NodeCache,内部关联一下Curator框架客户端CuratorFramework,节点监听器容器 listeners(ListenerContainer
添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer
Curator框架实现CuratorFrameworkImpl启动时,首先启动连接状态管理器ConnectionStateManager, 然后再启动客户端CuratorZookeeperClient(在构造Curator框架实现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient,传入一个Watcher,用于处理CuratorEvent。)。 启动客户端CuratorZookeeperClient过程,关键点是在启动连接状态ConnectionState(在构造CuratorZookeeperClient,初始化连接状态,并将内部Watcher传给连接状态)。 连接状态实现了观察者Watcher,在连接状态建立时,调用客户端CuratorZookeeperClient传入的Watcher,处理相关事件。而这个Watcher是在现CuratorFrameworkImpl初始化动客户端CuratorZookeeperClient时, 传入的。客户端观察者的实际处理业务逻辑在CuratorFrameworkImpl实现,及processEvent方法,processEvent主要处理逻辑为,遍历Curator框架实现CuratorFrameworkImpl内部的监听器容器内的监听器处理相关CuratorEvent 事件。这个CuratorEvent事件,是由原生WatchedEvent事件包装而来。
启动连接连接状态管理器,主要是使用连接状态监听器容器ListenerContainer
子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器ListenerContainer
一级目录监听器PathChildrenCache,启动主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和刷新RefreshOperation操作到操作集。 事件操作EventOperation,主要是触发监听器的子目录事件操作;刷新操作RefreshOperation主要是完成子目录的添加和刷新事件,并从新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。
关闭客户端框架,主要是清除监听器,连接状态管理器,关闭zk客户端。这是我们上一篇文章Curator目录监听所讲的内容,今天我们来看一下Curator高级特性分布式锁。本文中的所有示例见[zookeeper-demo][]
目录
Curator分布式锁
现在先让我们看看Curator的几种锁方案: 四种锁方案 InterProcessMutex:分布式可重入排它锁 InterProcessSemaphoreMutex:分布式排它锁 InterProcessReadWriteLock:分布式读写锁 InterProcessMultiLock:将多个锁作为单个实体管理的容器 DistributedBarrier:使用Curator实现分布式Barrier,实际在分布式环境中使用,待所有应用到达屏障时,移除屏障 DistributedDoubleBarrier:分布式锁, 控制同时进入,同时退出 今天我们主要探讨的是分布式可重入排它锁InterProcessMutex,先来看一个示例:
package org.donald.curator.recipes.lock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.common.threadpool.TaskExecutors;
import org.donald.constant.ConfigConstant;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
* @ClassName: RecipesLockSample
* @Description: 测试使用分布式锁的情况下,生成订单id,可以避免产生重复id
* @Author: Donaldhan
* @Date: 2018-05-16 16:01
*/
@Slf4j
public class RecipesLockSample {
private static CuratorFramework client;
private static ExecutorService exec = null;
public static void main(String[] args) {
String lock_path = "/curator_recipes_lock_path";
//模仿同时生成订单
final CountDownLatch down = new CountDownLatch(1);
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
client.start();
log.info("success connected...");
//分布式锁
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
exec = TaskExecutors.newFixedThreadPool(30);
for(int i = 0; i < 30; i++){
exec.submit(new Runnable() {
@Override
public void run() {
try {
//模仿同时生成订单
down.await();
//获取分布式锁
lock.acquire();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
log.info("{} 生成的订单号是 :{} ",Thread.currentThread().getName(), orderNo);
} catch ( Exception e ) {
e.printStackTrace();
}
finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
down.countDown();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (exec != null) {
exec.shutdown();
}
/* 如果在会话执行的过程中,关闭会话,将抛出异常
java.lang.IllegalStateException: instance must be started before calling this method
if (client != null) {
client.close();
}*/
}
}
}
从上面示例,关键的几个操作为:
//分布式锁
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
//获取分布式锁
lock.acquire();
lock.release();
分别为创建分布式可重入锁,获取锁,与释放锁。 下面我们来看分布式可重入锁的定义:
分布式可重入锁InterProcessMutex
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
private final LockInternals internals;
private final String basePath;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static class LockData
{
final Thread owningThread;//拥有线程
final String lockPath;//锁路径
final AtomicInteger lockCount = new AtomicInteger(1);//锁的次数
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
private static final String LOCK_NAME = "lock-";
/**
* @param client client
* @param path the path to lock
*/
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, LOCK_NAME, 1, new StandardLockInternalsDriver());
}
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = path;
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
}
从上面可以看出,分布式可重入锁InterProcessMutex主要的成员变量为锁路径basePath,持有分布式映射信息映射threadData(ConcurrentMap<Thread, LockData>),同时一个关键的锁实现LockInternals。
我们先来看一下分布式可重入锁接口的定义:
public interface InterProcessLock
{
/**
* Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
public void acquire() throws Exception;
/**
* Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
* to {@link #release()}
*
* @param time time to wait
* @param unit time unit
* @return true if the mutex was acquired, false if not
* @throws Exception ZK errors, connection interruptions
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* Perform one release of the mutex.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
public void release() throws Exception;
/**
* Returns true if the mutex is acquired by a thread in this JVM
*
* @return true/false
*/
boolean isAcquiredInThisProcess();
}
从上面可以分布式可重入锁,主要提供的获取释放锁,和检查当前线程是否持有锁操作。
再来看分布式可重入锁实现接口Revocable定义:
/**
* Specifies locks that can be revoked
可释放锁
*/
public interface Revocable<T>
{
/**
* Make the lock revocable. Your listener will get called when another process/thread
* wants you to release the lock. Revocation is cooperative.
* 锁释放时,触发监听器
* @param listener the listener
*/
public void makeRevocable(RevocationListener<T> listener);
/**
* Make the lock revocable. Your listener will get called when another process/thread
* wants you to release the lock. Revocation is cooperative.
*
* @param listener the listener
* @param executor executor for the listener
*/
public void makeRevocable(RevocationListener<T> listener, Executor executor);
}
//锁释放信号监听器
public interface RevocationListener<T>
{
/**
* Called when a revocation request has been received. You should release the lock as soon
* as possible. Revocation is cooperative.
* 当锁被释放时,触发此方法
* @param forLock the lock that should release
*/
public void revocationRequested(T forLock);
}
从上面可以看出Revocable的作用,主要是在锁释放的时候,触发释放锁监听器RevocationListener,同时我们可以使用自己的执行器,触发相关监听器操作。
在构造分布式可重入锁的是有一个锁驱动StandardLockInternalsDriver我们来看一下。
/**
* @param client client
* @param path the path to lock
*/
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, LOCK_NAME, 1, new StandardLockInternalsDriver());
}
public class StandardLockInternalsDriver implements LockInternalsDriver
{
static private final Logger log = LoggerFactory.getLogger(StandardLockInternalsDriver.class);
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
//获取序列节点的索引
int ourIndex = children.indexOf(sequenceNodeName);
//校验索引
validateOurIndex(sequenceNodeName, ourIndex);
//
boolean getsTheLock = ourIndex < maxLeases;
//锁路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
@Override
public String fixForSorting(String str, String lockName)
{
return standardFixForSorting(str, lockName);
}
public static String standardFixForSorting(String str, String lockName)
{
int index = str.lastIndexOf(lockName);
if ( index >= 0 )
{
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
{
if ( ourIndex < 0 )
{
log.error("Sequential path not found: " + sequenceNodeName);
throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
}
}
}
//锁内部驱动
interface LockInternalsDriver extends LockInternalsSorter
{
//尝试获取锁
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception;
}
//预测结果
class PredicateResults
{
private final boolean getsTheLock;//是否成功获取锁
private final String pathToWatch;//锁路径
PredicateResults(String pathToWatch, boolean getsTheLock)
{
this.pathToWatch = pathToWatch;
this.getsTheLock = getsTheLock;
}
String getPathToWatch()
{
return pathToWatch;
}
boolean getsTheLock()
{
return getsTheLock;
}
}
来看LockInternals的定义:
public class LockInternals
{
private final CuratorFramework client;//框架客户端
private final String path;
private final String basePath;//锁路径
private final LockInternalsDriver driver;//锁内部驱动
private final String lockName;
//释放线程执行器对
private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
private volatile int maxLeases;//最大可重入锁
static final byte[] REVOKE_MESSAGE = "__REVOKE__".getBytes();//释放数据
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
{
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
PathUtils.validatePath(path);
this.client = client;
this.basePath = path;
this.path = ZKPaths.makePath(path, lockName);
}
//检查锁路径是否可用观察器
private final CuratorWatcher revocableWatcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
{
checkRevocableWatcher(event.getPath());
}
}
};
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
}
//释放线程执行器对
class RevocationSpec
{
private final Runnable runnable;//任务线程
private final Executor executor;//任务执行器
RevocationSpec(Executor executor, Runnable runnable)
{
this.runnable = runnable;
this.executor = executor;
}
Runnable getRunnable()
{
return runnable;
}
Executor getExecutor()
{
return executor;
}
}
从上面可以看出LockInternals主要的成员变量为客户端框架CuratorFramework,锁内部驱动LockInternalsDriver,检查锁路径是否可用观察器CuratorWatcher,唤醒所有等待锁线程观察器Watcher。
我们来看一下LockInternals两个内部成员revocableWatcher,watcher
private final CuratorWatcher revocableWatcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
{
checkRevocableWatcher(event.getPath());
}
}
};
private void checkRevocableWatcher(String path) throws Exception
{
//获取线程执行器对
RevocationSpec entry = revocable.get();
if ( entry != null )
{
try
{
//获取所路径数据,并重新注册监听
byte[] bytes = client.getData().usingWatcher(revocableWatcher).forPath(path);
if ( Arrays.equals(bytes, REVOKE_MESSAGE) )
{
//如果数据为取消锁,执行相应的任务线程
entry.getExecutor().execute(entry.getRunnable());
}
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
}
//唤醒所有等待获取锁的线程
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher()
{
notifyAll();
}
}
现在回到分布式事务锁的InterProcessMutex获取锁操作
//InterProcessMutex
/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
* 尝试获取锁,阻塞到直到锁可用
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
//当前线程已经持有锁,则重入计数器自增
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
// 否则尝试获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//如果获取锁成功,则将锁数据存放的锁信息映射中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
我们来看获取锁的关键点
//LockInternals
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//锁数据
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
//创建路径成功,则成功获取锁
if ( localLockNodeBytes != null )
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
}
else
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//创建路径失败,则获取锁失败
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
//
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
//如果存在尝试获取锁线程,不为空,则监听路径,并注册检查锁可用监听器
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();//获取锁路径,子目录
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//尝试从驱动获取锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//获取锁成功
haveTheLock = true;
}
else
{
//获取锁路径
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
//监控锁释放
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null )
{
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
//获取锁异常,则删除锁目录下的临时序列目录
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
//StandardLockInternalsDriver
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
//检查锁路径下,是否存在给定的序列路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
再看看一下删除锁锁目录下的临时序列目录
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
再来看一下//获取锁路径,子目录:
List<String> getSortedChildren() throws Exception
{
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
List<String> children = client.getChildren().forPath(basePath);
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
//StandardLockInternalsDriver
@Override
public String fixForSorting(String str, String lockName)
{
return standardFixForSorting(str, lockName);
}
public static String standardFixForSorting(String str, String lockName)
{
int index = str.lastIndexOf(lockName);
if ( index >= 0 )
{
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
从上面可以看出分布式事务锁的InterProcessMutex获取锁操作,首先检查当前线程是否持有锁,如果只有则则重入计数器自增,否则尝试获取锁,如果获取成功,则则将锁数据存放的锁信息映射中threadData(ConcurrentMap<Thread, LockData>),如果获取锁失败, 则等待锁释放,等待锁释放的过程,即注册锁路径是否可用观察器,是否可以获取锁的过程是查看是否可以创建锁目录下的临时序列目录,获取锁异常,则删除锁目录下的临时序列目录。
来看释放锁
/**
* Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
* thread had made multiple calls to acquire, the mutex will still be held when this method returns.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
@Override
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
//如果当前线程不持有锁,则抛出异常
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//锁计数器自减
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
//锁剩余持有次数大于0,
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
//释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{
//移除当前线程锁数据
threadData.remove(currentThread);
}
}
我们来看释放锁,关键点
//释放锁
internals.releaseLock(lockData.lockPath);
//LockInternals
void releaseLock(String lockPath) throws Exception
{
revocable.set(null);
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
从上面可以看出,释放锁的关键,即删除锁路径
再来看是否持有锁操作
/**
* Returns true if the mutex is acquired by a thread in this JVM
*
* @return true/false
*/
@Override
public boolean isAcquiredInThisProcess()
{
return (threadData.size() > 0);
}
@Override
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
{
makeRevocable(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
{
internals.makeRevocable
(
new RevocationSpec
(
executor,
new Runnable()
{
@Override
public void run()
{
listener.revocationRequested(InterProcessMutex.this);
}
}
)
);
}
再来看另外一个分布式锁锁DistributedBarrier
分布式屏障锁DistributedBarrier
我们先来看一个示例:
package org.donald.curator.recipes.barrier;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.common.threadpool.TaskExecutors;
import org.donald.constant.ConfigConstant;
import java.util.concurrent.ExecutorService;
/**
* @ClassName: DistributedBarrierSample
* @Description: 使用Curator实现分布式Barrier,实际在分布式环境中使用,待所有应用到达屏障时,移除屏障。
* @Author: Donaldhan
* @Date: 2018-05-16 17:39
*/
@Slf4j
public class DistributedBarrierSample {
/**
* 布式Barrier
*/
private static DistributedBarrier barrier;
private static ExecutorService exec = null;
public static void main(String[] args) throws Exception {
try {
exec = TaskExecutors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
exec.submit(new Runnable() {
@Override
public void run() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
client.start();
barrier = new DistributedBarrier(client, ConfigConstant.BARRIER_PATH);
log.info("{} 号barrier设置", Thread.currentThread().getName());
try {
barrier.setBarrier();
barrier.waitOnBarrier();
} catch (Exception e) {
e.printStackTrace();
}
log.info("{} 启动...", Thread.currentThread().getName());
}
});
}
//等待所有分布式屏障节点到达屏障,以免在所有节点到达前,移除屏障
Thread.sleep( 6000 );
barrier.removeBarrier();
log.info("移除分布式屏障...");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (exec != null) {
exec.shutdown();
}
}
}
}
关键点如下:
barrier = new DistributedBarrier(client, ConfigConstant.BARRIER_PATH);
barrier.setBarrier();
barrier.waitOnBarrier();
barrier.removeBarrier();
我们来看DistributedBarrier的定义
public class DistributedBarrier
{
private final CuratorFramework client;
private final String barrierPath;
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher()
{
//唤醒所有等待屏障的线程
notifyAll();
}
/**
* @param client client
* @param barrierPath path to use as the barrier
*/
public DistributedBarrier(CuratorFramework client, String barrierPath)
{
this.client = client;
this.barrierPath = barrierPath;
}
/**
* Utility to set the barrier node
*
* @throws Exception errors
*/
public synchronized void setBarrier() throws Exception
{
try
{
//创建屏障
client.create().creatingParentsIfNeeded().forPath(barrierPath);
}
catch ( KeeperException.NodeExistsException ignore )
{
//忽略创建异常
// ignore
}
}
/**
* Blocks until the barrier node comes into existence
* 阻塞直到屏障节点不存在
* @throws Exception errors
*/
public synchronized void waitOnBarrier() throws Exception
{
waitOnBarrier(-1, null);
}
/**
* Blocks until the barrier no longer exists or the timeout elapses
*
* @param maxWait max time to block
* @param unit time unit
* @return true if the wait was successful, false if the timeout elapsed first
* @throws Exception errors
*/
public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
boolean result;
for(;;)
{
// 如果节点不存在,则达到屏障
result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
if ( result )
{
break;
}
if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
{
break;
}
//超时等待
wait(thisWaitMs);
}
else
{
//超时等待
wait();
}
}
return result;
}
/**
* Utility to remove the barrier node
* 移除屏障
* @throws Exception errors
*/
public synchronized void removeBarrier() throws Exception
{
try
{
client.delete().forPath(barrierPath);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
}
从上面可以看出,分布式屏障锁DistributedBarrier,主要根据路径的存在与否来决定屏障是否到达,当路径不存在时,解除屏障。创建路径成功,则到达屏障,等待屏障解除,在屏障路径已经创建,则忽略异常,及到达屏障。
再来看分布式闭锁DistributedDoubleBarrier
分布式闭锁DistributedDoubleBarrier
先来看一个示例:
package org.donald.curator.recipes.barrier;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.donald.constant.ConfigConstant;
/**
* @ClassName: DistributedDoubleBarrierSample
* @Description: 分布式锁, 控制同时进入,同时退出
* @Author: Donaldhan
* @Date: 2018-05-17 8:37
*/
@Slf4j
public class DistributedDoubleBarrierSample {
private static CuratorFramework client = null;
private static DistributedDoubleBarrier barrier;
public static void main(String[] args) {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(ConfigConstant.BASE_SLEEP_TIMES, ConfigConstant.MAX_RETRIES);
client =
CuratorFrameworkFactory.builder()
.connectString(ConfigConstant.IP)
.sessionTimeoutMs(ConfigConstant.SESSION_TIMEOUT)
.connectionTimeoutMs(ConfigConstant.CONNETING_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
client.start();
barrier = new DistributedDoubleBarrier(client, ConfigConstant.BARRIER_PATH, 3);
log.info("{} 号barrier设置", DistributedDoubleBarrierSample.class.getSimpleName());
//等待所有分布式屏障节点到达屏障
Thread.sleep(6000);
barrier.enter();
log.info("{} 启动...", DistributedDoubleBarrierSample.class.getSimpleName());
//等待所有分布式屏障节点到达屏障
Thread.sleep(6000);
barrier.leave();
log.info( "退出..." );
} catch (Exception e) {
e.printStackTrace();
} finally {
/*if (client != null) {
client.close();
}*/
}
}
}
从上面可以看出,关键点如下:
barrier = new DistributedDoubleBarrier(client, ConfigConstant.BARRIER_PATH, 3);
barrier.enter();
barrier.leave();
我们来看DistributedDoubleBarrier
public class DistributedDoubleBarrier
{
private final CuratorFramework client;
private final String barrierPath;//闭锁路径
private final int memberQty;
private final String ourPath;
private final String readyPath;
private final AtomicBoolean hasBeenNotified = new AtomicBoolean(false);
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher()
{
hasBeenNotified.set(true);
notifyAll();
}
private static final String READY_NODE = "ready";
/**
* Creates the barrier abstraction. <code>memberQty</code> is the number of members in the
* barrier. When {@link #enter()} is called, it blocks until all members have entered. When
* {@link #leave()} is called, it blocks until all members have left.
*
* @param client the client
* @param barrierPath path to use
* @param memberQty the number of members in the barrier. NOTE: more than <code>memberQty</code>
* can enter the barrier. <code>memberQty</code> is a threshold, not a limit
*/
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
{
Preconditions.checkState(memberQty > 0, "memberQty cannot be 0");
this.client = client;
this.barrierPath = barrierPath;
this.memberQty = memberQty;
ourPath = ZKPaths.makePath(barrierPath, UUID.randomUUID().toString());
readyPath = ZKPaths.makePath(barrierPath, READY_NODE);
}
/**
* Enter the barrier and block until all members have entered
*
* @throws Exception interruptions, errors, etc.
*/
public void enter() throws Exception
{
enter(-1, null);
}
/**
* Enter the barrier and block until all members have entered or the timeout has
* elapsed
*
* @param maxWait max time to block
* @param unit time unit
* @return true if the entry was successful, false if the timeout elapsed first
* @throws Exception interruptions, errors, etc.
*/
public boolean enter(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
//注册观察器
boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
//创建子序列路径
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);
//如果就绪路径已存在,则
boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
//如果失去连接,则抛出异常
if ( connectionLost.get() )
{
throw new KeeperException.ConnectionLossException();
}
return result;
}
private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
boolean result = true;
do
{
//获取当前进入闭锁的子序列节点
List<String> children = getChildrenForEntering();
int count = (children != null) ? children.size() : 0;
if ( count >= memberQty )
{
try
{
//如果到达数,大于等于成员数,则创建路径
client.create().forPath(readyPath);
}
catch ( KeeperException.NodeExistsException ignore )
{
//忽略创建异常
// ignore
}
break;
}
if ( hasMaxWait && !hasBeenNotified.get() )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
{
result = false;
}
else
{
//超时等待
wait(thisWaitMs);
}
if ( !hasBeenNotified.get() )
{
result = false;
}
}
else
{
//否者等待
wait();
}
} while ( false );
return result;
}
/**
* Leave the barrier and block until all members have left
*
* @throws Exception interruptions, errors, etc.
*/
public synchronized void leave() throws Exception
{
leave(-1, null);
}
/**
* Leave the barrier and block until all members have left or the timeout has
* elapsed
*
* @param maxWait max time to block
* @param unit time unit
* @return true if leaving was successful, false if the timeout elapsed first
* @throws Exception interruptions, errors, etc.
*/
public synchronized boolean leave(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
return internalLeave(startMs, hasMaxWait, maxWaitMs);
}
private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
//获取路径名
String ourPathName = ZKPaths.getNodeFromPath(ourPath);
boolean ourNodeShouldExist = true;//路径是否存在
boolean result = true;
for(;;)
{
if ( connectionLost.get() )
{
throw new KeeperException.ConnectionLossException();
}
List<String> children;
try
{
//获取闭锁屏障子节点
children = client.getChildren().forPath(barrierPath);
}
catch ( KeeperException.NoNodeException dummy )
{
children = Lists.newArrayList();
}
//排序节点
children = filterAndSortChildren(children);
if ( (children == null) || (children.size() == 0) )
{
break;
}
int ourIndex = children.indexOf(ourPathName);
if ( (ourIndex < 0) && ourNodeShouldExist )
{
//如果屏障锁线程对应的序列目录不存在,则抛出非法状态异常
if ( connectionLost.get() )
{
break; // connection was lost but we've reconnected. However, our ephemeral node is gone
}
else
{
throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName));
}
}
if ( children.size() == 1 )
{
//如果闭锁屏障为1, 且路径名不同,则抛出非法状态异常
if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) )
{
throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName));
}
//如果闭锁屏障为1, 且路径名相同,则
checkDeleteOurPath(ourNodeShouldExist);
break;
}
Stat stat;
boolean IsLowestNode = (ourIndex == 0);
if ( IsLowestNode )
{
String highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1));
//注册观察器
stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath);
}
else
{
String lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0));
stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath);
checkDeleteOurPath(ourNodeShouldExist);
ourNodeShouldExist = false;
}
if ( stat != null )
{
if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
{
result = false;
}
else
{
//超时等待
wait(thisWaitMs);
}
}
else
{
//等待
wait();
}
}
}
try
{
//删除锁路径
client.delete().forPath(readyPath);
}
catch ( KeeperException.NoNodeException ignore )
{
//忽略异常,即可重复删除
// ignore
}
return result;
}
private void checkDeleteOurPath(boolean shouldExist) throws Exception
{
if ( shouldExist )
{
client.delete().forPath(ourPath);
}
}
@VisibleForTesting
protected List<String> getChildrenForEntering() throws Exception
{
return client.getChildren().forPath(barrierPath);
}
private List<String> filterAndSortChildren(List<String> children)
{
Iterable<String> filtered = Iterables.filter
(
children,
new Predicate<String>()
{
@Override
public boolean apply(String name)
{
return !name.equals(READY_NODE);
}
}
);
ArrayList<String> filteredList = Lists.newArrayList(filtered);
Collections.sort(filteredList);
return filteredList;
}
}
从上面可以看出,分布式屏障闭锁DistributedDoubleBarrier,实际使用锁目录实现,分布式屏障锁成员进入屏障时,创建对应的临时序列子节点,待所有注册到锁路径的临时序列子节点到达后,清空所目录的所有临时目录, 即分布式屏障闭锁打开。
总结
分布式可重入锁InterProcessMutex主要的成员变量为锁路径basePath,持有分布式映射信息映射threadData(ConcurrentMap<Thread, LockData>),同时一个关键的锁实现LockInternals。
分布式可重入锁,主要提供的获取释放锁,和检查当前线程是否持有锁操作。
Revocable的作用,主要是在锁释放的时候,触发释放锁监听器RevocationListener,同时我们可以使用自己的执行器,触发相关监听器操作。
LockInternals主要的成员变量为客户端框架CuratorFramework,锁内部驱动LockInternalsDriver,检查锁路径是否可用观察器CuratorWatcher,唤醒所有等待锁线程观察器Watcher。
分布式事务锁的InterProcessMutex获取锁操作,首先检查当前线程是否持有锁,如果只有则则重入计数器自增,否则尝试获取锁,如果获取成功,则则将锁数据存放的锁信息映射中threadData(ConcurrentMap<Thread, LockData>),如果获取锁失败, 则等待锁释放,等待锁释放的过程,即注册锁路径是否可用观察器,是否可以获取锁的过程是查看是否可以创建锁目录下的临时序列目录,获取锁异常,则删除锁目录下的临时序列目录。
释放锁的关键,即删除锁路径
分布式屏障锁DistributedBarrier,主要根据路径的存在与否来决定屏障是否到达,当路径不存在时,解除屏障。创建路径成功,则到达屏障,等待屏障解除,在屏障路径已经创建,则忽略异常,及到达屏障。
分布式屏障闭锁DistributedDoubleBarrier,实际使用锁目录实现,分布式屏障锁成员进入屏障时,创建对应的临时序列子节点,待所有注册到锁路径的临时序列子节点到达后,清空所目录的所有临时目录, 即分布式屏障闭锁打开。