并发-任务执行


Executor框架

  1. Executors工厂类

    • newFixedThreadPool创建一个可重用的固定线程数的线程池,以共享的无界队列方式来运行这些线程,当活动线程到达固定数量时,再次加入任务会加入队列等待其他活动线程运行结束,再执行
    • newCachedThreadPool创建一个可缓存线程池大小不固定(基本大小为0,最大大小为Integer.MAX_VALUE),在使用缓存型池时,先查看池中有没有以前创建的线程,如果有就复用,没有则加入;会终止并从缓存中移除那些已有60秒未被使用的线程
    • newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行
    • newSingleThreadExecutor创建一个单线程化的线程池,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行
    • newWorkStealingPool实际上就是ForkJoinPool,默认并行级别是CPU数目,每个工作线程都维护着一个工作队列(是双端队列),使用工作窃取(work-stealing)算法,完成自己任务而处于空闲的工作线程会从其他仍然忙碌状态的工作线程处窃取等待执行的任务进行处理
    • 其他自定义类型的连接池可通过显示的ThreadPoolExecutor构造函数创建
  2. ExecutorService

    • ExecutorService是一个接口,继承了Executor接口,该接口扩展了一些生命周期的方法

      public interface ExecutorService extends Executor {
        void shutdown(); //平缓的关闭,停止接收新的任务,等待已经提交的任务执行结束(包括还未执行的任务)再关闭
        List<Runnable> shutdownNow();//粗暴的关闭,停止接收新的任务,并尝试停止所有运行中的任务,不在启动队列中尚未开始执行的任务
        boolean isShutdown(); //是否已经关闭
        boolean isTerminated();//若关闭后所有任务都已完成,则返回true;注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true
        /**
         * @param timeout 超时时间
         * @param unit 时间单位
         * 使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,
         * 若关闭则返回true,否则返回false.一般情况下会和shutdown方法组合使用.
         * (awaitTermination不会关闭ExecutorService,只是定时检测一下他是否关闭)
         */
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
      
        //...其他用于任务提交的便利方法
      }
  3. 示例

     //基于线程池的web服务器
     class TaskExecutionWebServer{
         private static final int NTHREADS = 100;
         //创建固定线程数的线程池
         private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
    
         public static void main(String[] args) throws IOException{
             ServerSocket socket = new ServerSocket(80);
             while(true){
                 final Socket connection = socket.accept();
                 Runnable task = () -> handleRequest(connection);
                 exec.execute(task); //将请求任务加入线程池,超过线程数的线程会被加入队列中等待
             }
         }
         private static void handleRequest(Socket connection) {
             // 请求处理
         }
     }
  4. 延迟任务与周期任务

    • Timer(存在缺陷,不推荐使用)负责管理延迟任务以及周期任务,然而Timer存在一些缺陷,执行所有定时任务时只会创建一个线程,Timer线程不会捕获异常,如果抛出了未检查异常则会导致线程终止,导致整个Timer线程任务取消;Timer对调度的支持是基于绝对时间的,而不是相对时间,所以对系统时间的改变非常敏感

    • ShceduledThreadPoolExecutor用来替代Timer,它基于相对时间(基于System.nanoTime实现相对时间,不会因系统时间的改变而改变)

      //错误的Timer行为(所有定时任务只会创建一个线程)
      public void timerTest() throws Exception {
        Timer timer = new Timer();
        timer.schedule(new ThrowTask(), 1); //定时任务1抛出异常
        Thread.sleep(1000);
        timer.schedule(new ThrowTask(), 1); //整个Timer线程终止(Timer already cancelled),定时任务2不在执行
        Thread.sleep(5000);
      }
      
      class ThrowTask extends TimerTask { //执行任务,抛出异常
        public void run() { throw new RuntimeException(); }
      }
  5. 携带结果的任务Callable与Future

    • Runnable和Callable描述的都是抽象的计算任务,但是Runnable没有返回值,Callable可以有返回值(Callable可以表示无返回值),并可能抛出一个异常

    • Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等(任务的生命周期只能前进,不能后退),其中get方法是可阻塞的

