Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Распределенное программирование в Erlang

В Эрланге:
создание и уничтожение процессов очень быстрое;
посылка сообщений между процессами очень быстрая;
процессы ведут себя одинаково во всех операционных системах;
может быть очень большое количество процессов;
процессы не разделяют память и являются полностью независимыми;
единственный способ для взаимодействия процессов — это через передачу сообщений.

По этим причинам Эрланг иногда называют языком с чистой передачей сообщений.

В Эрланге программировать процессы легко. Для этого нужно только три примитива: spawn , send, receive .

 Pid = spawn(Fun)
Создаёт новый параллельный процесс, который вычисляет (evaluates) Fun . Новый процесс работает параллельно с вызвавшим его. Spawn возвращает Pid (сокращение для идентификатор процесса). Вы можете использовать Pid для посылки сообщений процессу.
 Pid ! Message
Посылает сообщение Message процессу с идентификатором Pid . Посылка сообщения асинхронна. Отправитель не ждёт, а продолжает делать то, чем занимался. Восклицательный знак ! еще называется оператором send . Pid ! M определяется как M — примитив отправки сообщения ! возвращает само сообщение. Поэтому Pid1 ! Pid2 ! ... ! M означает отправку сообщения M всем процессам — Pid1 , Pid2 и т. д.
 receive ... end
Принимает сообщение, которое было послано процессу. У него следующий синтаксис:
 receive
   Pattern1 [when Guard1] ->
     Expressions1;
   Pattern2 [when Guard2] ->
     Expressions2;
   ...
 end
Когда сообщение прибывает к процессу система пытается сопоставить его с образцом Pattern1 (возможно с учётом условия Guard1 ). Если это выполнилось успешно, то она вычисляет выражение Expression1 . Если первый образец не совпадает, то она использует Pattern2 и т.д. Если ни один из образцов не соответствует, сообщение сохраняется для последующей обработки, а процесс ожидает следующего сообщения.

Напишем небольшую программу, которая создает процесс, в котором вычисляется площадь прямоугольника и круга, и пошлем этому процессу три сообщения:

 -module(area_server0).  
 -compile([export_all]). 
 
 loop() ->
     receive
 	{rectangle, Width, Ht} -> 
 	    io:format("Area of rectangle is ~p~n",[Width * Ht]),
 	    loop();
 	{circle, R} -> 
 	    io:format("Area of circle is ~p~n", [3.14159 * R * R]),
 	    loop();
 	Other ->
 	    io:format("I don't know what the area of a ~p is ~n",[Other]),
 	    loop()
     end.
 
 main() ->
   Pid = spawn(fun area_server0:loop/0),
   Pid ! {rectangle, 6, 10},
   Pid ! {circle, 23},
   Pid ! {triangle,2,4,5},
   io:format("start ~n",[]).
Собрать и запустить такую программу можно двумя командами:
 erlc  area_server0.erl
 erl -noshell -s area_server0 main -s init stop
Архитектуры клиент-сервер центральные в Эрланге. Клиент и сервер в клиент-серверной архитектуре — это раздельные процессы, и для связи между клиентом и сервером используется обычная передача сообщений Эрланга. Как клиент, так и сервер могут работать на одной и той же машине или на двух разных машинах. Слова клиент и сервер ссылаются на роли, которые выполняют эти два процесса. Клиент всегда начинает вычисление отправляя запрос к серверу. Сервер вычисляет ответ и отправляет отзыв клиенту.

Теперь мы слегка переделаем предыдущую программу. Нам нужно послать ответ процессу, который послал первоначальный запрос. Отправитель должен включить обратный адрес в сообщение. Этого можно достичь так:

 Pid ! {self(),{rectangle, 6, 10}}
