• Home
  • About
    • back
    • 谢志平 photo

      谢志平

      程序员

    • Learn More
    • Email
    • Github
  • Posts
    • back
    • All Posts
    • All Tags
  • Projects

JDK并发

14 Sep 2018

Reading time ~1 minute

1 synchronized与RenntrantLock

1.1 简介

synchronized关键字是一种最简单的并发控制方法,它决定了一个线程是否可以访问临界区资源。同时,Object.wait()和Object.notify()方法起到了线程等待和线程通知(唤醒)的作用。

可重入锁ReentrantLock提供了一种显式的操作过程,需要手动为临界区资源加锁,并需要手动将锁释放。ReentrantLock提供了几个重要的方法:

  • lock:获得锁,如果锁已经被占用,则等待
  • lockInterruptibly():获得锁,但优先响应中断
  • tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回
  • tryLock(long time , TimeUnit unit):在给定时间内尝试获得锁
  • unlock():释放锁
  • ReentrantLock(Boolean fair):是否为公平锁。公平锁不会产生饥饿现象,只要排队等待,最终可以获得等待的资源(获得锁,并进入临界区)

1.2 区别

  • 实现层面:synchronized基于JVM实现,ReentrantLock基于JDK实现,代码层面实现
  • 中断锁等候:ReentrantLock的lockInterruptibly()方法可以中断锁的等候
  • 定时锁等待:tryLock(long time , TimeUnit unit)可以定时锁的等待时间
  • 公平锁:ReentrantLock锁可以设置是否为公平锁
  • 锁的等待与唤醒(通知):synchronized的等待与唤醒是通过Object.wait()和Object.notify()实现,ReentrantLock的等待与唤醒是通过Conditions对象的await()和signal()方法实现。

2 锁

2.1 Semophore 信号量

(1)描述:Semophore可以指定多个线程同时访问某一个临界区资源。

(2)构造函数

public Semophore(int permits)
public Semophore(int permits, Boolean fair)

(3)重要方法

public void acquire():尝试获得一个准入许可
public void acquireUninterruptibly():尝试获得一个准入许可,但不响应中断
public boolean tryAcquire():尝试获得准入许可,成功true,失败false
public boolean tryAcquire(long time, TimeUnit unit):设置等待时间
public void release():释放一个许可

2.2 ReadWriteLock 读写分离锁

(1)描述:ReadWriteLock适用于读操作远大于写操作的场景,读-读操作时不会阻塞线程,减少锁竞争,提高系统性能。

(2)重要方法

ReentrantReadWriteLock类提供的readLock()和writeLock()方法来提供读写锁

2.3 CountDownLatch 倒计时器

(1)描述:CountDownLatch用于控制线程的等待,一般用于某个线程A等待若干(count)其他线程执行任务之后,A才执行

(2)构造函数

public CountDownLatch(int count)

(3)重要方法

public void countdown():通知CountDownLatch,一个线程已经执行完毕,count-1
public void await():要求当前线程等待所有count个检查任务(count个线程)全部完成后,才继续执行

2.4 CyclicBarrier 循环栅栏

(1)描述:CyclicBarrier一般用于一组线程相互等待至某个状态,然后这组线程再同时执行

(2)构造函数

public CyclicBarrier(int parties, Runnable barrierAction)
parties为计数总数,也就是参与的线程总数

(3)重要方法

CyclicBarrier.await()方法表示等待parties线程都到达某一个状态后,才一直执行,会进行下一次计数,当这个计数到了parties后就是执行所有线程。

3 线程池

3.1 Executor框架提供的线程池工厂方法

public static ExecutorService newFixedThreadPool(int nThread):固定线程数量的线程池
public static ExecutorService newSingleThreadExecutor():只有一条线程的线程池
public static ExecutorService newCachedThreadPool():可根据实际情况调整线程数量的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor():线程池数量为1,可以根据时间需要对线程进行调度
public static ScheduledExecutorServic newScheduledThreadPool(int corePoolSize):线程池数量可以指定,可以根据时间需要对线程进行调度

备注:

(1)newFixedThreadPool:该线程池数量始终不变,当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲,便处理在任务队列中的任务。(LinkedBlockingQueue阻塞队列)

(2)newSingleThreadExecutor:若多个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。(LinkedBlockingQueue阻塞队列)

(3)newCachedThreadPool:线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池中进行复用。(SynchronousQueue阻塞队列)

3.2 ThreadPoolExecutor类

Public ThreadPoolExecutor(

    int corePoolSize,       //线程池中的线程数量

    int maximumPoolSize,  //线程池中最大的线程数量

    Long keepAliveTime,  //当线程池数量超过corePoolSize时,多余的空闲线程的存活时间

    TimeUnit unit,     //keepalivetime的单位

    BlockingQueue<Runnable> workQueue,  //任务队列,被提交但是尚未被执行的任务

    ThreadFactory threadFactory,   //线程工厂,用于创建线程,一般用默认的即可

    RejectExecutionHandle handler   //拒绝策略。当任务太多来不及处理,如何拒绝任务(线程池中的线程数量已经达到maximumPoolSize并且等待队列已经满了)

)

