%% ファイル:amida.erl -module(amida). % -compile(export_all). -export([solve/1, solve_main/1, reduce_main/3]). -export([list_all/1, print_data/1, print_hist/1]). %%%% トレース用の定数/関数 -define(DETAILED, 3). -define(ORDINARY, 2). -define(ROUGH, 1). -define(TRACE_LEVEL, 0). trace(Level, Format, Args) -> if (Level =< ?TRACE_LEVEL) -> io:fwrite(Format, Args); true -> ok end. %%%% マネージャ・プロセス % % 1. 最初のワーカー・プロセスを生成。 % 2. ワーカー・プロセスからのデータ登録要求を受け付ける。 % 3. ワーカー・プロセスの完了通知を受け付ける。 % 4. 任意のクライアントからのデータ閲覧要求を受け付ける。 % solve(AmidaData) -> trace(?DETAILED, "~w: entered into solve, invoking manager.\n", [self()]), spawn(?MODULE, solve_main, [AmidaData]). % 戻りはマネージャ・プロセスPID solve_main(AmidaData) -> timer:sleep(200), % 表示が混じらないように少し休ませる trace(?DETAILED, "~w:(manager) invoking workers.\n", [self()]), % 最初のワーカー・プロセス spawn(?MODULE, reduce_main, [AmidaData, [AmidaData, start], self()]), receiving_loop(). % 待ち受けに入る receiving_loop() -> receive stop -> % プロセス終了コマンド io:fwrite("~w: bye bye.\n", [self()]), exit(ok); {register, WkrPid, Data} -> % ワーカー・プロセスからのデータ登録要求 WkrPid! {return, self(), do_register(Data)}; {finished, WkrPid, Hist} -> % ワーカー・プロセスの完了通知 put(WkrPid, Hist), io:fwrite("===>~w: ~w ~w\n", [WkrPid, lists:reverse(Hist), lists:sum(lists:nth(1, Hist))]); % lists:sum/1 が重さ {list, ClientPid} -> % 任意のクライアントからのデータ閲覧要求 ClientPid! {return, self(), get()}; Other -> % それ以外 io:fwrite("Oops! ~w is ignored.\n", [Other]) end, receiving_loop(). % 無限ループ %% データ登録のマネージャ側処理 do_register(Data) -> case get(Data) of undefined -> put(Data, 1), registered; N -> put(Data, N + 1), % 要求回数を記録 exists end. %%%% ワーカー・プロセス % % 1. 受け取ったデータをマネージャに登録。 % 2. 登録済みならただちに終了。 % 3. 登録成功なら処理を開始。 % 4. アルチン変形を実行して子ワーカーを生成、処理を任す。 % 5. 再帰的(リストを左から右)に処理を続ける。 % reduce_main(Data, Hist, MgrPid) -> trace(?ROUGH, "~w: Try to reduce; Data:~w.\n", [self(), Data]), case request_register(MgrPid, Data) of {ok, exists} -> % 既に登録済み trace(?ROUGH, "~w: Exists ~w, exit.\n", [self(), Data]), exit(ok); % 正常終了 {error, Msg} -> % エラー io:fwrite("~w: FATAL ERROR! ~s\n", [self(), Msg]), exit(ng); % 異常終了 {ok, registered} -> % 登録成功 reduce([], Data, Hist, MgrPid) % 処理開始 end. %% マネージャにデータ登録を依頼 request_register(MgrPid, Data) -> trace(?DETAILED, "~w: entered into request_register To:~w Data:~w.\n", [self(), MgrPid, Data]), MgrPid! {register, self(), Data}, receive {return, MgrPid, Value} -> trace(?DETAILED, "~w: request_register succeeded:~w.\n", [self(), Value]), {ok, Value}; _Other -> {error, "invalid message."} after 2*1000 -> {error, "timeout."} end. %%%% アルチン変形を行う関数 % % 第1引数が処理済み部分、第2引数が未処理部分、 % 第3引数は処理の履歴、第4引数はマネージャ・プロセス。 % % アルチン変形: % 1. 縮約(contraction) n, n → (ナシ) % 2. 交換(interchange) n, m → m, n ただし、nとmの差は2以上 % 3. フリップ(flip) n, m, n → m, n, m ただし、m = n±1 % %% 終了(再帰の基底部分) reduce(_ScanedData, [], Hist, MgrPid) -> trace(?ORDINARY, "~w: FINISHED.\n", [self()]), MgrPid!{finished, self(), Hist}, exit(ok); %% 縮約(contraction)による変形(プロセス生成+再帰ステップ) reduce(ScanedData, [N, M|RestData], Hist, MgrPid) when (N == M) % 適用条件 -> trace(?ORDINARY, "~w: contraction ~w, ~w => (none); Rest:~w.\n", [self(), N, M, RestData]), trace(?DETAILED, "~w: invoking new worker.\n", [self()]), spawn(?MODULE, reduce_main, [ScanedData ++ RestData, [ScanedData ++ RestData, cntr|Hist], MgrPid]), trace(?ORDINARY, "~w: after contraction; skip ~w, go next ~w.\n", [self(), N, [M|RestData]]), trace(?DETAILED, "~w: invoking new worker.\n", [self()]), reduce(ScanedData ++ [N], [M|RestData], Hist, MgrPid); %% 交換(interchange)による変形(プロセス生成+再帰ステップ) reduce(ScanedData, [N, M|RestData], Hist, MgrPid) when (N > M + 1 orelse N < M - 1) % 適用条件 -> trace(?ORDINARY, "~w: interchange ~w, ~w => ~w, ~w; Rest:~w.\n", [self(), N, M, M, N, RestData]), trace(?DETAILED, "~w: invoking new worker.\n", [self()]), spawn(?MODULE, reduce_main, [ScanedData ++ [M, N|RestData], [ScanedData ++ [M, N|RestData], ichg| Hist], MgrPid]), trace(?DETAILED, "~w: after interchange; skip ~w, go next ~w.\n", [self(), N, [M|RestData]]), reduce(ScanedData ++ [N], [M|RestData], Hist, MgrPid); %% フリップ(flip)による変形(プロセス生成+再帰ステップ) reduce(ScanedData, [N, M, L|RestData], Hist, MgrPid) when (N == L andalso (M == N + 1 orelse M == N - 1)) % 適用条件 -> trace(?ORDINARY, "~w: flip ~w, ~w, ~w => ~w, ~w, ~w; Rest:~w.\n", [self(), N, M, L, M, N, M, RestData]), trace(?DETAILED, "~w: invoking new worker.\n", [self()]), spawn(?MODULE, reduce_main, [ScanedData ++ [M, N, M|RestData], [ScanedData ++ [M, N, M|RestData], flip| Hist], MgrPid]), trace(?DETAILED, "~w: after flip; skip ~w, go next ~w.\n", [self(), N, [M, L|RestData]]), reduce(ScanedData ++ [N], [M, L|RestData], Hist, MgrPid); %% その他(再帰ステップのみ) reduce(ScanedData, [N|RestData], Hist, MgrPid) -> trace(?ORDINARY, "~w: else skip ~w, go next ~w.\n", [self(), N, RestData]), reduce(ScanedData ++ [N], RestData, Hist, MgrPid). %%%% マネージャとの通信とプリント %% 全データを取得 list_all(MgrPid) -> MgrPid! {list, self()}, receive {return, MgrPid, List} -> List; _Other -> {error, "invalid message."} after 2*1000 -> {error, "timeout."} end. %% 登録されたデータを表示 print_data(MgrPid) -> case list_all(MgrPid) of {error, Msg} -> io:fwrite("error: ~s\n", [Msg]); List -> % フィルタリング、第2要素は要求回数 List2 = [{Data, M} || {Data, M}<-List, is_number(M)], % 整列、重さが大きい順 List3 = lists:sort( fun({A, _}, {B, _})-> lists:sum(A) > lists:sum(B) end, % fun({_, M}, {_, L})-> M > L end, List2), % 表示 lists:foldl( fun({Data, M}, N)-> io:fwrite("~w ~w ~w ~w\n", [N, Data, lists:sum(Data), M]), N + 1 end, 1, List3), ok end. %% 変形履歴データを表示 print_hist(MgrPid) -> case list_all(MgrPid) of {error, Msg} -> io:fwrite("error: ~s\n", [Msg]); List -> % フィルタリング、第1要素がPID List2 = [H || {P, H}<-List, is_pid(P)], % 整列、重さが大きい順 List3 = lists:sort( fun([A|_], [B|_])-> lists:sum(A) > lists:sum(B) end, List2), % 表示 lists:foldl( fun(Hist, N)-> io:fwrite("~w ~w ~w\n", [N, lists:reverse(Hist), lists:sum(lists:nth(1, Hist))]), N + 1 end, 1, List3), ok end.