Focus on Java


  • 首页

  • 标签

  • 分类

  • 归档

Dubbo源码分析11 -Dubbo 服务引入

发表于 2019-08-31 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务引入

源码分析

invoker

接着上一篇,进入refreshInvokers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
// 如果invokerUrl中仅有empty协议的url
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 转化为invokers对象列表
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 转化为methd对应的invoker
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 关闭oldUrlInvokerMap中不在newUrlInvokerMap存在的
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
// 从url中获取protocol参数
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
// 如注释所说,如果配置了protocol,只接受配置允许的
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
// 跳过空协议
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 协议检查
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);

String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
// 默认enable true
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// protocol 在这里对应DubboProtocol,生成invoker代理
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}

}

refreshInvoke方法的功能对当前的invoker更新,新建新的provider对应的invoker,并移除不可用的invoker

在toInvokers方法中,protocol#ref 方法中调用了各自protocol方法的逻辑。现在进入到默认的协议,DubboProtocol中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
public class DubboProtocol extends AbstractProtocol {

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

private ExchangeClient[] getClients(URL url) {
// 获取客户端
// whether to share connection
boolean service_share_connect = false;
// 共享客户端
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化客户端
clients[i] = initClient(url);
}
}
return clients;
}

/**
* Get shared connection
*/
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
// 从缓存中取
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}

// 加锁处理,单个线程一个一个处理
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
// 从缓存中获取
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
// 缓存拿不到,初始化客户端
ExchangeClient exchangeClient = initClient(url);
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}

/**
* Create new connection
*/
private ExchangeClient initClient(URL url) {

// client type setting.
// 默认客户端类型为netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

// 编码
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
// 心跳
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}

ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 客户端初始化
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}

public class Exchangers {

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}

public static Exchanger getExchanger(URL url) {
// 默认 HeaderExchanger
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
}

public class HeaderExchanger implements Exchanger {

public static final String NAME = "header";

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 从数据交换层到传输曾
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
public class Transporters {

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
// 没有handler则新建适配器
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 多个则创建分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 默认nettyTransporter
return getTransporter().connect(url, handler);
}

public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}

// 进入到传输层

public class NettyTransporter implements Transporter {

@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 创建netty类型客户端 , 后续涉及handler包装,消息分发等,后面再分析
return new NettyClient(url, listener);
}

}

后续将继续RegistryDirectory余下代码

Dubbo源码分析10 -Dubbo 服务引入

发表于 2019-08-29 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务引入

源码分析

订阅

上一篇分析了Dubbo服务引入的整体流程。我发现一直没有看到DubboProtocol的部分。重新debug之后,发现漏了一些流程

首先先找到DubboProtocol#ref方法的堆栈

调用堆栈

梳理下它的调用链

RegistryProtocol#doRefer -> RegistryDirectory#subscribe -> FailbackRegistry#subscribe -> ZookeeperRegistry#doSubscribe ->

FailbackRegistry#notify -> FailbackRegistry#doNotify -> AbstractRegistry#notify -> RegistryDirectory#notify ->

RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokers -> Protocol的动态类以及一些Wrapper -> DubboProtocol#refer

调用链是比较复杂的,从registryProtocol的refer ,在经过RegistryDirectory以及其基类之间的调用,最终将其转化为invoker,再最后调用到DubboProtocol#refer做最后的处理。从调用链可以看出RegistryDirectory是作为字典的存在,用来保证注册中心的节点变动实时体现在Invoker上

回到RegistryProtocol#doRefer中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry); // 此处registry对应的是ZookeeperReigistry
directory.setProtocol(protocol);
// 设置directory

// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 注册到注册中心
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅privider, configurators, routers
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

进入directory#subscribe方法内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void subscribe(URL url) {
setConsumerUrl(url);
// 对应的zookeeperRegistry
registry.subscribe(url, this);
}
}

// ZookeeperRegistry对应的基类
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;

List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}

// 省去无关代码
}

public class ZookeeperRegistry extends FailbackRegistry {
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 当interface 参数为 * 时
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
// interface 参数有值时
List<URL> urls = new ArrayList<URL>();
// 按照分类 转换路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
// 此处zkClient为CuratorZookeeperClient
// 创建非临时节点
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知 此处调用父类方法
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}



}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// 调用即类方法
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}

public abstract class AbstractRegistry implements Registry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
// null 判断
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}

// 按照 category
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
// 此处判断逻辑为:
// 1、 对比interface:consumerInterface 不为 * 并且 consumeInterface和providerInterface不一致
// 2、 对比category: consumerCategory是否包含*; consumerCategory是否剔除(-)provider的category; consumerCategory是否包含providerCategory
// 3、 providerUrl, consumerUrl中 enable字段为false
// 下面是分别对group, version, classifier的判断。
// 4、 group 对应为 consumerGroup为* 或者 consumer,provider一致或者 consumerGroup包含 providerGroup
// 5、 version 对应为 consumerVersion 为* 或者 两者相等
// 6、 classfier对应为 consumerClassifier 为null 或者 consumerClassifier 为* 或者 两者相等
// 此处url为consumer协议的url
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}

