1 synchronized与RenntrantLock
1.1 简介
synchronized关键字是一种最简单的并发控制方法,它决定了一个线程是否可以访问临界区资源。同时,Object.wait()和Object.notify()方法起到了线程等待和线程通知(唤醒)的作用。
可重入锁ReentrantLock提供了一种显式的操作过程,需要手动为临界区资源加锁,并需要手动将锁释放。ReentrantLock提供了几个重要的方法:
- lock:获得锁,如果锁已经被占用,则等待
- lockInterruptibly():获得锁,但优先响应中断
- tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回
- tryLock(long time , TimeUnit unit):在给定时间内尝试获得锁
- unlock():释放锁
- ReentrantLock(Boolean fair):是否为公平锁。公平锁不会产生饥饿现象,只要排队等待,最终可以获得等待的资源(获得锁,并进入临界区)
1.2 区别
- 实现层面:synchronized基于JVM实现,ReentrantLock基于JDK实现,代码层面实现
- 中断锁等候:ReentrantLock的lockInterruptibly()方法可以中断锁的等候
- 定时锁等待:tryLock(long time , TimeUnit unit)可以定时锁的等待时间
- 公平锁:ReentrantLock锁可以设置是否为公平锁
- 锁的等待与唤醒(通知):synchronized的等待与唤醒是通过Object.wait()和Object.notify()实现,ReentrantLock的等待与唤醒是通过Conditions对象的await()和signal()方法实现。
2 锁
2.1 Semophore 信号量
(1)描述:Semophore可以指定多个线程同时访问某一个临界区资源。
(2)构造函数
public Semophore(int permits)
public Semophore(int permits, Boolean fair)
(3)重要方法
public void acquire():尝试获得一个准入许可
public void acquireUninterruptibly():尝试获得一个准入许可,但不响应中断
public boolean tryAcquire():尝试获得准入许可,成功true,失败false
public boolean tryAcquire(long time, TimeUnit unit):设置等待时间
public void release():释放一个许可
2.2 ReadWriteLock 读写分离锁
(1)描述:ReadWriteLock适用于读操作远大于写操作的场景,读-读操作时不会阻塞线程,减少锁竞争,提高系统性能。
(2)重要方法
ReentrantReadWriteLock类提供的readLock()和writeLock()方法来提供读写锁
2.3 CountDownLatch 倒计时器
(1)描述:CountDownLatch用于控制线程的等待,一般用于某个线程A等待若干(count)其他线程执行任务之后,A才执行
(2)构造函数
public CountDownLatch(int count)
(3)重要方法
public void countdown():通知CountDownLatch,一个线程已经执行完毕,count-1
public void await():要求当前线程等待所有count个检查任务(count个线程)全部完成后,才继续执行
2.4 CyclicBarrier 循环栅栏
(1)描述:CyclicBarrier一般用于一组线程相互等待至某个状态,然后这组线程再同时执行
(2)构造函数
public CyclicBarrier(int parties, Runnable barrierAction)
parties为计数总数,也就是参与的线程总数
(3)重要方法
CyclicBarrier.await()方法表示等待parties线程都到达某一个状态后,才一直执行,会进行下一次计数,当这个计数到了parties后就是执行所有线程。
3 线程池
3.1 Executor框架提供的线程池工厂方法
public static ExecutorService newFixedThreadPool(int nThread):固定线程数量的线程池
public static ExecutorService newSingleThreadExecutor():只有一条线程的线程池
public static ExecutorService newCachedThreadPool():可根据实际情况调整线程数量的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor():线程池数量为1,可以根据时间需要对线程进行调度
public static ScheduledExecutorServic newScheduledThreadPool(int corePoolSize):线程池数量可以指定,可以根据时间需要对线程进行调度
备注:
(1)newFixedThreadPool:该线程池数量始终不变,当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲,便处理在任务队列中的任务。(LinkedBlockingQueue阻塞队列)
(2)newSingleThreadExecutor:若多个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。(LinkedBlockingQueue阻塞队列)
(3)newCachedThreadPool:线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池中进行复用。(SynchronousQueue阻塞队列)
3.2 ThreadPoolExecutor类
Public ThreadPoolExecutor(
int corePoolSize, //线程池中的线程数量
int maximumPoolSize, //线程池中最大的线程数量
Long keepAliveTime, //当线程池数量超过corePoolSize时,多余的空闲线程的存活时间
TimeUnit unit, //keepalivetime的单位
BlockingQueue<Runnable> workQueue, //任务队列,被提交但是尚未被执行的任务
ThreadFactory threadFactory, //线程工厂,用于创建线程,一般用默认的即可
RejectExecutionHandle handler //拒绝策略。当任务太多来不及处理,如何拒绝任务(线程池中的线程数量已经达到maximumPoolSize并且等待队列已经满了)
)
3.3 任务队列 BlockingQueue
- SynchronousQueue:直接提交队列,newCachedThreadPool()。提交的任务不会被真实地保存,而总是将新的任务提交到线程执行,如果没有空闲的线程,则尝试创建新的线程,若干线程数量已经达到最大值,则执行拒绝策略。
- ArrayBlockingQueue:有界的任务队列,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。
- LinkedBlockingQueue:无界的任务队列,newFixedThreadPool()、newSingleThreadExecutor()。若有新的任务提交,如果系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续有新的任务提交,而又没有空闲的线程资源,则任务直接进入队列等待。
- PriorityBlockingQueue:优先任务队列。带有执行优先级的队列
4 JDK并发容器
4.1 ConcurrentHashMap
(1)描述:线程安全的HashMap集合,高效的并发HashMap
(2)问题:HashTable也是线程安全的Key-Value集合,HashTable与ConcurrentHashMap的区别?
- 两者主要是在性能上的差异,HashTable的所有操作都会锁住整个对象,虽然能够保证线程安全,但是性能上较差;ConcurrentHashMap内部使用Segment数组,每个Segment类似于HashTable,在“写”线程或者部分特殊的“读”线程中锁住的是整个Segment对象,其他的线程能够并发执行其它的Segment对象。
(3)实现:(锁粒度小)
ConcurrentHashMap的内部进一步细分了若干个小的HashMap,称之为段(Segment),默认情况下,一个ConcurrentHashMap被进一步细分为16个段(默认的数组大小为16)。
如果需要在ConcurrentHashMap中增加一个新的表项,并不是将整个HashMap加锁,而是首先根据hashCode得到该表项应该被存放在哪个段中,然后对该段进行加锁,并完成put操作。在多线程环境中,如果多个线程同时进行put操作,只要被加入的表项不存放在同一个段中,则线程间便可以做到真正的并行。
问题:如果系统需要取得全局锁时,其消耗的资源会比较多,因为必须获得所有segment段的锁,才能进行后续的全局性的操作,例如获得ConcurrentHashMap的size()。
4.2 CopyOnWriteArrayList
(1)描述:实现高效读取的线程安全ArrayList,内部在实现“读-写/读”操作时不会阻塞线程,只有在“写-写”操作时才会进行阻塞。
(2)实现:
“读-写”操作时,不需要阻塞线程,对共享资源用关键字volatile修饰,实现资源的透明,当发现写操作的时候会从原始数据copy出一份数据副本,“写”操作只对数据副本进行操作,完成写操作后再将副本数据替换原始数据,实现“写”操作。
4.3 ConcurrentLinkedQueue
线程安全的高并发队列Queue,其内部使用链表作为其数据结构
4.4 BlockingQueue
多线程之间数据的共享通道,Blocking是阻塞的意思,当服务线程(服务线程指不断获取队列中的消息,进行处理的线程)处理完成队列中所有的消息后,它会让服务线程在队列为空时,进行等待,当有新的消息进入队列后,自动将线程唤醒。(其等待唤醒使用可重入锁ReentrantLock的Condition对象的await()和signal()方法来实现)
4.5 ConcurrentSkipListMap
跳表,实现对元素的快速查找。
Collections工具类
(1)线程安全List
public static List<String> list = Collections.synchronizedList(new LinkedList<String>());
(2)线程安全Map
public static Map<K, V> map = Collections.synchronizedMap(new HashMap<K, V>());
(3)线程安全SortedSet
public static SortedSet<String> sortedSet = Collections.synchronizedSortedSet(new SortedSet<String>());
(4)线程安全SortedMap
public static SortedMap<K, V> sortedMap = Collections.synchronizedSortedMap(new SortedMap<K ,V>());
5 Java并发包
java.util.concurrent包下的类为线程安全类。该路径下有一些常用的类,如下:
ConcurrentHashap
CopyOnWriteArrayList
CopyOnWriteArraySet
Semaphore信号量
CountDownLatch倒计时器
CyclicBarrier循环栅栏
Executors、ExecutorService、ThreadPoolExecutor(线程池相关)
BlockingQueue(SynchronousQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue)
java.util.concurrent.atomic(AtomicBoolean、AtomicInteger、AtomicLong,无锁基于CAS实现)
java.util.concurrent.locks(Lock可重入锁、Condition可重入锁的等待await和唤醒signal、ReadWriteLock读写分离锁、ReentrantReadWriteLock)