Fork/Join框架

  • Fork/Join框架是ExecutorService接口的一种具体实现,目的是为了更好地利用多个处理器,能够递归的拆解子任务(用新线程处理fork),需要实现其compute方法,在其中实现分割子任务

    • ForkJoinPool 的每个工作线程都维护着一个工作队列(是双端队列),使用工作窃取(work-stealing)算法,完成自己任务而处于空闲的工作线程会从其他仍然忙碌状态的工作线程处窃取等待执行的任务进行处理
    • 如果想对大任务进行分割成子任务,可以选择继承RecursiveTask(有返回值)或者RecursiveAction(无返回值),重写compute方法并在内部进行分割子任务,然后向ForkJoinPool线程池中提交自定义的ForkJoinTask任务即可
    • 提交ForkJoinTask任务时,invoke是同步执行,submit是异步执行
    • fork与invokeAll区别,fork是建立分支,直接将子任务交于其他线程处理,invokeAll会将其中第一个任务交给当前线程处理,其他的任务交于其他线程处理
  1. 示例: 多线程归并排序
@Test
public void forkJoinTest() throws Exception{
    //创建数组
    Integer[] array = new Random().ints(100000000,0,Integer.MAX_VALUE)
            .boxed().toArray(Integer[]::new);
    new Merge<>(array).parallelSort();
    //new Merge<>(array).sort();
    System.out.println(SortTemplate.isSorted(array));//判断是否有序
}
//使用Fork/Join进行归并排序
class Merge<E> extends RecursiveAction{
    private E[] elements;
    private E[] aux;//辅助数组
    private Comparator<E> comparator; //比较器
    private int lo;
    private int hi;

    public Merge(E[] elements, Comparator<E> comparator) {
        this.elements = elements;
        this.aux = (E[])new Object[elements.length];
        this.comparator = comparator;
        //准备数据
        this.lo = 0;
        this.hi = elements.length-1;
    }
    public Merge(E[] elements){
        this.elements = elements;
        this.aux = (E[])new Object[elements.length];
        this.comparator = (E x,E y) -> {
            if(x instanceof Comparable) return ((Comparable) x).compareTo(y);
            else throw new RuntimeException("需要传入Comparator或者实现Comparable");
        };
        //准备数据
        this.lo = 0;
        this.hi = elements.length-1;
    }
    //私有构造方法用于分支调用
    private Merge(E[] elements, E[] aux, Comparator<E> comparator, int lo, int hi) {
        this.elements = elements;
        this.aux = aux;
        this.comparator = comparator;
        this.lo = lo;
        this.hi = hi;
    }

    //单线程归并
    public void sort(){
        merge(elements,aux,lo,hi);
    }
    //多线程并行归并
    public void parallelSort(){
        ForkJoinPool forkJoin = (ForkJoinPool) Executors.newWorkStealingPool();
        try {
            forkJoin.submit(this).get();//提交任务并获取等待结束
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        forkJoin.shutdown();
    }

    @Override
    protected void compute() {
        if(hi - lo <= 10000) merge(elements,aux,lo,hi);
        else{
            //分成两部分,分别进行归并操作
            int mid = lo + (hi-lo)/2;
            Merge<E> mergeTask1 = new Merge<>(elements,aux,comparator,lo,mid);
            Merge<E> mergeTask2 = new Merge<>(elements,aux,comparator,mid+1,hi);
            //mergeTask1.fork(); //分支(异步): 子任务交给其他线程处理
            //mergeTask2.fork();
            //mergeTask1.join(); //获取(同步): 阻塞当前线程并等待获取结果
            //mergeTask2.join();
            invokeAll(mergeTask1,mergeTask2); //批量提交子任务,会阻塞到结果返回: 与fork不同,mergeTask1会分配给当前线程
            inPlaceMerge(elements,lo,mid,hi);
        }
    }

    //归并排序(对arr数组[lo,hi]部分进行归并排序)
    private void merge(E[] arr,E[] aux,int lo,int hi){
        for(int i=lo;i<=hi;++i)
            aux[i] = arr[i];
        for(int i=1,len=hi-lo+1;i<len;i<<=1)
            for(int j = lo;j<=hi-i;j += i<<1)
                inPlaceMerge(arr,j,j+i-1,Math.min(j+(i<<1)-1,hi));

    }
    private void inPlaceMerge(E[] elements,int lo,int mid,int hi){
        for(int i=lo;i<=hi;++i)
            aux[i] = elements[i];
        int i = lo,j = mid+1;
        for(int k=lo;k<=hi;++k) {
            if (i > mid) elements[k] = aux[j++];
            else if (j > hi) elements[k] = aux[i++];
            else if (((Comparable)aux[i]).compareTo(aux[j]) > 0) elements[k] = aux[j++];
            else elements[k] = aux[i++];
        }
    }
}
  1. 示例: 多线程快排
public class Quick<E> extends RecursiveAction{
    private E[] elements;
    private Comparator<E> comparator; //比较器
    private int lo;
    private int hi;

