上节我讲了枚举排序的Erlang实现:Erlang实战:并行枚举排序,大家有兴趣可以看看!本节我将用Erlang多进程方式实现快速排序,快速排序采用的是分治的思想,即将原问题分解为若干个规模更小但结构与原问题相似的子问题。递归地解这些子问题,然后将这些子问题的解组合为原问题的解。通过对比快速排序的串行算法,我们将此串行算法改进为并行算法。
首先,来看看快速排序的串行算法:
- 从数列中挑出一个元素,称为 "基准"(pivot);
- 重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个分割结束之后,该基准就处于数列的中间位置。这个称为分割(partition)操作;
- 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序。
此思想网上到处都有,现在来看看快速排序的并行算法:
其中的一个简单思想是,对每次划分过后得到的两个序列分别使用两个处理器完成递归排序。例如对一个长为n的序列,首先划分得到两个长为n/2的序列,将其交给两个处理器分别处理;而后进一步划分得到四个长为n/4的序列,再分别交给四个处理器处理:如此递归下去最终得到排序好的序列。
当然,这里是理想的划分情况,如果划分步骤不能达到平均分配的目的,那么排序效率会相对较差。
跟上节讲得并行枚举排序进程调用的区别在于,枚举排序一次性将数据传给所有节点进程,而快速排序进程调用在分割数据的过程中呈现的是树状形式。
并行快排的伪代码如下:
1 //快速排序的并行算法
2
3 输入:无序数组data[1,n],使用的处理器个数为2^m
4 输出:有序数组data[1,n]
5
6 Begin
7 para_quicksort(data,1,n,m,0)
8 End
9
10 procedure para_quicksort(data,i,j,m,id)
11 Begin
12 if (j-i) <&#61;k or m&#61;0 then
13 Pid call quicksort(data,i,j)
14 else
15 Pid:r &#61; patition(data,i,j)
16 Pid send data[r&#43;1,m-1] to Pid&#43;2 ^(m-1)
17 para_quicksort(data,i,r-1,m-1,id)
18 par_quicksort(data,r&#43;1,j,m-1,id&#43;2^&#xff08;m-1) )
19 Pid&#43;2 ^ &#xff08;m-1) send data[r&#43;1,m-1] back to Pid
20 end if
21 End
现在&#xff0c;就来看看Erlang代码实现并行快排&#xff1a;
设置启动接口&#xff0c;启动多个进程之后&#xff0c;在总控端调用Dict &#61; store([&#39;node1&#64;pc1305&#39;,&#39;node2&#64;pc1305&#39;]...)存储各节点进程的字典表&#xff0c;然后调用start([3,4,512,1,2,5,……],2,Dict)进行快排&#xff0c;其中第二参数表示需要最多2^2&#61;4个进程来完成任务&#xff0c;总控端也作为一个排序的节点。
1 %% 启动排序时&#xff0c;需存储各节点信息&#xff1a;使用Dict &#61; store([&#39;node1&#64;pc1305&#39;,&#39;node2&#64;pc1305&#39;]...)方法
2 %%然后&#xff0c;便可启动start(Data,M,Dict) 进行排序,M为启动的进程个数指标&#xff0c;Dict为上述存储的节点
3
4 -module(para_qsort).
5 -export([para_qsort/3,start/3,store/1,lookup/2]).
6
7 store(L) -> store(L,[]).
8
9 store([],Ret) -> Ret;
10 store(L,Ret) ->
11 Value &#61; lists:nth(1,L),
12 Key &#61; length(Ret)&#43;1,
13 io:format("Key&#61;~p Value&#61;~p~n",[Key,Value]),
14 New &#61; [{Key,Value} | Ret],
15 store(lists:delete(Value,L) , New).
16
17 lookup(Key,Dict) ->
18 {K,V} &#61; lists:nth(1,Dict),
19 if
20 K &#61;:&#61; Key ->21 V;
22 K &#61;/&#61; Key ->
23 Filter &#61; lists:delete({K,V},Dict),
24 lookup(Key,Filter)
25 end.
26
27 start(Data,M,Dict) ->
28 register(monitor, spawn(fun() -> wait([]) end)),
29 put(monitor, node()),
30 NewDict &#61; [{monitor, get(monitor)} | Dict],
31 para_qsort(Data,M,NewDict).
wait/1函数主要对各节点排序好的结果进行归并整理&#xff0c;然后输出最终结果&#xff0c;代码如下&#xff1a;
1 %%服务器节点监听
2 wait(Ret) ->
3 Len1 &#61; parser(Ret)&#43;length(Ret)-1,
4 Len2 &#61; len(Ret),
5 if
6 (Len1 &#61;:&#61; Len2) and (Len2 &#61;/&#61;0) ->
7 SortRet&#61; merge(Ret,[],1),
8 io:format("Para qsort result:~n",[]),
9 io:format("~p~n",[SortRet]);
10 (Len1 &#61;/&#61; Len2) or (Len2 &#61;:&#61; 0) ->
11 receive
12 {Node,Pid,L} ->
13 {Data,I,J} &#61; L,
14 Temp &#61; [{Node,Data,I,J}|Ret],
15 wait(Temp)
16 end;
17
18 die ->
19 quit
20 end.
21
22 len([]) -> 0;
23 len([H|T])->
24 {Node,Data,I,J} &#61; H,
25 length(Data).
26
27 parser([]) -> 0;
28 parser([H|T]) ->
29 {Node,Data,I,J}&#61;H,
30 J-I&#43;1&#43;parser(T).
31
32 merge([],Ret,Len) -> Ret;
33 merge(T,Ret,Start) ->
34 H &#61; lists:nth(1,T),
35 {Node,Data,I,J} &#61; H,
36 if
37 I &#61;:&#61; Start ->
38 ListBetween &#61; lists:sublist(Data,I, J-I&#43;1),
39 case (I&#61;:&#61;1) of
40 true ->
41 Temp &#61; lists:append(Ret,ListBetween);
42 false ->
43 Pivo &#61; lists:append(Ret,[lists:nth(I-1,Data)]),
44 Temp &#61; lists:append(Pivo,ListBetween)
45 end,
46 io:format("~n-----<<~p >> processing data[~p,~p]~n-------------------------Result&#61;~p~n",[Node,I,J,Temp]),
47 Len &#61; Start&#43;J-I&#43;2,
48 NewData &#61; lists:delete(H,T),
49 merge(NewData,Temp,Len);
50 I &#61;/&#61; Start ->
51 NewData &#61; lists:delete(H,T),
52 Temp &#61; lists:append(NewData,[H]),
53 merge(Temp,Ret,Start)
54 end.
para_qsort完成对排序任务的分发&#xff0c;代码如下&#xff1a;
1 para_qsort(Data,M,Dict) ->
2 %%进程个数最多为2^M次方
3 I &#61; 1,
4 J &#61; length(Data),
5 ID &#61; 0,
6 para_qsort(Data,I,J,M,ID,Dict).
7
8 para_qsort(Data,I,J, M, ID,Dict) ->
9 %% 阀值&#xff0c;列表排序个数小于K时直接调用qsort直接快排
10 K &#61; 4,
11 Value1 &#61; (J-I) < K,
12 Value2 &#61; (M &#61;:&#61; 0),
13 Value &#61; Value1 or Value2,
14 if
15 Value &#61;:&#61; true ->
16 %io:format("~nCurrent node: (~p) ~nData&#61; ~p I&#61;~p J&#61;~p~n",[node(),Data,I,J]),
17 ListBefore &#61; lists:sublist(Data, I-1),
18 ListBetween &#61; lists:sublist(Data,I, J-I&#43;1),
19 ListAfter &#61; lists:sublist(Data,J&#43;1,length(Data)-J),
20 Ret &#61; lists:sort(ListBetween),
21 Temp &#61; lists:append(ListBefore,Ret),
22 NewData &#61; lists:append(Temp, ListAfter),
23 {monitor, lookup(monitor,Dict) } ! {node(),self(),{NewData,I,J} },
24 io:format("-------------------------------------------------------------");
25 Value &#61;:&#61; false ->
26 {NewData,R} &#61; partition(Data,I,J),
27 send(NewData, R&#43;1, J, M-1,ID &#43; math:pow(2 ,(M-1)), Dict ),
28 para_qsort(NewData,I,R-1,M-1,ID, Dict)
29 end.
其中patition用于将data按基准值划分为两部分&#xff0c;代码如下&#xff1a;
1 partition(Data, K, L) ->
2 Pivo&#61; lists:nth(L, Data),
3 ListBefore &#61; lists:sublist(Data, K-1),
4 ListBetween &#61; lists:sublist(Data,K, L-K),
5 ListAfter &#61; lists:sublist(Data,L&#43;1,length(Data)-L),
6 Left &#61; [X|| X <- ListBetween,X &#61;<Pivo],
7 Right &#61; [X || X <- ListBetween,X > Pivo],
8 Ret &#61; lists:append(Left,[Pivo|Right]),
9 Temp &#61; lists:append(ListBefore,Ret),
10 NewData &#61; lists:append(Temp, ListAfter),
11 Len &#61; length(ListBefore)&#43;length(Left)&#43;1,
12 {NewData, Len}.
任务分割完后&#xff0c;会将此任务发送给下个节点进程进行处理&#xff0c;send代码如下&#xff1a;
1 send( Data, I, J, M,No ,Dict) ->
2 ID &#61; trunc(No),
3 Node &#61; lookup(ID,Dict),
4 Pid &#61; spawn(Node, fun() -> loop() end),
5 Pid ! {node(),self(), {Data,I,J,M,ID,Dict}}.
6
7 %%客户节点N准备接受排序数据
8 loop() ->
9 receive
10 {Node,Pid,die} ->
11 disconnect_node(Node),
12 io:format("Node (~p) disconnected~n",[Node]),
13 quit;
14 {Node,Pid,L} ->
15 {Data,I,J,M,ID,Dict} &#61; L,
16 %io:format("4----:Current node:~p server node:~p~n",[node(),lookup(monitor,Dict)]),
17 para_qsort(Data,I,J,M,ID,Dict)
18 %loop()
19 end.
思想比较简单&#xff0c;我只是没对它进行进一步优化&#xff0c;最终运行结果如下&#xff1a;
大家可以看到&#xff0c;此data的划分就是不均匀的&#xff0c;node1、node3只处理了一个数据&#xff0c;node2处理了3个数据&#xff0c;其余的都是server处理的&#xff0c;因此排序的效率问题很差。此程序只是展示如何使用Erlang实现并行快排&#xff0c;并不是学习如何优化问题。
大致就说这么多了&#xff0c;以后我将实现PSRS并行算法的Erlang实现&#xff0c;这个算法Erlang实现起来比较麻烦&#xff0c;大家可以先去看看。
注&#xff1a;如无特殊说明&#xff0c;本博客内容纯属原创&#xff0c;转载请注明&#xff1a;http://www.cnblogs.com/itfreer/ | IT闲人&#xff0c;谢谢&#xff01;