synchronizedvoidmyJoin()throws InterruptedException { System.out.println(this.getClass()); System.out.println("current thread: " + Thread.currentThread().getName() + "\t my join start"); while (this.isAlive()) { System.out.println("current thread: " + Thread.currentThread().getName()); wait(0);//释放consumer.this的锁对象, main stop here ... ... , consumer子线程获取所对象执行, 当consumer子线程退出后自动唤醒main } System.out.println("current thread: " + Thread.currentThread().getName() + "\t my join end"); } }
Result:
1 2 3 4 5 6 7 8 9 10
consumer start start sleep 2000 consumer end class thread.Consumer current thread: main my join start current thread: main current thread: main my join end main end producer start producer end
BlockingQueue
BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the java.util.Collection interface.
wait for the queue to become non-empty when retrieving an element; become available in the queue storing an element.
A BlockingQueue does not accept null elements.(Implementations throw NullPointerException on attempts to add, offer a null.A null element is used as a sentinel value to indicate failure of poll operations.)
BlockingQueue implementations are thread-safe.However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation.
BlockingQueue methods come in four forms:
throws an exception
returns a special value (either null or false, depending on the operation)
blocks the current thread indefinitely until the operation can succeed
blocks for only a given maximum time limit before giving up.
Summary of BlockingQueue methods
Func/Result
Throws exception
Special value
Blocks
Times out
Insert
add add(e)
offer offer(e)
put put(e)
offer(Object, long, TimeUnit) offer(e, time, unit)
Remove
remove remove()
poll poll()
take take()
poll(long, TimeUnit) poll(time, unit)
Examine
element element()
peek peek()
not applicable
not applicable
when use a capacity-restricted queue, it’s preferable to use the offer method to add a element than add.
A bounded BlockingQueue backed by an array. This queue orders elements FIFO (first-in-first-out). head –> out tail –> in
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to {@code true} grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.
publicclassArrayBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable { /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ privatefinal Condition notEmpty; /** Condition for waiting puts */ privatefinal Condition notFull;
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain * @throws IllegalArgumentException if {@code capacity} is less than * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */ publicArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair);
finalReentrantLocklock=this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { inti=0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { thrownewIllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;// putIndex --> tail } finally { lock.unlock(); } } publicArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) thrownewIllegalArgumentException(); this.items = newObject[capacity]; lock = newReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } publicArrayBlockingQueue(int capacity) { this(capacity, false); }
publicvoidput(E e)throws InterruptedException { checkNotNull(e); finalReentrantLocklock=this.lock; lock.lockInterruptibly();//Acquires the lock unless the current thread is interrupted. try { while (count == items.length) notFull.await(); enqueue(e);//insert } finally { lock.unlock(); } } //insert the element privatevoidenqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
public E take()throws InterruptedException { finalReentrantLocklock=this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") Ex= (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } }
LinkedBlockingQueue
FIFO: head –> out tail –> in Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to Integer#MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.
publicclassLinkedBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable { /** * A variant of the "two lock queue" algorithm.双锁算法的变体。 The putLock gates * entry to put (and offer), and has an associated condition for waiting puts. * Similarly for the takeLock. */ /** Lock held by take, poll, etc */ privatefinalReentrantLocktakeLock=newReentrantLock(); /** Wait queue for waiting takes */ privatefinalConditionnotEmpty= takeLock.newCondition();
/** Lock held by put, offer, etc */ privatefinalReentrantLockputLock=newReentrantLock(); /** Wait queue for waiting puts */ privatefinalConditionnotFull= putLock.newCondition();
publicLinkedBlockingQueue() { this(Integer.MAX_VALUE); } publicLinkedBlockingQueue(int capacity) { if (capacity <= 0) thrownewIllegalArgumentException(); this.capacity = capacity; last = head = newNode<E>(null); } publicLinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); finalReentrantLockputLock=this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { intn=0; for (E e : c) { if (e == null) thrownewNullPointerException(); if (n == capacity) thrownewIllegalStateException("Queue full"); enqueue(newNode<E>(e));//enqueue(node) =>{last= last.next = node;} ++n; } count.set(n);//AtomicInteger count; } finally { putLock.unlock(); } }
publicvoidput(E e)throws InterruptedException { if (e == null) thrownewNullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. intc= -1; Node<E> node = newNode<E>(e); finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is not protected by lock. * This works because count can only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. * Similarly for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public E take()throws InterruptedException { E x; intc= -1; finalAtomicIntegercount=this.count; finalReentrantLocktakeLock=this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } }
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ publicvoidcountDown() { sync.releaseShared(1);//private final Sync sync; } /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); intinterruptMode=0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }