Focus on Java


  • 首页

  • 标签

  • 分类

  • 归档

Dubbo源码分析6-Dubbo 服务导出5

发表于 2019-08-02 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务导出

源码分析

大部分服务导出之后需要注册到注册中心,以利于服务治理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);

// 订阅相关
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}

public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}

/**
* 根据invoker的地址获取registry实例
*
* @param originInvoker
* @return
*/
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
// 根据URL获取Register
return registryFactory.getRegistry(registryUrl);
}

以Zookeeper为例,从registerURL中获取到的protocol字段为zookeeper。继续进入ZookeeperRegisterFactroy中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

private ZookeeperTransporter zookeeperTransporter;

public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}

// 在基类中调用此方法
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}

}

public abstract class AbstractRegistryFactory implements RegistryFactory {
// 。。。。。。
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
// 缓存操作
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 调用模板方法
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
// 。。。。。。。。
}

这段逻辑比较简单,继续进入ZookeeperRegistry中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}

上面的代码逻辑也比较简单,主要进入zookeeperTransporter#connect方法

1
2
3
4
5
6
7
@SPI("curator") // 默认为CuratorZookeeperTransporter
public interface ZookeeperTransporter {

@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);

}

继续进入CuratorZookeeperTransporter中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
@Override
public ZookeeperClient createZookeeperClient(URL url) {
return new CuratorZookeeperClient(url);
}
}

// 下面为基类AbstractZookeeperTransporter

@Override
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
List<String> addressList = getURLBackupAddress(url);
// 缓存操作
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
// 加锁,防止map中含有重复内容,单线程写入
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// 调用子类模板方法创建客户端
zookeeperClient = createZookeeperClient(toClientURL(url));
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}

上面的逻辑也比较简单,connect的流程主要在创建zookeeper客户端,之后都是缓存操作

