Fork/Join并行计算框架

Fork/Join并行计算框架

简介

Fork/Join框架:在必要时将一个大任务进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个

小任务的运行结果进行join合并,一般都是在大数据搜索中使用

Fork/Join采用“工作窃取” 模式(work-stealing) :

当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中。在程序执行过程中如果有闲置线程,闲置线程会把其他线程的线程队列末尾的任务偷偷拿到自己的线程中执行,注意线程队列是双关,即队列里最前面的任务会被直接,最后的任务会被闲置线程拿走去执行。这种方式减少了线程的等待时间,提高了性能

avatar

具体使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Test1 {
@Test
public void test01() {
//需求:计算1-10000000L之和

//创建并发线程池对象
ForkJoinPool pool = new ForkJoinPool();
//提交任务并得到结果
Long sum = pool.invoke(new ForkJoinCalculateTask(0,10000000L));
System.out.println(sum);
}
}
//计算任务类
//class ForkJoinCalculateTask extends RecursiveAction{//无返回值的任务类
class ForkJoinCalculateTask extends RecursiveTask<Long>{//有返回值的任务类

private static final long serialVersionUID = 1975909145028788652L;
private long startNum;//开始数字
private long endNum;//结束数字
private static final long THRESHOLD = 10000;//临界值,每到10000就分成小任务

public ForkJoinCalculateTask(long startNum, long endNum) {
this.startNum = startNum;
this.endNum = endNum;
}

@Override
protected Long compute() {

long len = endNum-startNum;

if(len <= THRESHOLD){
long sum = 0;
for (long i = startNum; i <= endNum; i++) {
sum += i;
}
return sum;
}else{

long mid = (startNum+endNum)/2;

ForkJoinCalculateTask left = new ForkJoinCalculateTask(startNum, mid);
left.fork();//拆分成子任务并压入新队列

ForkJoinCalculateTask right = new ForkJoinCalculateTask(mid+1, endNum);
right.fork();//拆分成子任务并压入新队列

//把结果合并返回
return left.join() + right.join();
}
}
}