// result中是按照category来分类的

if (result.size() == 0) {
return;
}


Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 保存配置
saveProperties(url);
// 进行通知,此处调用回到了RegistryDirectory中
listener.notify(categoryList);
}
}
}


public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
// 对url进行分类
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
// 转化为configurators对象列表
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
// 转化为router列表
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
// 针对provider刷新invokers,保证注册中心和字典内一致
refreshInvoker(invokerUrls);
}
}

TODO refresh invokers 等后续流程

Dubbo源码分析9 -Dubbo 服务引入

发表于 2019-08-28 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务引入

源码分析

引入服务

接着上一篇,进入createProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 协议为temp的URL
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) {
// 如果url不为空则可能为直连模式
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {

isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}

if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 加载注册地址
List<URL> us = loadRegistries(false);
// 加载注册地址url
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 单个注册地址
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 多个注册地址
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) {
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}

Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建代理
return (T) proxyFactory.getProxy(invoker);
}

上面逻辑比较简单,主要是加载注册地址以及对多个注册地址或者直连地址的合并处理。核心部分在refprotocol.refer

invoker

refprotocol在当前环境下对应的是RegistryProtocol。

这段代码分析的时候涉及到多个动态加载的类。在进入分析前先罗列下,后续我再把堆栈打印出来直观展示

RegistryProtocol#refer -> ZookeeperRegistryFactory#getRegistry
-> ZookeeperRegistry -> CuratorZookeeperTransporter#connnect -> CuratorZookeeperClient

RegistryProtocol#refer -> MockClusterWrapper#join -> MockClusterInvoker#join -> FailoverCluster#join

下面继续调试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获取注册中心协议 此例子用的zookeeper
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 根据url中的protocol获取到registry,对应的是zookeeperRegistryFactory
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// group的处理
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 导出服务
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 设置directory

// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 根据最开始分析的,这里的registry会调到CuratorZookeeperClient,之后到注册中心注册
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 注册privider, configurators, routers目录
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
// 将信息存入配置缓存中
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}

doRefer方法中的 Invoker invoker = cluster.join(directory); 这里需要特殊说明下,这里是通过Wrapper类进到核心的逻辑类FailoverCluster,至于SPI如何调用到Wrapper的类,可以看下 Dubbo源码分析5-Dubbo 服务导出4

对应的堆栈如下图所示

1

2

进入cluster#join对应的failovercluster中

1
2
3
4
5
6
7
8
9
10
11
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 返回failoverclusterinvoke
return new FailoverClusterInvoker<T>(directory);
}

}
proxy

上一节分析了invoker的来源,得到invoker其实是MockClusterInvoker,下面接着分析另外一个核心点,proxyFactory.getProxy(invoker) 是如何创建proxy

proxyFactory是通过自适应SPI来实例的。在服务导出的时候,分析过他getInvoker实现

现在我们使用Arthas来看看,这个动态生成的类是怎么样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;

public class ProxyFactory$Adpative
implements ProxyFactory {
public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getInvoker(object, class_, uRL);
}

public Object getProxy(Invoker invoker) throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string = uRL.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getProxy(invoker);
}
}

可以看出来还是比较简单的,getProxy从invoker中拿到url,然后根据url中的proxy参数来获取到ProxyFactory。这里用的是默认的javassitProxyFactory

我们继续往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// 首先还是从基类分析
public abstract class AbstractProxyFactory implements ProxyFactory {

@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 入口
return getProxy(invoker, false);
}

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 这里从url中获取interfaces的参数,但这里没发现哪里存的这个参数,或许是为了扩展
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 分割字符
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 默认加上invoker的interface,以及echo
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 反射获取
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}

// 泛化接口的处理
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = GenericService.class;
}
// 调用子类方法
return getProxy(invoker, interfaces);
}
}

public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 动态创建proxy
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

// 省去无关代码
}
public abstract class Proxy {
/**
* Get proxy.
*
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(Class<?>... ics) {

return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
* Get proxy.
*
* @param cl class loader.
* @param ics interface class array.
* @return Proxy instance.
*/
// 动态生成proxy
// 这个方法官网已经讲得非常详细了,照着官方来看
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
// 接口数量校验
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");

StringBuilder sb = new StringBuilder();
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");

Class<?> tmp = null;
try {
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}

if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

sb.append(itf).append(';');
}
// stringbuilder中存了所有的interface,并用分号分割
// use interface class name list as key.
String key = sb.toString();

// get cache by class loader.
// 根据classloader缓存 map
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}

Proxy proxy = null;
// 加锁
synchronized (cache) {
do {
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null)
return proxy;
}
// 仅允许单线程操作
if (value == PendingGenerationMarker) {
try {
cache.wait();
} catch (InterruptedException e) {
}
} else {
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);

Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();

for (int i = 0; i < ics.length; i++) {
// 如果接口不为空则需要在同一个包下
if (!Modifier.isPublic(ics[i].getModifiers())) {
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}

// ccp
ccp.addInterface(ics[i]);

for (Method method : ics[i].getMethods()) {
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc))
continue;
worked.add(desc);

int ix = methods.size();
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();

StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if (!Void.TYPE.equals(rt))
code.append(" return ").append(asArgument(rt, "ret")).append(";");

methods.add(method);
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}

if (pkg == null)
pkg = PACKAGE_NAME;

// create ProxyInstance class.
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));

// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null)
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
cache.put(key, new WeakReference<Proxy>(proxy));
cache.notifyAll();
}
}
return proxy;
}
}

上面的代码主要涉及ccp、ccm两部分,其中ccp用于生成interface的实现类,ccm用于生成proxy的基类

ccp生成的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0
implements ClassGenerator.DC,
EchoService,
DemoService {
public static Method[] methods;
private InvocationHandler handler;

public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}

public proxy0() {
}

public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], arrobject);
return (String)object;
}

public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}

ccm生成的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;

public class Proxy0
extends Proxy
implements ClassGenerator.DC {
public Object newInstance(InvocationHandler invocationHandler) {
return new proxy0(invocationHandler);
}
}

最终我们获取到了生成的Proxy0,大功告成

Dubbo源码分析8 -Dubbo 引入

发表于 2019-08-27 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务引入

开始

上一节说了Dubbo服务导出。这一节学习和服务导出类似的Dubbo服务引入

Dubbo服务引入是客户端-服务端模型中的客户端。Dubbo有两种方式:1、直连模式,2、注册中心

直连是通过指定ip以及端口来连接,注册中心,见名知意,是通过注册中心连接,线上环境应该都是注册中心连接,利于服务治理

源码分析

入口

代码的入口在ReferenceBean的getObject方法中

getObject方法是Spring的FactoryBean接口的实现,getObject是在获取reference时候被调用。可见,这是一种懒汉式的实现方式

此外,getObject还被afterPropertiesSet方法调用。afterPropertiesSet是InitializingBean接口的方法,将在初始化Bean时候被调用,这是饿汉式实现方式。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
//....省略无关代码
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b.booleanValue()) {
getObject();
}
}

reference的init参数被设置的时候会在Bean初始化的时候才会加载Bean

加载

现在回到getObject方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Object getObject() throws Exception {
return get();
}

public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}

这里用IDEA调试会有点问题 具体见 #2754

继续往下看

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
private void init() {
// 防止重复校验
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// get consumer's global configuration
checkDefault();
//
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
// 校验是否是泛化接口
setGeneric(getConsumer().getGeneric());
}

// 泛化接口处理逻辑
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}

// 映射文件处理
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
// 从系统变量中获取
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if (null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
}
}
}
// 核心配置设置
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 校验核心参数
checkApplication();
// 校验本地存根
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 很熟悉的代码,对此interface动态生成wrapper类
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 核心配置类的操作
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prefix = StringUtils.getServiceKey(map);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// 重试次数操作
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prefix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}

// host配置
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}

// 注册中心配置
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);

// 根据map内容生成proxy
ref = createProxy(map);
// 将ref interface 关联
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
// 存入ApplicationModel
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

这里的代码和服务导出的代码太像了。核心流程就是对map填充以及核心配置填充以及校验,这里大胆猜测,这个map同样是为了组装url。

后续的核心逻辑集中在createProxy

Dubbo源码分析7 Dubbo架构

发表于 2019-08-05 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo框架设计

官方图例

框架

分析了这么久,一直没有从整体框架看下Dubbo。没有阅读源码前,对于一个我这样一个Dubbo使用者,是迫不及待去阅读支撑它的基层实现,对于宏观的架构还没有太大的兴趣。但是在阅读了Dubbo SPI,服务导出之后,第一感觉是整个代码是有序的,但是对于阅读者涉及类太多,分支也比较多,我觉得再继续这样阅读下去会没有一个整体的概念,导致各模块不能在脑中好好组装起来,所以必须要停下来,看下我已经阅读的部分在整个框架中的位置,以及与上下层的衔接。

分层

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类

  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory

  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService

  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance

  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService

  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter

  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

从官网摘抄下来的部分,回顾之前的阅读的部分,集中在provider端的transport、exchange、protocol、registry、proxy部分。

123…7

shiyan

31 日志
4 分类
14 标签
GitHub E-Mail
© 2017 — 2020 shiyan
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.4