继续跟进,CuratorZookeeperClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public CuratorZookeeperClient(URL url) {
// 基类只是赋值操作
super(url);
try {
// 获取URL timeout参数。创建客户端
int timeout = url.getParameter(TIMEOUT_KEY, 5000);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();

// 事件监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

上面的逻辑还是很清晰的,目的就是在创建Zookeeper客户端。回到开始的部分,创建好客户端之后就是注册zookeeper。进入到ZookeeperRegistry的register方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

// 实际register方法实在此类的基类Failback中

@Override
public void register(URL url) {
// 基类记录已经注册过的url
super.register(url);
// 移除注册失败的
removeFailedRegistered(url);
// 移除反注册失败的
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 子类模板方法
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// Record a failed registration request to a failed list, retry regularly
// 记录注册失败的
addFailedRegistered(url);
}
}

failover是指出错重试的策略。

进入ZookeeperRegistry#doRegister方法

1
2
3
4
5
6
7
8
9
10
11
@Override
public void doRegister(URL url) {
try {
// 创建永久节点和临时节点
// 创建目录为/${group}/${serviceInterface}/providers/${url}
// 其中/${group}/${serviceInterface}/providers/为永久节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

Dubbo源码分析5-Dubbo 服务导出4

发表于 2019-08-01 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务导出

源码分析

上一节分析完Dubbo导出的代码之后发现一个问题,没有找到filter链路的调用逻辑。

但是可以在源码中清晰看到filter对应的几个实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SPI
public interface Filter {

/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}

filter基础的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

之后在filter的wrapper也看到了filter链路获取的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ProtocolFilterWrapper implements Protocol {

private final Protocol protocol;

public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url、group以及key获取链路
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {

@Override
public Class<T> getInterface() {
return invoker.getInterface();
}

@Override
public URL getUrl() {
return invoker.getUrl();
}

@Override
public boolean isAvailable() {
return invoker.isAvailable();
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}

@Override
public void destroy() {
invoker.destroy();
}

@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
//。。。。。。。。。。。。

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
// 略去不相关代码

}

但是这个wrapper是在哪里包装了protocol呢

这需要回顾SPI的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ExtensionLoader#createExtension
@SuppressWarnings("unchecked")
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
// 这里的cacheWrapperClasses,缓存的是构造函数中含有参数有且仅有T类型的类
// 例如Protocol对应的ProtocolFilterWrapper中,含有ProtocolFilterWrapper(Protocol protocol) 此构造函数
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 对于每一个缓存的wrapper类,查找是否又满足构造函数的实例,如果有,则循环依次包装此类返回
// 这一点就解释了,对于每一个Protocol其实缓存的是这个包装类,而非本体
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}

以默认的Dubbo Protocol为例,在通过URL动态获取protocol的时候,其实拿到的是经过了wrapper包装后的类,可能不止经过一层,对于Protocol,涉及到的为ProtocolFilterWrapper,ProtocolListenerWrapper类

这两个类同样是Protocol的实现,对应在SPI配置中为

1
2
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

这样就总体串起来了,在初次通过SPI获取类的时候,会将ProtocolFilterWrapper以及ProtocolListenerWrapper作为cacheWrapperClasses缓存起来,之后获取DubboProtocol会用这两个包装类层层包装,之后缓存并返回这个包装过的DubboProtocol,这样也就将Filter链路嵌在DubboProtocol中。

Dubbo源码分析4-Dubbo 服务导出2

发表于 2019-07-24 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务导出

源码分析

接上一节,继续介绍doExportUrls

1
2
3
4
5
6
7
8
9
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
// 加载注册中心的URL
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
// 对于每个协议都进行注册
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
1
2
<dubbo:registry address="${zookeeper_url}" client="curator2"
file="dubbo/dubbo.cache.${SERVER_NAME}" />
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected List<URL> loadRegistries(boolean provider) {
// 校验注册中心配置
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
// 替换为 0.0.0.0
address = Constants.ANYHOST_VALUE;
}
// 从系统参数中获取注册地址
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// appendParameters将对象中get方法的参数填入到map中
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// map中没有协议值的话,如果含有remote扩展则只暴露远程服务,否则只暴露本地服务
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
// 设置协议为register
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// (服务提供者 && register = true 或 null)
// || (非服务提供者 && subscribe = true 或 null)
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}

上面的逻辑主要初始化register列表,用于后续的对每个协议都要暴露服务

后续将进行服务URL的组装以及服务的本地暴露和远程暴露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 协议为空则设为dubbo
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}

// 设置side, dubbo version , timestamp, pid等信息
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 将配置类中的信息添加到map中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// 解析<dubbo:method>信息
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
// 设置重试,对应到map的key为xxx.retry
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 如果未设置retry则设置为0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
// 如果方法参数配置不为空则 解析 <dubbo:argument> 信息
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// 如果参数类型不为空
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 找到此方法
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// 获取方法的参数类型
if (argument.getIndex() != -1) {
// 如果参数的索引不为-1
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
// 如果参数类型一致,则填充argumentconfig信息到map中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
// 如果不一致抛出异常
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// 如果参数的索引值为-1,即没有填充
for (int j = 0; j < argtypes.length; j++) {
// 从参数中匹配类型
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
// 如果参数的索引不为-1,并且参数类型为空,添加argumentconfig信息到map中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}

}
}
} // end of methods for
}

//如果为通用服务
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

// 为接口生成包装类
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 将方法名用逗号连接
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 如果协议为injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}

// 获取上下文路径
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}

// 获取host与port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

// ........
// ........
}

上面代码介绍了导出服务中组装URL的部分,主要是对各个根据配置参数以及系统参数组装dubbo URL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 此为doExportUrlsFor1Protocol后半部分
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加载ConfiguratorFactory并配置url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果为N/A则不配置
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

// 如果不为remote则导出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 如果不为local则导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}

