Dubbo 内核SPI源码学习

  |  
阅读次数
  |  
字数 8,673
  |  
时长 ≈ 39 分钟

先从配置文件开始,dubbo基于Spring的可扩展Schema提供了一套简明的自定义配置标签,用于简化配置流程。

具体原理可参考以下链接:
基于Spring可扩展Schema提供自定义配置支持

于是,当我们配置了以下一段代码的时候其实最后通过解析之后就相当于变成了一个个的<bean/>配置,生成了实例。

1
2
3
4
5
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>

<!-- declare the service interface to be exported -->
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>

当加载以上配置 <dubbo:service/> 的时候,基于 dubbo.jar 内的 META-INF/spring.handlers 配置,Spring在遇到dubbo命名空间时,会回调 DubboNamespaceHandler。

所有dubbo的标签,都统一用 DubboBeanDefinitionParser 进行解析,基于一对一属性映射,将XML标签解析为一个 ServiceConfig Bean 对象。

在 ServiceConfig.export() 或者 ReferenceConfig.get() 初始化时,将会将bean对象转化为URL格式,所有的Bean属性转化为URL的参数,然后将URL传给协议扩展点,基于扩展点的扩展点自适应机制,根据前面转化的URL的协议头,进行不同协议的服务暴露或者引用。

所有扩展点参数都包含 URL 参数,URL 作为上下文信息贯穿整个扩展点设计体系。
URL 采用标准格式:protocol://username:password@host:port/path?key=value&key=value

1)SPI内核实现

前言:

SPI机制主要用于系统集成,dubbo提供了许多扩展点,允许开发者基于内置的扩展点对框架功能作特性定制,
dubbo的SPI机制就是将不同的实现方案进行集成装配,这样可以消除模块集成的代码,平等看待第三方扩展,以增强框架的扩展性。
dubbo的SPI机制参考了JDK中标准的SPI机制思想,加入一些增强功能,以适应dubbo特殊化的应用场景。

上面说到,所有Bean都将转化为URL对象,传给扩展点,基于自适应机制进行不同协议的暴露或引用。

这句话可才分为以下步骤:

1
2
3
4
5
6
7
8
// 1.转化为URL
这里是上面的步骤

// 2.传给扩展点,(所以这里需要生成一个扩展点,代码如下)
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

// 3.基于自适应机制进行不同协议的暴露或引用(上面已经获取到一个扩展点的代理类,他将通过代理类方法里面的URL自适应不同协议,然后执行具体逻辑)
protocol.export(wrapperInvoker);

所以这里重点解析第二步:生成扩展点的实现原理。

dubbo spi机制代码比较晦涩难懂,所以在分析dubbo spi机制之前,让我们先来看一个简单的示例说明下dubbo spi机制的特性。

1.1)SPI示例

1.1.1)

新建maven项目dubbo-spi-demo,并加入dubbo-common依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.8</version>
</dependency>
</dependencies>

1.1.2)

编写Calculator接口扩展点和两个不同的算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 计算接口类
*/
// 默认加法
@SPI("sum")
public interface Calculator {
// 以URL的calType属性指定需要选择的算法
@Adaptive("calType")
int calculate(URL url, int a, int b);
}
/**
* 加法
*/
public class Sum implements Calculator {
@Override
public int calculate(URL url, int a, int b) {
return a + b;
}
}
/**
* 减法
*/
public class Subtraction implements Calculator {
@Override
public int calculate(URL url, int a, int b) {
return a - b;
}
}

1.1.3)

编写两个wrapper类(装饰模式),wrapper1打印入参,wrapper2打印结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 打印入参
*/
public class CalculatorWrapper1 implements Calculator {
private Calculator calculator;
public CalculatorWrapper1(Calculator calculator) {
this.calculator = calculator;
}
@Override
public int calculate(URL url, int a, int b) {
System.out.println("wrapper1--打印入参: a = " + a + ", b = " + b);
int result = calculator.calculate(url, a, b);
return result;
}
}

/**
* 打印结果
*/
public class CalculatorWrapper2 implements Calculator {
private Calculator calculator;
public CalculatorWrapper2(Calculator calculator) {
this.calculator = calculator;
}
@Override
public int calculate(URL url, int a, int b) {
int result = calculator.calculate(url, a, b);
System.out.println("wrapper2--打印结果: " + result);
return result;
}
}

