0xTTEPX

Just do it, deeply...

Follow me on GitHub

JUC包总结

write by donaldhan, 2018-02-01 16:02
JUC

引言

在 Java 5.0 提供了 java.util.concurrent(简称JUC)包, 主要有原子操作CAS,锁机制AQS(可重入锁,闭锁,屏障锁等), 并发集合(Map,Queue等),线程池(线程池执行器,调度线程池执行器和执行器)。

JUC

目录

CountDownLatch使用场景

CountDownLatch同步化的一个辅助工具,允许一个或多个线程等待,直到所有线程中执行完成 比如主线程计算一个复杂的计算表达式,将表达式分为多个子表达式在线程中去计算, 主线程要计算表达式的最后值,必须等所有的线程计算完子表达式计算,方可计算表达式的值; 再比如一个团队赛跑游戏,最后要计算团队赛跑的成绩,主线程计算最后成绩,要等到所有 团队成员跑完,方可计算总成绩。使用情况两种:第一种,所有线程等待一个开始信息号,当开始信息号启动时,所有线程执行,等待所有线程执行完;第二种,所有线程放在线程池中,执行,等待所有线程执行完,方可执行主线程任务方可执行主线程任务。

Lock和synchronized的性能的比较

在公平的锁上,线程按照他们发出请求的顺序获取锁,但在非公平锁上,则允许‘插队’:当一个线程请求非公平锁时,如果在发出请求的同时该锁变成可用状态,那么这个线程会跳过队列中所有的等待线程而获得锁。 非公平的ReentrantLock 并不提倡 插队行为,但是无法防止某个线程在合适的时候进行插队。在公平的锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中。非公平锁性能高于公平锁性能的原因:在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于锁被A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此B会再次尝试获取这个锁。与此同时,如果线程C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样就是一种双赢的局面:B获得锁的时刻并没有推迟,C更早的获得了锁,并且吞吐量也提高了。当持有锁的时间相对较长或者请求锁的平均时间间隔较长,应该使用公平锁。在这些情况下,插队带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)可能不会出现。

AQS详解-CLH队列,线程等待状态

AQS使用CAS原子操作,修改锁的状态state;待锁的线程被放入到等待队列(CLH队列)中,每个线程等待状态用NODE来描述。 NODE有共享模式和独占模式,独占模式为NULL。NODE有CANCELLED,SIGNAL,SIGNAL,PROPAGATE4中状态值。

  1. SIGNAL:节点的后继由于park等原因被阻塞,当节点释放锁或取消时,要unpark后继节点。为了避免竞争,acquire方法必须,首先检查他们是否需要唤醒后继节点,再原子获取锁,获成功,失败,阻塞。简单说,节点释放锁,是否需要唤醒后继节点
  2. CANCELLED:节点由于等待锁超时或者中断等原因,被取消,节点不会停留在这个状态。简单说,单节点处于这个状态,将被移除到等待队列

  3. CONDITION: 处于这个状态的节点线程,放在条件队列中。它永远不会被用作一个同步队列节点,直到等待的条件发生,节点将被转移到同步队列中。(这个状态与其他状态,没有关联,只是一种简化的机制)。

  4. PROPAGATE: 处于此模式下,释放共享锁具有传递性。头节点调用doReleaseShared方法,保证传递释放共享锁,即使有其他的操作干涉。这个时共享模式下的状态。

CLH队列由于虚头节点,队列中线程等待节点有一个前驱和一个后继节点,NODE有一个状态waitStatus,描述线程的当前状态,有一个线程field用于表示当前等待线程,同时还有nextWaiter节点,用于描述,节点时候有等待条件,或共享模式,获取锁时,需要通知其他线程。

Node nextWaiter:节点下一个等待条件或共享锁的节点。当线程持有独占锁时,只需要访问条件队列,所以我们只需要一个简单的连接队列,存储等待条件的线程。当他们转移到主队列时,可以重新获取锁。由于条件可以是互斥的,所以我们用,特殊的值,去表示共享模式。AQS有一个状态state表示锁的状态,一个CLH队列存放等待锁的线程节点。NODE还可以用于描述节点的等待条件节点线程,用nextWaiter去关联,组成的队列是条件队列。条件队列和等待队列并不冲突,当等待条件的线程被唤醒时,可以尝试获取锁,加入到等待对列。当一个等待队列节点线程获取独占锁时,可以访问条件队列,唤醒等待条件的线程。

AQS-Condition详解

