多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
RPC即远程过程调用,它的提出旨在消除通信细节、屏蔽繁杂且易错的底层网络通信操作,像调用本地服务一般地调用远程服务,让业务开发者更多关注业务开发而不必考虑网络、硬件、系统的异构复杂环境。 先看看集群中RPC的整个通信过程,假设从节点node1开始一个RPC调用,①先将待传递的数据放到NIO集群通信框架(这里使用的是tribes框架)中;②由于使用的是NIO模式,线程无需阻塞直接返回;③由于与集群其他节点通信需要花销若干时间,为了提高CPU使用率当前线程应该放弃CPU的使用权进行等待操作;④NIO集群通信框架tribes接收到node2节点的响应消息,并将消息封装成Response对象保存至响应数组;⑤tribes接收到node4节点的响应消息,由于是使用了并行通信,所以node4可能比node3先返回消息,并将消息封装成Response对象保存至响应数组;⑥tribes最后接收到node3节点的响应消息,并将消息封装成Response对象保存至响应数组;⑦现在所有节点的响应都已经收集完毕,是时候通知刚刚被阻塞的那条线程了,原来的线程被notify醒后拿到所有节点的响应Response[]进行处理,至此完成了整个集群RPC过程。  ![](https://box.kancloud.cn/2016-01-15_5698bd91574dc.jpg) 上面整个过程是在只有一条线程的情况下,一切看起来没什么问题,但如果有多条线程并发调用则会导致一个问题:线程与响应的对应关系将被打乱,无法确定哪个线程对应哪几个响应。因为NIO通信框架不会每个线程都独自使用一个socket通道,为提高性能一般都是使用长连接,所有线程公用一个socket通道,这时就算线程一比线程二先放入tribes也不能保证响应一比响应二先接收到,所以接收到响应一后不知道该通知线程一还是线程二。只有解决了这个问题才能保证RPC调用的正确性。 要解决线程与响应对应的问题就需要维护一个线程响应关系列表,响应从关系列表中就能查找对应的线程,如下图,在发送之前生成一个UUID标识,此标识要保证同socket中唯一,再把UUID与线程对象关系对应起来,可使用Map数据结构实现,UUID的值作为key,线程对应的锁对象为value。接着制定一个协议报文,UUID作为报文的其中一部分,报文发往另一个节点node2后将响应信息message放入报文中并返回,node1对接收到的报文进行解包根据UUID去查找并唤起对应的线程,告诉它“你要的消息已经收到,往下处理吧”。但在集群环境下,我们更希望是集群中所有节点的消息都接收到了才往下处理,如下图下半部分,一个UUID1的请求报文会发往node2、node3和node4三个节点,这时假如只接收到一个响应则不唤起线程,直到node2、node3对应UUID1的响应报文都接收到后才唤起对应线程往下执行。同样地,UUID2、UUID3的报文消息都是如此处理,最后集群中对应的响应都能正确回到各自线程上。  ![](https://box.kancloud.cn/2016-01-15_5698bd916e0fc.jpg) 用简单代码实现一个RPC例子,选择一个集群通信框架负责底层通信,这里使用tribes,接着往下: ①定义一个RPC接口,这些方法是预留提供给上层具体逻辑处理的入口,replyRequest方法用于处理响应逻辑,leftOver方法用于残留请求的逻辑处理。 ~~~ public interface RpcCallback {     public Serializable replyRequest(Serializable msg, Member sender);     public void leftOver(Serializable msg, Member sender); } ~~~ ②定义通信消息协议,实现Externalizable接口自定义序列化和反序列化,message用于存放响应消息,uuid标识用于关联线程,rpcId用于标识RPC实例,reply表示是否回复。 ~~~ public class RpcMessage implements Externalizable { protected Serializable message; protected byte[] uuid; protected byte[] rpcId; protected boolean reply = false; public RpcMessage() { } public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { this.rpcId = rpcId; this.uuid = uuid; this.message = message; } ~~~ @Override ~~~ public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { reply = in.readBoolean(); int length = in.readInt(); uuid = new byte[length]; in.readFully(uuid); length = in.readInt(); rpcId = new byte[length]; in.readFully(rpcId); message = (Serializable) in.readObject(); } ~~~ @Override ~~~ public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(reply); out.writeInt(uuid.length); out.write(uuid, 0, uuid.length); out.writeInt(rpcId.length); out.write(rpcId, 0, rpcId.length); out.writeObject(message); } } ~~~ ③响应类型,提供多种唤起线程的条件,一共四种类型,分别表示接收到第一个响应就唤起线程、接收到集群中大多数节点的响应就唤起线程、接收到集群中所有节点的响应才唤起线程、无需等待响应的无响应模式。 ~~~ public class RpcResponseType { public static final int FIRST_REPLY = 1; public static final int MAJORITY_REPLY = 2; public static final int ALL_REPLY = 3; public static final int NO_REPLY = 4; } ~~~ ④响应对象,用于封装接收到的消息,Member在通信框架tribes是节点的抽象,这里用来表示来源节点。 ~~~ public class RpcResponse { private Member source; private Serializable message; public RpcResponse() { } public RpcResponse(Member source, Serializable message) { this.source = source; this.message = message; } public void setSource(Member source) { this.source = source; } public void setMessage(Serializable message) { this.message = message; } public Member getSource() { return source; } public Serializable getMessage() { return message; } } ~~~ ⑤RPC响应集,用于存放同个UUID的所有响应。 ~~~ public class RpcCollector {     public ArrayList responses = new ArrayList();      public byte[] key;     public int options;     public int destcnt;     public RpcCollector(byte[] key, int options, int destcnt) {         this.key = key;         this.options = options;         this.destcnt = destcnt;     }     public void addResponse(Serializable message, Member sender){      RpcResponse resp = new RpcResponse(sender,message);         responses.add(resp);     }     public boolean isComplete() {         if ( destcnt <= 0 ) return true;         switch (options) {             case RpcResponseType.ALL_REPLY:                 return destcnt == responses.size();             case RpcResponseType.MAJORITY_REPLY:             {                 float perc = ((float)responses.size()) / ((float)destcnt);                 return perc >= 0.50f;             }             case RpcResponseType.FIRST_REPLY:                 return responses.size()>0;             default:                 return false;         }     }     public RpcResponse[] getResponses() {         return responses.toArray(new RpcResponse[responses.size()]);     } } ~~~ ⑥RPC核心类,是整个RPC的抽象,它要实现tribes框架的ChannelListener接口,在messageReceived方法中处理接收到的消息。因为所有的消息都会通过此方法,所以它必须要根据key去处理对应的线程,同时它也要负责调用RpcCallback接口定义的相关的方法,例如响应请求的replyRequest方法和处理残留的响应leftOver方法,残留响应是指有时我们在接收到第一个响应后就唤起线程。 ~~~ public class RpcChannel implements ChannelListener { private Channel channel; private RpcCallback callback; private byte[] rpcId; private int replyMessageOptions = 0; private HashMap responseMap = new HashMap(); public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { this.rpcId = rpcId; this.channel = channel; this.callback = callback; channel.addChannelListener(this); } public RpcResponse[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) throws ChannelException { int sendOptions = channelOptions& ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; byte[] key = UUIDGenerator.randomUUID(false); RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length); try { synchronized (collector) { if (rpcOptions != RpcResponseType.NO_REPLY) responseMap.put(key, collector); RpcMessage rmsg = new RpcMessage(rpcId, key, message); channel.send(destination, rmsg, sendOptions); if (rpcOptions != RpcResponseType.NO_REPLY) collector.wait(timeout); } } catch (InterruptedException ix) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); } return collector.getResponses(); } ~~~ @Override ~~~ public void messageReceived(Serializable msg, Member sender) { RpcMessage rmsg = (RpcMessage) msg; byte[] key = rmsg.uuid; if (rmsg.reply) { RpcCollector collector = responseMap.get(key); if (collector == null) { callback.leftOver(rmsg.message, sender); } else { synchronized (collector) { if (responseMap.containsKey(key)) { collector.addResponse(rmsg.message, sender); if (collector.isComplete()) collector.notifyAll(); } else { callback.leftOver(rmsg.message, sender); } } } } else { Serializable reply = callback.replyRequest(rmsg.message, sender); rmsg.reply = true; rmsg.message = reply; try { channel.send(new Member[] { sender }, rmsg, replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } catch (Exception x) { } } } ~~~ @Override ~~~ public boolean accept(Serializable msg, Member sender) { if (msg instanceof RpcMessage) { RpcMessage rmsg = (RpcMessage) msg; return Arrays.equals(rmsg.rpcId, rpcId); } else return false; } } ~~~ ⑦自定义一个RPC,它要实现RpcCallback接口,分别对请求处理和残留响应处理,这里请求处理仅仅是简单返回“hello,response for you!”作为响应消息,残留响应处理则是简单输出“receive a leftover message!”。假如整个集群有五个节点,由于接收模式设置成了FIRST_REPLY,所以每个只会接受一个响应消息,其他的响应都被当做残留响应处理。 ~~~ public class MyRPC implements RpcCallback { @Override public Serializable replyRequest(Serializable msg, Member sender) { RpcMessage mapmsg = (RpcMessage) msg; mapmsg.message = "hello,response for you!"; return mapmsg; } @Override public void leftOver(Serializable msg, Member sender) { System.out.println("receive a leftover message!"); } public static void main(String[] args) { MyRPC myRPC = new MyRPC(); byte[] rpcId = new byte[] { 1, 1, 1, 1 }; byte[] key = new byte[] { 0, 0, 0, 0 }; String message = "hello"; int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK; RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message); RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC); RpcResponse[] resp = rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);        while(true) Thread.currentThread().sleep(1000); } } ~~~ 可以看到通过上面的RPC封装后,上层可以把更多的精力关注到消息逻辑处理上面了,而不必关注具体的网络IO如何实现,屏蔽了繁杂重复的网络传输操作,为上层提供了很大的方便。