self() - это PID клиентского процесса. Процесс, который посылает начальный запрос называется клиентом. Процесс, который принимает запрос и отправляет ответ называется сервером. Мы добавили маленькую полезную функцию, названную rpc (сокращение для remote procedure call — удалённый вызов процедуры), которая включает в себя посылку запроса на сервер и ожидание ответа: Между клиентом и сервером устанавливается диалог, в который не смогут вмешаться другие процессы:
 -module(area_server1).  
 -compile([export_all]). 
 
 
 start() -> spawn(fun loop/0).
 
 area(Pid, What) ->
     rpc(Pid, What).
 
 rpc(Pid, Request) ->
     Pid ! {self(), Request},
     receive
 	{Pid, Response} ->
 	    Response
     end.
 
 
 loop() ->
     receive
 	{From, {rectangle, Width, Ht}} -> 
 	    From ! {self(), Width * Ht},
 	    io:format("Area of rectangle is ~p~n",[Width * Ht]),
 	    loop();
 	{From, {circle, R}} -> 
 	    From !  {self(), 3.14159 * R * R},
 	    io:format("Area of circle is ~p~n", [3.14159 * R * R]),
 	    loop();
 	{From, Other} ->
 	    From ! {self(), {error,Other}},
 	    io:format("ups ...\n", []),
 	    loop()
     end.
 
     
 main() ->
   io:format("start area_server1\n",[]),
   Pid = area_server1:start(),
   area_server1:area(Pid, {rectangle, 10, 8}),
   area_server1:area(Pid, {blablabla, 10, 8}),
   area_server1:area(Pid, {circle, 4}).
Теперь напишем тестовую программу, которая определит, сколько вообще на вашей машине можно создавать эрланговских процессов:
 -module(processes).
 -compile([export_all]). 
 
 max(N) ->
     Max = erlang:system_info(process_limit),
     io:format("Maximum allowed processes:~p~n",[Max]),
     statistics(runtime),
     statistics(wall_clock),
     L = for(1, N, fun() -> spawn(fun() -> wait() end) end),
     {_, Time1} = statistics(runtime),
     {_, Time2} = statistics(wall_clock),
     lists:foreach(fun(Pid) -> Pid ! die end, L),
     U1 = Time1 * 1000 / N,
     U2 = Time2 * 1000 / N,
     io:format("Process spawn time=~p (~p) microseconds~n",
 	      [U1, U2]).
 
 wait() ->
     receive
 	die -> void
     end.
 
 for(N, N, F) -> [F()];
 for(I, N, F) -> [F()|for(I+1, N, F)].
 
 
 main() ->
   io:format("start ~n",[]),
   max(20000),
   max(10000),
   io:format("finish ~n",[]).
Когда я запустил у себя эту программу, то получил следующее:
 start 
 Maximum allowed processes:32768
 Process spawn time=6.0 (7.5) microseconds
 Maximum allowed processes:32768
 Process spawn time=5.0 (6.0) microseconds
 finish 
На моей машинке создание 20000 процессов требует 6 микросекунд. По умолчанию в эрланге установлен минимум в 32768 процессов. Эрлангу можно сказать при запуске увеличить этот минимум, например:
  erl +P 500000
В эрланге на стороне приема можно организовать таймер.
 -module(stimer).
 -compile([export_all]). 
 
 start(Time, Fun) -> spawn(fun() -> timer(Time, Fun) end).
 
 cancel(Pid) -> Pid ! cancel.
 
 timer(Time, Fun) ->
     receive
 	cancel ->
 	    void
     after Time ->
 	    Fun()
     end.
 
     
 main() ->
   Pid = stimer:start(5000, fun() -> io:format("timer event~n") end),
   io:format("Pid = ~w~n",[Pid]),
   Pid1 = stimer:start(25000, fun() -> io:format("timer event~n") end),
   io:format("Pid1 = ~w~n",[Pid1]),
   stimer:cancel(Pid1).
Первый раз мы ждем пять секунд, чтобы сработал таймер. Потом запускаем второй таймер и тут же отменяем его.

Каждый процесс в Эрланге имеет свой собственный почтовый ящик. Когда вы посылаете сообщение процессу, это сообщение помещается в почтовый ящик. Почтовый ящик проверяется только тогда, когда программа вычисляет оператор receive:

 receive
   Pattern1 [when Guard1] ->
     Expressions1;
   Pattern2 [when Guard1] ->
     Expressions1;
   ...
   after Time ->
     ExpressionTimeout
 end.
