并发-构建基础模块


同步容器类

Vector , Hashtable , Collections.synchronizedXxx等工厂方法创建

这些类实现线程安全方式: 将它们的状态封装起来,并对每个共有方法都进行同步,是的每次只有一个线程能方法容器状态

  1. 同步容器类上复合操作需要加锁

     //使用客户端加锁的Vector上的复合操作
     public static Object getList(Vector list){
         //线程安全容器类的复合操作需要加锁(与容器类内部锁相同)保证线程安全
         synchronized (list) {
             int lastIndex = list.size() - 1;
             return list.get(lastIndex);
         }
     }
  2. 注意: 隐藏的迭代器

    hashCode 与 equals 等方法会间接的执行迭代操作

    containsAll 丶 removeAll 和 retainAll等方法,以及把容器作为参数的构造方法,都会对容器进行迭代

     //隐藏的迭代器
     public void toStringList(Vector list){
         //容器会进行隐藏的迭代操作,不能保证线程安全
         System.out.println(list);
     }

    并发容器

    通过并发容器(细粒度加锁)来代替同步容器,可以极大提高伸缩性并降低风险

ConcurrentHashMap , CopyOnWriteArrayList , ConcurrentLinkedQueue先进先出队列 , BlockingQueue 阻塞队列

  1. ConcurrentHashMap

    • JDK1.7使用分段锁,JDK1.8+使用CAS+synchronized
    • 当需要加锁Map进行独占访问时,应放弃使用ConcurrentHashMap
    • 如果要增加额外的复合原子操作,则不能使用客户端加锁机制,可以使用组合(详见对象的组合)(实现ConcurrentMap接口)
  2. CopyOnWriteArrayList

    • 读取时不加锁,写入和删除时加锁,每次修改时都会创建并重新发布一个容器副本
    • 只能保证数据的最终一致性,不能保证数据的实时一致性
    • 每当修改时都会复制底层数组,这需要一定开销,仅当迭代操作远远多于修改操作时,才应该使用
  3. 阻塞队列

    • LinkedBlockingQueue 和 ArrayBlockingQueue是FIFO队列 ; PriorityBlockingQueue是优先队列 ;
      BlockingDeque是双端队列(常用于工作窃取算法) ; SynchronousQueue没有容量的无缓冲队列,维护的是一组线程,因为没有存储功能,put与take方法会一直阻塞,直到另一个线程已经准备好参与到交付过程中,数据直接在配对的线程之间传递
    • 阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法,如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用,队列可以是有界的也可以是无界的,无界队列永远不会充满,因此无界队列上的put方法永远不会阻塞
    • 阻塞队列支持生产者-消费者模式,可以协调生产者和消费者等线程之间的控制流

      同步工具类

  4. 闭锁

    • CountDownLatch , 闭锁可以使一个线程等待其他线程完成各自的工作后再执行(例如确保某个计算在其所需的所有资源都被初始化之后才继续执行)

    • CountDownLatch是通过计数器实现的,计数器初始值为线程的数量,每当一个线程完成自己的任务后,计数器值就会减1,当计数器达到0时,闭锁上等待的线程就可以恢复执行任务

      //闭锁实例: 运动员等待一声命令比赛开始,所有运动员达到终点后比赛结束
      public void latchTest(){
        int players = 3;
        //begin闭锁初始化,计数值1
        CountDownLatch begin = new CountDownLatch(1);
        //end闭锁初始化,计数值2
        CountDownLatch end = new CountDownLatch(players);
        for(int i=0;i<players;++i){
            new Thread(() -> {
                try {
                    begin.await(); //begin闭锁,等待直到计数为0时,开始往后执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "Arrived");
                end.countDown(); //end闭锁,计数-1
            }).start();
        }
      
        System.out.println("the race begin");
        begin.countDown(); //begin闭锁,计数-1
        try {
            end.await(); //end闭锁,等待直到计数为0,才开始往后执行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("the race end");
      }
    • FutureTask实现闭锁,FutureTask可用于异步获取执行结果或取消执行任务的场景

      //使用FutureTask来提前加载稍后使用的数据
      class Preloader{
        ProductInfo loadProductInfo(){ return null; }
        private final FutureTask<ProductInfo> future = new FutureTask<>(() -> loadProductInfo() );
        private final Thread thread = new Thread(future);
        public void start(){ thread.start(); }
        public ProductInfo get(){
            ProductInfo info = null;
            try {
                info = future.get(); //等待FutureTask线程任务执行结束后返回执行结果
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                //所有异常都被封装到ExecutionException中,可在此进行处理返回详细异常
                e.printStackTrace();
            }
            return info;
      }
      
      interface ProductInfo{}
      }
  5. 信号量

    • Semaphore类是一个计数信号量,必须由获取它的线程释放,常用于控制同时访问特定资源的线程数目

    • 二值信号量,即初始值为1的Semaphore,可以用做互斥体(mutex),并具备不可重入的加锁语义

    • Semaphore可以实现资源池,有界阻塞容器等

      //使用Semaphore实现有界容器
      class BoundedHashSet<T>{
        private final Set<T> set;
        private final Semaphore semaphore;
      
        //构造函数,传入有界容器大小
        public BoundedHashSet(int bound) {
            this.set = Collections.synchronizedSet(new HashSet<>());
            this.semaphore = new Semaphore(bound);
        }
        //添加时先获取计数信号量(计数+1),如果添加失败则,释放
        public boolean add(T o) throws InterruptedException {
            boolean wasAdded = false;
            semaphore.acquire();
            try {
                wasAdded = set.add(o);
            } finally {
                if(!wasAdded) semaphore.release();
            }
            return wasAdded;
        }
        //删除成功时释放计数信号量(计数-1)
        public boolean remove(T o){
            boolean wasRemoved = set.remove(o);
            if(wasRemoved) semaphore.release();
            return wasRemoved;
        }
      }
  6. 栅栏

    • 栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置(await()位置),才能继续执行,闭锁用于等待事件(可以是这些线程以外的事件),而栅栏用于等待其他线程(只能在线程中)
    • 栅栏提供了reset方法,可以重复使用,闭锁不能重复使用
    • 另一种形式的栅栏是Exchanger , 它是两方栅栏,用于在各方的栅栏上交换数据
      //栅栏: CyclicBarrier简单实例
      public void barrierTest(){
        //创建一个栅栏对象,初始化为5
        CyclicBarrier barrier = new CyclicBarrier(5);
        for(int i=0;i<5;++i){
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "执行到栅栏位置");
                    barrier.await();    //设置栅栏,线程执行到此会等待
                                        //当其他所有设置栅栏的线程都到达栅栏位置,再往后执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "通过栅栏");
            }).start();
        }
      }
  7. Exchanger

    • 用于两个线程间信息交换,两个线程使用同一个Exchanger对象,V exchange(V x),x为需要交换的变量,返回结果为交换后的变量

    • 如果超过两个线程使用同一个Exchanger对象,执行exchange方法,得到的结果随机

      @Test
      public void echangerTest(){
        Exchanger<String> exchanger = new Exchanger<>();//使用同一个exchange
        new Thread(() ->{//线程1
            try {
                //传入交换前的值(Tom),返回交换后的值(Jerry)
                System.out.println("Tom交换后: "+exchanger.exchange("Tom"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
      
        new Thread(() ->{//线程2
            try {
                System.out.println("Jerry交换后: "+exchanger.exchange("Jerry"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
      }
      //输出
      //Tom交换后: Jerry
      //Jerry交换后: Tom

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
并发-取消与关闭 并发-取消与关闭
任务取消 取消 java中没有安全的将占方式来停止线程,只有一些协作式的机制 其中一种协作机制,设置某个”已请求取消标志”,而任务将定期的查看该标志,如果设置了这个标志,任务将提前结束,(对于存在阻塞方法的任务,可能永远不会检查到取消标志
2019-08-22
下一篇 
并发-对象的组合 并发-对象的组合
实例封闭 将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁 java类库中有很多线程封闭示例: Collections.synchronizedList及其类似方法,这些工厂方法通过
2019-08-18
  目录