"生产者-消费者"场景

在 “生产者-消费者” 场景中,队列通常被视作线程间操作的数据容器,这样,可以对各个模块的业务功能进行解耦,生产者将“生产”出来的数据放置在数据容器中,而消费者仅仅只需要在“数据容器”中进行获取数据即可,这样生产者线程和消费者线程就能够进行解耦,只专注于自己的业务功能即可。

而阻塞队列(BlockingQueue)就被广泛使用在“生产者-消费者”场景中。原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

BlockingQueue 基本操作

BlockingQueue基本操作总结如下:

Throws exception Special value(true / false) Blocks Time out
Insert add(e) offer(e) put(e) offer(e,time,unit)
Remove remove() poll() take() poll(time,unit)
Examine element() peek() [not applicable] [not applicable]

BlockingQueue继承于Queue接口

插入队列元素

  • add(E e):boolean 往队列插入数据,当队列满时,插入元素时 会抛出IllegalStateException异常
  • offer(E e):boolean 当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时 不会抛出异常
  • put(E e):void 当阻塞队列容量已经满时,往阻塞队列插入数据的线程 会被阻塞,直至阻塞队列已经有空余的容量可供使用
  • offer(E e, long timeout, TimeUnit unit):boolean 若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出

删除元素

  • remove(Object o):boolean 从队列中删除数据,成功则返回true,否则为false
  • remove():Object 从队列中删除数据,成功则返回元素对象,否则 会抛出NoSuchElementException异常
  • poll:Object 删除并返回数据,当队列为空时,返回null
  • poll(long timeout, TimeUnit unit):Object 删除并返回数据,当队列为空时,返回null
  • take():Object 删除并返回数据,当阻塞队列为空时,获取队头数据的线程 会被阻塞

查看元素

  • element:Object 获取队头元素,如果队列为空时则 抛出NoSuchElementException异常
  • peek:Object 获取队头元素,如果队列为空 则返回null

特点

  • 不接受 null 元素
    试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

  • 可以是限定容量的
    它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

  • 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。
    因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

  • 实现是线程安全的
    所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

常用的BlockingQueue实现类

实现BlockingQueue接口的有 ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

ArrayBlockingQueue

一个由数组支持的有界阻塞队列。
此队列按 FIFO(先进先出)原则对元素进行排序。
队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。
新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”

1
private BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(10,true);

DelayQueue 延迟队列

Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。
该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素

延迟队列示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* @author TheEmbers
* @version 1.0
* createTime 2018-09-11 12:23
*/
public class DelayQueueDemo {
static class DelayedTask implements Delayed {
public Long delayTime; // -> 阻塞时间
public TimeUnit delayTimeUnit; // -> 阻塞时间单位
public Long executeTime;//ms // -> 执行时间

public DelayedTask(long delayTime, TimeUnit delayTimeUnit) {
this.delayTime = delayTime;
this.delayTimeUnit = delayTimeUnit;
this.executeTime = System.currentTimeMillis() + delayTimeUnit.toMillis(delayTime); // -> 系统当前时间 + 阻塞时间
}

/**
* 计算当前时间到执行时间之间还有多长时间
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* 判断队列中元素的顺序谁前谁后。当前元素比队列元素后执行时,返回一个正数,比它先执行时返回一个负数,否则返回0
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
}
return 0;
}

@Override
public String toString() {
return "DelayedTask{" +
"delayTime=" + delayTime +
", delayTimeUnit=" + delayTimeUnit +
", executeTime=" + executeTime +
'}';
}
}

public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.add(new DelayedTask(4L, TimeUnit.SECONDS));
queue.add(new DelayedTask(2L, TimeUnit.SECONDS));
queue.add(new DelayedTask(3L, TimeUnit.SECONDS));

System.out.println("queue put done");

while (!queue.isEmpty()) {
try {
DelayedTask task = queue.take();
System.out.println(task.toString());

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 输出: 注意输出顺序与插入顺序的区别
// queue put done
// DelayedTask{delayTime=2, delayTimeUnit=SECONDS, executeTime=1536640039477}
// DelayedTask{delayTime=3, delayTimeUnit=SECONDS, executeTime=1536640040477}
// DelayedTask{delayTime=4, delayTimeUnit=SECONDS, executeTime=1536640041477}
}

LinkedBlockingQueue 链阻塞队列

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

PriorityBlockingQueue 优先级阻塞队列

一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。

此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。

在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。

SynchronousQueue 同步队列

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

BlockingDeque 阻塞双端队列

一个基于已链接节点的、任选范围的阻塞双端队列。
可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。
大多数操作都以固定时间运行(不计阻塞消耗的时间)。异常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操作,它们均以线性时间运行。