Dubbo源码分析13 -InternalThreadLocal

Dubbo 内部实现了InternalThreadLocal 其读写速度比一般的要快很多

ThreadLocal

ThreadLocal的用来存储线程间局部变量。其内部维护了一个ThreadLocalMap,ThreadLocalMap是一个定制化的hashmap。每个Thread对应一个ThreadLocalMap,其原理可以参考 ThreadLocal

InternalThreadLocal 介绍

InternalThreadLocal用法和ThreadLocal基本一致。但是其内部的数据结构是不同的。

ThreadLocal内部是维护了ThreadLocalMap也就是一个hashmap,每一个ThreadLocal对应的slot都是需要经过hash计算出来,并且也要解决hash碰撞等等,是一种空间时间折中的方案。

InternalThreadLocal内部是维护一个ThreadLocalMap,而ThreadLocalMap是一个有序的数组,每次增加都会获取下一个slot的位置,然后填充,每次remove的时候将slot置为UNSET。但是InternalThreadLocal并没有对slot重新排位的操作,这就意味着如果remove的次数比较多,空间利用率会比较低,但是如果remove操作不多,那么将会带来极大速度提升,如果仅仅测试set/get,我的i5 8G 机器相比ThreadLocal大概有20倍提升。算是一种空间换取时间的方案。

InternalThreadLocal 源码解读

进入几个关键方法看下

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final V get() {
// 获取当前线程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 从数组中读取
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 初始化
return initialize(threadLocalMap);
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
throw new RuntimeException(e);
}

threadLocalMap.setIndexedVariable(index, v);
// 这一步是将所有的ThreadLocal对象存储到Set中,便于后续的removeAll操作
addToVariablesToRemove(threadLocalMap, this);
return v;
}

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
Set<InternalThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<InternalThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(VARIABLES_TO_REMOVE_INDEX, variablesToRemove);
} else {
variablesToRemove = (Set<InternalThreadLocal<?>>) v;
}

variablesToRemove.add(variable);
}

继续进入ThreadLocalMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof InternalThread) {
return fastGet((InternalThread) thread);
}
return slowGet();
}

private static InternalThreadLocalMap fastGet(InternalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}

private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = InternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}

从上面的代码看出来,InternalThread其实和ThreadLocalMap是一一对应的关系

其中get方法分了fastGet/slowGet

fastGet是通过InternalThreadLocal直接读取其内部变量InternalThreadLocalMap

slowGet通过原生的ThreadLocal中获取InternalThreadLocalMap

set

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void set(V value) {
// 如果设置为UNSET 相当于删除
if (value == null || value == InternalThreadLocalMap.UNSET) {
remove();
} else {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 设置到目标index 的值
if (threadLocalMap.setIndexedVariable(index, value)) {
// 加入到待删除的中,
addToVariablesToRemove(threadLocalMap, this);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// thread local map
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 扩展
expandIndexedVariableTableAndSet(index, value);
return true;
}
}

// 扩展数组
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity++;

Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}

到这里就明确了,InternalThreadLocal内部维护的其实是一个有序的数组

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}


public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

//从ThreadLocalMap中移除
Object v = threadLocalMap.removeIndexedVariable(index);

//从待移除的列表中移除
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
// 回调
onRemoval((V) v);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {

Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);

if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}

Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
1
2
3
4
5
6
7
8
9
10
11
// InternalThreadLocalMap
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}

remove操作就是把threadlocal在threadlocalmap中的数组置为UNSET

InternalThreadLocal中VARIABLES_TO_REMOVE_INDEX的变量,这个会存一个在InternalThreadLocalMap中的索引,这个索引对应了一个Set,而这个Set中存储了所有的非UNSET的InternlThreadLocal变量

这个Set主要是用在removeAll函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// InternalThreadLocalMap
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}

try {
// 获取到Set
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
if (v != null && v != InternalThreadLocalMap.UNSET) {
Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
// 转化为数组
InternalThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new InternalThreadLocal[variablesToRemove.size()]);
// 对每一个执行清空操作
for (InternalThreadLocal<?> tlv : variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
// 最后删除ThreadLocalMap操作
InternalThreadLocalMap.remove();
}
}