1.1.4)编写SPI配置文件

resources目录下新建com.kileto.dubbo.spi.Calculator文件(Calculator全限定名),文件内容为:

1
2
3
4
sum=com.kileto.dubbo.spi.impl.Sum
subtraction=com.kileto.dubbo.spi.impl.Subtraction
wrapper1=com.kileto.dubbo.spi.wrapper.CalculatorWrapper1
wrapper2=com.kileto.dubbo.spi.wrapper.CalculatorWrapper2

1.1.5)测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Main {
public static void main(String[] args) {
// 这里仅仅是为了创建URL,无需关注dubbo、127.0.0.1和8080的含义
URL url = new URL("dubbo", "127.0.0.1", 8080);
// 获取扩展点对应的加载器
ExtensionLoader<Calculator> extensionLoader = ExtensionLoader.getExtensionLoader(Calculator.class);

// 获取calculator适配类(该类已被wrap和执行过setter注入)
Calculator calculator = extensionLoader.getAdaptiveExtension();

// 1处:默认采用加法
calculator.calculate(url, 1, 2);

// 2处:指定采用减法
url = url.addParameter("calType", "subtraction");
calculator.calculate(url, 1, 2);

// 3处:指定采用sum算法
calculator = extensionLoader.getExtension("sum");
calculator.calculate(url, 1, 2);
}
}

1.1.6)执行结果分析

代码1处结果:
wrapper1–打印入参: a = 1, b = 2
wrapper2–打印结果: 3
这个结果说明两个方面:

获取的calculate已经被加入了wrapper1和wrapper2的代码,说明dubbo spi支持多层wrap的特性;
采用的是加法运算,这是因为@SPI(“sum”)指定了默认算法为sum;
代码2处结果:
wrapper1–打印入参: a = 1, b = 2
wrapper2–打印结果: -1
说明通过calType参数成功控制了算法的选择,dubbo是通过生成一个Calculator的动态代理来实现的。

代码3处结果:
wrapper1–打印入参: a = 1, b = 2
wrapper2–打印结果: 3
这个结果和代码1处的结果是一致的,说明sum算法已经加入了wrapper1和wrapper2。

上面示例展示了dubbo的特性:wrapper(构造注入)动态代理,除此之外,dubbo spi还支持setter、分组排序等特性没有展示出来。

1.2)Dubbo SPI场景

dubbo需要运行时动态选择扩展点实现(即对扩展点作动态代理,类似于AOP)。比如以多种协议发布服务,需要根据URL获取protocol参数,动态获取具体实现然后调用。

dubbo使用了多层wrapper(装饰模式,基于构造方法注入)和setter注入特性。wrapper和注入的都是适配类,而不是某个具体的实现,后文会有说明。

dubbo的扩展点实现需要支持条件激活和分组排序的特性。比如dubbo中的Filter,需要在provider或者consumer端激活,并且需要将多个Filter排序。

除此之外,java spi机制还存在天然的缺陷(参考dubbo说明),所以dubbo基于java spi机制的思想作了扩展。

1.3)Dubbo SPI原理

dubbo中SPI的实现在dubbo-common模块下的(com.alibaba.dubbo.common.extension)包下,包含以下几个核心类:

@SPI@Adaptive@ActivateExtensionFactory和核心实现类ExtensionLoader

1.3.1)@SPI注解

1
2
3
4
5
6
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
String value() default "";
}

该注解作用于接口上,表示接口是dubbo框架一个扩展点,dubbo会扫描加了@SPI注解的接口,并扫描spi配置文件,加载该扩展点接口的实现类,value属性指定默认的扩展点实现名称。

1.3.1.1)spi配置文件路径

META-INF/services/:应用级扩展配置,如果你的扩展是应用级别的,可以放入此目录,第一节的小示例就是放在这个目录下。
META-INF/dubbo/:框架外部扩展配置,如果你需要对dubbo框架做扩展,可以放入此目录。

META-INF/dubbo/internal/:框架内置扩展配置,用于dubbo框架内部扩展实现。
Protocol实现类如DubboProtocol、HttpProtocol、HessionProtocol等,都是dubbo内置实现,所以放入META-INF/dubbo/internal/目录。
spi配置文件格式:

1.3.1.2)新建一个以扩展点全限定名命名的文件

比如Protocol类的配置文件名为:com.alibaba.dubbo.rpc.Protocol
在配置文件中以name=value形式指定实现类,name指定实现类简称,比如:

1
2
3
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

1.3.1.3)

通过ExtensionLoader.getExtension(String name)方法,可依name获取到扩展点实现。

ExtensionLoader.getExtension(String name)方法获取的并非是原生(row)实现,过程分为以下几步:

  1. 获取原生扩展点实现impl;

  2. 对impl执行setter注入;

  3. 对impl执行wrap操作(如果存在wrap类,如果多个wrap类则多次嵌套wrap操作);

  4. 对wrapper类执行setter注入;

  5. 返回最终的wrapper类;

1.3.2)@Adaptive注解

1
2
3
4
5
6
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}

该注解可加在实现类或扩展点接口方法上,表示实现类是一个适配类,方法是一个适配方法。如果加在接口方法上,dubbo利用javasist为该扩展点生成适配类。

#1.3.2.1)关于适配类的理解

适配类本质上是扩展点接口的代理类,dubbo中对扩展点接口的调用都是代理给这个适配类,适配类会根据URL中的参数,动态选择扩展点的实现。

比如Protocol类的export用于暴露服务,如果某个服务以dubbo和hession两种协议发布,在调用export方法时,需要从URL中获取protocol参数的值(分别为dubbo和hession),通过ExtensionLoader.getExtension(name)来获取具体实现(分别为DubboProtocol和HessionProtocol),最后调用具体实现的export方法。

1.3.2.2)关于value属性理解

指定URL上对应的key,以key值作为需要动态选择的扩展点实现名称。

不指定默认为扩展点接口的点分隔小写字串,比如Protocol的export方法的@Adaptive并没有指定key,那么默认为protocol,表示从URL获取protocol参数来作为具体实现名称。如果是com.test.YyyInvokerWrapper,默认名称为yyy.invoker,wrapper。

如果指定value数组,将按顺序从URL获取这些key的value,以value作为名称找到对应的扩展点实现,如果都没有找到value,那么使用缺省扩展名(@SPI指定),如果未指定缺省扩展,抛出IllegalStateException。

1.3.2.3)加在类上

表示这个实现类已经是某个扩展点的适配类,dubbo无需再为这个扩展点再生成适配类,比如AdaptiveExtensionFactory、AdaptiveCompiler加了@Adaptive注解,
那么就不必在ExtensionFactory接口的方法中再打上@Adaptive注解了,dubbo会自动把AdaptiveExtensionFactory作为ExtensionFactory接口的适配类擦,
参考下文ExtensionFactory的说明

1.3.2.4)一般是加在方法上面

有以下几个结论:

  1. dubbo会以javasist为该接口生成适配类,每个接口有且仅有一个适配类;

  2. 打上@Adaptive的方法必须有URL类型入参,或者入参中要有URL属性(即提供getUrl方法),这是因为适配类要从URL获取具体实现名称;

  3. 非适配方法:扩展点接口中没有打上@Adaptive注解的方法(如Protcol#getDefault),表示方法不需要适配,在生成的适配类中,非适配直接抛出异常,所以不能通过适配类调用非适配方法,只能通过ExtensionLoader.getExtension来获取具体实现类来调用,尽管这样会导致调用代码不够优雅,事实上dubbo中非适配方法很少,扩展点接口中基本都是适配方法;

  4. 适配方法:扩展点接口中打上@Adaptive注解的方法,适配类内部通过ExtensionLoader从URL中获取相应key,动态选择实现类;

1.3.2.5)

扩展点不一定要适配,比如Filter,只需要扫描所有的Filter实现,并按顺序整理成一条链即可,这里并没有“动态选择实现”的语义,也就不需要适配类了。

下面以Protocol和Transporter来说明。

Protocol类的export和refer方法上都打上了@Adaptive注解,为什么要加这个注解呢?

比如对DemoService用dubbo和hession两种协议发布服务,应该代理给DubboProtocol或者HessionProtocol来发布。

1.3.2.5.1)

Protocol接口

