引言和概述
在分布式系统中,通信协议的设计直接影响系统的可靠性和可扩展性。Apache Seata的RPC通信协议为各组件间的数据传输提供了基础,对这方面的源码分析是深入理解seata的又一个好方式。在最近的2.2.0版本里,我为Seata的通信机制进行了重构,以支持多版本协议的兼容性,现在改造完成了,我将从传输机制和通信协议两个方面去分析新版本里源码。 本文是第一篇,介绍Seata传输机制。
seata里的RPC通信主角是TC
、TM
、RM
三者,当然过程中还可能会涉及注册中心甚至配置中心等其他网络交互,但这些相对内容所使用的通信机制是相对独立的,本文不作讨论。
接下来我将按照我最早了解源码时的几个直觉提问,带大家进行探索。
Seata中的Netty(谁在传输)
第一个问题:seata通信的底层是什么在进行发送请求报文和接收请求报文?答案是netty,而netty在seata里面是如何工作的呢?我们将去到core包的org.apache.seata.core.rpc.netty去探索

从这个继承关系我们可以看到,AbstractNettyRemoting
作为核心的父类,RM和TM和Server(TC)都实现了他,实际上这个类里面已经实现了核心的发送和接收
在sendSync
实现了同步发送逻辑,异步发送sendAsync
的逻辑相似且更简单这里不再重复,只要拿到channel进行发送即可
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
// 此处省略非关键代码
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);
// netty的写方法(netty write)
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
// 此处省略非关键代码
}
}
而接收报文的方式,主要在processMessage方法里,这个方法被AbstractNettyRemotingClient.ClientHandler
和AbstractNettyRemotingServer.ServerHandler
这两个类的channelRead调用,这两个内部类都是ChannelDuplexHandler
的子类,他们各自注册在client和server的Bootstrap里(为什么注册到bootstrap就能进行接收操作?这个要移步netty的原理)

收到信息后就会调进父类的processMessage
方法里,我们来看看源码
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
// 此处省略非关键代码
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 这里可读性稍微差点,first是Processor,表示普通处理,而second是线程池,表示池化处理。
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 此处省略非关键代码
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}