High Performance Linux

Saturday, March 24, 2012

Simple Cluster Management Using Erlang Mnesia

    If you need to run some application in cluster to get high availability (through redundancy) or high performance (though scaling out), then you need some cluster management system. Basically such system performs two operations: load balancing between the cluster nodes and nodes failovering. Also usually dynamic cluster reconfiguration is required, i.e. we desire to be able to introduce a new node to the cluster or shutdown a node at any given time.

    The logic for load balancing and failovering itself is not trivial, but there is also one not obvious problem - synchronization between the cluster nodes. The problem is crucial in shared-nothing cluster architecture with shared state, i.e. there is no single physical point of failure in the cluster, but all the nodes operate on a shared data. Lets consider that you have a cluster of working nodes and you have a jobs on all of the nodes, which you need to reassign depending on each node load. Traditionally you'd do this with some director node - this node collects load factors from all the nodes and reschedule the jobs between the nodes. The director itself is a cluster of at least two machines for redundancy. But this works only for large clusters - it has not so much sense to have two directors for small cluster of two or little bit more working nodes which works in active-active scenario. For such small clusters we would wish that all the nodes are able to participate in cluster load balancing and failovering itself.

    Let's consider a quick example. You have cluster of two active working nodes, A and B, and both the nodes are loaded equally. Let J1 and J2 be a jobs (e.g. client connections) on node A and J3 and J4 a jobs on node B respectively. At some point of time clients which makes jobs J3 and J4 reduced their activity, but J1 and J2 increased (i.e. load factors produced by the jobs are increased). You would expect that the nodes will reschedule the jobs/connections between the nodes in manner like J1 and J3 to node A and J2 and J4 to node B. To do this both the nodes have to track current load on itself and other node and reassign the jobs in accordance with the loads ratio. Since both the nodes work independently, then we can not guarantee that the node won't try to reassign the jobs in different ways at the same time (e.g. node A tries to assign J1 and J3 to itself and J2 and J4 to B while B is trying to assign J1 and J4 to itself and J2 and J3 to A). If we have a cluster of bit more more machines (say 3) and the machines can fail at any time then we also have a change that some node crash during the jobs reassignment and we loose some jobs.

    To synchronize such kind of communications between the nodes usually we use algorithms of distributed state machines like Paxos or Virtual Synchrony. The last is greatly implemented in Spread Toolkit, which you can use with C/C++ or Perl bindings to build reliable cluster management system.

    However Erlang provides distributed database, Mnesia, from the box, which is useful in solving such issues. In fact, Mnesia have distributed transactions, but it is not a heavy disk-bases database. Mnesia has special kind of tables, ram_copies, which is actually just a hash tables. So the key point: with Erlang and Mnesia we can atomically and reliably execute distributed operations on hash tables stored in RAM on different cluster nodes.

    Now lets have a look how to solve the jobs scheduling problem with Erlang (I suppose that the reader is familiar with basic Erlang syntax). First of all let's create two table with assigned jobs and current load:

        -record(jobs, {job_id, weight, node_name}).
        -record(cluster_view, {node_name}).

        mnesia:create_table(cluster_view,
            [{ram_copies, [node()]},
             {attributes, record_info(fields, cluster_view)}
            ]).
        mnesia:create_table(jobs,

            [{ram_copies, [node()]},
             {attributes, record_info(fields, jobs)},
             {index, [node_name]}
            ]).


    (I defined secondary index for node_name in jobs table to be able to select all jobs assigned to a node).

    Each node periodically updates its load status in transactional manner:


        mnesia:transaction(fun() ->
            mnesia:write(#cluster_view{node_name = node(), 
                                       load = CurrentLoad})
        end).

    where CurrentLoad is the value of current load of the node. I don't do any error handling here for brevity, but it should be done in production code.

    And load balancing can be done in following way (this is not the most effective method, but simple enough):


        mnesia:transaction(fun() ->
            % QLC Query Handler to get sorted list of Jobs by
            % weight field in descending order.
            % Returns list of tuples of form {weight, job_id}
            JLQH = qlc:sort(qlc:q([{J#jobs.weight, J#jobs.job_id}
                                   || J <- mnesia:table(jobs)]),
                            [{order, descending}]).

            % Get two lists of jobs assigned to each node.
            % E.g. if it returns {10,[1],8,[2,4,3]}, then
            % job 1 with weight 10 has to be assigned to
            % node A and jobs 1, 4 and 3 with total weight 8
            % to node B.
            qlc:fold(fun(J, D) ->
                    if element(1, D) =< element(3, D) ->
                        {element(1, D) + element(1, J),
                         lists:append(element(2, D),[element(2, J)]),
                         element(3, D),
                    
        element(4, D)};
               
        true ->
                        {element(1, D),
                         element(2, D),
        
                     element(3, D) + element(1, J),
            
                 lists:append(element(4, D), [element(2, J)])}
                    end 
                end,
                {0, [], 0, []},
               
    JLQH)
        end).

    This way only one node at each given time can check current nodes load and redistribute jobs atomically.

    Thus, distributed Mnesia transactions are very useful to build simple cluster management system for small clusters (distributed transactions are very expensive in general, so they are absolutely not suitable for large clusters with intensive management operations), but it has number of drawbacks which make it hard to develop flexible cluster operations. The one of such things is that Mnesia does not have normal mechanism to detach the cluster. So if you need to detach a node from the cluster such that it will keep current cluster data snapshot, but won't replicate to and from other cluster nodes, then you have to dump all tables with dump_tables() to local text file, recreate local database schema and load the tables back. Other thing is that you have no enough flexibility in Mnesia setting to manage database schema in node failure case.

    P.S. Ulf Wiger has given very good presentation about Mnesia ablilities in cluster environment.

    No comments:

    Post a Comment