0xTTEPX

Just do it, deeply...

Follow me on GitHub

Dubbo框架设计源码解读一(服务和引用bean初始化)

write by valuewithTime, 2020-11-17 23:11

引言

随着互联网应用体量不断的增加,为了对应大量用户访问的体验效果,及提供应用的可用性,微服务应运而生,微服务使应用的业务具有更高的内聚性,同时对整体应用进行服务化的解耦;微服务之间除了通过异步的MQ进行通信,常用的还有基于同步的RPC通知机制,比如基于HTTP的GRPC, BRPC及在国内使用比较广泛的Dubbo。一致在使用DUBOO,对其中的功能组件有大致的了解,本计划看一下源码的,一致没有抽出时间,最近终于忙里抽闲,一探Dubbo芳容。

目录

dubbo的源码如何看

首先我们从 incubator-dubbo-spring-boot-project github vt 仓库拉取dubbo与spring boot集成的项目

再来取dubbo仓库

dubbo github vt

从incubator-dubbo-spring-boot-project项目的dubbo-spring-boot-autoconfigure-compatible模块的 META-INF/spring.factories

可以看到对应的自动配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dubbo.spring.boot.autoconfigure.
DubboAutoConfiguration,\
org.apache.dubbo.spring.boot.autoconfigure.
DubboRelaxedBindingAutoConfiguration
org.springframework.context.ApplicationListener=\
org.apache.dubbo.spring.boot.context.event.
OverrideDubboConfigApplicationListener,\
org.apache.dubbo.spring.boot.context.event.
WelcomeLogoApplicationListener,\
org.apache.dubbo.spring.boot.context.event.
AwaitingNonWebApplicationListener
org.springframework.boot.env.EnvironmentPostProcessor=\
org.apache.dubbo.spring.boot.env.
DubboDefaultPropertiesEnvironmentPostProcessor

关键的为DubboAutoConfiguration,查看DubboAutoConfiguration,注释生命的如下两个类,这两个配置ServiceBean和ReferenceBean对应的bean后处理器ServiceAnnotationBeanPostProcessor, ReferenceAnnotationBeanPostProcessor,这是我们从dubbo源码分析dubbo框架设计入口。

DubboAutoConfiguration

ServiceAnnotationBeanPostProcessor

ServiceBean

ReferenceAnnotationBeanPostProcessor

ReferenceBean

org.apache.dubbo.config.spring.ReferenceBean#getObject

下面从源码的角度,对dubbo框架进行一个简单的概要性的总结。

概要框架设计

dubbo-framework

dubbo框架主要包括序列化,消息层,传输层,协议层。序列化主要是请求消息和响应消息的序列化,比如基于Javad的ObjectOut/InputStream序列化、基于JSON的序列化。消息层提供消费者调用服务请求消息、服务提供方处理 结果响应消息的编解码;传输层主要建立消费者和服务者的通信通道,传输服务请求响应数据,比如基于Netty和Mina的,默认为Netty;协议层首先是基于相关协议将服提供者,和消费者通过export暴露出去,即注册器Registry中,消费者通过Registry订阅响应的服务提供者,消费者发现有服务 提供者,则与服务提供者建立连接,注册协议有基于Zookeeper,Redis等,在注册协议中还有一个注册器目录服务,用于提供消费者和服务者列表,及根据负载均衡策略选择服务者。服务提供者接受的消费者的服务请求后,根据相关协议,调用相应的Invoker服务。 消费者和服务者的RPC调用协议,实际在DubboProtocol中,协议首先导出服务,消费者发送RPC请求,调用Exporter服务容器中的Invoker。

下面我们来从源码来分析Dubbo的各个组件模块。

源码分析

首先先看一下ServiceBean和ReferenceBean的后处理

ServiceAnnotationBeanPostProcessor

/**
 * {@link Service} Annotation
 * {@link BeanDefinitionRegistryPostProcessor Bean Definition Registry Post Processor}
 *
 * @since 2.5.8
 */
