org.apache.catalina.tribes.group.interceptors

Class NonBlockingCoordinator

public class NonBlockingCoordinator extends ChannelInterceptorBase

Title: Auto merging leader election algorithm

Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that werent part of the

This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on

This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes correctly, there is nothing guaranteeing that it will.

To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}

A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept the view.

Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Lets also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E

To ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.

Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)

State Diagrams

Initiate an election

Receive an election message

Version: 1.0

Author: Filip Hanik

Nested Class Summary
static classNonBlockingCoordinator.CoordinationEvent
static classNonBlockingCoordinator.CoordinationMessage
Field Summary
protected AtomicBooleancoordMsgReceived
protected static byte[]COORD_ALIVE
Alive message
protected static byte[]COORD_CONF
Coordination confirmation, for blocking installations
protected static byte[]COORD_HEADER
header for a coordination message
protected static byte[]COORD_REQUEST
Coordination request
protected ObjectelectionMutex
protected Membershipmembership
Our nonblocking membership
protected booleanstarted
protected intstartsvc
protected UniqueIdsuggestedviewId
indicates that we are running an election and this is the one we are running
protected MembershipsuggestedView
protected Membershipview
Our current view
protected UniqueIdviewId
Out current viewId
protected longwaitForCoordMsgTimeout
Time to wait for coordination timeout
Constructor Summary
NonBlockingCoordinator()
Method Summary
booleanaccept(ChannelMessage msg)
protected booleanalive(Member mbr)
ChannelDatacreateData(NonBlockingCoordinator.CoordinationMessage msg, MemberImpl local)
voidfireInterceptorEvent(InterceptorEvent event)
MembergetCoordinator()
Returns coordinator if one is available
MembergetLocalMember(boolean incAlive)
Return the member that represents this node.
MembergetMember(Member mbr)
Member[]getMembers()
Get all current cluster members
MembergetNextInLine(MemberImpl local, MemberImpl[] others)
Member[]getView()
UniqueIdgetViewId()
protected voidhalt()
Block in/out messages while a election is going on
protected voidhandleMyToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
protected voidhandleOtherToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
protected voidhandleToken(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
protected voidhandleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
protected booleanhasHigherPriority(Member[] complete, Member[] local)
booleanhasMembers()
has members
voidheartbeat()
booleanisCoordinator()
booleanisHighest()
protected booleanisViewConf(NonBlockingCoordinator.CoordinationMessage msg)
voidmemberAdded(Member member)
voidmemberAdded(Member member, boolean elect)
voidmemberDisappeared(Member member)
protected MembershipmergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg, Member sender)
voidmessageReceived(ChannelMessage msg)
protected voidprocessCoordMessage(NonBlockingCoordinator.CoordinationMessage msg, Member sender)
protected voidrelease()
Release lock for in/out messages election is completed
protected voidsendElectionMsg(MemberImpl local, MemberImpl next, NonBlockingCoordinator.CoordinationMessage msg)
protected voidsendElectionMsgToNextInline(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg)
voidsendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
protected voidsetupMembership()
voidstart(int svc)
voidstartElection(boolean force)
voidstop(int svc)
protected voidviewChange(UniqueId viewId, Member[] view)
protected voidwaitForRelease()
Wait for an election to end

Field Detail

coordMsgReceived

protected AtomicBoolean coordMsgReceived

COORD_ALIVE

protected static final byte[] COORD_ALIVE
Alive message

COORD_CONF

protected static final byte[] COORD_CONF
Coordination confirmation, for blocking installations

COORD_HEADER

protected static final byte[] COORD_HEADER
header for a coordination message

COORD_REQUEST

protected static final byte[] COORD_REQUEST
Coordination request

electionMutex

protected Object electionMutex

membership

protected Membership membership
Our nonblocking membership

started

protected boolean started

startsvc

protected final int startsvc

suggestedviewId

protected UniqueId suggestedviewId
indicates that we are running an election and this is the one we are running

suggestedView

protected Membership suggestedView

view

protected Membership view
Our current view

viewId

protected UniqueId viewId
Out current viewId

waitForCoordMsgTimeout

protected long waitForCoordMsgTimeout
Time to wait for coordination timeout

Constructor Detail

NonBlockingCoordinator

public NonBlockingCoordinator()

Method Detail

accept

public boolean accept(ChannelMessage msg)

alive

protected boolean alive(Member mbr)

createData

public ChannelData createData(NonBlockingCoordinator.CoordinationMessage msg, MemberImpl local)

fireInterceptorEvent

public void fireInterceptorEvent(InterceptorEvent event)

getCoordinator

public Member getCoordinator()
Returns coordinator if one is available

Returns: Member

getLocalMember

public Member getLocalMember(boolean incAlive)
Return the member that represents this node.

Returns: Member

getMember

public Member getMember(Member mbr)

Parameters: mbr Member

Returns: Member

getMembers

public Member[] getMembers()
Get all current cluster members

Returns: all members or empty array

getNextInLine

public Member getNextInLine(MemberImpl local, MemberImpl[] others)

getView

public Member[] getView()

getViewId

public UniqueId getViewId()

halt

protected void halt()
Block in/out messages while a election is going on

handleMyToken

protected void handleMyToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)

handleOtherToken

protected void handleOtherToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)

handleToken

protected void handleToken(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)

handleViewConf

protected void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)

hasHigherPriority

protected boolean hasHigherPriority(Member[] complete, Member[] local)

hasMembers

public boolean hasMembers()
has members

heartbeat

public void heartbeat()

isCoordinator

public boolean isCoordinator()

isHighest

public boolean isHighest()

isViewConf

protected boolean isViewConf(NonBlockingCoordinator.CoordinationMessage msg)

memberAdded

public void memberAdded(Member member)

memberAdded

public void memberAdded(Member member, boolean elect)

memberDisappeared

public void memberDisappeared(Member member)

mergeOnArrive

protected Membership mergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg, Member sender)

messageReceived

public void messageReceived(ChannelMessage msg)

processCoordMessage

protected void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg, Member sender)

release

protected void release()
Release lock for in/out messages election is completed

sendElectionMsg

protected void sendElectionMsg(MemberImpl local, MemberImpl next, NonBlockingCoordinator.CoordinationMessage msg)

sendElectionMsgToNextInline

protected void sendElectionMsgToNextInline(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg)

sendMessage

public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)

setupMembership

protected void setupMembership()

start

public void start(int svc)

startElection

public void startElection(boolean force)

stop

public void stop(int svc)

viewChange

protected void viewChange(UniqueId viewId, Member[] view)

waitForRelease

protected void waitForRelease()
Wait for an election to end
Copyright © 2000-2011 Apache Software Foundation. All Rights Reserved.