ConditionObject是AQS的一个内部类,其实现是基于AQS;ConditionObject中的条件等待队列中的节点与同步队列中的节点基本相同,最大的不同时,同步等待队列中的节点有先驱pre和后继next,而条件等待队列中的节点只有后继nextWaiter。条件等待有两种方法,一种为非中断等待awaitUninterruptibly,在线程等待条件时不可中断线程;另外一种,可中断等待await,等待后,确定中断当前线程,还是抛出异常。非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。唤醒有两种,一种是唤醒等待条件队列中的头结点,另一种,唤醒条件等待队列中,所有等待此条件的节点线程。唤醒节点时,首先设置线程等待的节点状态的初始状态0,然后添加到同步等待队列,如果线程取消等待,则移除线程,否则通知节点前驱,当前驱节点线程释放锁时,unpark线程。在唤醒等待条件的线程时,为什么要将,节点移动同步等待线程节点能,这是因为条件时锁的一部分,先创建锁,再创建条件;当线程被唤醒时,只能说明,线程条件发生,能处于park状态,要获取锁,才能unpark线程。

可重入锁ReentrantLock详解

可重入自旋锁,当线程持有锁,可以多次获取锁,但最多只有2^31-1次;获取失败时,添加到同步等待队列自旋,直到获取锁成功;ReentrantLock关联一个同步锁SYNC,内部的SYNC是基于AQS实现的。同步锁SYNC有两种实现,公平锁与非公平锁;ReentrantLock默认创建的是非公平锁。比较非公平锁的尝试获取锁nonfairTryAcquire与公平锁TryAcquire的区别在于,非公平尝试获取锁时,如果锁为打开状态,则锁住锁;而公平锁,则先看有没有前驱节点,有前驱,则不能锁住锁,没有则可锁住。公平锁与公平锁lock,最大的不同是非公平锁,先以CAS的方式锁住锁,在进行acquire操作,而公平锁,直接acquire操作。acquire操作主要过程为,自旋,检查节点的前驱节点是否为头节点,如果是,当前节点为同步队列的第一个节点,则尝试获取锁,如果成功,设置头结点为当前节点,否则判断尝试获取锁失败,是否应该park,如果需要park,则park当前线程,park后,检查是否可中断当前线程,如果可,则中断当前线程。

CountDownLatch详解

CountDownLatch本质上是一个共享锁,是一个多功能的同步工具,可用被用于很多场景。当count初始化为1时,CountDownLatch可以作为一个简单的on/off闭锁,或者可以理解为一扇门:所有调用await的线程,等待这扇门被线程调用countDown打开。CountDownLatch初始化为N时,这种情况,可以用作一下场景:1.一个线程等待,直到N个线程完成工作或任务;2.一个任务被完成N次。CountDownLatch内部有一个基于AQS实现的共享锁,用SYNC的状态status,来表示,锁可以被多少个线程所共享,当锁被所有的线程打开countDown,则其他线程可以获取锁。在锁没有被完全打开之前,其他线程,自旋,尝试获取共享锁,在这个过程中,线程可能被park,或者中断。

CyclicBarrier详解

屏障点思想,当每个线程完成任务时,自旋等待条件Condition trip,释放共享锁,count减1;当线程代的最后一个线程到达屏障点时,唤醒线程代中所有等待的线程, 如果有action,执行action,然后创建下一代线程。如果在线程代未结束之前,有等待线程中断或超时,则结束当前代,唤醒所有等待线程,重置count为parties。

Semaphore详解

信号量,维持着一个许可集。如果许可集中,无许可,线程acquire,将会阻塞,直到其他线程释放许可。线程每一次释放#release,则添加一个许可,潜在地 释放一个阻塞信号获取者。信号量的许可,实际上并不是一对象,仅仅保证一定数量的虚拟许可证。信号量经常被用于,只有一定数量的线程访问一些物理或逻辑资源。比如用信号量控制池对象的获取。当信号量被初始化为1时,作为互斥锁,可以用于最多只有一个 permit可以用的场景。这种方式比较有名的一种是二进制信号量,因为它只有两种状态,1表示可利用,0表示无permits可利用。二进制信号量,有一个属性,锁可以被其他非持有锁的线程释放。这种特性在一些特殊的上下文场景中,比较拥有,比如恢复死锁。信号量中的锁有公平和非公平方式,这个和我们前面讲的可重入锁,有点相似。非公平方式,获取锁,首先检查锁是否可用,可用则获取,公平锁获取锁,先检查有没有前驱节点,有则等待,没有则获取。获取锁的方法,有多种,有公平的和非公平,则个要看需求。

ReadWriteLock实现ConcurrentMap

ReadWriteLock需要严格区分读写操作,如果读操作使用了写入锁,那么降低读操作的吞吐量,如果写操作使用了读取锁,那么就可能发生数据错误。另外ReentrantReadWriteLock还有以下几个特性:

  • 公平性 非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞, 直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。
  • 重入性 读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。当然了只有写线程释放了锁,读线程才能获取重入锁。写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。另外读写锁最多支持65535个递归写入锁和65535个递归读取锁。
  • 锁降级 写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。
  • 锁升级 读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁尝试获取写入锁而都不释放读取锁时就会发生死锁。
  • 锁获取中断 读取锁和写入锁都支持获取锁期间被中断。这个和独占锁一致。
  • 条件变量 写入锁提供了条件变量(Condition)的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个UnsupportedOperationException异常。
  • 重入数 读取锁和写入锁的数量最大分别只能是65535(包括重入数)。

