MapReduce in Elixir Cluster.

2015-09-10, Thursday
elixir

今天將介紹如何在Elixir Cluster跑Mapping reduce 的Job.
本文純屬概念性的Demo,實際上線有許多額外需要Take care的東西.But!
Alt text

首先來介紹兩台電腦怎樣連在一起.如果你沒有兩台電腦.可以使用以下的範例..
如果有可以將下列的範例中的FQND (localhost) 換成實際的機器位置.

# shell
iex --sname m1@localhost --cookie masato
iex --sname m2@localhost --cookie masato

#將機器conntect 在一起

iex(m1@localhost)1> Node.connect( :m2@localhost )
true
iex(m1@localhost)2> Node.list
[:m2@localhost]

叫對方回覆它的pid.

#拿到自己的pid
iex(m1@localhost)3> self
#PID<0.64.0>
iex(m1@localhost)9> Node.spawn(:m2@localhost, fn -> self end)
#PID<8051.71.0>

Great! 可以開始叫它做事了.

iex(m1@localhost)11> 1 .. 10 |> Enum.filter(&(rem(&1,2) == 0 ))
[2, 4, 6, 8, 10]
#remote machine執行!
iex(m1@localhost)19> Node.spawn(:m2@localhost, fn -> 1 .. 10 |> Enum.filter(&(rem(&1,2) == 0 ) ) |> Enum.each(&IO.puts(&1)) end)
#PID<8051.74.0>
2
4
6
8
10

來個word count小程式吧.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
defmodule MapReduce do
  def map() do
    # Wait for a map message and then count the occurence of words
    receive do
      {:map, client, filename} ->
        file = File.read!(filename)
        word_count = file |> String.split |> create_map
        send client, {:result, word_count}
        map()
    end
  end

  def receiveme() do
    word_count1 = nil
    receive do
      {:result, d1} -> word_count1 = d1
    end
    word_count1
  end

  def reduce(dict1, dict2) do
      Dict.merge(dict1, dict2, fn(_k, v1, v2) -> v1+v2 end)
  end

  def create_map(words), do: create_map(words, %{})
  def create_map([], dict), do: dict
  def create_map([word | rest], dict) do
    create_map(rest, Dict.update(dict, word, 1, &(&1+1)) )
  end

end

以下的link可以get 測試data & 程式.

curl http://pastebin.com/raw.php?i=c4Dp7qLT > data.txt  
curl http://pastebin.com/raw.php?i=i3XuPMb0 > mapreduce.ex  

接下來在一台機器執行以下command

iex(m1@localhost)20> c("map_reduce.exs")
iex(m2@localhost)1> c("map_reduce.exs")

# Start the map on both nodes
n1 = Node.spawn(:'m1@localhost', &MapReduce.map/0)  
n2 = Node.spawn(:'m1@localhost', &MapReduce.map/0)

# Send the map message to the started nodes
send n1, {:map, self, "data.txt"}  
send n2, {:map, self, "data.txt"}

# recive data from each node.
d1 = MapReduce.receiveme
d2 = MapReduce.receiveme

# reduce !!!
result = MapReduce.reduce(d1, d2)

恭喜你!你已經完成了elixir cluster map reduce 之 Hello Words count!

本文參考出處-Connecting Elixir nodes in Azure