
Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。fork-join任务图但是Fork/Join并行计算框架,并不是银弹,并不能解决所有应用程序在超多核心处理器上的并发问题。
其本质上就是分治法的典型实现,将一个复杂的任务分解为若干个子任务,然后结合各个子任务,形成最终的结果信息。
比如计算1+2+。fork-join任务图。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:

言而总之,就是分治法的经典实现案例。
2. 工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:


那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
3. 主要的组成类
Fork/Join使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。其本身也实现了ExecutorService的接口,提供了类似Executor的服务。
3. 案例介绍

计算给定函数y=1/x 在定义域 [1,100]上围城与X轴围成的面积,计算步长0.01
首先我们来分析一下, 计算面积是目标,我们按照步长0.01的维度来做为计算单元;每个步长所产生的面积由两个部分组成: 一个矩形和一个三角形。 矩形的边长为: 0.01(宽), 1/(x+0.01) (x为起点坐标)。在矩形上方有一个三角形,其面积为(1/x - 1/(x+0.01), 另一个边为0.01, 具体信息如下所示:

4. 算法设计
我们选定RecursiveTask做为定义的对象实体,其中定义了我们最为核心的逻辑,且我们需要分解task之后的单个结果;然后将其放入ForkJoinPool队列中。
在这里,我们需要充分利用ForkJoin的特性,对任务进行分解和迭代。
5. 代码实现
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 计算给定函数 y=1/x 在定义域 [1,100]上围城与X轴围成的面积,计算步长0.01
*
* @author junfengchen
*
*/
public class AreaCalc {
public static class NodeArea extends RecursiveTask<Float> {
private static final long serialVersionUID = -8437583391117009271L;
public static final float THRESHOLD = 0.01f;
public static final int SLIC_NUM = 10;
public static AtomicInteger counter = new AtomicInteger(0);
private float startLoc = 0.0f;
private float endLoc = 0.0f;
public NodeArea(float startLoc, float endLoc) {
this.startLoc = startLoc;
this.endLoc = endLoc;
}
@Override
protected Float compute() {
float sum = 0.0f;
boolean canCompute = (endLoc - startLoc)<= THRESHOLD;
if (canCompute) {
sum = calc(startLoc, endLoc);
counter.incrementAndGet();
System.out.println(counter + "--> calc result between " + (startLoc ) + " -- " + (endLoc));
}
else {
float step = (endLoc - startLoc)/SLIC_NUM;
for (int i=0; i<SLIC_NUM; i++) {
NodeArea task = new NodeArea(startLoc + i*step, startLoc+ (i+1)*step);
task.fork();
sum += task.join();
}
}
return sum;
}
private float calc(float startXLoc, float endXLoc) {
float rectArea = 1/(endXLoc) * THRESHOLD;
float trigleArea = 0.5f * (1/startXLoc - 1/endXLoc)*0.01f;
return rectArea + trigleArea;
}
}
public static void main(String[] args) {
NodeArea nodeA1 = new NodeArea(1, 100);
ForkJoinPool pool = new ForkJoinPool();
Future<Float> result = pool.submit(nodeA1);
long startPoint = System.currentTimeMillis();
try {
System.out.println("Final Area:" + result.get());
System.out.println("The whole process consumes " + (System.currentTimeMillis() - startPoint) + " ms");
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
代码执行结果展示:

这里有个问题,是关于计算次数;常规来说基于0.01的步长,从1到100,总共应该计算9900次,但是由于精度的问题,在某些情况下会出现在某一个区间多次计算的问题,比如如下图示:
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/jisuanjixue/article-40536-1.html
新老板当然有些失落
婚姻制度是封建社会的残余