1
2
3
4
5
6
7
8
9
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
Exporter export(Invoker invoker) throws RpcException;
@Adaptive
Invoker refer(Class type, URL url) throws RpcException;
void destroy();
}

Protocol适配类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
// 没有打上@Adaptive注解的方法直接抛出异常
public void destroy() {
throw new UnsupportedOperationException("...");
}
// 没有打上@Adaptive注解的方法直接抛出异常
public int getDefaultPort() {
throw new UnsupportedOperationException("...");
}
// export打上了@Adaptive注解
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("...");
// 有URL类型的入参,或者入参中有URL属性
if (arg0.getUrl() == null) throw new IllegalArgumentException("...");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
// 默认扩展点实现名称是dubbo
// 根据前面的URL协议规则,protocol是作为单独一部分存在的所以这里直接获取Protocol
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) throw new IllegalStateException("...");
// 根据名称获取具体扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
// 调用具体扩展点实现的export方法
return extension.export(arg0);
}
// refer打上了@Adaptive注解
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("...");
// 有URL类型的入参,或者入参中有URL属性
com.alibaba.dubbo.common.URL url = arg1;
// 默认扩展点实现名称是dubbo
// 根据前面的URL协议规则,protocol是作为单独一部分存在的所以这里直接获取Protocol
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) throw new IllegalStateException("...");
// 根据名称获取具体扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
// 调用具体扩展点实现的export方法
return extension.refer(arg0, arg1);
}
}

1.3.2.5.2)

以下是Transporter和对应的适配类:

Transporter接口

1
2
3
4
5
6
7
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

Transporter适配类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Transporter$Adaptive implements com.alibaba.dubbo.remoting.Transporter {
public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
//先以client为key获取value,获取不到再以transporter为key获取,获取不到使用默认值netty
// 根据前面URL协议规则,除了protocol动态扩展点为单独部分,其他均为参数体组成部分,所以这里生成的代理类,动态扩展代码生成的时候是在url参数拿的
String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
if(extName == null) throw new IllegalStateException("...");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.connect(arg0, arg1);
}
public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0,com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
//先以server为key获取value,获取不到再以transporter为key获取,获取不到使用默认值netty
// 根据前面URL协议规则,除了protocol动态扩展点为单独部分,其他均为参数体组成部分,所以这里生成的代理类,动态扩展代码生成的时候是在url参数拿的
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if(extName == null) throw new IllegalStateException("...");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
}

1.3.3)@Activate注解

对扩展点实现进行条件激活和分组排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
// 指定分组过滤条件,比如provider指定在提供端激活扩展,consumer指定在消费端激活扩展。
String[] group() default {};
// 指定对应URL中key的过滤条件,当URL中存在这些key时,则启用该扩展。比如@Activate(“cache,validatioin”),如果URL中有名cache或validation的key,那么返回该扩展。
String[] value() default {};
// 指定扩展点加载的顺序
String[] before() default {};
String[] after() default {};
int order() default 0;
}

1
2
3
//表示当前扩展可应用在在provider或者consumer端,并且当URL中有cache这个key时激活
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {}

1.3.4)ExtensionFactory

工厂类,用于对扩展点setter注入和构造方法注入。

dubbo中注入的都是适配类(除非该扩展点不需要适配),而不是某个具体的扩展点实现。

1.3.4.1)ExtensionFactory有三个实现类

  1. SpiExtensionFactory:基于dubbo的SPI机制获取扩展点适配类

  2. pringExtensionFactory:从spring容器中获取扩展点适配类bean

  3. AdaptiveExtensionFactory:内部维护了所有的ExtensionFactory,从上面两个工厂类中获取适配类。这个类打上了@Adaptive注解,所以ExtensionFactory中就不用再打@Adaptive注解了,dubbo会自动把该类作为ExtensionFactory的适配类。

1.3.5)多层Wrapper

比如ProtocolFilterWrapper是对Protocol的装饰模式

ProtocolFilterWrapper

1
2
3
4
5
6
7
8
9
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}

ProtocolFilterWrapper是Protocol的一个扩展点实现,通过构造方法注入protocol实例,注入的逻辑和setter注入类似,都是注入了Protocol$Adaptive,在使用protocol实例是,实际上使用的是适配类,适配类再选择具体的扩展点实现。