// 选择动态代理方式 jdk/javassist 默认为javassit
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}

// 根据接口以及实现类生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

// 导出服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 不需要注册,直接导出服务
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);

上面代码可以看出来核心部分在生成Invoker部分,是后续导出服务的前提

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class JavassistProxyFactory extends AbstractProxyFactory {
// ............

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 生成包装类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 返回匿名类
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 包装类调用
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

}

从上面代码来看,getInvoker代码比较简单,主要逻辑在生成包装类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
c = c.getSuperclass();

if (c == Object.class)
return OBJECT_WRAPPER;

Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}

上面的代码主要是缓存的操作。生成类在makeWrapper中。makeWrapper方法比较复杂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
private static Wrapper makeWrapper(Class<?> c) {
if (c.isPrimitive())
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);

String name = c.getName();
ClassLoader cl = ClassHelper.getClassLoader(c);

StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");

c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");

Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types>
Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance>
List<String> mns = new ArrayList<String>(); // method names.
List<String> dmns = new ArrayList<String>(); // declaring method names.

// get all public field.
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))
continue;

c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
pts.put(fn, ft);
}

Method[] methods = c.getMethods();
// get all public method.
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
}
for (Method m : methods) {
if (m.getDeclaringClass() == Object.class) //ignore Object's method.
continue;

String mn = m.getName();
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
c3.append(" && ").append(" $3.length == ").append(len);

boolean override = false;
for (Method m2 : methods) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}

c3.append(" ) { ");

if (m.getReturnType() == Void.TYPE)
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
else
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");

c3.append(" }");

mns.add(mn);
if (m.getDeclaringClass() == c)
dmns.add(mn);
ms.put(ReflectUtils.getDesc(m), m);
}
if (hasMethod) {
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}

c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

//。。。。。。。。。。。。。。。

cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());

//。。。。。。。。。。。。。。。。
}

上面的代码用来生成Wrapper类,Wrapper类主要将接口中方法的信息都储存起来,通过解析getter setter is方法获得属性。

但我们目前主要关心的invokeMethod方法。debug之后可以看出来invokeMethod对应的源码,主要是根据接口中方法类型的不同来选择调用接口中不同的方法。

javassist不是本文重点,暂不介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//配置为none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
// 。。。。。
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
// 。。。。。。。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}

回顾之前的代码,导出服务时首先导出本地服务,之后根据register信息导出远程服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void exportLocal(URL url) {
// 判断是否已经是injvm协议,如果是因为之前的已经导出过了
// 但这里似乎有个问题,如果scope为local并且协议为injvm,就不会导出服务了
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}

对于Injvm本地导出只是简单的生成一个InjvmExporter即可

1
2
3
4
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

重点关注需要注册的协议。进入RegisterProtocol中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

URL registryUrl = getRegistryUrl(originInvoker);

// 注册
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

// 是否延迟注册
boolean register = registeredProviderUrl.getParameter("register", true);

ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}


final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 每次都要返回一个DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

跟着主流程走,第一步是导出服务,进入doLocalExport中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 获取服务的key,由分组,版本号,接口,端口构成
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 双重检查锁过后,获取到对原始的invoker的代理
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 导出服务,默认为dubbo协议
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}

默认的协议为Dubbo,进入DubboProtocol中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

// 本地存根
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
optimizeSerialization(url);
return exporter;
}

继续进入openServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务
serverMap.put(key, createServer(url));
} else {
// 重置服务
server.reset(url);
}
}
}

private ExchangeServer createServer(URL url) {
// 默认readonly
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 默认心跳开启
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 服务实现,默认为netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);

url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 创建server
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取client类型
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 检查是否由此类型扩展
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}

之后进入Exchangers#bind中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}

// 通过SPI动态获取transporter 默认为nettyTransporter
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

继续进入NettyTransporter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class NettyTransporter implements Transporter {

public static final String NAME = "netty";

@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}

@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}