public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
        ResourceLoaderAware, BeanClassLoaderAware {
@Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        //解决环境变量的包扫描路径,针对包扫描参数中有占位符的场景
        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

        if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
            //注册service bean
            registerServiceBeans(resolvedPackagesToScan, registry);
        } else {
            if (logger.isWarnEnabled()) {
                logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
            }
        }

    }


    /**
     * Registers Beans whose classes was annotated {@link Service}
     * 注册注解了Service的Bean
     * @param packagesToScan The base packages to scan
     * @param registry       {@link BeanDefinitionRegistry}
     */
    private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);
        //添加service注解过滤器
        scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));

        for (String packageToScan : packagesToScan) {

            // Registers @Service Bean first, 包扫描
            scanner.scan(packageToScan);

            // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not. 获取Service注解bean
            Set<BeanDefinitionHolder> beanDefinitionHolders =
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                    //注册service bean
                    registerServiceBean(beanDefinitionHolder, registry, scanner);
                }

                if (logger.isInfoEnabled()) {
                    logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
                            beanDefinitionHolders +
                            " } were scanned under package[" + packageToScan + "]");
                }

            } else {

                if (logger.isWarnEnabled()) {
                    logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
                            + packageToScan + "]");
                }

            }

        }

    }
    /**
     * Registers {@link ServiceBean} from new annotated {@link Service} {@link BeanDefinition}
     * 注册Service bean
     * @param beanDefinitionHolder
     * @param registry
     * @param scanner
     * @see ServiceBean
     * @see BeanDefinition
     */
    private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
                                     DubboClassPathBeanDefinitionScanner scanner) {

        Class<?> beanClass = resolveClass(beanDefinitionHolder);
        //获取service注解
        Service service = findAnnotation(beanClass, Service.class);

        Class<?> interfaceClass = resolveServiceInterfaceClass(beanClass, service);

        String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
        //构造service bean
        AbstractBeanDefinition serviceBeanDefinition =
                buildServiceBeanDefinition(service, interfaceClass, annotatedServiceBeanName);

        // ServiceBean Bean name
        String beanName = generateServiceBeanName(service, interfaceClass, annotatedServiceBeanName);

        if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
            //注册bean
            registry.registerBeanDefinition(beanName, serviceBeanDefinition);

            if (logger.isWarnEnabled()) {
                logger.warn("The BeanDefinition[" + serviceBeanDefinition +
                        "] of ServiceBean has been registered with name : " + beanName);
            }

        } else {

            if (logger.isWarnEnabled()) {
                logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
                        "] of ServiceBean[ bean name : " + beanName +
                        "] was be found , Did @DubboComponentScan scan to same package in many times?");
            }

        }

    }
    /**
     * 构造服务bean定义
     * @param service
     * @param interfaceClass
     * @param annotatedServiceBeanName
     * @return
     */
    private AbstractBeanDefinition buildServiceBeanDefinition(Service service, Class<?> interfaceClass,
                                                              String annotatedServiceBeanName) {

        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);

        AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();

        MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();

        String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol", "interface", "interfaceName");

        propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(service, environment, ignoreAttributeNames));

        // References "ref" property to annotated-@Service Bean, 设置service config
        addPropertyReference(builder, "ref", annotatedServiceBeanName);
        // Set interface
        builder.addPropertyValue("interface", interfaceClass.getName());

        /**
         * Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference
         */
        String providerConfigBeanName = service.provider();
        if (StringUtils.hasText(providerConfigBeanName)) {
            addPropertyReference(builder, "provider", providerConfigBeanName);
        }

        /**
         * Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference
         */
        String monitorConfigBeanName = service.monitor();
        if (StringUtils.hasText(monitorConfigBeanName)) {
            addPropertyReference(builder, "monitor", monitorConfigBeanName);
        }

        /**
         * Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference
         */
        String applicationConfigBeanName = service.application();
        if (StringUtils.hasText(applicationConfigBeanName)) {
            addPropertyReference(builder, "application", applicationConfigBeanName);
        }

        /**
         * Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference
         */
        String moduleConfigBeanName = service.module();
        if (StringUtils.hasText(moduleConfigBeanName)) {
            addPropertyReference(builder, "module", moduleConfigBeanName);
        }


        /**
         * Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference
         */
        String[] registryConfigBeanNames = service.registry();

        List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames);

        if (!registryRuntimeBeanReferences.isEmpty()) {
            builder.addPropertyValue("registries", registryRuntimeBeanReferences);
        }

        /**
         * Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference
         */
        String[] protocolConfigBeanNames = service.protocol();

        List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames);

        if (!protocolRuntimeBeanReferences.isEmpty()) {
            builder.addPropertyValue("protocols", protocolRuntimeBeanReferences);
        }

        return builder.getBeanDefinition();

    }
...
        }

从上面可以看出,ServiceAnnotationBeanPostProcessor主要做的事情是扫描 应用先的Service注解bean,并构造ServiceBean,注册到bean注册器中。

再来看一下ServiceBean,这里是服务导出的入口。

ServiceBean

/**
 * ServiceFactoryBean
 *
 * @export
 */
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
...
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //暴露服务
            export();
        }
    }
    ...
}

//ServiceConfig

/**
 * ServiceConfig
 *
 * @export
 */
public class ServiceConfig<T> extends AbstractServiceConfig {
    
    /**
     * @see ExtensionLoader#DUBBO_DIRECTORY
     * @see ExtensionLoader#getAdaptiveExtension()
     */
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    /**
     * @see ExtensionLoader#DUBBO_INTERNAL_DIRECTORY
     * @see ExtensionLoader#getAdaptiveExtension()
     */
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    private static final Map<String, Integer> RANDOM_PORT_MAP = new HashMap<String, Integer>();

