京东技术解密之配置中心DUCC

松花皮蛋me 2019-04-19 20:52
文章首发于公众号 松花皮蛋的黑板报松花皮蛋的黑板报,作者就职于京东,在稳定性保障、敏捷开发、高级JAVA、微服务架构有深入的理解

一、使用方法

简单说下DUCC的特点

支持多环境(或称分组),分组可以合并

内置强大的基于插件的数据绑定框架,支持多种类型等转换;

支持Log4j、Log4j2、Logback的动态修改日记级别功能。

支持Spring原生注解、支持自定义注解,客户端代码入侵性低

支持客户端多配置源,支持自定义配置,如ZK、Consol扩展

支持配置预案切换


接下来说说怎么用,下面我代码中的ConfiguratorManager就是DUCC的配置管理类

@Configuration
@Log4j2
@Order(value = 0)
public class Config extends PropertyPlaceholderConfigurer {

    //发布系统的配置
    private static Map<String,String> joneProperty =  new HashMap<>();
    //DUCC系统的配置
    private static Map<String,String> duccProperty = new HashMap<>();

    @Bean(initMethod = "start" , destroyMethod = "stop")
    public  ConfiguratorManager configuratorManager()
    {
        ConfiguratorManager configuratorManager = ConfiguratorManager.getInstance() ;
        configuratorManager.setApplication(getJoneProperty("laf.config.manager.application"));
        configuratorManager.addResource(new Resource("ucc",getJoneProperty("laf.config.manager.uri")));
        configuratorManager.addListener(new ConfigurationListener.CustomConfigurationListener("ucc") {
            @Override
            public void onUpdate(com.jd.laf.config.Configuration configuration) {
                List<Property> properties = configuration.getProperties();
                for (Property property:properties) {
                    log.info("duccConfig update key:{}",property.getKey());
                    duccProperty.put(property.getKey(), String.valueOf(property.getValue()));
                }
            }
        });
        return configuratorManager ;
    }

    public String getDuccProperty(String key)
    {
        if(duccProperty.containsKey(key)) {
            return duccProperty.get(key);
        }
        Property property = configuratorManager().getProperty(key);
        if(property==null || String.valueOf(property.getValue()).isEmpty()) {
            log.error("配置降级,key:{}",key);
            return getJoneProperty(key);
        }
        duccProperty.put(key, String.valueOf(property.getValue()));
        return String.valueOf(property.getValue());
    }

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactory, Properties props)throws BeansException {
        super.processProperties(beanFactory, props);
        for (Object key : props.keySet()) {
            String keyStr = key.toString();
            joneProperty.put(keyStr, String.valueOf(props.getProperty(keyStr)));
        }
    }

    public  String getJoneProperty(String key)
    {
        return joneProperty.get(key);
    }

}

二、DUCC各重要模块解读

1、ConfiguratorManager统一配置管理类

主要步骤:

1.1 从服务器获取配置信息

1.2 将配置保存到本地

1.3 启动更新事件消费者

1.4 为每个Resource(可以理解为分组)启动变更监控线程Watcher,监听配置变更

public class ConfiguratorManager implements ConfigurationSupplier, Watchable {


      //通知
    protected Notifier notifier = new Notifier() {
        @Override
        public <M, T extends Listener> void send(M property, List<T> listeners) {
            inform(property, listeners);
        }
    };

    //事件
protected BlockingQueue<Resource> events = new ArrayBlockingQueue<Resource>(5000);


     @Override
    public Property getProperty(final String key) {
         //最终是通过configuration获取的,后面会讲到
        return configuration.getProperty(name);
    }

     @Override
    public boolean addListener(final PropertyListener listener) {

           //判断是否重复添加

           groupResources.addListener(listener.getKey(), listener);
       }

         /**
     * 通知监听器,notify执行send方法时会触发
     *
     * @param property  变更的配置
     * @param listeners 监听器
     */
    protected <M, T extends Listener> void inform(final M property, final List<T> listeners) {
        if (listeners == null) {
            return;
        }
        for (final T listener : listeners) {
            if (!isStarted()) {
                return;
            }
            notifierThreads.execute(new Runnable() {
                @Override
                public void run() {
                    if (isStarted()) {
                        listener.onUpdate(property);
                    }
                }
            });
        }
    }


