并行流水线
现在要生产一批小玩偶.小玩偶的制作分为四个步骤:
1.组装身体;2.在身体上安装四肢和头部;3.给组装完成的玩偶穿上一件漂亮的衣服;4.包装出 货。
为了加快制作进度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严 重的依赖关系。如果没有身体,就没有地方安装四肢;如果没有组装完成,就不能穿衣服; 如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是毫无意义的。
但是,如果你现在要制作的不是1个玩偶,而是1万个玩偶,那情况就不同了。你可以找四个人,第一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装头部 和四肢,完成后交付第三人;第三人只负责穿衣服,完成后交付第四人:第四人只负责包 装发货。
这样所有人都可以一起工作,共同完成任务,而整个时间周期也能缩短到原来的 14左右,这就是流水线的思想。一旦流水线满载,每次只需要一步(假设一个玩偶需要四 步)就可以产生一个玩偶
在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势.
示例:
P1:A=B+C P2:D=A×B P3:D=D/2 。上述步骤中的P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| public class Msg {
public double i; public double j; public String orgStr = null;
}
public class Plus implements Runnable {
public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();
@Override public void run() { while (true) { Msg msg = null; try { msg = blockingDeque.take(); msg.i = msg.j + msg.i; Multiply.blockingDeque.add(msg); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Multiply implements Runnable { public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();
@Override public void run() { while (true) { Msg msg = null; try { msg = blockingDeque.take(); msg.i = msg.j * msg.i; Div.blockingDeque.add(msg); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Div implements Runnable { public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();
@Override public void run() { while (true) { Msg msg = null; try { msg = blockingDeque.take(); msg.i = msg.i / 2; System.out.println(msg.orgStr + "=" + msg.i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class PStreamMain { public static void main(String args[]) {
new Thread(new Plus()).start(); new Thread(new Multiply()).start(); new Thread(new Div()).start(); for (int i = 0; i <= 1000; i++) { for (int j = 0; j <= 1000; j++) { Msg msg = new Msg(); msg.i = i; msg.j = j; msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2"; Plus.blockingDeque.add(msg); } } } }
|
并性搜索
就是将数据分段进行检索;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
public class SearchDemo { static int[] arr={1,22,2,3,4,5,344,6,7,8,10,9}; static ExecutorService pool= Executors.newCachedThreadPool(); static final int thread_num=2; static AtomicInteger result=new AtomicInteger(-1); public static int search(int searchValue ,int beginPos,int endPos){ int j=0; for (j = beginPos; j <endPos ; j++) { if (result.get()>=0){ return result.get(); } if (arr[j]==searchValue){ if (!result.compareAndSet(-1,j)){ return result.get(); } return j; } } return -1; } public static class SearchTask implements Callable<Integer>{ int begin,end,searchValue; public SearchTask(int searchValue,int begin,int end){ this.begin=begin; this.end=end; this.searchValue=searchValue; } public Integer call() throws Exception { int re=search(searchValue,begin,end); return re; } } public static int psearch(int searchValue) throws InterruptedException,ExecutionException{ int subArrSize=arr.length/thread_num+1; List<Future<Integer>> re=new ArrayList<Future<Integer>>(); for (int i = 0; i <arr.length ; i+=subArrSize) { int end=i+subArrSize; if (end>=arr.length) end=arr.length; re.add(pool.submit(new SearchTask(searchValue,i,end))); } for (Future<Integer> fu: re) { if (fu.get()>=0) return (fu.get()); } return -1; } public static void main(String[] args) { try { System.out.println(psearch(9)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
|
并行排序
奇偶交换排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
public class OddEvenSort {
static int[] arr= {25,48,65,87,123,233,456,666,777,8999,55555};
static ExecutorService pool = Executors.newCachedThreadPool();
static int exchFlag = 1;
public static synchronized int getExchFlag() { return exchFlag; }
public static synchronized void setExchFlag(int exchFlag) { OddEvenSort.exchFlag = exchFlag; }
public static void pOddEvenSort(int[] arr) throws InterruptedException { int start = 0;
while(getExchFlag() == 1 || start ==1) { setExchFlag(0); CountDownLatch latch = new CountDownLatch(arr.length/2 -(arr.length%2 == 0 ? start :0)); for(int i =start; i < arr.length-1; i+=2) { pool.submit(new OddEvenSortTask(i,latch)); } latch.await(); start = start ==0 ? 1 : 0; } }
static class OddEvenSortTask implements Runnable{ int i; CountDownLatch latch;
public OddEvenSortTask(int i, CountDownLatch latch) { super(); this.i = i; this.latch = latch; }
@Override public void run() { if(arr[i] > arr[i+1]) { int temp = arr[i]; arr[i] = arr[i+1]; arr[i+1] = temp; setExchFlag(1); } latch.countDown(); } }
public static void main(String[] args) throws InterruptedException { pOddEvenSort(arr); for (int i : arr) { System.out.print(i+" "); } } }
|
希尔排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
public class ShellSortS {
static int[] arr= {25,48,65,87,123,233,456,666,777,8999,55555};
static ExecutorService pool = Executors.newCachedThreadPool();
static class ShellSortTask implements Runnable{
int i = 0; int h = 0; CountDownLatch latch;
public ShellSortTask(int i, int h, CountDownLatch latch) { super(); this.i = i; this.h = h; this.latch = latch; }
@Override public void run() { if(arr[i] < arr[i-h]) { int temp = arr[i]; int j = i - h; while(j >= 0 && arr[j] > temp) { arr[j+h] = arr[j]; j -= h; } arr[j+h] = temp; } latch.countDown(); }
}
public static void pShellSort(int[] arr) throws InterruptedException { int h = 1; CountDownLatch latch = null; while( h <= arr.length/3) { h = h*3+1; } while(h > 0) { if(h >= 4) latch = new CountDownLatch(arr.length - h); for(int i = h; i < arr.length; i++) { if(h >= 4) { pool.execute(new ShellSortTask(i, h, latch)); }else { if(arr[i] < arr[i-h]) { int temp = arr[i]; int j = i -h; while(j >= 0 && arr[j] > temp) { arr[j+h] = arr[j]; j -= h; } arr[j+h] = temp; } } } latch.await(); h = (h-1) / 3; } }
public static void main(String[] args) throws InterruptedException { pShellSort(arr); for (int i : arr) { System.out.print(i+" "); } }
}
|