    /**
     *
     */
    private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
    /**
     *
     */
    private final List<URL> urls = new ArrayList<URL>();
    /**
     *
     */
    private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
    /**
     * interface type
     */
    private String interfaceName;
    /**
     *
     */
    private Class<?> interfaceClass;
    /**
     * reference to interface impl
     * 接口实现
     */
    private T ref;
    /**
     *service name, 服务名
     */
    private String path;
    /**
     * method configuration
     */
    private List<MethodConfig> methods;
    /**
     *
     */
    private ProviderConfig provider;
    private transient volatile boolean exported;

    private transient volatile boolean unexported;

    private volatile String generic;
    ...
     /**
     * 暴露服务
     */
    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            //延迟调度
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }
    /**
     * 暴露服务
     */
    protected synchronized void doExport() {
        ...
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStub(interfaceClass);
        checkMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        //提供模型
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
        //初始化模型提供者模型
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
        //暴露服务url
        doExportUrls();
    }
     /**
     * 暴露服务url
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        //加载注册器地址
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    /**
     * @param protocolConfig
     * @param registryURLs
     */
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
    ...
     String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //dubbo
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                //暴露本地服务
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            //暴露服务到远程注册器
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }
                        //创建代理Invoke JavassistProxyFactory
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        //ProtocolFilterWrapper ,ZookeeperRegistry ,RestProtocol,RmiProtocol,DubboProtocol
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

    /**
     * 暴露本地注册器地址
     * @param url
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }
}

关键在下面几句

// For providers, this is used to enable custom proxy to 
String proxy = url.getParamete(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
    registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
//创建代理Invoke JavassistProxyFactory
Invoker<?> invoker = proxyFactorygetInvoker(ref, (Class) interfaceClass,registryURL.addParameterAndEncode(Constants.EXPORT_KEY, url.toFullString());
DelegateProviderMetaDataInvokerwrapperInvoker = newDelegateProviderMetaDataInvoker(invoker,this);
//ProtocolFilterWrapper ZookeeperRegistry ,RestProtocolRmiProtocol,DubboProtocol,RegistryProtocol
Exporter<?> exporter = protocol.expor(wrapperInvoker);
exporters.add(exporter);

从上面导出服务实际委托给相应的协议RegistryProtocol。RegistryProtocol我们在后面的再说。

我们再来看一下ReferenceBean的处理

ReferenceAnnotationBeanPostProcessor

/**
 * {@link org.springframework.beans.factory.config.BeanPostProcessor} implementation
 * that Consumer service {@link Reference} annotated fields
 *
 * @since 2.5.7
 */
