微服务架构之服务冶理Dubbo-Netty流程

博客首页文章列表 松花皮蛋me 2019-06-26 16:03
文章首发于公众号 松花皮蛋的黑板报松花皮蛋的黑板报,作者就职于京东,在稳定性保障、敏捷开发、高级JAVA、微服务架构有深入的理解

一、服务消费者

服务引用时流程会走到DubboProtocol#refer方法,之前篇章中没有提及Netty环节,本节补上

在getClients时如果有共享链接则直接使用,反之,通过信息交换层Exchangers进行connect得到客户端client实例

可以看到信息交换层Exchanger是个SPI扩展点,默认实现HeaderExchanger

谈谈为什么心跳探测要在客户端发起,而不是服务端去探测?如何应对网络抖动情况下的节点管理?动态注册中心可以把异常的节点及时去除然后通知到消费端,但是如果是因为网络抖动误判就比较麻烦了,可以设置比例但是消费端感知会有时延。那么换个思路解决这个问题,服务消费者并不严格以注册中心的服务节点为准,而是根据实际情况来判断提供者节点是否可用

接下来看下传输层Transporters.connect中的业务逻辑

Transporter是个SPI自适应扩展点,通过URL中的值获取相应的实现类,默认是NettyClient,系统支持GrizzlyClient、MinaClient、NettyClient、NettyClient4

channelHandler参数是包装了HeaderExchangeHandler的DecodeHandler先看下静态变量中的nioEventLoopGroup字段,它设置的线程数是Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),”CPU核数加1″或”32″,大家平时研发线程数设置可以参考这个,另外线程数不是越多越好,业界推荐的计算方法是最佳线程数=CPU核数*[1+(I/O耗时/CPU耗时)]

另外在NettyClient的父类构造方法中进行了handler包装可以看到包装成了包括心跳处理器的MuiltMessageHandler

这里的请求调度器Dispatcher是通过自适应扩展器得到的,默认实现是AllDispatcher

回到NettyClient中,在抽象类AbstractClient的构造方法中调用了模板方法doOpen和doConnect,那么我们先看下NettyClient的doOpen方法

主要业务逻辑是进行创建Netty客户端,客户端只需要创建一个 EventLoopGroup即可,然后将编解码、心跳、业务处理器注册到pipeline事件流中。另外connect方法无非就是调用ChannelFuture future = bootstrap.connect(getConnectAddress());得到future然后阻塞等待结果。另外配置ChannelOption选项时,通过TCP_NODELAY控制禁用了Nagle优化纳格算法的工作方式是合并一定数量的输出数据后一次提交。特别的是,只要有已提交的数据包尚未确认,发送者会持续缓冲数据包,直到累积一定数量的数据才提交,不过该算法与 TCP延迟确认会有不好的相互作用

我们知道服务引用时会调用DubboInvoker#doInvoke方法

HeaderExchangeChannel#request,这里需要注意的是,Request中有一个通过AtomicLong得到的mId用来来唯一表示请求对象,DefaultFuture靠的就是这个mId来关联请求和应答消息,DefaultFuture中有两个很重要的属性:FUTURS和CHANNELS,它们类型都是ConcurrentHashMap,key为mId,在新建DefaultFuture对象时会把mId和相关的Future和Channel塞到这两个Map中;还有一个ReentrantLock类型的lock属性,用于阻塞来等待应答

DefaultFuture#received

DefaultFuture#doReceived,当获取结果时也就是调用DefaultFuture#get时,会阻塞一段时间,直到有结果返回或者超时时唤醒线程


回到请求流程中,send方法实际上是NettyChannel#send方法

这里需要注意两点:

1、writeAndFlush写队列并刷新,实际上netty会把要发送的消息保存在ChannelOutboundBuffer里面,如果网络对方处理速度比较慢或者消息发送比较快或者消息发送量过大都有可能导致内存溢出;。建议优化方式:通过启动项channelOption设置发送队列的长度,或者通过-D启动参数配置长度

2、nettyChannel#send方法中没有直接进行回推发送失败的消息,因为Dubbo提供了容错补偿机制,如果在日常中使用Netty需要自己处理发送失败的消息,不可生吞异常


二、服务提供者


流程复习
RegistryProtocol#export=>DubboProtocol#export=>DubboProtocol#openServer=>DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>Transporters#bind=>NettyServer#init=>AbstractServer#init=>NettyServer#doOpen

初始化EventLoopGroup-bossGroup,它只负责认证授权、连接然后将socket注册到IO连接池中的某个channel,线程数为1,不过Netty推荐使用的是线程池。初始化EventLoopGroup-workerGroup,负责IO读写。线程数:Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)。将EventLoopGroup注册到bootstrap并处理连接,可以看到使用的是一主多从线程模型,Netty根据group参数设置不多的reactor线程模型,默认支持单线程、多线程模型、主从多线程模型,配置非常灵活。最后配置channel,配置连接处理器,然后绑定,阻塞等待关闭


当接收到客户端的消息时,NettyServerHandler#channelRead

AllChannelHandler#received,从WrappedChannelHandler的getExecutorService方法获取线程池,如果通过SPI自适应扩展器无法获取实例对象,就使用DubboSharedHandler线程池。然后提交标识为received的线程任务,提交任务的时候可能会抛出RejectedExecutionException,因为IO线程池是无界的(0-Integer.MAX_VALUE),但服务调用线程池是有界的。抛出异常后将进入caught方法来处理,而该方法使用的仍然是业务线程池,所以很有可能这时业务线程池还是满的。所以生产环境中为了减少在Provider线程池打满时整个系统雪崩的风险,建议将Dispatcher设置成message

FixedThreadPool线程池,默认处理业务的线程数是200,处理消息的队列长度不限制

提交ChannelEventRunnable线程任务,根据state不同进行处理,接收信息、连接、断连、发送、异常

DecodeHandler#received

HeaderExchangeHandler#received=>this#handleRequest。流程复习:DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>HeaderExchangeHandler#init,所以构造方法中的handler参数其实是DubboProtcol中的requestHandler

HeaderExchangerHandler#handleRequest

DubboProtocol.requestHandler#reply

Invoker.invoke调用链大概是echoFilter=>genericFilter=>contextFilter=>traceFilter=>timeoutFilter=>exceptionFilter=>RegistryProrocol$invokerdelegate=>JavassistProxyFactory#getInvoker#doinvoke


通过wrapper包装类执行invokeMethod得到结果,然后通过netty的channel响应消息给服务引用者