    public Quick(E[] elements, Comparator<E> comparator) {
        this.elements = elements;
        this.comparator = comparator;
        this.lo = 0;
        this.hi = elements.length - 1;
    }
    public Quick(E[] elements) {
        this.elements = elements;
        this.comparator = (E x,E y) -> {
            if(x instanceof Comparable) return ((Comparable) x).compareTo(y);
            else throw new RuntimeException("需要传入Comparator或者实现Comparable");
        };
        this.lo = 0;
        this.hi = elements.length - 1;
    }
    private Quick(E[] elements,int lo,int hi){
        this.elements = elements;
        this.lo = lo;
        this.hi = hi;
    }

    //快排:单线程
    public void sort(){ quickMedian(elements,lo,hi); }
    //快排:多线程并行
    public void parallelSort(){
        ForkJoinPool forkJoin = (ForkJoinPool) Executors.newWorkStealingPool();
        try {
            forkJoin.submit(this).get();//提交任务并获取等待结束
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        forkJoin.shutdown();
    }

    @Override
    protected void compute() {
        if(hi - lo <= 10000) quickMedian(elements,lo,hi);
        else{
            //分切
            int j = partitionMedian(elements,lo,hi);
            //剩余两部分作为两个分支任务放入ForkJoinPool
            Quick<E> left = new Quick<>(elements,lo,j-1);
            Quick<E> right = new Quick<>(elements,j+1,hi);
            invokeAll(left,right);
        }
    }

    //快速排序: 对[lo,hi]部分进行快速排序
    private void quickMedian(E[] arr,int lo,int hi){
        int len = hi-lo+1;
        if(len <= 10){ //1. 数组规模小于10时使用插入排序
            insertion(arr,lo,hi);
            return;
        }
        int j = partitionMedian(arr,lo,hi);
        quickMedian(arr,lo,j-1);
        quickMedian(arr,j+1,hi);
    }
    private int partitionMedian(E[] arr,int lo,int hi){
        int len = hi-lo+1;
        setMedian(arr,lo,hi); //2. 从lo,hi,中间处mid三个数中找出中位数与lo交换,最大数与hi交换
        E v = arr[lo]; //分切元素
        int i = lo, j = hi+1;
        while(true){
            while(((Comparable)v).compareTo(arr[--j]) < 0);
            while(((Comparable)arr[++i]).compareTo(v) < 0); // if(i == j) break; 取中位数时,每次最大数与hi交换,因此不需要进行边界判断
            if(i >= j) break;
            //交换
            exch(arr,i,j);
        }
        exch(arr,lo,j);
        return j;
    }
    private void setMedian(E[] arr,int lo,int hi){
        int mid = (lo+hi)/2;
        int max = lo;
        if(((Comparable)arr[max]).compareTo(arr[mid]) < 0) max = mid;
        if(((Comparable)arr[max]).compareTo(arr[hi]) < 0) max = hi;
        exch(arr,max,hi);
        if(((Comparable)arr[lo]).compareTo(arr[mid]) < 0) exch(arr,lo,mid);
    }
    private void exch(E[] arr,int i,int j){
        if(i == j) return;
        E temp = arr[i];
        arr[i] = arr[j];
        arr[j] = temp;
    }
    private void insertion(E[] arr,int lo,int hi){
        for(int i=lo+1;i<=hi;++i)
            for(int j=i;j>lo && ((Comparable)arr[j]).compareTo(arr[j-1]) < 0;--j)
                exch(arr,j,j-1);
    }
}

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
并发-线程池的使用 并发-线程池的使用
线程饥饿死锁 在线程池中,如果任务依赖于其他任务,那么可能产生死锁 饥饿: 一个线程在无限的等待另外一个或多个线程相互传递使用并且永不会释放的资源 死锁: 可以认为是两个线程或者进程在请求对方占有的资源 运行时间较长的任务 如果任务阻塞
2019-08-26
下一篇 
并发-取消与关闭 并发-取消与关闭
任务取消 取消 java中没有安全的将占方式来停止线程,只有一些协作式的机制 其中一种协作机制,设置某个”已请求取消标志”,而任务将定期的查看该标志,如果设置了这个标志,任务将提前结束,(对于存在阻塞方法的任务,可能永远不会检查到取消标志
2019-08-22
  目录