problem
When we use the leader and follower model, we need to ensure that the information between the leader and each follower remains in order, and retry the missing information. At the same time, the cost of new connections is kept low, so that opening new connections does not increase system delay.
solution
Fortunately, the long-term and widely used TCP mechanism provides all these necessary features. Therefore, we can achieve the communication we need by ensuring that all communication between a follower and leader passes through a socket channel. Then the follower uses a Singular Update Queue to serialize the update from the leader
The node never closes the connection after it is opened, and continues to read new requests. The node uses a dedicated thread for each connection to read and write requests. If non-blocking io is used, one thread per connection is not required. A simple thread-based implementation is as follows:
class SocketHandlerThread...
@Override public void run() {try {//Continues to read/write to the socket connection till it is closed. while (true) {handleRequest();}} catch (Exception e) {getLogger(). debug(e);}}
private void handleRequest() {RequestOrResponse request = readRequestFrom(clientSocket); RequestId requestId = RequestId. valueOf(request.getRequestId()); requestConsumer. accept(new Message<>(request, requestId, clientSocket));}
Nodes read requests and submit them to a single update queue for processing. Once the node has processed the request, it will write the response back to the socket.
Whenever a node establishes communication, it opens a socket connection for all requests with the other party.
class SingleSocketChannel...
public class SingleSocketChannel implements Closeable {final InetAddressAndPort address; final int heartbeatIntervalMs; private Socket clientSocket; private final OutputStream socketOutputStream; private final InputStream inputStream;
public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException {this. address = address; this. heartbeatIntervalMs = heartbeatIntervalMs; clientSocket = new Socket(); clientSocket. connect(new InetSocketAddress(address.getAddress(), address.getPort()), heartbeatIntervalMs); clientSocket. setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat. socketOutputStream = clientSocket. getOutputStream(); inputStream = clientSocket. getInputStream();}
public synchronized RequestOrResponse blockingSend(RequestOrResponse request) throws IOException {writeRequest(request); byte[] responseBytes = readResponse(); return deserialize(responseBytes);}
private void writeRequest(RequestOrResponse request) throws IOException {var dataStream = new DataOutputStream(socketOutputStream); byte[] messageBytes = serialize(request); dataStream. writeInt(messageBytes.length); dataStream. write(messageBytes);}
It is important to keep a timeout on the connection so that it will not block indefinitely when an error occurs. We use the HeartBeat mechanism to periodically send requests through the socket channel to keep it active. This timeout is usually a multiple of the heartbeat interval, including network round-trip time and some possible network delays. It is reasonable to set the connection timeout to 10 times the heartbeat interval.
class SocketListener...
private void setReadTimeout(Socket clientSocket) throws SocketException {clientSocket. setSoTimeout(config.getHeartBeatIntervalMs() * 10);}
Sending requests through a single channel may cause head blocking problems. To avoid these problems, we can use Request Pipeline.