ReentrantReadWriteLock详解一

Sync是ReentrantReadWriteLock实现读写锁的基础,Sync是基于AQS的实现,内部有一个各读锁计数器(ThreadLocal),每个线程拥有自己的读写计数器,存储着线程持有读锁的数量。同时有一个缓存计数器,用于记录当前线程拥有的读锁数量。有一个firstReader用于记录第一个获取读锁的线程,firstReaderHoldCount记录第一个获取读锁线程的持有锁数量。SYNC将锁的状态int state分成两部分,分为高16位和低16位,高位表示共享读锁,低位是独占写锁;所以读锁和写锁的最大持有量为65535。线程只有在没有线程持有读锁且写锁状态为打开,即state为打开状态,或当前线程持有写锁的数量小于65535的情况下,获取写锁成功,否则失败。线程只有在其他线程没有持有写锁,且读锁的持有数量未达到65535,或当前线程持有写锁且没有读锁的持有数量未达到65535(即锁降级),则当前线程获取读锁成功,否则自旋等待。

ReentrantReadWriteLock详解后续

ReentrantReadWriteLock的锁机制的实现通过内部SYNC,而SYNC是基于AQS的实现。SYNC有两种实现公平和非公平两个版本。公平锁时先判断队列中是否有线程在等待,有则阻塞获取读锁和写锁。而非公平锁,则直接获取写锁;获取读锁时,判断等待队列第一个节点线程是不是在等待独占模式线程,即如果等待队列的第一个节点线程,在等待写锁,则阻塞锁的获取,否则不阻塞。ReentrantReadWriteLock默认为非公平锁可以提高吞吐量,ReentrantReadWriteLock的构造带有一个公平性参数,用于控制内部锁机制是公平锁还是非公平锁。ReadLock和WriteLock都是通过SYNC来实现,在构造函数中通过ReentrantReadWriteLock,将锁sync交给ReadLock和WriteLock。ReadLock是共享模式锁和我们前面讲的CountDownLatch原理较像,WriteLock是独占锁与ReentrantLock的原理较像。 ReadLock和WriteLock还可以创建Condition,用于控制共享锁和独占锁获取时的条件等待。

Map的简单实现AbstractMap

AbstractMap提供了一个Map的简单实现,不过没有提供put的entrySet方法,这个待子类扩展。Key视图set的所有操作都是与AbstractMap相关联,可以理解,直接是AbstractMap的操作;Value的视图Collection,同样。不可变SimpleImmutableEntry,与一般的SimpleEntry的区别是Value为Final修饰,同时SetValue方法不可用。

HashMap详解

HashMap中有一个Entry数组table用于存储Entry,size描述的是Map中元素的大小,threshold描述的是扩容的临界条件,loadFactor是扩容因子(loadFactor>0),计算临界条件threshold(capacityloadFactor)的。那么容量就是table.length(capacity),也就是数组的大小。换句话说,如果存取的Entry的达到了整个容量threshold,那么就需要扩充容量了。在HashMap中每次扩容就是将扩大数组的一倍,使数组大小为原来的两倍。在初始化Map时,给的容量最好是2的N次方,在构造中我们,看到了怎么回事。扩充过程会导致元素数据的所有元素进行重新hash计算,这个过程也叫rehash。显然这是一个非常耗时的过程,否则扩容都会导致所有元素重新计算hash。因此尽可能的选择合适的初始化大小是有效提高HashMap效率的关键。太大了会导致过多的浪费空间,太小了就可能会导致繁重的rehash过程。在这个过程中loadFactor也可以考虑。举个例子来说,如果要存储1000个元素,采用默认扩容因子0.75,那么1024显然是不够的,因为1000>0.751024了,所以选择2048是必须的,显然浪费了1048个空间。如果确定最多只有1000个元素,那么扩容因子为1,那么1024是不错的选择。另外需要强调的一点是扩容因此越大,从统计学角度讲意味着链表的长度就也大,也就是在查找元素的时候就需要更多次的循环。所以凡事必然是一个平衡的过程。这里可能有人要问题,一旦我将Map的容量扩大后(也就是数组的大小),这个容量还能减小么?比如说刚开始Map中可能有10000个元素,运行一旦时间以后Map的大小永远不会超过10个,那么Map的容量能减小到10个或者16个么?答案就是不能,这个capacity一旦扩大后就不能减小了,只能通过构造一个新的Map来控制capacity了。