        /**
     * 启动
     */
    public void start() throws Exception {
         //验证降级文件路径可写等
        validate();
        synchronized (mutex) {
            if (started.compareAndSet(false, true)) {
                ExecutorService executorService = null;
                try {
                //资源URL解析,排序合并
                initializeResource()
                resource.setConfigurator(configurator)
                //远程加载,初始化上下文context
                startRemote(resources, executorService)         
                //通知,最后还是会执行到notifier.send()方法
                inform(groupResource, groupConfiguration);                   
                //启动变更监听
                consumer = new Thread(new EventConsumer());
                consumer.start();
                //所有资源已经初始化过,安全启动监听器
                //支持多个分组,推荐重复度高的单独抽出来
                for (Resource resource : groupResources) {
                    if (resource.isReady()) {
                        resource.watch();
                    }
                }
               } catch (Exception e) {
                stop();
                throw e;
            } finally {
                if (executorService != null) {
                    executorService.shutdownNow();
                }
            }
            }
        }

     }


    protected class EventConsumer implements Runnable {
            @Override
            public void run() {
                while (isStarted() && !Thread.currentThread().isInterrupted()) {
                    try {
                        Resource event = events.poll(1000, TimeUnit.MICROSECONDS);
                        if (event != null) {
                            onUpdateConfig(event);
                        }
                    } catch (InterruptedException e) {
                       //必须重置中断信号标记
                        Thread.currentThread().interrupt();
                    }
                }
        }

        /**
         * 资源-配置发生变更
         *
         * @param resource
         */
        protected void onUpdateConfig(final Resource resource) {
           //省略很多行
           /获取旧的配置项
            //获取新的配置项
            //判断是否变更或者新增或者有删除
            //通知配置监听器,最后还是会执行到notifier.send()
            inform(newProperty, listeners);
        }
 }

2、监听器容器Observer

Observer包含了各种Listener,同时拥有一个ConfiguratorManager的成员变量,Lister最终会传递到该变量中

/**
 * 监听器容器
 */
public class Observer implements InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

     protected ConfiguratorManager manager;

     /**
     *
     *   省略其他
     */
     public void addPropertyListener(final PropertyListener listener) {
        propertyListeners.add(listener);
    }

        /**
     * 添加监听器
     */
    public synchronized void execute() {
        if (!processed) {
            if (manager != null) {
                for (PropertyListener listener : propertyListeners) {
                    manager.addListener(listener);
                }
                for (ConfigurationListener listener : configurationListeners) {
                    manager.addListener(listener);
                }
                for (Map.Entry<String, String> entry : scriptListeners.entrySet()) {
                    manager.addListener(new JavaScriptListener(entry.getKey(), entry.getValue(), context));
                }
                //针对@ConfigurationPropties注解
                for (ConfigPropertiesBean cfgBean : beans) {
                    String prefix = cfgBean.getPrefix();
                    Object bean = cfgBean.getBean();
                    if (bean == null && context.containsBean(cfgBean.getBeanName())) {
                        bean = context.getBean(cfgBean.getBeanName());
                    }
                    if (bean != null) {
                        //添加监听器
                        process(bean, cfgBean.getBeanName(), prefix);
                    }
                }
            }
            processed = true;
        }
    }

    /**
    * 当BeanFactory设置完Bean属性后会调用此方法,可以添加初始化方法
    ***/

      @Override
    public void afterPropertiesSet() throws Exception {

        //在BeanDefinition中定义注入的
        if (fieldListeners != null) {
            for (FieldListener fieldListener : fieldListeners) {
                addFieldListener(fieldListener);
            }
        }

        if (methodListeners != null) {
            for (MethodListener methodListener : methodListeners) {
                addMethodListener(methodListener);
            }
        }
    }

    /**
     * 获取上下文的引用,需要实现ApplicationContextAware接口
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.context = applicationContext;
    }

    @Override
    public void onApplicationEvent(final ContextRefreshedEvent event) {
        if (event.getSource() == context) {
            execute();
        }
    }

}

如果在上下文中部署一个实现了ApplicationListener接口的Bean,那么每当在一个ApplicationEvent发布到ApplicationContext时,这个Bean会得到通知,其实这就是标准的Oberver设计模式

当ApplicationContext实例完成后,会调用onApplicationEvent()方法,执行execute()方法,然后将PropertyListener/ConfigurationLister添加到ConfiguratorManager实例中

3、Bean实例化后置处理器ConfigPostProcessor

/**
 * Bean实例化后置处理器,保存ConfiguratorManager实例,处理Bean的配置
 *
 */
public class ConfigPostProcessor implements BeanPostProcessor, PriorityOrdered, ApplicationContextAware {

    protected ApplicationContext applicationContext;

    protected Observer observer;

