同步容器类
Vector , Hashtable , Collections.synchronizedXxx等工厂方法创建
这些类实现线程安全方式: 将它们的状态封装起来,并对每个共有方法都进行同步,是的每次只有一个线程能方法容器状态
同步容器类上复合操作需要加锁
//使用客户端加锁的Vector上的复合操作 public static Object getList(Vector list){ //线程安全容器类的复合操作需要加锁(与容器类内部锁相同)保证线程安全 synchronized (list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } }
注意: 隐藏的迭代器
hashCode 与 equals 等方法会间接的执行迭代操作
containsAll 丶 removeAll 和 retainAll等方法,以及把容器作为参数的构造方法,都会对容器进行迭代
//隐藏的迭代器 public void toStringList(Vector list){ //容器会进行隐藏的迭代操作,不能保证线程安全 System.out.println(list); }
并发容器
通过并发容器(细粒度加锁)来代替同步容器,可以极大提高伸缩性并降低风险
ConcurrentHashMap , CopyOnWriteArrayList , ConcurrentLinkedQueue先进先出队列 , BlockingQueue 阻塞队列
ConcurrentHashMap
- JDK1.7使用分段锁,JDK1.8+使用CAS+synchronized
- 当需要加锁Map进行独占访问时,应放弃使用ConcurrentHashMap
- 如果要增加额外的复合原子操作,则不能使用客户端加锁机制,可以使用组合(详见对象的组合)(实现ConcurrentMap接口)
CopyOnWriteArrayList
- 读取时不加锁,写入和删除时加锁,每次修改时都会创建并重新发布一个容器副本
- 只能保证数据的最终一致性,不能保证数据的实时一致性
- 每当修改时都会复制底层数组,这需要一定开销,仅当迭代操作远远多于修改操作时,才应该使用
阻塞队列
- LinkedBlockingQueue 和 ArrayBlockingQueue是FIFO队列 ; PriorityBlockingQueue是优先队列 ;
BlockingDeque是双端队列(常用于工作窃取算法) ; SynchronousQueue没有容量的无缓冲队列,维护的是一组线程,因为没有存储功能,put与take方法会一直阻塞,直到另一个线程已经准备好参与到交付过程中,数据直接在配对的线程之间传递 - 阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法,如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用,队列可以是有界的也可以是无界的,无界队列永远不会充满,因此无界队列上的put方法永远不会阻塞
- 阻塞队列支持生产者-消费者模式,可以协调生产者和消费者等线程之间的控制流
同步工具类
- LinkedBlockingQueue 和 ArrayBlockingQueue是FIFO队列 ; PriorityBlockingQueue是优先队列 ;
闭锁
CountDownLatch , 闭锁可以使一个线程等待其他线程完成各自的工作后再执行(例如确保某个计算在其所需的所有资源都被初始化之后才继续执行)
CountDownLatch是通过计数器实现的,计数器初始值为线程的数量,每当一个线程完成自己的任务后,计数器值就会减1,当计数器达到0时,闭锁上等待的线程就可以恢复执行任务
//闭锁实例: 运动员等待一声命令比赛开始,所有运动员达到终点后比赛结束 public void latchTest(){ int players = 3; //begin闭锁初始化,计数值1 CountDownLatch begin = new CountDownLatch(1); //end闭锁初始化,计数值2 CountDownLatch end = new CountDownLatch(players); for(int i=0;i<players;++i){ new Thread(() -> { try { begin.await(); //begin闭锁,等待直到计数为0时,开始往后执行 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "Arrived"); end.countDown(); //end闭锁,计数-1 }).start(); } System.out.println("the race begin"); begin.countDown(); //begin闭锁,计数-1 try { end.await(); //end闭锁,等待直到计数为0,才开始往后执行 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("the race end"); }
FutureTask实现闭锁,FutureTask可用于异步获取执行结果或取消执行任务的场景
//使用FutureTask来提前加载稍后使用的数据 class Preloader{ ProductInfo loadProductInfo(){ return null; } private final FutureTask<ProductInfo> future = new FutureTask<>(() -> loadProductInfo() ); private final Thread thread = new Thread(future); public void start(){ thread.start(); } public ProductInfo get(){ ProductInfo info = null; try { info = future.get(); //等待FutureTask线程任务执行结束后返回执行结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { //所有异常都被封装到ExecutionException中,可在此进行处理返回详细异常 e.printStackTrace(); } return info; } interface ProductInfo{} }
信号量
Semaphore类是一个计数信号量,必须由获取它的线程释放,常用于控制同时访问特定资源的线程数目
二值信号量,即初始值为1的Semaphore,可以用做互斥体(mutex),并具备不可重入的加锁语义
Semaphore可以实现资源池,有界阻塞容器等
//使用Semaphore实现有界容器 class BoundedHashSet<T>{ private final Set<T> set; private final Semaphore semaphore; //构造函数,传入有界容器大小 public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<>()); this.semaphore = new Semaphore(bound); } //添加时先获取计数信号量(计数+1),如果添加失败则,释放 public boolean add(T o) throws InterruptedException { boolean wasAdded = false; semaphore.acquire(); try { wasAdded = set.add(o); } finally { if(!wasAdded) semaphore.release(); } return wasAdded; } //删除成功时释放计数信号量(计数-1) public boolean remove(T o){ boolean wasRemoved = set.remove(o); if(wasRemoved) semaphore.release(); return wasRemoved; } }
栅栏
- 栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置(await()位置),才能继续执行,闭锁用于等待事件(可以是这些线程以外的事件),而栅栏用于等待其他线程(只能在线程中)
- 栅栏提供了reset方法,可以重复使用,闭锁不能重复使用
- 另一种形式的栅栏是Exchanger , 它是两方栅栏,用于在各方的栅栏上交换数据
//栅栏: CyclicBarrier简单实例 public void barrierTest(){ //创建一个栅栏对象,初始化为5 CyclicBarrier barrier = new CyclicBarrier(5); for(int i=0;i<5;++i){ new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "执行到栅栏位置"); barrier.await(); //设置栅栏,线程执行到此会等待 //当其他所有设置栅栏的线程都到达栅栏位置,再往后执行 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "通过栅栏"); }).start(); } }
Exchanger
用于两个线程间信息交换,两个线程使用同一个Exchanger对象,V exchange(V x),x为需要交换的变量,返回结果为交换后的变量
如果超过两个线程使用同一个Exchanger对象,执行exchange方法,得到的结果随机
@Test public void echangerTest(){ Exchanger<String> exchanger = new Exchanger<>();//使用同一个exchange new Thread(() ->{//线程1 try { //传入交换前的值(Tom),返回交换后的值(Jerry) System.out.println("Tom交换后: "+exchanger.exchange("Tom")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() ->{//线程2 try { System.out.println("Jerry交换后: "+exchanger.exchange("Jerry")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } //输出 //Tom交换后: Jerry //Jerry交换后: Tom