如果不止一个wrapper类,会执行多次wrap。

1.3.6)setter注入

比如CacheFilter用于缓存执行结果:

CacheFilter

1
2
3
4
5
6
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;
public void setCacheFactory(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
}
}

CacheFilter本身是Filter的一个扩展点实现,通过setter方法注入CacheFactory对象,CacheFactory本身也是一个扩展点接口,dubbo在加载CacheFilter时,检测到setter方法,通过SpiExtensionFactory获取到CacheFactory的适配类(CacheFactory$Adaptive),然后将这个适配类注入,在使用cacheFactory时,实际上使用的是适配类,适配类再选择具体的扩展点实现。

1.3.7)ExtensionLoader

在dubbo SPI中最关键的类是ExtensionLoader。

每个定义的spi的接口都会构建一个ExtensionLoader实例,
存储在ExtensionLoader对象的

1
ConcurrentMap<Class<?>,ExtensionLoader<?>> EXTENSION_LOADERS

这个map对象中。

获取SPI对象的典型方式为,最开始引入的那句代码:

1
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

1.3.7.1)ExtensionLoader核心方法

1
2
3
4
5
6
7
8
9
10
// 为扩展点创建ExtensionLoader
public static ExtensionLoader getExtensionLoader(Class type);
// 根据名称,获取具体的扩展点实现(该实现已经被wrapper并且被执行过setter注入)
public T getExtension(String name);
// 获取扩展点适配类实例(对应@Adaptive注解)
public T getAdaptiveExtension();
// 获取可被激活的扩展点实现(对应@Activate注解)
public List getActivateExtension(URL url, String[] values, String group);
// 获取已经加载的所有扩展点实现名称
public Set getSupportedExtensions();

我们来解析下他的运行原理和机制:

1.3.7.1.1)getExtensionLoader

就是为该接口new 一个ExtensionLoader,然后缓存起来。并为非ExtensionFactory的对象创建一个objectFactory用来依赖注入。

com.alibaba.dubbo.common.extension.ExtensionLoader#getExtensionLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
// 必须加上SPI注解
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
// 优先从本地缓存获取
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 如果本地缓存没有则新建一个
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

为SPI接口创建一个ExtensionLoader对象,用于获取扩展对象。

com.alibaba.dubbo.common.extension.ExtensionLoader#ExtensionLoader

1
2
3
4
5
private ExtensionLoader(Class<?> type) {
this.type = type;
// 也是利用扩展实现,后面会看到在注入其他扩展点或bean到当前扩展时使用
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

在创建ExtensionLoader对象的时候,如果当前对象不是ExtensionFactory,为当前SPI接口创建一个ExtensionFactory对象。

在当调用ExtensionLoader#injectExtension方法的时候进行依赖注入。

1.3.7.1.2)getAdaptiveExtension

这个方法是dubbo SPI里最核心的方法。dubbo通过这个方法来获取到SPI接口的对应扩展代理类。

