微服务架构之服务框架Dubbo-服务暴露

博客首页文章列表 松花皮蛋me 2019-05-13 14:19
文章首发于公众号 松花皮蛋的黑板报松花皮蛋的黑板报,作者就职于京东,在稳定性保障、敏捷开发、高级JAVA、微服务架构有深入的理解

上篇文章说到ServiceBean监听了ContextRefreshedEvent然后export服务,我们接着谈这个话题

private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));

 public synchronized void export() {    
          checkAndUpdateSubConfigs();
        if (shouldDelay()) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

Dubbo提供了延迟暴露服务的特性,这个功能对初始化服务非常友好,建议开启预热。先看下ServiceConfig#checkAndUpdateSubConfigs,第一眼看到是check开头以为只是校验功能的

public void checkAndUpdateSubConfigs() {
    // Use default configs defined explicitly on global configs
    completeCompoundConfigs();
    // Config Center should always being started first.
    startConfigCenter();
    checkDefault();
    checkApplication();
    checkRegistry();
    checkProtocol();
    this.refresh();
    checkMetadataReport();
}

AbstractInterfaceConfig#startConfigCenter

   void startConfigCenter() {
        if (configCenter == null) {
            ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
        }

        if (this.configCenter != null) {
            // TODO there may have duplicate refresh
            this.configCenter.refresh();
            prepareEnvironment();
        }
        //刷新所有的配置,通过set方法注入相关属性
        //getApplication().ifPresent(ApplicationConfig::refresh);
        //java8的Optional#ifPresent()如果实例非空则调用Comsumer Lambda 表达式
        ConfigManager.getInstance().refreshAll();
    }

AbstractInterfaceConfig#prepareEnvironment

private void prepareEnvironment() {
        if (configCenter.isValid()) {
            if (!configCenter.checkOrUpdateInited()) {
                return;
            }
            DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
            String configContent = dynamicConfiguration.getConfig(configCenter.getConfigFile(), 
            try {
                Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
                Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
                Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
            } catch (IOException e) {
                throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
            }
        }
    }

AbstractInterfaceConfig#getDynamicConfiguration,Dubbo默认可以支持apollo、consul、etcd、zookeeper作为配置中心

  private DynamicConfiguration getDynamicConfiguration(URL url) {
        DynamicConfigurationFactory factories = ExtensionLoader
                .getExtensionLoader(DynamicConfigurationFactory.class)
                .getExtension(url.getProtocol());
        DynamicConfiguration configuration = factories.getDynamicConfiguration(url);
        Environment.getInstance().setDynamicConfiguration(configuration);
        return configuration;
    }    

ZookeeperDynamicConfigurationFactory#createDynamicConfiguration

 @Override
    protected DynamicConfiguration createDynamicConfiguration(URL url) {
        return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
    }

ZookeeperDynamicConfiguration#ZookeeperDynamicConfiguration,CountDownLatch是一个同步工具类,它允许线程一直等待,直到其他线程的操作执行完后再执行。另外一个常常被相提并论的CyclicBarrier,它让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活

 ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
         //zookeeper://127.0.0.1:2181/ConfigCenterConfig?address=zookeeper://127.0.0.1:2181&check=true&configFile=.properties&group=&highestPriority=false&namespace=&prefix=.config-center&timeout=3000&valid=true
        this.url = url;
        rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
        initializedLatch = new CountDownLatch(1);
        this.cacheListener = new CacheListener(rootPath, initializedLatch);
        this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addDataListener(rootPath, cacheListener, executor);
        try {
            // Wait for connection
            this.initializedLatch.await();
        } catch (InterruptedException e) {
            logger.warn("Failed to build local cache for config center (zookeeper)." + url);
        }
    }

Dubbo的Zookeeper客户端使用的是Curator,Curator是Netflix公司开源的一套Zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等

CuratorZookeeperClient#CuratorZookeeperClient

 public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))//重试策略
                    .connectionTimeoutMs(timeout);
            //授权认证
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            //连接状态变更通知
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

回到服务暴露入口

 private void doExportUrls() {
        //加载注册中心列表
        List<URL> registryURLs = loadRegistries(true);
        //逐一向注册中心分组暴露服务
        for (ProtocolConfig protocolConfig : protocols) {
            //org.apache.dubbo.demo.DemoService
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

ServiceConfig#doExportUrlsFor1Protocol ,如果不指定SCOPE会进行本地和远程暴露服务

  private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

         //本地scope
          if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {

                exportLocal(url);
            }
           //远程scope
          if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
          }
    }

本地暴露,使用proxyFactory#getInvoker创建服务代理生成切入点也就是invoker对象,然后protocol#export暴露服务

  private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                //injvm://127.0.0.1/....
            URL local = URLBuilder.from(url)
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
        }
    }

ProxyFactory通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()获得

如果扩展加载器getExtension找不到就创建(注:Dubbo使用了大量的双重检测机制),这里的injectExtension是通过set方法注入扩展加载器的依赖。注:原始JDK SPI不支持IOC注入,Dubbo SPI支持Set方法和构造方法注入,wrapperClass.getConstructor(type).newInstance(instance)

  @SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

Adaptive注解如果是配置在类级别上,表示自定义适配器,配置在方法级别上,表示动态生成适配器。SPI注解可以指定默认值。注:原始JDK SPI要用循环判断对象,如果有一个实现加载异常会导致意想不到的结果,Dubbo自己设计的可以通过getAdaptiveExtension灵活获取对象

private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//并不一定需要动态生成
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

