Focus on Java


  • 首页

  • 标签

  • 分类

  • 归档

leetcode Wildcard Matching 题解 动态规划

发表于 2020-03-24 | 分类于 leetcode |

原题

1
2
3
4
5
6
7
8
9
10
Given an input string (s) and a pattern (p), implement wildcard pattern matching with support for '?' and '*'.

'?' Matches any single character.
'*' Matches any sequence of characters (including the empty sequence).
The matching should cover the entire input string (not partial).

Note:

s could be empty and contains only lowercase letters a-z.
p could be empty and contains only lowercase letters a-z, and characters like ? or *.

题意比较简单,就是 ? 表示任何单个字符,* 表示0到多个字符。类似于正则表达式

那么就可以看出来,

  • 如果 s[i] == p[j] ,那就看 s[i+1…n], p[j+1…m] 是否匹配

  • 如果 s[i] != p[j] && p[j] == ‘?’ 同上

  • 如果 s[i] != p[j] && p[j] == ‘*’ 这种比较复杂

    • 如果 * 表示空字符 则是否匹配取决于 s[i…n], p[j+1…m]是否匹配
    • 如果 * 表示和s[i]相等的字符,则是否匹配取决于 s[i+1…n], p[j+1…m] 是否匹配
    • 如果* 表示继续匹配s字符串之后的字符,则是否匹配取决于 s[i+1…n], p[j…m]
  • 如果 s[i] != p[j] && p[j] != ‘?’ && p[j] != ‘*’ 则不匹配

通过以上的关系,就可以很轻松写出来递归,回溯的代码,但是这样是过不了的,当p中有很多*字符,那么肯定是超时的,效率很低

效率低是因为其中回溯的做法,没有记住中间状态,导致回溯过程中重复大量比较,浪费了时间。

如果用一个二维数组记住中间状态,就可以空间换时间,节约较多时间。

上面的四种情况,等价于 一下的等式

  • if s[i] == p[j] then dp[i][j] = dp[i+1][j+1]

  • if s[i] != p[j] && p[j] == ‘?’ then 同上

  • if s[i] != p[j] && p[j] == ‘*’ then dp[i][j] = dp[i+1][j+1] || dp[i+1][j] || dp[i][j+1]

  • if s[i] != p[j] && p[j] != ‘?’ && p[j] != ‘*’ then dp[i][j] = false

那么从等式就可以看出来前一个结果和回一个结果之间的依赖关系。

用题中给的例子,来画个图,

初始化

初始化的过程比较简单,其中需要注意的是3G的位置,由于匹配的是*,因此这个位置依赖于4G的位置

之后根据初始化的值和之前的依赖等式,就可以推导出来其他位置的值

第一轮

其中红色箭头就是依赖关系,还是比较清晰的,之后依次持续推算

结果

从最后图中可以看出dp[0][0] = false,所以不匹配,动态规划的做法记录了每一次计算的中间状态,省去了可能的大量重复计算,速度比较快。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

class Solution {

public boolean isMatch(String s, String p) {
if (p.equals("")) {
return s.equals("");
}

return match(s.toCharArray(), s.length(), p.toCharArray(), p.length());
}

public boolean match(char[] s, int i, char[] p, int j) {
boolean[][] dp = new boolean[i+1][j+1];
dp[i][j] = true;
for (int jj = j-1; jj>=0; --jj) {
if (p[jj] == '*') {
dp[i][jj] = dp[i][jj+1];
}
}

for (int k=i-1; k>=0; --k) { // i
for (int t=j-1; t>=0; --t) { // j
if (s[k]== p[t] || p[t] == '?') {
dp[k][t] = dp[k+1][t+1];
} else {
if (p[t] == '*') {
dp[k][t] = dp[k+1][t+1] || dp[k][t+1] || dp[k+1][t];
} else {
dp[k][t] = false;
}
}
}
}

return dp[0][0];
}
}

价值投资的一些想法

发表于 2019-12-27 | 分类于 瞎写 |

炒股票好几年了,之前心里一直有个疑惑,炒股是怎么获取收益,又是获取谁的收益?

一开始是瞎买,竟然还赚了。后面开始研究各种技术指标,什么MACD,金叉死叉,KDJ,量价指标,等等等等。那个时候我觉得只要坚持这些指标买卖,我就能赚钱。可是事实似乎和我想得不打一样,感觉指标的走向是随机的,滞后的。

后来知道了股票回测这种技术,我专门写了回测代码,跑了最近几年的所有股票的数据。。。发现了一个事实。。。通过这些指标买卖股票,能否赚钱取决当时市场的氛围。。。如果市场处于牛市,那么怎么搞都是对的,反之怎么搞都是错的。。。看到这个结果我是有点意外的。