put操作:如果key为null则放在Entry table的index为0的上面,如果存在null key则替换旧值;如果key不为null,则获取key的hash值,然后再获取Entry在table中的index,如果对应 的index上有Key对应的Entry,则替换旧值返回,否则添加新的Entry到table的index对应的链表中。当添加Entry时的size达到临界条件,则扩容table为原来的两倍,重新hash旧table中Entry对应的key,并定位Entry应该存放的桶索引index,放到新table中。如果在扩容是旧容量已经达到最大容量,则扩容失败;添加新的Entry到table的index对应的桶链表时,将Entry放在链表头,next指向原始的链表头。 get操作:get操作首先判断key是否为null,如果为null,则获取table索引为0的链表中key为null的值 否则先获取key的hash值,在获取key对应的table index,遍历table的index对应的Entry链表,找出与key相同的Entry,返回Entry对应的值。

遍历:Key视图和Value视图的实现是在Entry的基础上,所以我们遍历HashMap的时候最好用EntrySet。

ConcurrentHashMap解析-Segment

ConcurrentHashMap是线程安全的,可并发访问,不允许key或value的值为null,默认的容量为16,负载因子为0.75,并发访问量为16。ConcurrentHashMap中有一个Segment数组,默认数组大小为16,Segment中有一个HashEntry数组类似于HashMap中的table,Segment继承了可重入锁ReentrantLock,Segment的修改其hash table的操作都要使用lock。Segment的put操作,首先尝试获取锁,如果获取锁失败,则Key在片段hash table中的索引,遍历索引对应的Hash Entry链,如找不到key对应HashEntry,则创建一个HashEntry,这都是在尝试次数小于最大尝试次数MAX_SCAN_RETRIES情况下,MAX_SCAN_RETRIES默认为2。这样做的目的是为确保,进行put操作时,仍持有锁。然后定位key在片段table中的索引,并放在链头,如果实际size达到临界条件,则重新hash,创建2倍原始容量的hash table,重新建立hash table。Segment的remove操作,首先尝试获取锁失败,则继续尝试获取锁,在获取锁的过程中,定位key在片段table的HashEntry链表索引,遍历链表,如果找到对应HashEntry,则移除,如果尝试次数超过最大尝试次数,则lock,则遍历链表,找到对应的HashEntry,则移除。replace与remove的基本思路相同,唯一的区别是,当替换值时,修改计数要自增1。put,remove和replace操作是锁住片段table中,key对应的索引HashEntry链表,而Clear为锁住整个table。ConcurrentHashMap通过将所有HashEntry分散在不同的Segment,及锁机制实现了并发访问。

ConcurrentHashMap解析后续

put,remove,replace操作来看,基本思路相同,先定位定位Segment,再将操作委托给Segment相应的操作。get操作是先定位定位Segment,再遍历key对应的HashEntry链表,找到,则返回值。视图基本思路为定位片段,在定位片段的hash table的Hash Entry链,遍历完当前片段,再遍历下一个片段的Hash table。

AbstractQueue简介

AbstractQueue的add,remove,element方法分别基于offer,poll,peek的实现,但是当队列为null时,抛出异常,而不是返回false或null。offer,poll,peek,并没有实现待子类扩展。清空,循环poll,直到为空。addAll为循环遍历集合元素,add到队列。

ConcurrentLinkedQueue解析

ConcurrentLinkedQueue一个基于链接节点线程安全的单向无界队列。队列的元素顺序为FIFO。队列的头部,是在队列上时间最久的元素。队列的尾元素是在队列中时间最短的元素。新元素添加到队列的尾部,队列获取元素,从队列头部获取ConcurrentLinkedQueue适用于多个线程需要同时访问一个相同集合的场景。与大多数并发集合一样,不允许插入null元素。Iterators是弱一致性,只反映了队列在某一点的状态,比如创建iterator的时间点。Iterators不会抛出异常,可以处理其他的并发操作。在队列创建Iterators时,队列中的元素,全都在Iterators中。不像大多数的集合,size操作时间复杂度不是一个常量,因为队列异步操作的天性,决定了遍历队列元素时,有可能其他线程修改队列,导致最终size的数量可能不准确。另外addAll,removeAll,retainAll,containsAll,equals,toArray都不能保证原子性。比如遍历操作和addAll操作同时发生,可能会看到一些相同的新增元素。

LinkedBlockingQueue解析

LinkedBlockingQueue是一个线程安全的阻塞并发队列,队列的顺序为FIFO,队列的中节点包装者原始元素E,有一个后继链接,所以队列是单向的,队列的队头head和队尾节点last是傀儡节点,元素为null。队列有两把锁一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。队列中有一个AtomicInteger类型count,用于记录当前队列中元素的个数。