public class ReferenceAnnotationBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter
        implements MergedBeanDefinitionPostProcessor, PriorityOrdered, ApplicationContextAware, BeanClassLoaderAware,
        DisposableBean {
            ...
            @Override
    public PropertyValues postProcessPropertyValues(
            PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {

        InjectionMetadata metadata = findReferenceMetadata(beanName, bean.getClass(), pvs);
        try {
            metadata.inject(bean, beanName, pvs);
        } catch (BeanCreationException ex) {
            throw ex;
        } catch (Throwable ex) {
            throw new BeanCreationException(beanName, "Injection of @Reference dependencies failed", ex);
        }
        return pvs;
    }
     @Override
    public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
        if (beanType != null) {
            InjectionMetadata metadata = findReferenceMetadata(beanName, beanType, null);
            metadata.checkConfigMembers(beanDefinition);
        }
    }
    /**
     * {@link Reference} {@link InjectionMetadata} implementation
     * 引用注入元数据
     * @since 2.5.11
     */
    private static class ReferenceInjectionMetadata extends InjectionMetadata {

        /**
         * 字段级引用元素集
         */
        private final Collection<ReferenceFieldElement> fieldElements;

        /**
         * 方法级引用元素集
         */
        private final Collection<ReferenceMethodElement> methodElements;


        public ReferenceInjectionMetadata(Class<?> targetClass, Collection<ReferenceFieldElement> fieldElements,
                                          Collection<ReferenceMethodElement> methodElements) {
            super(targetClass, combine(fieldElements, methodElements));
            this.fieldElements = fieldElements;
            this.methodElements = methodElements;
        }

        private static <T> Collection<T> combine(Collection<? extends T>... elements) {
            List<T> allElements = new ArrayList<T>();
            for (Collection<? extends T> e : elements) {
                allElements.addAll(e);
            }
            return allElements;
        }

        public Collection<ReferenceFieldElement> getFieldElements() {
            return fieldElements;
        }

        public Collection<ReferenceMethodElement> getMethodElements() {
            return methodElements;
        }
    }
    /**
     * {@link Reference} {@link Method} {@link InjectionMetadata.InjectedElement}
     */
    private class ReferenceMethodElement extends InjectionMetadata.InjectedElement {

        private final Method method;

        private final Reference reference;

        private volatile ReferenceBean<?> referenceBean;

        protected ReferenceMethodElement(Method method, PropertyDescriptor pd, Reference reference) {
            super(method, pd);
            this.method = method;
            this.reference = reference;
        }

        @Override
        protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {

            Class<?> referenceClass = pd.getPropertyType();

            referenceBean = buildReferenceBean(reference, referenceClass);

            ReflectionUtils.makeAccessible(method);

            method.invoke(bean, referenceBean.getObject());

        }

    }
    /**
     * {@link Reference} {@link Field} {@link InjectionMetadata.InjectedElement}
     */
    private class ReferenceFieldElement extends InjectionMetadata.InjectedElement {

        /**
         *
         */
        private final Field field;

        /**
         *
         */
        private final Reference reference;

        private volatile ReferenceBean<?> referenceBean;

        protected ReferenceFieldElement(Field field, Reference reference) {
            super(field, null);
            this.field = field;
            this.reference = reference;
        }

        @Override
        protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {

            Class<?> referenceClass = field.getType();

            referenceBean = buildReferenceBean(reference, referenceClass);

            ReflectionUtils.makeAccessible(field);

            field.set(bean, referenceBean.getObject());

        }

    }

    /**
     * 构建引用bean
     * @param reference
     * @param referenceClass
     * @return
     * @throws Exception
     */
    private ReferenceBean<?> buildReferenceBean(Reference reference, Class<?> referenceClass) throws Exception {

        String referenceBeanCacheKey = generateReferenceBeanCacheKey(reference, referenceClass);

        ReferenceBean<?> referenceBean = referenceBeansCache.get(referenceBeanCacheKey);

        if (referenceBean == null) {

            ReferenceBeanBuilder beanBuilder = ReferenceBeanBuilder
                    .create(reference, classLoader, applicationContext)
                    .interfaceClass(referenceClass);

            referenceBean = beanBuilder.build();

            referenceBeansCache.putIfAbsent(referenceBeanCacheKey, referenceBean);

        }

        return referenceBean;

    }


}

//ReferenceBean

**
 * ReferenceFactoryBean
 */
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
    ...
     @Override
    public Object getObject() {
        return get();
    }
}

//ReferenceConfig

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    private static final long serialVersionUID = -5864351140409987595L;

    /**
     *
     */
    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();

    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    private final List<URL> urls = new ArrayList<URL>();
    // interface name
    private String interfaceName;
    private Class<?> interfaceClass;
    private Class<?> asyncInterfaceClass;
    // client type
    private String client;
    // url for peer-to-peer invocation
    private String url;
    // method configs
    private List<MethodConfig> methods;
    // default config
    private ConsumerConfig consumer;
    private String protocol;
    // interface proxy reference
    private transient volatile T ref;
    private transient volatile Invoker<?> invoker;
    private transient volatile boolean initialized;
    private transient volatile boolean destroyed;
    ...
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
     /**
     *
     */
    private void init() {
        if (initialized) {
            return;
        }
        initialized = true;
        ...
         //获取注册中心地址
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            //为空,则获取本地host
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        ref = createProxy(map);
        //构建消费值模型
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
        //初始化消费者模型
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
    /**
     * @param map
     * @return
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        ...
         if (urls.size() == 1) {
                //创建Invoker代理 netty客户端
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        ...
    }
}

从上面可以看出ReferenceBean后处理主要扫描Reference注解的bean,并构造ReferenceBean,ReferenceBean通过Invoker去调用服务提供者的服务,Invoker为服务的包装类。实际通过相应的协议创建。

今天先理到这,其他的我们在后面的章节中再讲。

总结

dubbo框架主要包括消息层,传输层,协议层。消息层提供消费者调用服务请求消息、服务提供方处理 结果响应消息的编解码;传输层主要建立消费者和服务者的通信通道,传输服务请求响应数据;协议层首先是基于相关协议将服 提供者,和消费者通过export暴露出去,即注册的Registry中,消费者通过Registry订阅响应的服务提供者,消费者发现有服务 提供者,则与服务提供者建立连接。服务提供者接受的消费者的服务请求后,根据相关协议,调用响应的Invoker服务。

ServiceAnnotationBeanPostProcessor主要做的事情是扫描 应用先的Service注解bean,并构造ServiceBean,注册到bean注册器中。导出服务实际委托给相应的协议RegistryProtocol

ReferenceBean后处理主要扫描Reference注解的bean,并构造ReferenceBean,ReferenceBean通过Invoker去调用服务提供者的服务,Invoker为服务的包装类。实际通过相应的协议创建。

dubbo offical site
dubbo github
dubbo github vt
incubator-dubbo-spring-boot-project github vt