com.alibaba.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建对应的扩展代理类
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}
`

当SPI接口首次调用这个方法的时候,扩展类还没有创建好,所以它就会直接访问createAdaptiveExtension方法。

com.alibaba.dubbo.common.extension.ExtensionLoader#createAdaptiveExtension

1
2
3
4
5
6
7
8
private T createAdaptiveExtension() {
try {
// 创建代理类,然后进行setter,构造器注入
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}

通过getAdaptiveExtensionClass获取到SPI扩展对象Class的实例,然后通过反射方法newInstance()创建这个对象。最后通过最开始介绍的ExtensionLoader#getExtensionLoader创建的objectFactory进行依赖注入。

com.alibaba.dubbo.common.extension.ExtensionLoader#getAdaptiveExtensionClass

1
2
3
4
5
6
7
8
9
private Class<?> getAdaptiveExtensionClass() {
// @Adaptive 注解在类上
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// @Adaptive注解在SPI接口方法上
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

这里就是上面说的,如果@Adaptive接口标注在@SPI接口的实现类上面就会直接返回这个对象的Class实例。

如果标注在@SPI接口的方法上,就会通过dubbo中的字节码Compiler接口通过动态代理来创建SPI接口的实例。

1.3.7.1.2.1)@Adaptive在类上

下面我们就来分析一下@Adaptive标注在SPI接口的实现类上。SPI扩展的创建过程。
com.alibaba.dubbo.common.extension.ExtensionLoader#getExtensionClasses

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载扩展类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

加载扩展类,并把扩展类Class实例设置到cachedClasses中。

com.alibaba.dubbo.common.extension.ExtensionLoader#loadExtensionClasses

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
// 只允许有一个默认扩展
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
// 设置扩展类的默认名称
if (names.length == 1) cachedDefaultName = names[0];
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
// 加载三种配置文件夹中的dubbo SPI文件,后交由apache开源,所以加上apache目录
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
return extensionClasses;
}

从代码里面可以看到,在loadExtensionClasses中首先会检测扩展点在@SPI注解中配置的默认扩展实现的名称,
并将其赋值给cachedDefaultName属性进行缓存,后面想要获取该扩展点的默认实现名称就可以直接通过访问cachedDefaultName字段来完成,
比如getDefaultExtensionName()和后面createAdaptiveExtensionClassCode()创建代理类的时候指定默认的扩展点实现。
从这里的代码中又可以看到,具体的扩展实现类型,是通过调用loadFile方法来加载,分别从一下三个地方加载:

1
2
3
META-INF/dubbo/internal/
META-INF/dubbo/
META-INF/services/

那么这个loadDirectory方法则至关重要了,看看其源代码:
com.alibaba.dubbo.common.extension.ExtensionLoader#loadDirectory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
// SPI目录与SPI接口
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
// 扫描classpath下面的当前SPI接口的扩展
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}

com.alibaba.dubbo.common.extension.ExtensionLoader#loadResource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
// 读取SPI文件中的SPI扩展
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}

com.alibaba.dubbo.common.extension.ExtensionLoader#loadClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
// 如果Adaptive注解标注在这个对象上,设置cachedAdaptiveClass值为当前对象
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else if (isWrapperClass(clazz)) {
// 判断这个SPI扩展是否以当前SPI接口为构造器,使用装饰器模式增强这个类
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} else {
// 没有以当前SPI接口为构造器
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
// 标注了Activate注解,保存在cachedActivates属性中
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
// 把扩展名称与扩展类的映射关系保存在cachedNames中
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
// 把扩展名称与扩展类的映射关系保存在extensionClasses中,用于返回
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}

这里就把@Adaptive的对象放置到cachedAdaptiveClass属性中,把有SPI接口为构造的包装对象放置在cachedWrapperClasses属性中,
把没有SPI接口为构造的对象放置在cachedNames属性中。

1.3.7.1.2.2)@Adaptive在方法上

如果@Adaptive标注在SPI接口的方法上,那么dubbo就会通过SPI接口Compiler进行字节码操作生成代理对象。默认使用Javassist字节码框架生成代理对象。

com.alibaba.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass

1
2
3
4
5
6
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

其中createAdaptiveExtensionClassCode()方法为代理类生成模板拼接方法,具体生成的类可看上面Protocol和Transporter生成的适配类。

通过getAdaptiveExtensionClass获取到SPI扩展对象Class的实例的过程到此为止就完成了。

下面就来看下扩展点自动注入的实现injectExtension方法。

com.alibaba.dubbo.common.extension.ExtensionLoader#injectExtension

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
// 处理所有set方法
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
// 获取set方法参数类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取setter对应的property名称
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
// 根据类型,名称信息从ExtensionFactory获取生成需要注入的SPI接口扩展对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 如果不为空,说set方法的参数是扩展点类型,那么进行注入
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}

这里可以看到,扩展点自动注入的一句就是根据setter方法对应的参数类型和property名称从ExtensionFactory中查询,
如果有返回扩展点实例,那么就进行注入操作。到这里getAdaptiveExtension方法就分析完毕了。

1.3.7.1.3)getExtension

这个方法的主要作用是用来获取ExtensionLoader实例代表的扩展的指定实现。
已扩展实现的名字作为参数,结合前面学习getAdaptiveExtension的代码,我们可以推测,这方法中也使用了。

在生成的扩展点实现代理类中,有类似代码

1
2
com.alibaba.dubbo.rpc.Protocol extension = 
(com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

其中就是使用了getExtension来获取具体的扩展实现,然后调用具体方法。
com.alibaba.dubbo.common.extension.ExtensionLoader#getExtension

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
// 判断是使用默认的扩展 还是根据名称来创建相应的SPI扩展。
if ("true".equals(name)) {
return getDefaultExtension();
}
// 缓存
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 没有缓存实例则创建
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}

com.alibaba.dubbo.common.extension.ExtensionLoader#createExtension

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private T createExtension(String name) {
// 获取解析Adaptive标注了@Adaptive的对象缓存在属性cachedNames
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 从已创建Extension实例缓存中获取
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入使用setter方法的SPI扩展接口
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
// 对wrapper类进行注入setter方法的SPI扩展接口
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}

这里或许有一个疑问:
从代码中看,不论instance是否存在于EXTENSION_INSTANCE,都会进行扩展点注入和Wrap操作。
那么如果对于同一个扩展点,调用了两次createExtension方法的话,那不就进行了两次Wrap操作么?

如果外部能够直接调用createExtension方法,那么确实可能出现这个问题。
但是由于createExtension方法是private的,因此外部无法直接调用。
而在ExtensionLoader类中调用它的getExtension方法(只有它这一处调用),内部自己做了缓存(cachedInstances),
因此当getExtension方法内部调用了一次createExtension方法之后,后面对getExtension方法执行同样的调用时,
会直接使用cachedInstances缓存而不会再去调用createExtension方法了。

1.3.7.1.4)getActivateExtension

getActivateExtension方法主要获取当前扩展的所有可自动激活的实现。

可根据入参(values)调整指定实现的顺序,在这个方法里面也使用到getExtensionClasses方法中收集的缓存数据。

com.alibaba.dubbo.common.extension.ExtensionLoader#getActivateExtension(com.alibaba.dubbo.common.URL, java.lang.String[], java.lang.String)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 如果未配置"-default",则加载所有Activates扩展(names指定的扩展)
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
// 遍历当前扩展所有的@Activate扩展
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
// 判断group是否满足,group为null则直接返回true
if (isMatchGroup(group, activate.group())) {
// 获取扩展示例
T ext = getExtension(name);
// 排除names指定的扩展;并且如果names中没有指定移除该扩展(-name),且当前url匹配结果显示可激活才进行使用
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 对names指定的扩展进行专门的处理
List<T> usrs = new ArrayList<T>();
// 遍历names指定的扩展名
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
// 未设置移除该扩展
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
// default表示上面已经加载并且排序的exts,将排在default之前的Activate扩展放置到default组之前,例如:ext1,default,ext2
// 如果此时user不为空,则user中存放的是配置在default之前的Activate扩展
if (!usrs.isEmpty()) {
// 注意index是0,放在default前面
exts.addAll(0, usrs);
// 放到default之前,然后清空
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
// 这里留下的都是配置在default之后的
if (!usrs.isEmpty()) {
// 添加到default排序之后
exts.addAll(usrs);
}
return exts;
}

1.4)总结

上面把SPI的思路以及源代码分析了一遍,有几点可能需要注意的地方:

  1. 每个ExtensionLoader实例只负责加载一个特定扩展点实现
  2. 每个扩展点对应最多只有一个ExtensionLoader实例
  3. 对于每个扩展点实现,最多只会有一个实例
  4. 一个扩展点实现可以对应多个名称(逗号分隔)
  5. 对于需要等到运行时才能决定使用哪一个具体实现的扩展点,应获取其自使用扩展点实现(AdaptiveExtension)
  6. @Adaptive注解要么注释在扩展点@SPI的方法上,要么注释在其实现类的类定义上
  7. 如果@Adaptive注解注释在@SPI接口的方法上,那么原则上该接口所- - 有方法都应该加@Adaptive注解(自动生成的实现中默认为注解的方法抛异常)
  8. 每个扩展点最多只能有一个被AdaptiveExtension
  9. 每个扩展点可以有多个可自动激活的扩展点实现(使用@Activate注解)
  10. 由于每个扩展点实现最多只有一个实例,因此扩展点实现应保证线程安全
  11. 如果扩展点有多个Wrapper,那么最终其执行的顺序不确定(内部使用ConcurrentHashSet存储)

1.5)参考文章:

dubbo中SPI机制的使用与设计分析
dubbo源码分析之内核SPI实现