在getExtensionClasses方法中会判断SPI的默认实现,加载指定路径下的文件(resources/META-INF/包名+接口名/),方法里可能会设置cachedAdaptiveClass

 private T createAdaptiveExtension() {
        try {
           //获取扩展点然后注入相应的依赖
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

getAdaptiveExtensionClass方法是生成code然后编译

private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        //取得该Class对象的类装载器
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

code长下面这个样子,重点关注下url.getParameter和getExtension。ProtocolFactory$Adaptive的code就不显示了

package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements  org.apache.dubbo.rpc.ProxyFactory {

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

默认类装载器是org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory,另外一种是org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

客户端调用(消费时)会用到getProxy方法获取代理对象,其中InvokerInvocationHandler#invoker中会过滤方法,toString、hashCode、equals方法不走rpc调用,rpc调用走Invoker.invoke(createInvocation(method, args)).recreate()

由于Invoker对象实际和Service实现对象是无法直接调用的,需要先将ref实现类包装成了一个Wrapper,利用字节码增强技术Javassist为Wrapper创建一个实现类,然后Invoker被调用的时候会触发doInvoke()方法,然后调用Wrapper的invokeMethod()方法

跳回到protocol#export的分析,它必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别,Invoker传入后,根据Invoker.url自动获得对应Protocol拓展实现。export()方法的调用顺序是:Protocol$Adaptive => QosProtocolWrapper => ProtocolListenerWrapper => ProtocolFilterWrapper => InjvmProtocol

ProtocolFilterWrapper#export

 @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //注册中心export
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //构建执行链后export
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

ProtocolFilterWrapper#buildInvokerChain,默认的filters包括EchoFilter、ClassFilter、GenericFilter、ContextFilter、TraceFilter、TimeOutFilter、MonitorFilter、ExceptionFilter

 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result result = filter.invoke(next, invocation);
                    if (result instanceof AsyncRpcResult) {
                        AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                        asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                        return asyncResult;
                    } else {
                        return filter.onResponse(result, invoker, invocation);
                    }
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

我们选取限流Filter看下,其他如最小活跃连接数、缓存和异常Filter推荐自行阅读

    @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    "Failed to invoke service " +
                            invoker.getInterface().getName() +
                            "." +
                            invocation.getMethodName() +
                            " because exceed max service tps.");
        }

        return invoker.invoke(invocation);
    }

}

StatItem#isAllowable,Dubbo使用的是AtomicInteger。首先根据时间偏差重置窗口,然后每次来一个请求就减少一个许可。业内常见的限流方式有计数器、滑动容器、漏斗、令牌

public boolean isAllowable() {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }

InjvmProtocol#export

  @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

当scope为空时,会同时进行本地暴露和远程暴露,远程export流程中我们先看下ProtocolListenerWrapper#startQosServer。在Dubbo中,QoS这个概念被用于动态的对服务进行查询和控制,就是后台控制器角色,例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作

   private void startQosServer(URL url) {
        try {           

            //又看到了AtomicBoolean。在进行完比较和赋值操作之前,不会发生任何其他的操作,CAS锁
            if (!hasStarted.compareAndSet(false, true)) {
                return;
            }
            int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
            boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
            Server server = Server.getInstance();
            server.setPort(port);
            server.setAcceptForeignIp(acceptForeignIp);
            server.start();

        } catch (Throwable throwable) {
            logger.warn("Fail to start qos server: ", throwable);
        }
    }

Server#start

   public void start() throws Throwable {
        if (!started.compareAndSet(false, true)) {
            return;
        }
        boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
        worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
            }
        });
        try {
            serverBootstrap.bind(port).sync();
            logger.info("qos-server bind localhost:" + port);
        } catch (Throwable throwable) {
            logger.error("qos-server can not bind localhost:" + port, throwable);
            throw throwable;
        }
    }

回到远程暴露export本身,RegistryProtocol#export

 public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
         ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }       
          // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);

    }

RegistryProtocol#doLocalExport

     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
    });
}

DubboProtocol#export

 @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    openServer(url);
    optimizeSerialization(url);

    return exporter;
}

private ExchangeServer createServer(URL url) {

      ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
}

上述Exchangers.bind(url, requestHandler)流程是
HeaderExchanger => Transporters$Adaptive =>
NettyTransporter => NettyServer。注:Exchanger、Transporter也是通过SPI扩展加载的

NettyServer#doOpen

 @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

接下来看下服务注册,ZookeeperRegistry#doRegister

public class ZookeeperRegistry extends FailbackRegistry {

        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }
         @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

ZookeeperTransporter

@SPI("curator")
public interface ZookeeperTransporter {
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);
}

服务提供者启动后,向”/根/服务名/类型”目录写入自己的URL,比如/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo://172.23.205.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=providers:dubbo:org.apache.dubbo.demo.DemoService&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5316&register=true&release=&side=provider&timestamp=1557727085669,生成一个EPHEMERAL临时节点,这样即使服务提供者异常关闭,等待Zookeeper会话超时后,该临时节点也会自动删除,当然Dubbo通过封装JDK提供的ShutdownHook实现优雅停止,大概流程是标记不再新请求,已有请求处理完成后释放资源

注册完并不代表服务暴露的流程结束,还有一个很重要的过程需要处理,也就是监听配置变更,在ZookeeperRegistry#doSubscribe中创建持久化节点“dubbo/org.apache.dubbo.demo.DemoService/configurators”,然后设置监听回调地址,即回调给FailbackRegistry中的notify。如果有变更则更新本地Zookeeper信息缓存文件,然后判断是否需要重新export