源码部分只包含重点代码,其余代码忽略

线程与线程池

thread

1
2
3
4
5
6
7
8
9
10
11
/**
* Thread类实现了Runnable接口,所以说线程可以通过继承Thread来实现。
*/
public class Thread implements Runnable {
// 守护线程
private boolean daemon = false;
// 要被执行的方法
private Runnable target;
// 与本线程有关的ThreadLocal值,该映射由ThreadLocal类维护,注意这一点!这里是关于ThreadLocal为什么会内存泄漏的原因之一
ThreadLocal.ThreadLocalMap threadLocals = null;
}

ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ThreadLocal<T> {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}


/**
* ThreadLocal 维护在它自己内部类ThreadLocalMap中,这是一个类似HashMap的数据结构。
* 其内部 Entry 对象继承了弱引用类型 WeakReference 也就是说这个对象会在下一次GC时被回收。注意这里是ThreadLocal为什么会内存泄漏之二。
*/
static class ThreadLocalMap {
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
}

正如代码注释山标记了两个“注意”,ThreadLocalMap在Thread中存在一个强引用链,而在ThreadLocal中则是一个弱引用链。如果此时如下图所示,发生了GC会导致弱引用中的Key值被回收,而此时当前线程并没有停止,所以强引用链依旧存在,但是并不能通过get方法获取到value(因为key被回收变成了null)。所以value会一直存在于内存中(如果当前线程一直存活),最后导致内存泄漏。

关于ThreadLocal的使用技巧以及如何保证在使用时不会被过早的GC可以参考这篇文章

JDK建议将ThreadLocal变量定义成private static的,这样的话ThreadLocal的生命周期就更长,由于一直存在ThreadLocal的强引用,所以ThreadLocal也就不会被回收,也就能保证任何时候都能根据ThreadLocal的弱引用访问到Entry的value值,然后remove它,防止内存泄露。

ExecutorService 线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// 工作队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程对象
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 线程对象
* 继承了AQS,说明它自生是带有锁机制的,他所代表的线程状态(即work自己的状态)都是AQS机制的
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker AQS 初始状态
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);
}
// 当前有效的线程数 & 当前线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// workerCountOf() 获取当前线程数,如果线程数小于核心线程数,则创建新线程(Worker)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get(); // 执行到这里说明addWorker()操作失败,比如被其他command抢先拉满了核心线程数。
}
// isRunning() 获取当前线程池状态是否为执行中,并且加入工作队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 如果当前线程池不在执行状态,但是任务从工作队列移除成功了,则调用拒绝策略
reject(command);
// 如果工作线程总数为0,就创建新工作线程(至少启动一个线程。)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)) // 到这说明队列满了,那么就创建扩展线程执行,注意addWorker()第二个参数。
// 到这说明用拓展线程也失败了,则调用拒绝策略
reject(command);
}

// 创建执行线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 重试点
// CAS 操作
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果 线程池 停止 并且 没有新任务 队列空 则返回 (说白了就是线程停止并且完全没事干了)
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
// 当前工作线程数大于容量(CAPACITY 表示线程池理论容量)或者大于核心/最大设置线程数时 返回
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 添加线程数。成功 则跳出循环(跳出到最外层循环)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // cas 判断当前线程状态是否与上次的状态一致,不一致则到最外层重试。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化一个工作对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把worker对象加到workers链中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
/**
* 如果添加成功 那就开始调用 worker的start()方法,后续这个线程会从工作队列中获取任务自己执行。
* 可以看Worker的run方法,调用的runWorker(Worker this),以及 getTask()从队列中获取任务。
*/
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}


}

由此可见:

  1. 线程池初始化线程其实超麻烦的,所以在一些核心场景下可以考虑优先初始化热身(把线程拉满)
  2. keepAliveTime是表示 当前线程池有任务要执行,但是现在开启的线程池太多了,导致一些现场始终抢不到任务,所以超过这个时间没抢到任务的线程就会被回收
  3. 任务被执行完后(工作队列没任务了),则会主动调取processWorkerExit()方法来回出发回收线程操作。并且会保证只存活核心线程数或者只有一个线程(ONLY_ONE)

附上线程池工作流程图:

线程池工作流程.png线程池工作流程.png

AQS

AQS -> AbstractQueuedSynchronizer 是一种实现锁和以来等待队列的同步器框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
// 用于标记线程被终止的状态值
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 用于标记线下需要被唤醒的状态值
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 用于标记线程为等待状态
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 表示可以传播下一个共享状态
static final int PROPAGATE = -3;
// 共享状态 使用 volatile 修饰 保证了状态在多线程场景下的可见性
volatile int waitStatus;

// Node 节点的两端引用
volatile Node prev;
volatile Node next;

// 当前node排队时的线程实例
volatile Thread thread;

Node nextWaiter;
}
}

AQS数据模型AQS数据模型

JUC

ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* 继承 AbstractMap 并实现 ConcurrentMap 接口
* AbstractMap 实现了 map 对象 的基本方法 (HashMap 也继承了这个抽象类)
* ConcurrentMap 定义了 并发 Map 的一些方法。
*/
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30; // 最大可能容量

private static final int DEFAULT_CAPACITY = 16; // 默认容量

static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 最大数组长度

/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 默认并发级别,在jdk1.8中有说明 不被使用,只是作为和之前的版本兼容

private static final float LOAD_FACTOR = 0.75f; // 默认负载因子

static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的阈值

static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

/**
* 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
* 当为负的时候,说明表正在初始化或扩张,
* -1表示初始化
* -(1+n) n:表示活动的扩张线程
*/
private transient volatile int sizeCtl;

/**
* Node 与 hashMap 一致
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
/**
* 红黑树节点
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}

transient volatile Node<K,V>[] table; // node 数组 2的幂,初始化懒载

private transient volatile Node<K,V>[] nextTable; // 扩容时用来使用


/**
* 先看 put 再看 get
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
// kv都不可以为null
if (key == null || value == null) throw new NullPointerException();
// 获取key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 死亡for循环!
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // table为空则初始化
tab = initTable(); // 这里不提供 initTable() 的源码 ,但需要知道的是 与 HashMap 不一样,这里的 initTable()使用的是CAS操作。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 首节点 cas 插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果正在迁移(resize ing..) 则进行 helpTransfer()方法进行插入。helpTransfer 此处不提供源码,原理是通过cas操作直接把node 插入 nextTable 里。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 常规加锁插入节点, 除了加锁 过程与 hashmap 大致相同
else {
V oldVal = null;
// 给当前节点加锁,所以锁粒度是节点,注意与1.7中segment的不同!
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

// get方法是无锁操作,所以它是支持并发的,但不代表线程安全(即 在时间序列上的读写请求顺序是不能保证顺序的,得看哪个线程先抢到锁)
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

}