本文共 2444 字,大约阅读时间需要 8 分钟。
“分而治之”是处理大数据的方法,著名的MapReduce就是采用这种分而治之的思路,简单点说,如果要处理1000个数据,但是不具备处理1000个数据的能力,可以只处理10个数据,可以把1000个数据分阶段处理100次,每次处理10个,把100次的处理结果进行合成,形成最后这1000个数据。
把大任务调用fork()方法分解成小的任务,把小的任务结果进行join()合并为大任务的结果
系统还对ForkJoinPool线程池进行了优化,提交的任务数量与线程的数量不一定是一对一的关系,在多数情况下一个物理线程实际上需要处理多个逻辑任务。
ForkJoinPool就是线程池中最常用的方法就是:
publicForkJoinTask submit(ForkJoinTask task)
向线程池提交一个ForkJoinTask,支持fork()与Join()等待任务。ForkJoinTask有两个重要的子类RecursiveAction 和RecursiveTask,他们的区别在于RecursiveAction任务没有返回值,而RecursiveTask带有返回值
演示ForkJoinPool线程池的使用
package com;import java.util.ArrayList;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class Text10 { private static class Contask extends RecursiveTask{ private static final int Threshold=10000;//数据阈值 private static final int TaskSum=100;//定义每次把大人物分解成100个小任务 private long start;//计算起始值 private long end; public Contask(long start,long end) { this.start=start; this.end=end; } @Override protected Long compute() { long sum=0; //当数量超过阈值就继续分解 if(end-start list=new ArrayList<>(); long pos=start; for (int i = 0; i < TaskSum; i++) { long lastOne=pos+step;//每个任务结束位置 if(lastOne>end) { lastOne=end; } //创建子任务 Contask contask=new Contask(pos,lastOne); //把任务添加到集合 list.add(contask); //调用fork提交子任务 contask.fork(); //调整下个任务起始位置 pos+=step+1; //等待所有子任务结束后 计算结果 } for (Contask task:list) { sum+=task.join();//join会一直等待子任务执行完毕返回结果 } } return sum; } } public static void main(String[] args) throws ExecutionException, InterruptedException { //创建线程池 ForkJoinPool forkJoinPool=new ForkJoinPool(); //创建一个大的任务 Contask contask=new Contask(0L,200000L); //把大人物提交给线程池 ForkJoinTask result=forkJoinPool.submit(contask); System.out.println("结果为"+result.get()); }}
代码的意思是把0-20000之间的数字进行求和计算,把20000个数字分成了100组任务,每组任务进行2000个数字求和计算并提交,全部计算完返回结果值
转载地址:http://zfakz.baihongyu.com/