1.从源码全面解析 LinkedBlockingQueue的来龙去脉
2.还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了
3.LinkedBlockingQueue
4.LinkedBlockingDeque
5.阿里P9整理Java 高频面试题聊一聊 JUC 下的 LinkedBlockingQueue
6.喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。
从源码全面解析 LinkedBlockingQueue的来龙去脉
并发编程是互联网技术的核心,面试官常在此领域对求职者进行深入考察。为了帮助读者在面试中占据优势,courrenthashmap源码本文将解析 LinkedBlockingQueue 的工作原理。
阻塞队列是并发编程中常见的数据结构,它在生产者和消费者模型中扮演重要角色。生产者负责向队列中添加元素,而消费者则从队列中取出元素。LinkedBlockingQueue 是 Java 中的一种高效阻塞队列实现,它底层基于链表结构。
在初始化阶段,LinkedBlockingQueue 不需要指定队列大小。除了基本成员变量,它还包含两把锁,分别用于读取和写入操作。有读者疑惑,为何需要两把锁,而其他队列只用一把?本文后续将揭晓答案。
生产者使用 `add()`、`offer()`、`offer(time)` 和 `put()` 方法向队列中添加元素。消费者则通过 `remove()`、医疗源码`poll()`、`poll(time)` 和 `take()` 方法从队列中获取元素。
在解析源码时,发现 LinkedBlockingQueue 与 ArrayBlockingQueue 在锁的使用上有所不同。ArrayBlockingQueue 使用互斥锁,而 LinkedBlockingQueue 使用读锁和写锁。这是否意味着 ArrayBlockingQueue 可以使用相同类型的锁?答案是肯定的,且使用两把锁的 ArrayBlockingQueue 在性能上有所提升。
流程图展示了 LinkedBlockingQueue 和 ArrayBlockingQueue 之间的相似之处。有兴趣的读者可以自行绘制。
总结而言,LinkedBlockingQueue 是一种高效的阻塞队列实现,其底层结构基于链表。它通过读锁和写锁管理线程安全,为生产者和消费者提供了并发支持。通过优化锁的使用,LinkedBlockingQueue 在某些场景下展现出更好的性能。
互联网寒冬虽在,但学习和分享是抵御寒冬的最佳方式。通过交流经验,可以减少弯路,提高效率。如果你对后端架构和中间件源码感兴趣,欢迎与我交流,彩38源码共同进步。
还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了
引言
本文将详细解读Java中常见的5种BlockingQueue阻塞队列,包括它们的优缺点、区别以及典型应用场景,以帮助深入理解这5种队列的独特性质和使用场合。
常见的BlockingQueue有以下5种:
1. **基于数组实现的阻塞队列**:创建时需指定容量大小,是有限队列。
2. **基于链表实现的阻塞队列**:默认无界,可自定义容量。
3. **无缓冲阻塞队列**:生产的数据需立即被消费,无缓冲。
4. **优先级阻塞队列**:支持元素按照大小排序,无界。
5. **延迟阻塞队列**:基于PriorityQueue实现,无界。
**BlockingQueue简介
**BlockingQueue作为接口,定义了放数据和取数据的多组方法,适用于并发多线程环境,特别适合生产者-消费者模式。
**应用场景
**BlockingQueue的作用类似于消息队列,用于解耦、异步处理和削峰,适用于线程池的编译源码包核心功能实现。
**区别与比较
**- **ArrayBlockingQueue**:基于数组实现,容量可自定义。
- **LinkedBlockingQueue**:基于链表实现,无界或自定义容量。
- **SynchronousQueue**:同步队列,生产者和消费者直接交互,无需缓冲。
- **PriorityBlockingQueue**:实现优先级排序,无界队列。
- **DelayQueue**:本地延迟队列,支持元素延迟执行。
在选择使用哪种队列时,需考虑具体任务的特性、吞吐量需求以及是否需要优先级排序或延迟执行。
本文旨在提供全面理解Java中BlockingQueue的指南,从源码剖析到应用场景,帮助开发者更好地应用这些工具于实际项目中。
LinkedBlockingQueue
LinkedBlockingDequeå¨ç»æä¸æå«äºä¹å讲解è¿çé»å¡éåï¼å®ä¸æ¯Queueèæ¯Dequeï¼ä¸æç¿»è¯æå端éåï¼å端éåæå¯ä»¥ä»ä»»æä¸ç«¯å ¥éæè åºéå ç´ çéåï¼å®ç°äºå¨éå头åéåå°¾çé«ææå ¥å移é¤LinkedBlockingDequeæ¯é¾è¡¨å®ç°ç线ç¨å®å ¨çæ ççåæ¶æ¯æFIFOãLIFOçå端é»å¡éåï¼å¯ä»¥å顾ä¸ä¹åçLinkedBlockingQueueé»å¡éåç¹ç¹ï¼æ¬è´¨ä¸æ¯ç±»ä¼¼çï¼ä½æ¯åæäºä¸åï¼
QueueåDequeçå ³ç³»æç¹ç±»ä¼¼äºåé¾è¡¨åååé¾è¡¨ï¼LinkedBlockingQueueåLinkedBlockingDequeçå é¨ç»ç¹å®ç°å°±æ¯åé¾è¡¨åååé¾è¡¨çåºå«ï¼å ·ä½å¯åèæºç ã
å¨ç¬¬äºç¹ä¸å¯è½æäºäººæäºçé®ï¼ä¸¤ä¸ªäºæ¥éåä¸ä¸ªäºæ¥éçåºå«å¨åªéï¼æ们å¯ä»¥èè以ä¸åºæ¯ï¼
A线ç¨å è¿è¡å ¥éæä½ï¼B线ç¨éåè¿è¡åºéæä½ï¼å¦ææ¯LinkedBlockingQueueï¼A线ç¨å ¥éè¿ç¨è¿æªç»æï¼å·²è·å¾éè¿æªéæ¾ï¼ï¼B线ç¨åºéæä½ä¸ä¼è¢«é»å¡çå¾ ï¼éä¸åï¼ï¼å¦ææ¯LinkedBlockingDequeåB线ç¨ä¼è¢«é»å¡çå¾ ï¼åä¸æéï¼A线ç¨å®ææä½æ继ç»æ§è¡
LinkedBlockingQueueä¸è¬çæä½æ¯è·åä¸æéå°±å¯ä»¥ï¼ä½æäºæä½ä¾å¦removeæä½ï¼åéè¦åæ¶è·å两æéï¼ä¹åçLinkedBlockingQueue讲解æ¾ç»è¯´æè¿
LinkedBlockingQueue ç±äºæ¯åé¾è¡¨ç»æï¼åªè½ä¸ç«¯æä½ï¼è¯»åªè½å¨å¤´ï¼ååªè½å¨å°¾ï¼å æ¤ä¸¤æéæçæ´é«ãLinkedBlockingDeque ç±äºæ¯åé¾è¡¨ç»æï¼ä¸¤ç«¯å¤´å°¾é½è½è¯»åï¼å æ¤åªè½ç¨ä¸æéä¿è¯ååæ§ã å½ç¶æçä¹å°±æ´ä½
ArrayBlockingQueue
LinkedBlockingQueue
é®é¢ï¼ä¸ºä»ä¹ArrayBlockingQueue ä¸è½ç¨ä¸¤æé
å 为ååºåï¼ArrayBlockingQueue çå ç´ éè¦åå移å¨ã
LinkedBlockingQueueå é¨ç±åé¾è¡¨å®ç°ï¼åªè½ä»headåå ç´ ï¼ä»tailæ·»å å ç´ ãæ·»å å ç´ åè·åå ç´ é½æç¬ç«çéï¼ä¹å°±æ¯è¯´LinkedBlockingQueueæ¯è¯»åå离çï¼è¯»åæä½å¯ä»¥å¹¶è¡æ§è¡ãLinkedBlockingQueueéç¨å¯éå ¥é(ReentrantLock)æ¥ä¿è¯å¨å¹¶åæ åµä¸ç线ç¨å®å ¨ã
LinkedBlockingQueueä¸å ±æä¸ä¸ªæé å¨ï¼åå«æ¯æ åæé å¨ãå¯ä»¥æå®å®¹éçæé å¨ãå¯ä»¥ç©¿å ¥ä¸ä¸ªå®¹å¨çæé å¨ãå¦æå¨å建å®ä¾çæ¶åè°ç¨çæ¯æ åæé å¨ï¼LinkedBlockingQueueçé»è®¤å®¹éæ¯Integer.MAX_VALUEï¼è¿æ ·åå¾å¯è½ä¼å¯¼è´éåè¿æ²¡æ满ï¼ä½æ¯å åå´å·²ç»æ»¡äºçæ åµï¼å å溢åºï¼ã
size()æ¹æ³ä¼éåæ´ä¸ªéåï¼æ¶é´å¤æ度为O(n),æ以æ好éç¨isEmtpy
1.å¤æå ç´ æ¯å¦ä¸ºnullï¼ä¸ºnullæåºå¼å¸¸
2.å é(å¯ä¸æé)
3.å¤æéåé¿åº¦æ¯å¦å°è¾¾å®¹éï¼å¦æå°è¾¾ä¸ç´çå¾
4.å¦æ没æé满ï¼enqueue()å¨éå°¾å å ¥å ç´
5.éåé¿åº¦å 1ï¼æ¤æ¶å¦æéåè¿æ²¡æ满ï¼è°ç¨signalå¤éå ¶ä»å µå¡éå
1.å é(ä¾æ§æ¯ReentrantLock)ï¼æ³¨æè¿éçéååå ¥æ¯ä¸åç两æé
2.å¤æéåæ¯å¦ä¸ºç©ºï¼å¦æ为空就ä¸ç´çå¾
3.éè¿dequeueæ¹æ³åå¾æ°æ®
3.åèµ°å ç´ åéåæ¯å¦ä¸ºç©ºï¼å¦æä¸ä¸ºç©ºå¤éå ¶ä»çå¾ ä¸çéå
åçï¼å¨éå°¾æå ¥ä¸ä¸ªå ç´ ï¼ å¦æéå没满ï¼ç«å³è¿åtrueï¼ å¦æéå满äºï¼ç«å³è¿åfalseã
åçï¼å¦æ没æå ç´ ï¼ç´æ¥è¿ånullï¼å¦ææå ç´ ï¼åºé
1ãå ·ä½å ¥éä¸åºéçåçå¾ï¼
å¾ä¸æ¯ä¸ä¸ªèç¹ååé¨å表示å°è£ çæ°æ®xï¼åè¾¹ç表示æåçä¸ä¸ä¸ªå¼ç¨ã
1.1ãåå§å
åå§åä¹åï¼åå§åä¸ä¸ªæ°æ®ä¸ºnullï¼ä¸headålastèç¹é½æ¯è¿ä¸ªèç¹ã
1.2ãå ¥é两个å ç´ è¿å
1.3ãåºéä¸ä¸ªå ç´ å
表é¢ä¸çï¼åªæ¯å°å¤´èç¹çnextæéæåäºè¦å é¤çx1.nextï¼äºå®ä¸è¿æ ·æè§çå°±å®å ¨å¯ä»¥ï¼ä½æ¯jdkå®é ä¸æ¯å°åæ¥çheadèç¹å é¤äºï¼èä¸è¾¹çå°çè¿ä¸ªheadèç¹ï¼æ£æ¯åååºéçx1èç¹ï¼åªæ¯å ¶å¼è¢«ç½®ç©ºäºã
2ãä¸ç§å ¥é对æ¯ï¼
3ãä¸ç§åºé对æ¯ï¼
LinkedBlockingDeque
LinkedBlockingDequeï¼ ç±ååé¾è¡¨ç»æçæçé»å¡éåï¼éå容é大å°å¯éï¼é»è®¤å¤§å°ä¸ºInteger.MAX_VALUEãé头é¨åéå°¾é½å¯ä»¥åå ¥å移é¤å ç´ ï¼å 为å¤äºä¸ä¸ªæä½éåçå ¥å£ï¼å¨å¤çº¿ç¨åæ¶å ¥éæ¶ï¼ä¹å°±åå°äºä¸åéçç«äº ãLinkedBlockingDequeæ¯ååé¾è¡¨å®ç°çé»å¡éåã该é»å¡éååæ¶æ¯æFIFOåFILO两ç§æä½æ¹å¼ï¼å³å¯ä»¥ä»éåç头åå°¾åæ¶æä½(æå ¥/å é¤)ï¼
å¨ä¸è½å¤æå ¥å ç´ æ¶ï¼å®å°é»å¡ä½è¯å¾æå ¥å ç´ ç线ç¨ï¼å¨ä¸è½å¤æ½åå ç´ æ¶ï¼å®å°é»å¡ä½è¯å¾æ½åç线ç¨ãï¼
LinkedBlockingDequeè¿æ¯å¯é容éç,é²æ¢è¿åº¦è¨èï¼é»è®¤çäºInteger.MAX_VALUEãï¼
LinkedBlockingDuque没æè¿è¡è¯»åéçå离ï¼å æ¤åä¸æ¶é´åªè½æä¸ä¸ªçº¿ç¨å¯¹å ¶æä½ï¼å æ¤å¨é«å¹¶ååºç¨ä¸ï¼å®çæ§è½è¦è¿è¿ä½äºLinkedBlockingQueueã
Dequeç¹æ§: é头åéå°¾é½å¯ä»¥æå ¥å移é¤å ç´ ï¼æ¯æFIFOåFILOã
ç¸æ¯äºå ¶ä»é»å¡éåï¼LinkedBlockingDequeå¤äºaddFirst()ãaddLast()ãpeekFirst()ãpeekLast()çæ¹æ³ï¼ä»¥XXXFirstç»å°¾çæ¹æ³ï¼è¡¨ç¤ºæå ¥ãè·åè·ç§»é¤å端éåçé头å ç´ ã以xxxLastç»å°¾çæ¹æ³ï¼è¡¨ç¤ºæå ¥ãè·åè·ç§»é¤å端éåçéå°¾å ç´ ã
å设:æå¤ä¸ªæ¶è´¹è ï¼æ¯ä¸ªæ¶è´¹è æèªå·±çä¸ä¸ªæ¶æ¯éåï¼ç产è ä¸æçç产æ°æ®æå°éåä¸ï¼æ¶è´¹è æ¶è´¹æ°æ®æå¿«ææ ¢ã为äºæåæçï¼é度快çæ¶è´¹è å¯ä»¥ä»å ¶å®æ¶è´¹è éåçéå°¾åºéå ç´ æ¾å°èªå·±çæ¶æ¯éåä¸ï¼ç±äºæ¯ä»å ¶å®éåçéå°¾åºéï¼è¿æ ·å¯ä»¥åå°å¹¶åå²çªï¼å ¶å®æ¶è´¹è ä»éé¦åºéå ç´ ï¼ï¼åè½æåæ´ä¸ªç³»ç»çååéãè¿å ¶å®æ¯ä¸ç§âå·¥ä½çªåç®æ³âçæè·¯ã
BlockingDequeç¸å¯¹äºBlockingQueueï¼æ大çç¹ç¹å°±æ¯å¢å äºå¨éé¦å ¥é/éå°¾åºéçé»å¡æ¹æ³ãä¸é¢æ¯ä¸¤ä¸ªæ¥å£çæ¯è¾ï¼
å¯ä»¥çfirstæåéé¦èç¹ï¼lastæåéå°¾èç¹ãå©ç¨ReentrantLockæ¥ä¿è¯çº¿ç¨å®å ¨ï¼ææ对éåçä¿®æ¹æä½é½éè¦å è·åå ¨å±é
åå§ï¼
éé¦æå ¥èç¹nodeï¼
åå§ï¼
éå°¾æå ¥èç¹nodeï¼
å¯ååºä¸æé»å¡å¼éé¦å ¥é-void putFirst(E e)
å¯ååºä¸æé»å¡å¼éå°¾å ¥é-void putLast(E e)
éé»å¡å¼éé¦å ¥é-boolean offerFirst(E e)
éé»å¡å¼éå°¾å ¥é-boolean offerLast(E e)
å¯ååºä¸æéæ¶é»å¡éé¦å ¥é-boolean offerFirst(E e, long timeout, TimeUnit unit)
å¯ååºä¸æéæ¶é»å¡éå°¾å ¥é-boolean offerLast(E e, long timeout, TimeUnit unit)
éé»å¡éé¦å ¥é(æå¼å¸¸)-void addFirst(E e)
éé»å¡éå°¾å ¥é(æå¼å¸¸)-void addLast(E e)
éé»å¡å¼éå°¾å ¥é-boolean add(E e)
å¯ååºä¸æé»å¡å¼éå°¾å ¥é-void put(E e)
éé»å¡å¼éå°¾å ¥é-boolean offer(E e)
å¯ååºä¸æéæ¶é»å¡éå°¾å ¥é-boolean offer(E,long,TimeUnit)
åå§ï¼
å é¤éé¦èç¹ï¼
ä»éå°¾åºélastèç¹ï¼ä¸å®ä¼å°prevæéæåèªèº«ï¼ä»¥åºå«äºéåºéæ¶lastæåå ·ä½èç¹(å³prevæéä¸ä¼æåèªèº«ï¼å¯è½ä¸ºçå®èç¹ænullï¼ã
åå§ï¼
å é¤éå°¾èç¹ï¼
åºéæ ¸å¿æ¹æ³-void unlink(Node x)
å¯ååºä¸æé»å¡å¼éé¦åºé-E takeFirst()
å¯ååºä¸æé»å¡å¼éå°¾åºé-E takeLast()
éé»å¡å¼éé¦åºé-E pollFirst()
éé»å¡å¼éå°¾åºé-E pollLast()
å¯ååºä¸æéæ¶é»å¡éé¦åºé- E pollFirst(long timeout, TimeUnit unit)
å¯ååºä¸æéæ¶é»å¡éå°¾åºé- E pollLast(long timeout, TimeUnit unit)
ä¸ç§»é¤å ç´ éé¦åºé-E peekFirst()
ä¸ç§»é¤å ç´ éå°¾åºé-E peekLast()
éé»å¡éé¦åºé(æå¼å¸¸)-E removeFirst()
éé»å¡éå°¾åºé(æå¼å¸¸)âE removeLast()
ä»éé¦åå移é¤æå®å ç´ -boolean removeFirstOccurrence(Object o)
ä»éå°¾åå移é¤æå®å ç´ -boolean removeLastOccurrence(Object o)
å¯ååºä¸æé»å¡å¼éé¦åºé-E take()
éé»å¡å¼åºé-E poll()
é»å¡å¼è¶ æ¶åºé-E poll(timeout, unit)
é»å¡å¼åºé-E peek()
移é¤å ç´ - E remove()
æ¤è¿ä»£å¨æ¯å¼±ä¸è´æ§çãå 为å³ä½¿èç¹è¢«å é¤ï¼ è¿ä»£å¨ä¹ä¼ç §æ ·è¿å被å é¤èç¹çitem ã
æ£åè¿ä»£å¨åååè¿ä»£å¨ åªéè¦å®ç°2个æ½è±¡æ¹æ³ã
LinkedBlockingDequeæ¯ä¸ä¸ª æ çé»å¡å端éå ï¼ç¸æ¯æ®ééåï¼ä¸»è¦æ¯å¤äºãéå°¾åºéå ç´ ã/ãéé¦å ¥éå ç´ ãçåè½ã
阿里P9整理Java 高频面试题聊一聊 JUC 下的 LinkedBlockingQueue
本文将深入探讨Java并发包(JUC)中的LinkedBlockingQueue队列,首先介绍LinkedBlockingQueue的主要特性和实现方式。
LinkedBlockingQueue是一个双端队列,其继承自AbstractQueue类,并实现了BlockingQueue接口。它具有以下特性:
1. LinkedBlockingQueue允许线程安全地向队列中添加或删除元素。
2. 队列中的比鸡源码元素按插入顺序进行存储。
3. 队列的容量可以设置。
4. 支持元素的等待和通知操作。
5. 可以被多个线程安全地访问。
LinkedBlockingQueue的常用方法有put()和take()。下面将分别解析这两个方法的实现。
put()方法用于将元素添加到队列中。其核心逻辑如下:
1. 当队列未满时,直接将元素添加到队尾。
2. 当队列已满时,调用线程将进入等待状态,等待队列中的元素被消费或队列容量增加。
3. 当有元素被消费或队列容量增加时,等待线程将被唤醒并继续执行。
offer()方法类似于put()方法,但不阻塞线程,如果队列已满,则返回false。
take()方法用于从队列中获取元素。其核心逻辑如下:
1. 当队列为空时,调用线程将进入等待状态,等待队列中有元素被添加。
2. 当队列中有元素时,线程将被唤醒并获取队列中的元素。
3. 如果需要,可以设置超时等待时间。
LinkedBlockingQueue的其他方法,如poll()、peek()等,实现方式与put()和take()类似,提供不同类型的获取操作。
总结,LinkedBlockingQueue是一个功能强大的并发队列,其通过实现put()和take()方法提供了线程安全的元素添加和获取操作。通过理解其内部机制,开发者可以更有效地使用并发技术,提高程序的性能和稳定性。希望本文的解析能够帮助你更好地理解LinkedBlockingQueue,并在面试或工作中发挥重要作用。
为了进一步提升你的Java技能,我整理了一份涵盖基础知识、高级技术、面试技巧等内容的全面学习资料。这份资料包括性能优化、微服务架构、并发编程、开源框架、分布式系统等多个领域的内容,以及技术+人事面试的全面指导。通过系统学习这份资料,你将能够应对各大公司技术面试,甚至在求职过程中脱颖而出。获取这份资料的链接如下:[资料分享链接]。
喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。
在探讨问题之前,我们先回顾一下 LinkedBlockingQueue 的线程安全性。在传统的观点中,LinkedBlockingQueue 是线程安全的,因为它内部使用了 ReentrantLock。然而,就在 RocketMQ 的讨论版中,一个问题揭示了 LinkedBlockingQueue 在特定情况下的线程不安全性,引发了我们的好奇心。
核心问题在于 LinkedBlockingQueue 的 stream 遍历方式,在多线程环境下可能出现死循环。我们通过一个简单的 demo 来深入分析这一现象。首先,引入了一个链接,其中详细展示了如何在多线程环境下复现这一 Bug。
在分析代码之前,让我们先明确 demo 的基本逻辑:创建了 个线程,每个线程不断调用 offer 和 remove 方法。主线程则通过 stream 对 queue 进行遍历,目标是找到队列中的第一个非空元素。这看似是一个简单的遍历操作,但事实并非如此。
关键点在于 tryAdvance 方法,看似平凡的遍历操作隐藏了陷阱。当运行代码时,预期的输出并未出现,而是陷入了一个死循环,控制台仅输出了一行信息或交替输出几次后停止。
我们的疑问指向了 JDK 版本,尤其是 JDK 8。通过替换为 JDK ,我们观察到交替输出的效果。这使得我们大胆推测,这可能是 JDK 8 版本的 Bug。为了验证这一假设,我们进行了详细的分析。
通过线程 dump 文件,我们发现主线程始终处于可运行状态,似乎没有被锁阻塞。然而,从控制台的输出来看,它似乎处于阻塞状态。这一现象让我们联想到一个经典的场景:线程陷入死循环。
通过深入源码分析,我们发现了死循环的根源。在 stream 遍历的关键方法 tryAdvance 中,存在一个 while 循环,其条件始终满足,导致死循环。而问题的核心在于移除队列头部元素的代码逻辑,当有其他线程不断调用 remove 方法时,可能会形成特定的节点结构,触发死循环。
经过详细的分析,我们揭示了这一 Bug 的原理,并通过简化代码演示了整个过程。通过将实例代码简化,我们揭示了死循环是如何在多线程环境下产生的。这不仅有助于理解 Bug 的本质,也为后续的 Bug 修复提供了思路。
为了验证解决方案的正确性,我们对比了 JDK 8 和 JDK 的源码差异。在 JDK 中,通过引入了一个名为 succ 的方法,成功解决了死循环问题。这一方法通过确保节点不会指向自身,从而避免了死循环的产生。
通过这篇文章的分析,我们不仅揭示了 LinkedBlockingQueue 在特定条件下的线程不安全性,还探讨了如何通过升级 JDK 版本、避免使用 stream 遍历,以及使用 synchronized 修饰符等方式来规避此类问题。同时,我们还延伸至其他数据结构,如 ConcurrentHashMap,讨论了它们在不同使用场景下的线程安全性问题。
最后,我们再次强调在多线程环境下,LinkedBlockingQueue 的 stream 遍历方式可能存在一定的问题,可能会导致死循环。理解并解决这类 Bug,对于确保代码的健壮性和性能至关重要。
java多线程关于消费者和生产者,求源程序,求大神解答。愿意提高报酬
自己看代码体会吧import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
System.out.println("blockingQueue now contains " + blockingQueue.size() + " unit");
service.submit(new Consumer1(blockingQueue));
gap(blockingQueue);
service.submit(new Productor2(blockingQueue));
gap(blockingQueue);
service.submit(new Productor3(blockingQueue));
gap(blockingQueue);
service.submit(new Productor4(blockingQueue));
gap(blockingQueue);
service.submit(new Productor5(blockingQueue));
gap(blockingQueue);
service.shutdown();
}
private static void gap(BlockingQueue<String> blockingQueue) {
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("blockingQueue now contains " + blockingQueue.size() + " unit");
}
}
class Consumer1 implements Runnable{
BlockingQueue<String> blockingQueue;
public Consumer1(BlockingQueue<String> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Consumer1 start: need units");
for(int i = 0; i < ; i++){
try {
blockingQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Consumer1 end: has got units");
}
}
class Productor2 implements Runnable{
BlockingQueue<String> blockingQueue;
public Productor2(BlockingQueue<String> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Productor2 start: put 5 units");
for(int i = 0; i < 5; i++){
try {
blockingQueue.put("Object");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Productor2 end: has put 5 units");
}
}
class Productor3 implements Runnable{
BlockingQueue<String> blockingQueue;
public Productor3(BlockingQueue<String> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Productor3 start: put 5 units");
for(int i = 0; i < 5; i++){
try {
blockingQueue.put("Object");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Productor3 end: has put 5 units");
}
}
class Productor4 implements Runnable{
BlockingQueue<String> blockingQueue;
public Productor4(BlockingQueue<String> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Productor4 start: put units");
for(int i = 0; i < ; i++){
try {
blockingQueue.put("Object");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Productor4 end: has put units");
}
}
class Productor5 implements Runnable{
BlockingQueue<String> blockingQueue;
public Productor5(BlockingQueue<String> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Productor5 start: put units");
for(int i = 0; i < ; i++){
try {
blockingQueue.put("Object");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Productor5 end: has put units");
}
}
每个线程是隔了1s启动的, 结果
blockingQueue now contains 0 unit
Consumer1 start: need units
blockingQueue now contains 0 unit
Productor2 start: put 5 units
Productor2 end: has put 5 units
blockingQueue now contains 0 unit
Productor3 start: put 5 units
Productor3 end: has put 5 units
Consumer1 end: has got units
blockingQueue now contains 0 unit
Productor4 start: put units
Productor4 end: has put units
blockingQueue now contains unit
Productor5 start: put units
blockingQueue now contains unit