put操作首先以可中断方式获取锁,如果成功,则判断队列是否已满,如果队列已满,则等待队列非满条件notFull,否则,添加元素节点到队列,再次判断判断队列是否已满,如果没满,则唤醒一个等待notFull条件的put线程;释放putLock。如果添加元素成功,获取takeLock锁,成功,则唤醒一个等待队列为notEmpty的take线程。put操作和offer操作的区别时,put是先获取putLock锁,再判断队列是否已满,已满则等待notFull条件;而offer是先判断队列是否已满,如果已满,则返回false,未满则获取putLock,后续操作相同。从分析来看,在我们向队列中添加元素时,如果使用offer,当队列已满的情况下,我们需要重新将元素放入队列,而put不需要我们再次这样操作,当队列满时,等待队列nullFull条件。具体选哪一种,根据具体的场景去选择。offer(E e, long timeout, TimeUnit unit)与put(E e,)更像,区别在于当超时offer获取putLock锁成功后,如果队列已满,则超时等待notFull条件。

take操作,首先获取当前队列容量计数器,并以可中断方式获取takeLock,获取锁成功,则判断队列是否为空,如果为空,则等待notEmpty条件,否则从队头取出元素,容量计数器减1,如果队列中还有元素,则唤醒一个等待非空条件的take线程;如果在take前,队列容量已满,则成功take后,唤醒等待notFull条件的put线程。超时poll方法,与take方法的区别在于,当队列为null,进行超时等待。poll方法,与taket方法的最大区别为先检查先检查队列是否为空,为空,则返回null;不为空,剩下的操作与take相同。具体选哪一种,根据具体的场景去选择。
peek操作首先检查队列是否为空,为空,则返回null,否则获取takeLock锁,取出队头元素,并返回。

remove方法需要获取takeLock和putLock锁,遍历队列,比较元素是否相等,相等则移除,则唤醒一个等待put的线程,最后释放takeLock和putLock锁。为什么要获取两把锁呢,主要防止在移除的过程中,有线程消费元素,或生产元素,带来的不缺定性结果。contain操作与remove思路一样。 drainTo操作首先获取takeLock锁,从队头take,n个元素,并添加集合中,如果成功移除元素,则唤醒等待put的线程。

在上面所有的操作中,我们看所有的唤醒都是signal而不是signalAll,那么为什么不总是使用signalAll替换signal呢? 假设有N个线程在条件队列中等待,调用signalAll会唤醒所有线程,然后这N个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果N比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用signal呢?所以某些情况下使用signal的性能是要高于signalAll的。如果满足下面的条件,可以使用单一的signal取代signalAll操作: 相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait条件发生时,执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。

ArrayBlockingQueue解析

ArrayBlockingQueue是一个有界的线程安全FIFO队列,队列元素放在一个元素数组中,一个takeIndex,表示下一个take,poll,peek,remove元素的数组index。一个putIndex,表示下一个put, offer, or add元素的数组index,一个int Count,表示当前队列元素数量,一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两个条件都是由ReentrantLock创建。

put操作首先检查元素是否为null,然后以可中断方式,获取锁,如果队列已满,等待notFull条件,否则插入元素;putIndex索引自增,容量count自增,唤醒等待消费的线程。offer操作与put操作的区别,当队列满时,返回false;超时offer与put操作没有太大的区别,区别是当队列满时超时等待notFull条件,插入成功,则返回true。具体选择何中操作,视具体的场景需求。

take操作首先以可中断方式获取锁,当队列为空,等待非空条件,否则返回队列中takeIndex,索引对应的元素,akeIndex索引自增,容量count自减,唤醒等待notFull条件的线程。超时poll的唯一区别是当当队列为空,超时等待非空条件。poll操作与take操作的区别为当队列为空,返回null。具体选择何中操作,视具体的场景需求。peek操作首先获取锁,如果队列为空,则返回null,否则,返回takeIndex所对应的元素。

remove操作,首先获取锁,遍历队列,找到元素相等,移除。

ArrayBlockingQueue与LinkedBlockingQueue区别是LinkedBlockingQueue中元素以节点链表来存储,而ArrayBlockingQueue是放在数组中;LinkedBlockingQueue中有两把锁分别为put和take锁,读写分离,而ArrayBlockingQueue只有一把锁控制take和put。

PriorityBlockingQueue解析

offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。无论是add,put,还是超时offer操作,都是委托给offer操作。

take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。 poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。 peek操作获取锁,返回队头元素。 remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。 序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。

SynchronousQueue解析上-TransferStack

SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程,尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法,生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用,remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。当时非公平策略用的是TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作线程的描述,从TransferStack中Snode节点可以看出:节点关联一个等待线程waiter,后继next,匹配节点match,节点元素item和模式mode;模式由三种,REQUEST节点表示消费者等待消费资源,DATA表示生产者等待生产资源。FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源。当有线程take/put操作时,查看栈头,如果是空队列,或栈头节点的模式与要放入的节点模式相同;如果是超时等待,判断时间是否小于0,小于0则取消节点等待;如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时);如果匹配返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点;匹配成功,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回返回当前节点元素)。如果与栈头节点的模式不同且不为FULFILLING,匹配节点,成功者,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA(put)模式返回返回当前节点元素。如果栈头为FULFILLING,找出栈头的匹配节点,栈头与匹配到的节点同时出栈。从分析非公平模式下的TransferStack,可以看出一个REQUEST操作必须同时伴随着一个DATA操作,一个DATA操作必须同时伴随着一个REQUEST操作,这也是同步队列的命名中含Synchronous原因。这也应了这句话,SynchronousQueue像一个管道,一个操作必须等待另一个操作的发生。

SynchronousQueue解析下-TransferQueue

TransferQueue在执行take/put操作时,首先根据元素是否判断当前节点的模式,如果元素为null则为REQUEST(take)模式,否则为DATA模式(put)。然后自旋匹配节点,如果队列头或尾节点没有初始化,则跳出本次自旋,如果队列为空,或当前节点与队尾模式相同,自旋或阻塞直到节点被fulfilled;如果队列不为空,且与队头的模式不同,及匹配成功,出队列,如果是REQUEST操作,返回匹配到节点的元素,如果为DATA操作,返回当前节点元素。TransferQueue相对于TransferStack来说,操作匹配过程更简单,TransferStack为非公平策略下的实现LIFO,TransferQueue是公平策略下的实现FIFO。TransferQueue中的QNODE与TransferStack的SNODE节点有所不同处理后继next,等待线程,节点元素外,SNODE还有一个对应的模式REQUEST,DATA或FULFILLING,而QNODE中用一个布尔值isData来表示模式,这个模式的判断主要根据是元素是否为null,如果为null,则为REQUEST(take)模式,否则为DATA模式(put)。SynchronousQueue根据构造公平参数,确定transferer为TransferStack还是TransferQueue,默认为TransferStack,SynchronousQueue的put/offer和take/poll统一委托给transferer,即通过TransferStack和TransferQueue的transfer(Object e, boolean timed, long nanos) 方法。由于同步队列一个take伴随着一个put,反之亦然,所有队列总是为空,所以size为0.剩余容量为0,peek返回false,contains返回false,remove返回false。

DelayQueue解析

DelayQueue内在一个优先级队列PriorityQueue,用于存放元素,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;入队列后,检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程。add,put,超时offer操作都是委托给offer操作。
take操作首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;自旋的过程为获取队头元素,如果队头为null,则等待available条件,如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,如果未过期,且leader不为null,则等待available条件,如果未过期,且leader为null,则选举当前线程为leader,以元素的delay为超时时间,超时等待触发available条件,超时时间过后触发available条件,最后判断当前线程是否为leader,如果当前线程成为leader,超时等待后,take成功,则重置leader为null;在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,而超时poll为超时等待。poll操作为如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素。peek和remove,clear,size操作直接委托给内部优先级队列。drainTo操作是有peek和poll操作协作完成。

JAVA集合类简单综述

CopyOnWriteArrayList主要使用可重入锁ReentrantLock保证线程安全,在add和remove操作中有加锁,而get没有,即读写分离;用volatile保证数组元素的可见性。 CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的。 CopyOnWriteArraySet/List是jdk1.5以后的线程安全的List与Set的实现。UnmodifiableSet不可变集合的实现是通过委托UnmodifiableCollectionfinal修饰集合类保证不可变性。SynchronizedSet等线程安全的集合类是SynchronizedCollection的内置同步对象(final Object mutex)实现线程安全。SynchronizedSet/List/Map是jdk1.2之后,jdk1.5之前的线程安全集合的实现Vector是jdk1.0的list的线程安全版本,主要通synchronized修饰方法保证线程安全。

FutureTask解析

FutureTask内部关联一个同步器Sync,Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。取消操作,首先自旋设置任务线程运行状态为CANCELLED,如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,释放锁,做一些任务执行结束工作(默认为空)。FutureTask的相关操作主要通过Sync来完成。

ExecutorCompletionService解析

ExecutorCompletionService内部关联一个执行器AbstractExecutorService和 一个阻塞的任务完成队列,默认为LinkedBlockingQueue。当提交任务,则包装成QueueingFuture,QueueingFuture 扩展了FutureTask,重写done方法,即在任务执行结束时,添加任务执行结果到完成队列。 而take,poll,超时poll直接委托给完成队列。

AbstractExecutorService解析