可见bind函数就是创建了NettyServer,继续看下去,NettyServer继承了AbstractServer,主要逻辑在此构造函数以及子类doOpen模板方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();

String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}


// NettyServer

@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 设置线程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 启动服务
channel = bootstrap.bind(getBindAddress());
}

上面就是暴露远程服务的过程了。

总结下来就是:

  • 监听ContextRefreshEvent事件。在此方法中暴露服务

  • 检查dubbo核心配置,并根据默认值或者系统值进行填充,同时组装URL

  • 加载所有的注册信息

  • 对每个协议注册到所有的注册中心上

    • 根据核心配置组装map
    • 获取invoker
    • 暴露服务

在这个过程中涉及到几个核心接口

  • Protocol 协议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*
* @author william.liangf
*/
@SPI("dubbo") // 默认为DubboProtocol
public interface Protocol {

/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();

/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();

}
  • Transporter 传输层载体(TCP/UDP…)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    /**
    * Transporter. (SPI, Singleton, ThreadSafe)
    * <p>
    * <a href="http://en.wikipedia.org/wiki/Transport_Layer">Transport Layer</a>
    * <a href="http://en.wikipedia.org/wiki/Client%E2%80%93server_model">Client/Server</a>
    *
    * @see com.alibaba.dubbo.remoting.Transporters
    */
    @SPI("netty")
    public interface Transporter {

    /**
    * Bind a server.
    *
    * @param url server url
    * @param handler
    * @return server
    * @throws RemotingException
    * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
    */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
    * Connect to a server.
    *
    * @param url server url
    * @param handler
    * @return client
    * @throws RemotingException
    * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
    */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

    }
  • Exchanger 交换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Exchanger. (SPI, Singleton, ThreadSafe)
* <p>
* <a href="http://en.wikipedia.org/wiki/Message_Exchange_Pattern">Message Exchange Pattern</a>
* <a href="http://en.wikipedia.org/wiki/Request-response">Request-Response</a>
*/
@SPI(HeaderExchanger.NAME)
public interface Exchanger {

/**
* bind.
*
* @param url
* @param handler
* @return message server
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

/**
* connect.
*
* @param url
* @param handler
* @return message channel
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

Dubbo源码分析3-Dubbo 服务导出1

发表于 2019-07-23 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务导出

介绍

Dubbo 导出服务分为三个部分:

  • 前置工作:检查参数,组装默认值,组装URL
  • 导出服务:导出服务到本地(InJvmExporter),和导出服务到远程
  • 注册服务:向注册中心注册服务用于服务发现

源码分析

前置工作

过程起始于ServiceBean#onApplicationEvent,ServiceBean用于连接Dubbo和Spring。Dubbo在收到事件通知之后就会导出服务。

1
2
3
4
5
6
7
8
9
10
11
@Override
// 事件类型为Spring的上下文刷新事件
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否延迟导出 && 是否已经导出 && 是否已取消导出
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}

其中isDelay方法,含义与字面意义是相反的

1
2
3
4
5
6
7
8
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}

例如如果provider中配置了delay为10,则返回false,而未配置delay则为true

之后进入export方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public synchronized void export() {
if (provider != null) {
// 获取是否需要暴露
if (export == null) {
export = provider.getExport();
}
// 是否延迟暴露
if (delay == null) {
delay = provider.getDelay();
}
}
// 如果不暴露则终止
if (export != null && !export) {
return;
}

if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}

上面代码逻辑比较简单,继续进入doExport方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
protected synchronized void doExport() {
// 已经取消暴露
if (unexported) {
throw new IllegalStateException("Already unexported!");
}

// 已经暴露
if (exported) {
return;
}

exported = true;

// 校验接口名称是否为空
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}

// 检查默认值
checkDefault();
// 如果provider不为空,则从中获取应用信息,模块配置、注册地址等信息对实例赋值
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// ......

// 判断是否是通用服务,通用服务即不需要服务提供接口
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
// 获取接口
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 校验接口和方法是否一致
checkInterfaceAndMethods(interfaceClass, methods);
// 检查ref是否非空以及是否是接口的实现类
checkRef();
generic = Boolean.FALSE.toString();
}
// local 与 stub 为本地存根逻辑
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
// 加载本地存根类,如果local字段为为true则默认加载xxxxLocal类
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
// 逻辑同local部分
}
// 校验application等
checkApplication();
checkRegistry();
checkProtocol();

// 对当前对象的各个对象检测是否为空,为空的话从系统参数中获取后新建
appendProperties(this);

// 校验本地存根类以及本地伪装类
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 暴露Urls
doExportUrls();
// 创建提供者模型,记录服务名称与配置,以及实现类关系,其中uniqueServiceName是由group+接口名称+版本组成
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
// 存入缓存
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

上面的逻辑比较复杂,主要做了如下事情:

  • 对<dubbo:service interface=””…> 合理性校验

  • 对application module registers等配置类赋值以及校验

  • 对ref合理性进行校验

  • 对stub本地存根以及mock本地伪装类进行合理性校验

  • 暴露Urls

后续介绍核心部分doExportUrls

Dubbo源码分析2-Dubbo SPI 2

发表于 2019-07-19 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo SPI

Dubbo IOC

上一篇讲到SPI,中有一部分没有介绍,在创建扩展类的时候,需要对扩展类进行依赖注入。这里涉及到Dubbo IOC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SuppressWarnings("unchecked")
private T createExtension(String name) {
// 从配置文件中读取类名,并加载类,初始化到名称-类的表中
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 反射创建实例,存入缓存
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入依赖,Dubbo IOC部分,通过set方法注入需要的依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
// 循环创建 包装实例
for (Class<?> wrapperClass : wrapperClasses) {
// 用当前的实例作为参数传给后续的包装类作为入参
// 此处的做法涉及到WrapperClass部分
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
// 寻找set开头的并且参数只有一个的函数
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
// 获取参数类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取参数名称
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
// 寻找此类型和此名称的扩展类
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 赋值
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}

通过setter方法对扩展类依赖注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}

ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

这里objectFactory类型为ExtensionFactory,但是可以在构造函数中看出来,其寻找了ExtensionFactory对应的AdaptiveExtension。AdaptiveExtension表示自适应扩展类(下节介绍)。在Dubbo中ExtensionFactory为AdaptiveExtensionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

private final List<ExtensionFactory> factories;

public AdaptiveExtensionFactory() {
// 从缓存中读取ExtensionLoader
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
// 获取所有的扩展,返回name与class对应关系
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}

@Override
public <T> T getExtension(Class<T> type, String name) {
// 从
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}

}

getSupportedExtensions和SPI部分逻辑一致,获取所有扩展的key值

1
2
3
4
public Set<String> getSupportedExtensions() {
Map<String, Class<?>> clazzes = getExtensionClasses();
return Collections.unmodifiableSet(new TreeSet<String>(clazzes.keySet()));
}

回到最开始,injectExtension依赖注入部分,

1
2
3
4
5
6
 // 寻找此类型和此名称的扩展类
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 赋值
method.invoke(instance, object);
}

调用AdaptiveExtensionFactory getExtension 获取到当前所有的ExtensionFactory下对应的名字和类型的扩展类。调用set方法赋值。

Dubbo 自适应扩展类

使用

回到上一章,Dubbo SPI还支持自适应扩展类。

自适应扩展类和普通扩展类的区别在于,自适应扩展类可以根绝URL参数在运行时动态创建扩展类,而不是在启动时创建完毕。

还是参照上一节的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SPI
public interface Robot {
@Adaptive
void sayHello(URL url);
}

public class Bumblebee implements Robot {

@Override
public void sayHello(URL url) {
System.out.println(this.getClass().getName());
}

}

public class OptimusPrime implements Robot {

@Override
public void sayHello(URL url) {
System.out.println(this.getClass().getName());
}

}

需要自适应扩展类的需要增加@Adaptive注解。@Adaptive注解支持在类和方法上,如果在类上,则不会生成代理类,如果在方法上则会生成代理类。

1
2
3
4
5
6
7
8
9
10
11
public class TestDubbo {

@Test
public void test_adaptive() throws Exception {
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);
Robot robot = extensionLoader.getAdaptiveExtension();
robot.sayHello(URL.valueOf("test://1.1.1.1:9999?robot=bumblebee"));
robot.sayHello(URL.valueOf("test://1.1.1.1:9999?robot=optimusPrime"));
}
}

robot调用方法则根据URL后的robot=xxx获取对应的扩展类

源码分析

上一节的测试类,首先进入的是 getAdaptiveExtension方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
// 从holder中获取
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
// 两次检查后都没有获取到
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}

上面逻辑比较简单,去缓存中获取自适应扩展,获取不到则创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
// 获取到自适应扩展类,并创建新实例的注入依赖
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}

private Class<?> getAdaptiveExtensionClass() {
// 获取所有扩展类
getExtensionClasses();
// 如果找到自适应扩展类,即在类上注解的扩展类,则直接返回了
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 如果不是在类上注解的自适应扩展类,需要创建代理类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

上面这段逻辑也比较简单,获取到自适应扩展类,并反射创建实例,然后对实例进行依赖注入。

对于在类上的注解,则直接返回,否则需要创建代理类。

1
2
3
4
5
6
7
8
private Class<?> createAdaptiveExtensionClass() {
// 构造代理类代码
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 生成代理类
return compiler.compile(code, classLoader);
}

上面的逻辑即构造代理类代码,并且编译生成代理类。至此,可以看到我们目前用到了两个自适应扩展类,AdaptiveCompiler 和 AdaptiveExtensionFactory。Compiler即是AdaptiveCompiler实现的扩展。这也是Dubbo中唯二的在类上有Adaptive注解的。

上面代码的核心流程在createAdaptiveExtensionClassCode。

首先,回到刚才的例子中,在调用

1
robot.sayHello(URL.valueOf("test://1.1.1.1:9999?robot=bumblebee"))时

时,其实是调用了,Robot$Adaptive扩展类中的sayHello方法。Debug之后可以看出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package test.dubbo;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Robot$Adpative implements test.dubbo.Robot {
public void sayHello(com.alibaba.dubbo.common.URL arg0) {
// 如果URL为空则抛出异常
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
// 获取URL参数比如类名为AbcDef则参数key对应abc.def
String extName = url.getParameter("robot");
if(extName == null) throw new IllegalStateException("Fail to get extension(test.dubbo.Robot) name from url(" + url.toString() + ") use keys([robot])");
// 通过此参数获取扩展类,并调用sayHello方法
test.dubbo.Robot extension = (test.dubbo.Robot)ExtensionLoader.getExtensionLoader(test.dubbo.Robot.class).getExtension(extName);extension.sayHello(arg0);
}
}

生成的代理类即以上的内容。结合生成的代理类来看createAdaptiveExtensionClassCode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StringBuilder codeBuilder = new StringBuilder();
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for (Method m : methods) {
if (m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// no need to generate adaptive class since there's no adaptive method found.
if (!hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");

codeBuilder.append("package ").append(type.getPackage().getName()).append(";");
codeBuilder.append("\nimport ").append(ExtensionLoader.class.getName()).append(";");
codeBuilder.append("\npublic class ").append(type.getSimpleName()).append("$Adaptive").append(" implements ").append(type.getCanonicalName()).append(" {");

上面代码包含两部分逻辑,首先判断是否有Adaptive注解,如果不存在则报错。之后就是构造代码的package和import语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
for (Method method : methods) {
// 方法的基本信息
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
Class<?>[] ets = method.getExceptionTypes();

Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
// 如果当前方法不是Adaptive注解的,则直接抛出异常
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
} else {
int urlTypeIndex = -1;
// 如果方法的入参包括了URL类型的参数,记录参数的索引值
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
if (urlTypeIndex != -1) {
// 非Null判断
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);

s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
else {
// 如果入参没有直接包含URL类型的参数,寻找参数中包含URL的参数
String attribMethod = null;
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
// 寻找参数中的返回值类型为URL的getter方法
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) {
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
if (attribMethod == null) {
throw new IllegalStateException("fail to create adaptive class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}

// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);

s = String.format("%s url = arg%d.%s();", URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}

上面的逻辑比较复杂一些,主要是寻找URL。分为两种情况,一种是我们之前例子中的,直接包含URL的,一种是通过入参可以获取到URL的。

例如 com.alibaba.dubbo.common.URL url = arg0.getUrl();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Adaptive注解的value数组
String[] value = adaptiveAnnotation.value();
// 如果没有值,会赋一个默认值,默认值是根据驼峰规则转换来的,例如AaBbCc生成的则是aa.bb.cc
if (value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if (Character.isUpperCase(charArray[i])) {
if (i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
} else {
sb.append(charArray[i]);
}
}
value = new String[]{sb.toString()};
}

// 检查是否存在invocation类型
boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
hasInvocation = true;
break;
}
}

// cachedDefaultName即SPI注解值
String defaultExtName = cachedDefaultName;
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) {
if (i == value.length - 1) {
if (null != defaultExtName) {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else
// url中protocol未单独部分,所以需要特殊处理
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
// protocol类型特殊处理
getNameCode = "url.getProtocol()";
}
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
code.append("\nString extName = ").append(getNameCode).append(";");

上面的逻辑比较复杂,主要是对URL的处理。一种情况为:

  • 是否有默认值:如果有的话,在最后一个参数的时候需要替换默认值

  • Protocol 其需要从getProtocol方法中获取参数

  • 是否含有Invocation类型,是的话需要从MethodParameter中获取

针对上面的情况

  • 以Protocol接口为例,生成代码如下:
1
2
3
4
5
6
7
8
9
10
11

@SPI("dubbo")
public interface Protocol {
...
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
...
}
1
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
  • 以Exchanger为例
1
2
3
4
5
6
7
8
9
10
@SPI(HeaderExchanger.NAME) // header
public interface Exchanger {

@Adaptive({Constants.EXCHANGER_KEY}) // exchanger
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}
1
String extName = url.getParameter("exchanger", "header");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);

// 根据名称获取到扩展类
s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);

// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}

// 调用扩展类相关的代码
s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");

从URL中拿到扩展类的名字后,获取到相关的扩展类,执行扩展类的函数即可。

之后就是收尾,构造代码了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
codeBuilder.append("\npublic ").append(rt.getCanonicalName()).append(" ").append(method.getName()).append("(");
for (int i = 0; i < pts.length; i++) {
if (i > 0) {
codeBuilder.append(", ");
}
codeBuilder.append(pts[i].getCanonicalName());
codeBuilder.append(" ");
codeBuilder.append("arg").append(i);
}
codeBuilder.append(")");
if (ets.length > 0) {
codeBuilder.append(" throws ");
for (int i = 0; i < ets.length; i++) {
if (i > 0) {
codeBuilder.append(", ");
}
codeBuilder.append(ets[i].getCanonicalName());
}
}
codeBuilder.append(" {");
codeBuilder.append(code.toString());
codeBuilder.append("\n}");

至此,Dubbo SPI部分结束了,还是有很多细节需要多Debug才能分别出来

1234…7

shiyan

31 日志
4 分类
14 标签
GitHub E-Mail
© 2017 — 2020 shiyan
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.4