3.3 任务队列 BlockingQueue

  • SynchronousQueue:直接提交队列,newCachedThreadPool()。提交的任务不会被真实地保存,而总是将新的任务提交到线程执行,如果没有空闲的线程,则尝试创建新的线程,若干线程数量已经达到最大值,则执行拒绝策略。
  • ArrayBlockingQueue:有界的任务队列,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。
  • LinkedBlockingQueue:无界的任务队列,newFixedThreadPool()、newSingleThreadExecutor()。若有新的任务提交,如果系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续有新的任务提交,而又没有空闲的线程资源,则任务直接进入队列等待。
  • PriorityBlockingQueue:优先任务队列。带有执行优先级的队列

4 JDK并发容器

4.1 ConcurrentHashMap

(1)描述:线程安全的HashMap集合,高效的并发HashMap

(2)问题:HashTable也是线程安全的Key-Value集合,HashTable与ConcurrentHashMap的区别?

  • 两者主要是在性能上的差异,HashTable的所有操作都会锁住整个对象,虽然能够保证线程安全,但是性能上较差;ConcurrentHashMap内部使用Segment数组,每个Segment类似于HashTable,在“写”线程或者部分特殊的“读”线程中锁住的是整个Segment对象,其他的线程能够并发执行其它的Segment对象。

(3)实现:(锁粒度小)

ConcurrentHashMap的内部进一步细分了若干个小的HashMap,称之为段(Segment),默认情况下,一个ConcurrentHashMap被进一步细分为16个段(默认的数组大小为16)。

如果需要在ConcurrentHashMap中增加一个新的表项,并不是将整个HashMap加锁,而是首先根据hashCode得到该表项应该被存放在哪个段中,然后对该段进行加锁,并完成put操作。在多线程环境中,如果多个线程同时进行put操作,只要被加入的表项不存放在同一个段中,则线程间便可以做到真正的并行。

问题:如果系统需要取得全局锁时,其消耗的资源会比较多,因为必须获得所有segment段的锁,才能进行后续的全局性的操作,例如获得ConcurrentHashMap的size()。

4.2 CopyOnWriteArrayList

(1)描述:实现高效读取的线程安全ArrayList,内部在实现“读-写/读”操作时不会阻塞线程,只有在“写-写”操作时才会进行阻塞。

(2)实现:

“读-写”操作时,不需要阻塞线程,对共享资源用关键字volatile修饰,实现资源的透明,当发现写操作的时候会从原始数据copy出一份数据副本,“写”操作只对数据副本进行操作,完成写操作后再将副本数据替换原始数据,实现“写”操作。

4.3 ConcurrentLinkedQueue

线程安全的高并发队列Queue,其内部使用链表作为其数据结构

4.4 BlockingQueue

多线程之间数据的共享通道,Blocking是阻塞的意思,当服务线程(服务线程指不断获取队列中的消息,进行处理的线程)处理完成队列中所有的消息后,它会让服务线程在队列为空时,进行等待,当有新的消息进入队列后,自动将线程唤醒。(其等待唤醒使用可重入锁ReentrantLock的Condition对象的await()和signal()方法来实现)

4.5 ConcurrentSkipListMap

跳表,实现对元素的快速查找。

Collections工具类

(1)线程安全List

public static List<String> list = Collections.synchronizedList(new LinkedList<String>());

(2)线程安全Map

public static Map<K, V> map = Collections.synchronizedMap(new HashMap<K, V>());

(3)线程安全SortedSet

public static SortedSet<String> sortedSet = Collections.synchronizedSortedSet(new SortedSet<String>());

(4)线程安全SortedMap

public static SortedMap<K, V> sortedMap = Collections.synchronizedSortedMap(new SortedMap<K ,V>());

5 Java并发包

java.util.concurrent包下的类为线程安全类。该路径下有一些常用的类,如下:

ConcurrentHashap
CopyOnWriteArrayList
CopyOnWriteArraySet
Semaphore信号量
CountDownLatch倒计时器
CyclicBarrier循环栅栏
Executors、ExecutorService、ThreadPoolExecutor(线程池相关)
BlockingQueue(SynchronousQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue)
java.util.concurrent.atomic(AtomicBoolean、AtomicInteger、AtomicLong,无锁基于CAS实现)
java.util.concurrent.locks(Lock可重入锁、Condition可重入锁的等待await和唤醒signal、ReadWriteLock读写分离锁、ReentrantReadWriteLock)


并发编程 Share Tweet +1