    /**
     * PostProcessBeforeInitialization 方法会在Bean构造完成后(构造方法执行完成),初始化方法(init-method)方法调用之前被调用
     * */
    @Override
        public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
    //处理配置对象
    if (!process(bean, beanName, observer)) {
        //处理属性监听器
        doWithFields(bean.getClass(), new FieldCallback() {
            @Override
            public void doWith(final Field field) {
                process(field, bean, beanName, observer);
            }
        }, new FieldFilter() {
            @Override
            public boolean matches(final Field field) {
                return !Modifier.isFinal(field.getModifiers()) && !Modifier.isStatic(field.getModifiers());
            }
        });

        //处理方法生成的配置对象
        doWithMethods(bean.getClass(), new MethodCallback() {
            @Override
            public void doWith(Method method) {
                process(method, bean, beanName, observer);
            }
        });
    }

    return bean;
        }

    /**
     * 处理方法
     *
     * @param method
     * @param bean
     * @param beanName
     */
    protected void process(final Method method, final Object bean, final String beanName, final Observer observer) {

    }

    /**
     * 处理字段
     *
     * @param field
     * @param bean
     * @param beanName
     */
    protected void process(final Field field, final Object bean, final String beanName, final Observer observer) {
        //SPI服务发现
        for (FieldProcessor processor : FIELD.extensions()) {
            processor.process(bean, beanName, field, observer);
        }
    }

    /**
     * 处理Bean
     *
     * @param bean
     * @param beanName
     * @return
     */
    protected boolean process(final Object bean, final String beanName, final Observer observer) {

    }

    /**
    *维持Observer的成员变量
    **/
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (bean instanceof ConfiguratorManager && observer.getManager() == null) {
            observer.setManager((ConfiguratorManager) bean);
        }
        if (bean instanceof ConfigurationListener) {
            observer.addConfigurationListener((ConfigurationListener) bean);
        }
        if (bean instanceof PropertyListener) {
            observer.addPropertyListener((PropertyListener) bean);
        }
        return bean;
    }


    @Override
    public int getOrder() {
         //优先级要放最低
        return Ordered.LOWEST_PRECEDENCE;
    }


    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        this.applicationContext = context;
        this.observer = context.getBean(Observer.class);
    }
}

我们可以看到ConfigPostProcessor加载通过SPI服务发现的方法\字段处理类,然后执行process方法,其内部封装了Observer.addPropertyListener

4、Bean工厂后置处理器PropertySourcesFactorPostProcess

/**
 * 注册PropertySource
 */
public class PropertySourcesFactoryPostProcessor implements     BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {

    protected ConfigurableEnvironment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = (ConfigurableEnvironment) environment;
    }

    @Override
    public int getOrder() {
        //最低优先级,先处理Spring内置的PropertySources
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void postProcessBeanFactory(final ConfigurableListableBeanFactory factory) throws BeansException {
        //其它的PropertySource已经加载了,可以安全创建ConfiguratorManager
        ConfiguratorManager manager = factory.getBean(CONFIGURATOR_MANAGER, ConfiguratorManager.class);
        Observer observer = factory.getBean(OBSERVER_BEAN_NAME, Observer.class);
        //将manager设置为observer的成员变量
        if (observer.getManager() == null) {
            observer.setManager(manager);
        }
        // 注册 Spring 属性配置
        environment.getPropertySources().addFirst(new ConfigSource(manager));
        //Bean的定义已经注册,由于ConfiguratorManager延迟加载,存在Bean定义中的占位符没有被替换的情况。
        //用ConfiguratorManager替换一下剩余的占位符
        resolvePlaceHolder(factory, manager);
    }

    /**
     * 解析Bean的变量
     *
     * @param factory
     * @param manager
     */
    protected void resolvePlaceHolder(final ConfigurableListableBeanFactory factory, final ConfiguratorManager manager) {
        StringValueResolver valueResolver = new ConfigResolver(manager);
        BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);

        String[] beanNames = factory.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            if (!factory.containsSingleton(beanName)) {
                //Bean已经实例化为单例
                BeanDefinition bd = factory.getBeanDefinition(beanName);
                try {
                    visitor.visitBeanDefinition(bd);
                } catch (Exception ex) {
                    throw new BeanDefinitionStoreException(bd.getResourceDescription(), beanName, ex.getMessage(), ex);
                }
            }
        }

        // New in Spring 2.5: resolve placeholders in alias target names and aliases as well.
        factory.resolveAliases(valueResolver);
        // New in Spring 3.0: resolve placeholders in embedded values such as annotation attributes.
        factory.addEmbeddedValueResolver(valueResolver);
    }
}

可以看到在PostProcessBeanFactory方法中,实例化了ConfiguratorManager和Observer,并把Manager设置为Observer的成员变量。另外构造了一个包含Manager的配置属性源propertrySources(属性集合,内部封装个多个k/v),并放到Sping属性源的第一个