receive работает следующим образом:
1. Когда мы входим в оператор receive , мы запускаем таймер (но только, если в выражении присутствует секция after ).
2. Взять первое сообщение из почтового ящика и попытаться соотнести его с образцами Pattern1 , Pattern2 и т.д. Если соответствие успешно, то сообщение удаляется из почтового ящика и вычисляется выражение, следующее за образцом.
3. Если ни один из образцов в операторе receive не соответствует первому сообщению из почтового ящика, то первое сообщение удаляется из ящика и помещается в «отложенную очередь» (save queue). Затем так же проверяется второе сообщение. Эта процедура повторяется до тех пор, пока не будет найдено совпадающее сообщение, либо не будут проверены все сообщения из почтового ящика.
4. Если ни одно сообщение из почтового ящика не соответствует, процесс приостанавливается и ждёт до тех пор, пока новое сообщение не будет помещено в почтовый ящик. Заметьте, что когда новое сообщение прибывает, сообщения из отложенной очереди не проверяются заново на соответствие образцам. Проверяется только новое сообщение.
5. Как только сообщение совпало с образцом, сразу после этого все сообщения из отложенной очереди помещаются обратно в почтовый ящик в том же порядке, в каком они прибыли к процессу. Если был установлен таймер, то он очищается.
6. Если таймер истёк, пока мы ждали сообщение, то выполнится выражение ExpressionTimeout, после чего все отложенные сообщения поместятся обратно в почтовый ящик в том же порядке, в каком они прибыли к процессу.

У Эрланга есть метод публикации идентификатора процесса, так что любой процесс в системе может общаться с этим процессом. Такой процесс называется зарегистрированным процессом. Есть четыре встроенные функции (BIF) для управления зарегистрированными процессами:

 register(AnAtom, Pid)
 
- зарегистрировать процесс Pid с именем AnAtom .
 unregister(AnAtom)
- удалить любые регистрации, связанные с AnAtom .

Следующий шаблон полезен для написания параллельных программ:

 -module(ctemplate).
 -compile(export_all).
 
 start() ->
     spawn(fun() -> loop([]) end).
 
 rpc(Pid, Request) ->
     Pid ! {self(), Request},
     receive
 	{Pid, Response} ->
 	    Response
     end.
 	    
 loop(X) ->
     receive
 	Any ->
 	    io:format("Received:~p~n",[Any]),
 	    loop(X)
     end.
Здесь цикл приёма — это просто пустой цикл, который принимает и печатает все сообщения,

В эрланге один процесс может проверить статус другого процесса - жив тот или нет. Для этого есть встроенная функция Эрланга link, в которой параметром является идентификатор процесса Pid. После установления связи оба процесса неявно следят друг за другом. Если умрёт процесс А , то процесс B получит сигнал выхода (exit signal) . И наоборот — если умрёт B , то такой сигнал получит A. Это работает как для одной локальной машины, так и для сетевых машин. Можно выполнить некое действие, когда процесс завершается. Можно написать функцию on_exit(Pid,Fun) , которая устанавливает связь с процессом Pid . Если Pid умирает с причиной Why , то вычисляется функция Fun(Why):

 on_exit(Pid, Fun) ->
   spawn(fun() ->
     process_flag(trap_exit,true),
     link(Pid),
     receive
       {'EXIT', Pid, Why} ->
 	Fun(Why)
     end
   end).
Следующий пример создает процесс, устанавливаем обработчик on_exit для мониторинга этого процесса, Сначала определим функцию F , которая ждёт единственное сообщение X и затем вычисляет list_to_atom(X), затем создаем процесс, устанавливаем обработчик on_exit, затем посылаем этому процессу не список, который он ожидает, а просто атом, в результате процесс умирает по ошибке, и тут же автоматически вызывается on_exit.
 -module(on_exit).
 -compile([export_all]). 
 
 on_exit(Pid, Fun) ->
   spawn(fun() ->
     process_flag(trap_exit,true),
     io:format(" I died with:~p~n ...",[Pid]),
     link(Pid),
     receive
       {'EXIT', Pid, Why} ->
 	Fun(Why)
     end
   end).
 
 
 main() ->
    F = fun() ->  receive    X -> list_to_atom(X)  end end,
    Pid = spawn(F),
    on_exit(Pid, fun(Why) -> io:format(" ~p died with:~p~n",[Pid, Why])end),
    Pid ! hello.
Когда родительский процесс создает дочерний процесс, и дочерний процесс по какой-то причине падает, для дальнейшего поведения родителя есть следующий 3 варианта:
1. Если родитель никак не хочет реагировать на падение дочернего процесса, то дочерний процес создается с помощью команды
 Pid = spawn(fun() -> ... end)
2. Если нужно, чтобы родитель умер сразу после того, как умрет дочерний процесс, то последний должен быть создан следующей командой:
 Pid = spawn_link(fun() -> ... end)