后来我尝试回测了打板的概率,这个我用了量价指标来算的,比如放量首板,然后追第二板这种模式来回测,这个模型比较简单,我就直接用了同花顺回测,发现盈利的概率只有40%多。。。这让我更迷惑了,炒股真的是一种完全随机游戏吗?类似掷硬币吗?

后来我了解了价值投资,防御型投资,知道了另一种理念,“好股好价”。仔细想了想,股票和债券之间的相似与不同。股票可以通过红利/股息获取收益,债券通过票面利率获取收益,但是股票是否有股息是取决于企业。债券到期之后付本息,票面的价格和利息是不变的,但是股票不一样,其票面价格会有很大的波动。

可以看到和债券相比,股票有两种方式获取收益,一个是价格上涨,一个是股息。但是其本质是一样的,股票能否盈利,和企业有着必然联系。好的企业,利润每年稳定增长,企业内在价值不断提升,通过给股东分红,回报股东,同时企业不断地发展带动其票面价格上涨。

所以,要想获取较高的收益,那一定要以合适或者较低的价格买入好企业的股票,并长期持有。核心因素,低价,好企业。

股票的价格是一个比较重要的因素,如果在一个企业高估时买入,这样其实是比较危险的,一般一个企业被高估都是因为其高速的增长,如果这个公司仍保持高速增长,那么股价将会维持高位,那还好,但是如果公司有一点闪失,你的利益将会大打折扣,可能你的本金没有损失,但是也要考虑到机会成本。

公司有高估就有低估的,有时候公司可能一个因为短期的事件,市场情绪或者市场偏见导致股价大幅下跌或者被低估,但是其本身的盈利能力或者核心价值没有改变,那就可以考虑考虑。

股票对标的企业是另外一个核心因素,如果长期持有股票,我们必须得确定这个公司能够活得很好,必须是家好企业,行业龙头,行业龙头有更大得机会保持稳定高速增长,并且其面临困境处理能力也会比一般公司强很多,这也意味着行业处于寒冬时,它依然可以过得不错。

好股好价是获得高收益的关键,但是好公司,龙头企业一般价格都不会低,作为一个没有多少专业能力的普通投资者,我就不奢望能以较低的价格买入这些好公司了,合理的价格买入就可以了,但是绝不能以高估的价格买入。这时候就有另一个问题,如何判断企业高估,这里面就比较复杂了,我也在不断学习。作为一个普通的投资者,一定要谦虚不可盲目,我个人不喜欢市盈率高于30的企业,我觉得其溢价太高,我的能力还不足以“把控”这种企业,但是有些行业其市盈率就普遍偏高,例如高科技行业,这也是为啥我科技股仓位极低,而且持有的也是一些相对较低PE的。或许我会错过,但是要清楚自己的定位,普通投资者,或者叫防御型投资者。

通过以上的分析,最开始的两个问题,就迎刃而解了。

ThreadPoolExecutor

发表于 2019-10-27 | 分类于 Java |

ThreadPoolExecutor

线程池有多种实现类,常用的例如 固定大小的FixedThreadPool, 可以复用线程的CachedThreadPool,可以定时的ScheduleThreadPool等等。

线程池结合Executor模式,最终提供了一套简单可用的线程池。

使用线程池一定要注意内存使用,内存的使用和构造ThreadPoolExecutor时候的参数使用有很大关系

查看ThreadPoolExecutor的构造函数可以看到几个核心参数

  • corePoolSize 核心线程数

  • maximumPoolSize 最大线程数

  • keepAliveTime 当一个线程任务完成后的存活时间(超过核心线程的部分)

  • unit keepAliveTime的单位

  • BlockingQueue 当核心线程使用完毕后,之后的任务将会放入到此队列

  • ThreadFactory 线程工厂类

  • RejectedExecutionHandler 当线程数量达到maximumPoolSize之后的拒绝策略

参数详解

其中核心线程数和最大线程数之间的关系也比较重要,核心线程是当任务提交之后一次性创建的,最大线程是队列满了之后继续创建线程的最大值。

流程如下:

一开始,池中将会创建corePoolSize个线程

之后陆陆续续提交任务,池中的corePoolSize个线程都开始了工作。但是任务还在陆陆续续进来,这时候,任务将会被存在BlockingQueue中

之后BlockingQueue也满了,这时候将会继续创建线程,来处理任务。任务还在继续,创建的线程的数量到达了maximumPoolSize的大小,这时,后续的任务将会被reject

所以在设置Queue以及maximumPoolSize参数就需要小心,防止不断存入队列或者最大线程池大小过大

举例

以二元组为一个状态,来看看这个过程中各个参数的变化

举例来说,corePoolSize=5, maximumPoolSize=10, Queue大小为3,任务数为30,前提为任务时间比较长

{corepoolsize, queue_size}

{0, 0} -> {5, 3} -> {6, 3} -> {7, 3} -> {8, 3} -> {9, 3} -> {10, 3}

13号之后的任务被拒绝

再假如 任务数为 12

这个数量变化为:

{corepoolsize, queue_size}

{0, 0} -> {5, 3} -> {6, 3} -> {7, 3} -> {8, 3} -> {9, 3} -> {9, 3}

可以看到是最终是队列满的,线程数量停留在9

Dubbo源码分析13 -InternalThreadLocal

发表于 2019-10-14 | 分类于 源码分析 |

Dubbo 内部实现了InternalThreadLocal 其读写速度比一般的要快很多

ThreadLocal

ThreadLocal的用来存储线程间局部变量。其内部维护了一个ThreadLocalMap,ThreadLocalMap是一个定制化的hashmap。每个Thread对应一个ThreadLocalMap,其原理可以参考 ThreadLocal

InternalThreadLocal 介绍

InternalThreadLocal用法和ThreadLocal基本一致。但是其内部的数据结构是不同的。

ThreadLocal内部是维护了ThreadLocalMap也就是一个hashmap,每一个ThreadLocal对应的slot都是需要经过hash计算出来,并且也要解决hash碰撞等等,是一种空间时间折中的方案。

InternalThreadLocal内部是维护一个ThreadLocalMap,而ThreadLocalMap是一个有序的数组,每次增加都会获取下一个slot的位置,然后填充,每次remove的时候将slot置为UNSET。但是InternalThreadLocal并没有对slot重新排位的操作,这就意味着如果remove的次数比较多,空间利用率会比较低,但是如果remove操作不多,那么将会带来极大速度提升,如果仅仅测试set/get,我的i5 8G 机器相比ThreadLocal大概有20倍提升。算是一种空间换取时间的方案。

InternalThreadLocal 源码解读

进入几个关键方法看下

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final V get() {
// 获取当前线程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 从数组中读取
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 初始化
return initialize(threadLocalMap);
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
throw new RuntimeException(e);
}

threadLocalMap.setIndexedVariable(index, v);
// 这一步是将所有的ThreadLocal对象存储到Set中,便于后续的removeAll操作
addToVariablesToRemove(threadLocalMap, this);
return v;
}

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
Set<InternalThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<InternalThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(VARIABLES_TO_REMOVE_INDEX, variablesToRemove);
} else {
variablesToRemove = (Set<InternalThreadLocal<?>>) v;
}

variablesToRemove.add(variable);
}

继续进入ThreadLocalMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof InternalThread) {
return fastGet((InternalThread) thread);
}
return slowGet();
}

private static InternalThreadLocalMap fastGet(InternalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}

private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = InternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}

从上面的代码看出来,InternalThread其实和ThreadLocalMap是一一对应的关系

其中get方法分了fastGet/slowGet

fastGet是通过InternalThreadLocal直接读取其内部变量InternalThreadLocalMap

slowGet通过原生的ThreadLocal中获取InternalThreadLocalMap

set

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void set(V value) {
// 如果设置为UNSET 相当于删除
if (value == null || value == InternalThreadLocalMap.UNSET) {
remove();
} else {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 设置到目标index 的值
if (threadLocalMap.setIndexedVariable(index, value)) {
// 加入到待删除的中,
addToVariablesToRemove(threadLocalMap, this);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// thread local map
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 扩展
expandIndexedVariableTableAndSet(index, value);
return true;
}
}

// 扩展数组
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity++;

Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}

到这里就明确了,InternalThreadLocal内部维护的其实是一个有序的数组

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}


public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

//从ThreadLocalMap中移除
Object v = threadLocalMap.removeIndexedVariable(index);

//从待移除的列表中移除
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
// 回调
onRemoval((V) v);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {

Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);

if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}

Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
1
2
3
4
5
6
7
8
9
10
11
// InternalThreadLocalMap
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}

remove操作就是把threadlocal在threadlocalmap中的数组置为UNSET

InternalThreadLocal中VARIABLES_TO_REMOVE_INDEX的变量,这个会存一个在InternalThreadLocalMap中的索引,这个索引对应了一个Set,而这个Set中存储了所有的非UNSET的InternlThreadLocal变量

这个Set主要是用在removeAll函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// InternalThreadLocalMap
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}

try {
// 获取到Set
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
if (v != null && v != InternalThreadLocalMap.UNSET) {
Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
// 转化为数组
InternalThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new InternalThreadLocal[variablesToRemove.size()]);
// 对每一个执行清空操作
for (InternalThreadLocal<?> tlv : variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
// 最后删除ThreadLocalMap操作
InternalThreadLocalMap.remove();
}
}

Dubbo源码分析12 -服务调用过程

发表于 2019-09-13 | 分类于 源码分析 |

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务调用过程

代理