5、资源配置器

public abstract class AbstractConfigurator implements Configurator, Prototype {

    /**
     * 初始化
     *
     * @param context
     * @throws Exception
     */
    Configurator setup(Context context) throws Exception {
    }

    /**
     * 拉取配置
     *
     * @param version     当前版本
     * @param longPolling 长轮询时间
     * @return
     * @throws Exception
     */
    Configuration pull(long version, int longPolling) throws Exception {
    }

    /**
     * 监听配置变化
     *
     * @param version 当前版本
     * @return
     */
    boolean watch(long version) {
         //这里只判断properties是否有更新,不涉及更新的具体类型
        if (!configuration.equals(resource.getConfiguration())) {
            resource.setConfiguration(configuration);
            //最终调用的是events.add(resource),也就是manager中的那个events
            context.fire(resource);
        }
    }

    /**
     * 停止监听
     */
    void stop() {}

    /**
     * 资源来源
     *
     * @return 资源来源。
     */
    Source source() {}

    /**
     * 从URL中返回资源名称
     *
     * @param url URL
     * @return 资源名称
     */
    String name(URL url){}

}

Configuration主要是对properties进行操作,Resource封装了Configuration。DUCC通过SPI服务发现将FileConfigurator、SystemConfigurator等extends了AbstractConfigurator的类自动加载进来从而达到可插拨扩展其他配置源的效果,也是通过这种机制支持所有数据格式、适配其他操作系统、实现方法字段属性配置化

6、SPI服务发现

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,常见的JDBC、SLF4门面就是通过这个实现的,下面是DUCC的运用

    /**
 * 插件管理器
 */
public interface Plugin {
    /**
     * 字段处理器扩展点
     */
    ExtensionPoint<FieldProcessor, String> FIELD = new ExtensionPointLazy<FieldProcessor, String>(FieldProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * Bean处理器扩展点
     */
    ExtensionPoint<BeanProcessor, String> BEAN = new ExtensionPointLazy<BeanProcessor, String>(BeanProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * 方法处理器扩展点
     */
    ExtensionPoint<MethodProcessor, String> METHOD = new ExtensionPointLazy<MethodProcessor, String>(MethodProcessor.class, SpiLoader.INSTANCE, null, null);
}



public class SpiLoader implements ExtensionLoader {
    public static final ExtensionLoader INSTANCE = new SpiLoader();

    public SpiLoader() {
    }

    public <T> Collection<Plugin<T>> load(Class<T> clazz) {
        if (clazz == null) {
            return null;
        } else {
            List<Plugin<T>> result = new LinkedList();
            ServiceLoader<T> plugins = ServiceLoader.load(clazz);
            Iterator var4 = plugins.iterator();

            while(var4.hasNext()) {
                //这里的plugin不是上面的那个plugin
                T plugin = var4.next();
                result.add(new Plugin(new Name(plugin.getClass()), plugin, this));
            }

            return result;
        }
    }
}

通过SPI机制将接口的实现类全部加载并实例化一遍,前提是实现类名称放在”META-INF/services/接口名”。当在ConfigPostProcessor中执行METHOD.extensions()时会将实现了MethodProcessor接口的实例取出来

三、SpingBoot注解(@EnableLafConfig)流程总结

DUCC通过实现ImportBeanDefinitionRegistrar接口,将指定的类注册到Spring Boot容器中,另外必须定义一个Java配置类(带有注解@Configuration)通过@Import指定ImportBeanDefinitionRegistrar的实现类。在registerBeanDefinitions()完成了以下几个类的BeanDefinition的注册:

  1. 1、配置管理器ConfiguratorManager,监听器容器Observer
  2. 2、Bean实例化后置处理器ConfigPostProcessor
  3. 3、Bean工厂后置处理器PropertySourcesFactoryPostProcessor

四、Spring初始化流程

主要流程在AbstractApplicationContext.refresh()中

流程图备注

  1. 1、InvokeBeanFactoryPostProcessors()

    在Bean未开始实例化时,执行工厂后置处理器,会查找所有BeanFactoryPostProcessor实现类Bean,并且调用方法PostProcessBeanDefinitionRegistry,修改Definition的定义

    注意:DUCC中PropertySourcesFactoryPostProcessor实现了BeanFactoryPostProcessor把ConfiguratorManager赋值给Observer
  2. 2、RegisterBeanPostProcessors()

    将BeanPostProcessors的实现类添加到工厂的RegisterBeanPostProcessors中

五、Bean实例生命周期


阅读 171 次