无论是提交Runnable任务,还是Callable都是创建FutureTask执行任务,然后执行,返回结果。执行Callable任务集,遍历任务集合,创建相应的RunnableFuture任务,并添加到结果集;遍历结果集,等待所有任务执行完。超时执行Callable任务集,与非超时执行任务集不同的点是,第一点:在每次执行任务,判断是否超时,超时则返回结果集;第二点:在等待线程任务结束时,为超时等待。invokeAny的任务集,主要通ExecutorCompletionService去执行,当有任务执行结束时,获取执行结果,并取消其他任务。

ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)

ThreadPoolExecutor的变量主要有核心线程池数量corePoolSize和最大线程池数量maximumPoolSize,即在当前任务线程数大于核心线程数量时,是否(allowCoreThreadTimeOut)允许空闲任务线程等,保活keepAliveTime时间,等待新任务的到来。一个线程工厂ThreadFactory用于创建任务线程,一个拒绝任务处理器RejectedExecutionHandler,默认的拒绝任务策略为AbortPolicy,抛出运行时异常,当然还有直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,还有调用者执行任务策略CallerRunsPolicy。上面的变量为volatile,以便线程池执行操作时,可以使用最新的变量。 一个阻塞的任务队列final BlockingQueue workQueue,阻塞队列可以为Linked,Array,Delay,SynchronousQueue等阻塞类型,具体可以根据场景选择。默认为LinkedBlockingQueue队列,一般判断队列是否为空,用isEmpty方法,LinkedBlockingQueue一般用于任务相互之间独立,没有交叉,可独立执行。如果用SynchronousQueue,则可用poll方法判断,同步队列一般用于任务之间有依赖的关系的场景,一个任务执行依赖于另一个任务的结果。DelayQueue队列用于定时任务。ArrayBlockingQueue队列一般可以用于 资源有限情况,可以避免资源被耗尽。一个AtomicInteger的ctl用于包装线程状态runState和任务线程数workerCount;低29位保存任务线程数,高两位用于存储线程池状态,线程池状态已用有四种RUNNING,SHUTDOWN ,STOP,TIDYING ,TERMINATED。

  • RUNNING:接受新的任务,处理队列任务;
  • SHUTDOWN:不在接受新的任务,处理队列任务;
  • STOP:不在接受新任务,不处理队列任务,中断正在执行的任务线程;
  • TIDYING:所有的任务已经结束,任务线程为0,线程转换到TIDYING;
  • TERMINATED:线程池已结束,即terminated()方法执行完。
  • 线程的状态是可以数字化比较的。

一个任务线程集final HashSet workers,largestPoolSize记录线程池的最大任务线程数,completedTaskCount为完成任务计数器,在任务线程结束时,更新。一个可重入锁mainLock,用于保护非线程安全的变量如workers,largestPoolSize,completedTaskCount。 一个等待线程池结束条件termination,用于控制超时等待线程池关闭。

ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)

默认的线程池拒绝任务处理策略AbortPolicy,直接抛出RejectedExecutionException;直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,调用者执行任务CallerRunsPolicy。DiscardOldestPolicy和CallerRunsPolicy都是在线程池没关闭时,策略才生效,否则关闭直接丢弃任务。拒绝策略都为ThreadPoolExecutor的内部类。

默认的线程工厂DefaultThreadFactory为Executors的内部类, 用于创建线程,工厂创建分组相同的线程,交由执行器执行。如果有java.lang.SecurityManager,则用System#getSecurityManager线程组,否则用调用者的线程组。创建的新线程为非守护模式,优先级在 MIN_PRIORITY和MAX_PRIORITY之间,默认为NORM_PRIORITY。可以通过Thread#getName获取线程name,默认为pool-N-thread-M,N为线程池编号,M为线程编号。 Worker包装了任务线程,主要是为了维护中断控制状态和其他次要状态记录,及任务的执行。Worker同时继承了AQS,在任务线程执行前lock,任务执行完unlock。加锁的目的主要是保护任务线程的执行,线程池唤醒一个任务线程等待任务,而不是中断当前正在执行任务的线程去执行任务。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的目的是以防在任务执行的过程,线程池控制方法的改变,对任务线程执行的影响,比如setCorePoolSize方法。另外为了防止任务线程在实际执行前被中断,我们初始化锁状态为-1,在runWorker方法中,我们会清除它。runWorker执行任务时,首先释放锁,此时锁打开,允许中断,如果线程池正在stop,确保线程池已中断,否则做执行前工作,执行任务,做执行后工作,如果任务被中断,则工作线程数量减1;如果任务完成,则更新完成任务数量,从工作任务集中移除工作线程,尝试结束线程池。尝试结束线程池,首先检查线程池运行状态如果为运行中,关闭但任务队列不为空,或线程池工作线程为0,任务队列为空,则直接返回;否则查看工作线程是否为0,不为0,则根据onlyOne参数确定中断多少空闲线程,如果onlyOne为true,中断一个,否则中断所有空闲线程。

ThreadPoolExecutor解析三(线程池执行提交任务)