3. Если родитель должен обработать ошибки в случае падения дочернего процесса, нужно использовать следующую схему:
 process_flag(trap_exit, true),
 Pid = spawn_link(fun() -> ... end),
 ...
 loop(...).
 
 loop(State) ->
   receive 
     {'EXIT', SomePid, Reason} ->
 	%% do something with the error
 	loop(State1);
     ...
 end.
Теперь процесс, вычисляющий loop , перехватывает выход и не умрёт, если упадёт связанный с ним другой процесс. Он увидит все сигналы выхода (преобразованные в сообщения) от умирающего процесса и сможет предпринять все необходимые действия, когда обнаружит сбой.

Монитор — это однонаправленная связь. Если процесс A мониторит процесс B , и процесс B умирает, то к А будет послан сигнал выхода. Однако, если А умирает, то к B не будет послано никакого сигнала выхода. Полное описание возможностей монитора можно найти в руководстве по Эрлангу.

Пример

Теперь пришло время написать простейшую систему клиент-сервер. Это будет сервер имен - программа, которая, получив имя, возвращает значение, связанное с этим именем. Мы также можем менять значение, связанное с определённым именем:
 -module(kvs).
 -compile([export_all]). 
 
 start() -> register(kvs, spawn(fun() -> loop() end)).
 
 store(Key, Value) -> rpc({store, Key, Value}).
 
 lookup(Key) -> rpc({lookup, Key}).
 
 rpc(Q) ->
     kvs ! {self(), Q},
     receive
 	{kvs, Reply} ->
 	    Reply
     end.
 
 loop() ->
     receive
 	{From, {store, Key, Value}} ->
 	    put(Key, {ok, Value}),
 	    From ! {kvs, true},
 	    loop();
 	{From, {lookup, Key}} ->
 	    From ! {kvs, get(Key)},
 	    loop()
     end.
 
 
 main() ->
    io:format(" node:~p~n",[node()]).
    start(),
    store({location, joe}, "Stockholm"),
    store(weather, raining),
    io:format(" lookup:~p~n",[lookup(weather)]),
    io:format(" lookup:~p~n",[lookup({location, joe})]),
    io:format(" lookup:~p~n",[lookup({location, jane})]).
Его нужно запустить с помощью следующего батника - вместо serg вы можете поставить любое имя: Имя узла имеет вид Name@Host. Name и Host — это атомы и если они содержат какие-либо не атомные символы, то такие атомы должны быть в одинарных кавычках.
 erlc  kvs.erl
 erl -sname serg -noshell -s kvs main 
После запуска сервера терминал выведет что-то типа:
  node:serg@blablabla
Вам вот это вот имя ноды serg@blablabla понадобится для того, чтобы вставить его в клиентский код.

Теперь напишем простого клиента, который будет делать запросы к этому серверу. Не забудьте вставить имя ноды. Выглядит он так-

 -module(kvc).
 -compile([export_all]). 
 
 main() ->
       io:format(" node:~p~n",[node()]),
       io:format(" lookup:~p~n",[rpc:call(serg@blablabla, kvs, store, [weather, fine])]),
       io:format(" lookup:~p~n",[rpc:call(serg@blablabla, kvs, lookup, [weather])]).
Для запуска клиента вам понадобится - внимание - второй терминал на этой же машине, чтобы полностью смоделировать распределенную среду. Теперь осталось запустить батник из командной строки, который соберет клиента и запустит его:
 erlc  kvc.erl
 erl -sname serg2 -noshell -s kvc main -s init stop
Вывод клиента должен быть типа:
 node:serg2@blablabla
 lookup:true
 lookup:{ok,fine}
Так на одной локальной машине можно тестировать распределенное эрланговское приложение.

Если клиент и сервер расположены на разных машинах в пределах одной локальной сети, интерпретатор нужно запускать на них с одинаковыми куками:

 erl -name serg -noshell -setcookie blablabla -s kvs main 
 erl -name serg2 -noshell -setcookie blablabla -s kvc main -s init stop
Параметр sname можно использовать, когда машины в одной подсети, параметр name - в разных подсетях. У каждого узла куки должны быть одинаковы в целях безопасности. Набор соединённых узлов, имеющих одинаковые куки, образует эрланговый кластер.

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье