-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathriak_core_multinode.console
105 lines (93 loc) · 3.17 KB
/
riak_core_multinode.console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
%% @doc Interface for {{nodeid}}-admin commands.
-module({{appid}}_console).
-export([join/1,
leave/1,
remove/1,
ringready/1]).
join([NodeStr]) ->
case do_join(NodeStr) of
ok ->
io:format("Sent join request to ~s\n", [NodeStr]),
ok;
{error, not_reachable} ->
io:format("Node ~s is not reachable!\n", [NodeStr]),
error;
{error, different_ring_sizes} ->
io:format("Failed: ~s has a different ring_creation_size~n",
[NodeStr]),
error
end;
join(_) ->
io:format("Join requires a node to join with.\n"),
error.
do_join(NodeStr) when is_list(NodeStr) ->
do_join(riak_core_util:str_to_node(NodeStr));
do_join(Node) when is_atom(Node) ->
{ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
case net_adm:ping(Node) of
pong ->
case rpc:call(Node,
application,
get_env,
[riak_core, ring_creation_size]) of
{ok, OurRingSize} ->
riak_core_gossip:send_ring(Node, node());
_ ->
{error, different_ring_sizes}
end;
pang ->
{error, not_reachable}
end.
leave([]) ->
remove_node(node()).
remove([Node]) ->
remove_node(list_to_atom(Node)).
remove_node(Node) when is_atom(Node) ->
Res = riak_core_gossip:remove_from_cluster(Node),
io:format("~p\n", [Res]).
-spec(ringready([]) -> ok | error).
ringready([]) ->
case ringready() of
{ok, Nodes} ->
io:format("TRUE All nodes agree on the ring ~p\n", [Nodes]);
{error, {different_owners, N1, N2}} ->
io:format("FALSE Node ~p and ~p list different partition owners\n", [N1, N2]),
error;
{error, {nodes_down, Down}} ->
io:format("FALSE ~p down. All nodes need to be up to check.\n", [Down]),
error
end.
-spec(ringready() -> {ok, [atom()]} | {error, any()}).
ringready() ->
case get_rings() of
{[], Rings} ->
{N1,R1}=hd(Rings),
case rings_match(hash_ring(R1), tl(Rings)) of
true ->
Nodes = [N || {N,_} <- Rings],
{ok, Nodes};
{false, N2} ->
{error, {different_owners, N1, N2}}
end;
{Down, _Rings} ->
{error, {nodes_down, Down}}
end.
%% Retrieve the rings for all other nodes by RPC
get_rings() ->
{RawRings, Down} = riak_core_util:rpc_every_member(
riak_core_ring_manager, get_my_ring, [], 30000),
Rings = orddict:from_list([{riak_core_ring:owner_node(R), R} || {ok, R} <- RawRings]),
{lists:sort(Down), Rings}.
%% Produce a hash of the 'chash' portion of the ring
hash_ring(R) ->
erlang:phash2(riak_core_ring:all_owners(R)).
%% Check if all rings match given a hash and a list of [{N,P}] to check
rings_match(_, []) ->
true;
rings_match(R1hash, [{N2, R2} | Rest]) ->
case hash_ring(R2) of
R1hash ->
rings_match(R1hash, Rest);
_ ->
{false, N2}
end.