执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。

ThreadPoolExecutor解析四(线程池关闭)

关闭线程池,首先检查工作线程运行时访问权限,更新线程状态为SHUTDOWN,中断空闲工作线程,最后尝试关闭线程池。立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。我们可以用awaitTermination来等待任务执行完。

ScheduledThreadPoolExecutor解析一(调度任务,任务队列)

ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为任务在队列的索引。调度线程池执行器用DelayedWorkQueue来存储调度任务,DelayedWorkQueue与延时队列DelayedQueue有点像,一个可重入锁控制队列的并发访问,一个available条件控制队列中是否有任务可用,leader为当前正在等待队列头任务可用(队列不为空,队列头任务过期)的线程,当队列不为空或leader被释放,才会触发available条件。DelayedWorkQueue是特为存放ScheduledFutureTask调度任务而定制的。

ScheduledThreadPoolExecutor解析二(任务调度)

从调度线程池执行器的构造来看,核心线程池数量是必须设置的,线程工厂和拒绝策略可选,默认最大线程池数量为 Integer.MAX_VALUE,保活时间为0,即不存在空闲的任务线程, 任务队列为DelayedWorkQueue。

scheduleAtFixedRate方法首先根据任务command和任务执行系统时间,及任务间隔时间period,构造调度任务,简单包装调度任务,延时执行调度任务, 延时执行调度任务。在延时执行调度任务时,添加任务到延时DelayedWorkQueue,同时添加一个空任务工作线程,空任务工作线程执行时, 如果任务为null,则从任务队列中取任务。调度任务的执行,如果任务为ScheduledFutureTask,在运行的时候,从新计算任务下一次执行的系统时间,重置任务线程状态为READY,添加任务到队列。

scheduleWithFixedDelay与scheduleAtFixedRate不同点在构造ScheduledFutureTask时间间隔为-delay。时间间隔p为正,以固定的频率调度任务即scheduleAtFixedRate,每隔p时间执行一次任务,无论上一次任务是否执行完,具体任务能否执行,调度线程池无法保证,这要看是否有工作线程可用;当时间间隔p为负,以固定的间隔时间调度任务,即scheduleWithFixedDelay, 当前任务执行完后,等待p时间,再执行下一个任务。

ScheduledThreadPoolExecutor解析三(关闭线程池)

关闭操作,与线程池执行器的关闭基本相同,不同的是,在onShutdown方法,调度线程池执行器,重写了这个方法,这个方法主要是根据线程池关闭间歇性任务和延时任务的处理策略,确定是否以不可中断方式取消任务。

Executors解析

Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法创建的线程池执行器适合需要执行大量执行时间短的异步任务场景。单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。newScheduledThreadPool返回的调度执行器为ScheduledThreadPoolExecutor。不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。

总结

JUC包装的所有锁,信号量等实现都是基于AQS和Unsafe,锁的模式由公平和非公平两种,非公平锁可以提供系统的吞度量。对于读写锁,写锁可以降级为读锁,反之不可,同时线程只有在没有线程持有读锁且写锁状态为打开,即state为打开状态,或当前线程持有写锁的数量小于65535的情况下,获取写锁成功,否则失败。线程只有在其他线程没有持有写锁,且读锁的持有数量未达到65535,或当前线程持有写锁且没有读锁的持有数量未达到65535(即锁降级),则当前线程获取读锁成功,否则自旋等待。

ConcurrentHashMap有Segments数组组成,并发访问数为Segments的数量,而每个Segments内部是一个HashEntry数组,ConcurrentHashMap是节点线程安全的。 ConcurrentLinkedQueue是基于Unsafe的节点线程安全的。LinkedBlockingQueue内部两把锁,一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。ArrayBlockingQueue是一个有界的线程安全FIFO队列,内部有一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两个条件都是由ReentrantLock创建。 SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。SynchronousQueue同步队列没有容量,可以说,队列内始终为空。DelayQueue内部同样是由一把锁ReentrantLock用于访问控制,一个available条件预示队列中是否有元素可用。

任务调度执行器ScheduledThreadPoolExecutor的scheduleWithFixedDelay方法和scheduleAtFixedRate不方法最大的不同是,scheduleWithFixedDelay需要当前任务执行完,,等待延时时间,再执行下一个任务,而scheduleAtFixedRate不需要等待当前任务执行完,待延时时间到达后,无论当前任务有没有执行完,立即执行下一个任务。Executors为创建线程池执行器的辅助类,创建固定线程池,实际为ThreadPoolExecutor任务队列为LinkedBlockingQueue;创建单线程执行器,任务队列为LinkedBlockingQueue。newCachedThreadPool方法,创建的线程池执行器,任务队列为SynchronousQueue,此方法创建的线程池执行器适合需要执行大量执行时间短的异步任务场景。