并行流水线

现在要生产一批小玩偶.小玩偶的制作分为四个步骤:

1.组装身体;2.在身体上安装四肢和头部;3.给组装完成的玩偶穿上一件漂亮的衣服;4.包装出 货。

为了加快制作进度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严 重的依赖关系。如果没有身体,就没有地方安装四肢;如果没有组装完成,就不能穿衣服; 如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是毫无意义的。

但是,如果你现在要制作的不是1个玩偶,而是1万个玩偶,那情况就不同了。你可以找四个人,第一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装头部 和四肢,完成后交付第三人;第三人只负责穿衣服,完成后交付第四人:第四人只负责包 装发货。

这样所有人都可以一起工作,共同完成任务,而整个时间周期也能缩短到原来的 14左右,这就是流水线的思想。一旦流水线满载,每次只需要一步(假设一个玩偶需要四 步)就可以产生一个玩偶

clipboard.png

在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势.

示例:

P1:A=B+C P2:D=A×B P3:D=D/2 。上述步骤中的P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 信息载体
public class Msg {

public double i;
public double j;
public String orgStr = null;

}
// p1加法
public class Plus implements Runnable {

public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();

@Override
public void run() {
while (true) {
Msg msg = null;
try {
msg = blockingDeque.take();
msg.i = msg.j + msg.i;
Multiply.blockingDeque.add(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// p2乘法
public class Multiply implements Runnable {
public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();

@Override
public void run() {
while (true) {
Msg msg = null;
try {
msg = blockingDeque.take();
msg.i = msg.j * msg.i;
Div.blockingDeque.add(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// p3除法
public class Div implements Runnable {
public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();

@Override
public void run() {
while (true) {
Msg msg = null;
try {
msg = blockingDeque.take();
msg.i = msg.i / 2;
System.out.println(msg.orgStr + "=" + msg.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 主程序
public class PStreamMain {
public static void main(String args[]) {

new Thread(new Plus()).start();
new Thread(new Multiply()).start();
new Thread(new Div()).start();

for (int i = 0; i <= 1000; i++) {
for (int j = 0; j <= 1000; j++) {
Msg msg = new Msg();
msg.i = i;
msg.j = j;
msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2";
Plus.blockingDeque.add(msg); // 开启流水线的计算
}
}
}
}

并性搜索

就是将数据分段进行检索;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 适用场景:有很多系统变量需要查询,或者查询redis这样数据量大得list
* web适用场景比较少,但是作为平台软件是很常用的
*/
public class SearchDemo {
//定义我们需要查询的无序数组
static int[] arr={1,22,2,3,4,5,344,6,7,8,10,9};
//定义线程池数据,已经存放结果的Result
static ExecutorService pool= Executors.newCachedThreadPool();
static final int thread_num=2;
static AtomicInteger result=new AtomicInteger(-1);//初始值定位-1

public static int search(int searchValue ,int beginPos,int endPos){
int j=0;
for (j = beginPos; j <endPos ; j++) {
if (result.get()>=0){
return result.get();
}

if (arr[j]==searchValue){
//如果设置失败,表示其他现场已经先找到了
if (!result.compareAndSet(-1,j)){
//如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
//-1当前值为-1就返回
return result.get();
}
return j;
}

}
return -1;
}

public static class SearchTask implements Callable<Integer>{
int begin,end,searchValue;
public SearchTask(int searchValue,int begin,int end){
this.begin=begin;
this.end=end;
this.searchValue=searchValue;
}

public Integer call() throws Exception {
int re=search(searchValue,begin,end);
return re;
}
}

public static int psearch(int searchValue) throws InterruptedException,ExecutionException{
int subArrSize=arr.length/thread_num+1;
List<Future<Integer>> re=new ArrayList<Future<Integer>>();

for (int i = 0; i <arr.length ; i+=subArrSize) {
int end=i+subArrSize;
if (end>=arr.length) end=arr.length;
re.add(pool.submit(new SearchTask(searchValue,i,end)));
}
for (Future<Integer> fu:
re) {
if (fu.get()>=0) return (fu.get());

}
return -1;
}

public static void main(String[] args) {
try {
System.out.println(psearch(9));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

并行排序

奇偶交换排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 奇偶交换排序
* 奇交换:比较奇数索引以及其相邻的后续元素。
* 偶交换:比较偶数索引和其相邻的后续元素。
* 两种交换成对出现,保证比较和交换涉及到数组中的每一个元素。
* @author wsz
* @date 2018年1月4日
*/
public class OddEvenSort {

static int[] arr= {25,48,65,87,123,233,456,666,777,8999,55555};

static ExecutorService pool = Executors.newCachedThreadPool();

static int exchFlag = 1; //记录当前迭代是否发生了数据交换

public static synchronized int getExchFlag() {
return exchFlag;
}

public static synchronized void setExchFlag(int exchFlag) {
OddEvenSort.exchFlag = exchFlag;
}

public static void pOddEvenSort(int[] arr) throws InterruptedException {
int start = 0; //记录交换类型。0为偶交换,1为奇交换

//如果上次比较发生了数据交换,或者当前正在进行奇交换,循环不会停止;
//直到程序不再发生交换,或者当前进行的是偶交换,即奇偶交换已经成对出现。
while(getExchFlag() == 1 || start ==1) {
setExchFlag(0);
//偶数的数值长度,当start=1时,只有len/2-1个线程。倒计时线程数。
CountDownLatch latch = new CountDownLatch(arr.length/2 -(arr.length%2 == 0 ? start :0));
for(int i =start; i < arr.length-1; i+=2) {
pool.submit(new OddEvenSortTask(i,latch));
}
latch.await();//等待所有线程结束
start = start ==0 ? 1 : 0;
}
}

static class OddEvenSortTask implements Runnable{
int i;
CountDownLatch latch;

public OddEvenSortTask(int i, CountDownLatch latch) {
super();
this.i = i;
this.latch = latch;
}

@Override
public void run() {
if(arr[i] > arr[i+1]) {
int temp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = temp;
setExchFlag(1); //数据进行了交换
}
latch.countDown();//结束当前线程的任务,倒计时器-1
}
}

public static void main(String[] args) throws InterruptedException {
pOddEvenSort(arr);
for (int i : arr) {
System.out.print(i+" ");
}
}
}

希尔排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 并行情况下的希尔排序
*/
public class ShellSortS {

static int[] arr= {25,48,65,87,123,233,456,666,777,8999,55555};

static ExecutorService pool = Executors.newCachedThreadPool();

static class ShellSortTask implements Runnable{

int i = 0;
int h = 0;
CountDownLatch latch;

public ShellSortTask(int i, int h, CountDownLatch latch) {
super();
this.i = i;
this.h = h;
this.latch = latch;
}

@Override
public void run() {
if(arr[i] < arr[i-h]) {
int temp = arr[i];
int j = i - h;
while(j >= 0 && arr[j] > temp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = temp;
}
latch.countDown();
}

}

public static void pShellSort(int[] arr) throws InterruptedException {
int h = 1;
CountDownLatch latch = null;
while( h <= arr.length/3) {
h = h*3+1;
}
while(h > 0) {
if(h >= 4)
latch = new CountDownLatch(arr.length - h);
for(int i = h; i < arr.length; i++) {
if(h >= 4) { // h大于4时使用并行线程
pool.execute(new ShellSortTask(i, h, latch));
}else { // h小于4时退化为传统的插入排序
if(arr[i] < arr[i-h]) {
int temp = arr[i];
int j = i -h;
while(j >= 0 && arr[j] > temp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = temp;
}
}
}
latch.await();
h = (h-1) / 3;
}
}

public static void main(String[] args) throws InterruptedException {
pShellSort(arr);
for (int i : arr) {
System.out.print(i+" ");
}
}

}