回到最开始的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 客户端
public class Consumer {

public static void main(String[] args) {
//Prevent to get IPV6 address,this way only work in debug mode
//But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
context.start();
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
while (true) {
try {
Thread.sleep(1000);
String hello = demoService.sayHello("world"); // call remote method
System.out.println(hello); // get result

} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}

从context#getBean获取到的是动态生成的DemoService代理类。大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class proxy0
implements ClassGenerator.DC,
EchoService,
DemoService {
public static Method[] methods;
private InvocationHandler handler;

public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}

public proxy0() {
}

public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], arrobject);
return (String)object;
}

public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}

在调用sayHello方法时,此方法的核心部分为 this.handler.invoke 方法。这里的InvocationHandler对应的是InvokerInvocationHandler

进入InvokerInvocationHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class InvokerInvocationHandler implements InvocationHandler {

private final Invoker<?> invoker;

public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

}

进入MockClusterInvoker方法内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

public class MockClusterInvoker<T> implements Invoker<T> {

@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// 只做mock
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
// 失败后执行mock
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
}

主要分析no mock部分

对于invoker部分默认的是FailoverClusterInvoker

进入FailoverClusterInvoker,以及基类 AbstractClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

// 从字典中获取所有的invocation对应的invokers
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
// 如果是异步的,会向attachment中填入id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行子类方法
return doInvoke(invocation, invokers, loadbalance);
}

}


public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 如果不配置此参数,则默认3次
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
// 负载均衡,这个后续单独分析
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用invoker,此是对应的是DubboInvoker, 但是DubboInvoker被层层Wrapper包裹
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

}

进入DubboInvoker方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public abstract class AbstractInvoker<T> implements Invoker<T> {

@Override
public Result invoke(Invocation inv) throws RpcException {
// 是否被清理了
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
// 如果是异步的则存入id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

try {
// 调用基类方法
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
}


public class DubboInvoker<T> extends AbstractInvoker<T> {

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 单方向的话不需要响应和future
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步的话,需要存入future
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 同步双向的话需要响应
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

传输层

我们继续后续流程,之后的操作是到currentClient中。这里的currentClients是DubboProtocol#ref中初始化的

对应的是ReferenceCountExchangeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

回忆下DubboProtocol#ref方法中,在初始化共享客户端的时候

先是得到默认的HeaderExchanger,之后执行了connect方法,继续进入HeaderExchanger#connect方法中

```java
public class HeaderExchanger implements Exchanger {

public static final String NAME = "header";

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}

可以看到connect返回了HeaderExchangeClient,其中Transporters#connect方法返回是对应着默认的NettyTransporter#connect

1
2
3
4
5
6
7

public class NettyTransporter implements Transporter {
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}

最终得到了NettyClient

所以最后的request方法通过ReferenceCountExchangeClient -> HeaderExchangerClient

继续看HeaderExchangerClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class HeaderExchangeClient implements ExchangeClient {

private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
startHeartbeatTimer();
}
}

@Override
public ResponseFuture request(Object request) throws RemotingException {
return channel.request(request);
}

}

真正执行request的是ExchangeChannel

继续往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
final class HeaderExchangeChannel implements ExchangeChannel {

private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);

private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";

private final Channel channel;

private volatile boolean closed = false;

// 这里需要说明下,NettyClient是Channel的实现
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
this.channel = channel;
}

@Override
public void send(Object message) throws RemotingException {
send(message, getUrl().getParameter(Constants.SENT_KEY, false));
}

@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}

@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}

@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 通过channel 发送, 默认也就是对应的是NettyClient的send方法
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}

继续进入NettyClient以及基类方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267

public class NettyClient extends AbstractClient {

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
Constants.DEFAULT_IO_THREADS);
private ClientBootstrap bootstrap;

private volatile Channel channel; // volatile, please copy reference to use

// channelHandler 对应的是DecodeHandler
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// 调用基类方法。第二个形参是对handler的包装
super(url, wrapChannelHandler(url, handler));
}


@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());

// nettyHandler 对应的是ChannelHandler
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}

@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {


public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);

send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}

executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
// 包装消息分发
return ChannelHandlers.wrap(handler, url);
}


@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 调用子类的方法,获取到channel
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// 如果sent 为true 发送的时候就要等待timeout时间,看结果
channel.send(message, sent);
}

protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
// 初始化
initConnectStatusCheckCommand();
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}

public void disconnect() {
connectLock.lock();
try {
destroyConnectStatusCheckCommand();
try {
Channel channel = getChannel();
if (channel != null) {
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doDisConnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}

@Override
public void reconnect() throws RemotingException {
disconnect();
connect();
}

@Override
public void close() {
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}

@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}


}
12…7

shiyan

31 日志
4 分类
14 标签
GitHub E-Mail
© 2017 — 2020 shiyan
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.4