Skip to content

数据结构和算法之旅(四)- 队列

约 5431 字大约 18 分钟

数据结构queue(队列)

2017-02-13

以顺序的方式维护一组数据集合,在一端添加数据,从另一个端移除数据。

定义

计算机科学中,queue是以顺序的方式维护一组数据集合,在一端添加数据,从另一个端移除数据。习惯来说,添加的一端称为尾,移除的一端称为头,就如同生活中排队购买商品。

链表实现队列

下面以单向环形含哨兵链表方式来实现队列。

//首先定义队列的接口
package org.example.datastructure;

/**
 * 队列
 */
public interface Queue<E> {
    /**
     * 向队列尾部添加一个元素
     * @param val 待插入值
     * @return 插入成功返回true,否则返回false
     */
    boolean offer(E val);

    /**
     * 从队列头部取出一个元素,并移除
     * @return 如果队列非空,返回队列头部元素,否则返回null
     */
    E poll();

    /**
     * 从队列头部取出一个元素,但不移除
     * @return 如果队列非空,返回队列头部元素,否则返回null
     */
    E peek();

    /**
     * 判断队列是否为空
     * @return 空返回true,否则返回false
     */
    boolean isEmpty();


    /**
     * 判断队列是否已满
     * @return 已满返回true,否则返回false
     */
    boolean isFull();
}
package org.example.datastructure;

import java.util.Iterator;

/**
 * 基于单向环形链表实现的队列
 * <p>
 * 队列只需要操作头,尾部不需要操作,所以只需要一个指针即可
 *
 * @param <E>
 */

public class LinkedListQueue<E> implements Queue<E>, Iterable<E> {

    /**
     * 节点类
     */
    private static class Node<E> {
        E value;
        Node<E> next;

        public Node(E value, Node<E> next) {
            this.value = value;
            this.next = next;
        }
    }

    Node<E> head = new Node<>(null, null);
    Node<E> tail = head;
    private int size;// 节点数
    private int capacity = Integer.MAX_VALUE;// 队列容量

    {
	    //构成环
        tail.next = head;
    }

    public LinkedListQueue(int capacity) {
        this.capacity = capacity;
    }

    public LinkedListQueue() {
    }

    @Override
    public boolean offer(E val) {
        if (isFull()) return false;
        Node<E> added = new Node<>(val, head);
        tail.next = added;
        tail = added;//让新节点作为新的tail
        size++;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) return null;
        Node<E> first = head.next;
        head.next = first.next;
        if (first == tail) tail = head;
        size--;
        return first.value;
    }

    @Override
    public E peek() {
        if (isEmpty()) return null;
        return head.next.value;
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        return size == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            Node<E> p = head.next;

            @Override
            public boolean hasNext() {
                return p != head;
            }

            @Override
            public E next() {
                E value = p.value;
                p = p.next;
                return value;
            }
        };
    }
}

环形数组实现队列

判断头指针和尾指针是否指向同一个就是判空。
当尾指针+1等于头指针就是判断满,(tail+1)%5 == head。

package org.example.datastructure;

import java.util.Iterator;

public class ArrayQueue1<E> implements Queue<E>, Iterable<E> {

    private E[] array;
    private int head;
    private int tail;
    private int size;
    private int capacity;

    @SuppressWarnings("all")
    public ArrayQueue1(int capacity) {
        array = (E[]) new Object[this.capacity + 1];
    }

    @Override
    public boolean offer(E val) {
        if (isFull()) return false;
        array[tail] = val;
        tail = (tail + 1) % array.length;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) return null;
        E val = array[head];
        head = (head + 1) % array.length;
        return val;
    }

    @Override
    public E peek() {
        if (isEmpty()) return null;
        return array[head];
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        return (tail + 1) % array.length == head;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;

            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E e = array[p];
                p = (p + 1) % array.length;
                return e;
            }
        };
    }
}

双端队列

双端队列特点:俩端都可以添加、删除

基于双向环形链表实现

package org.example.datastructure;

/**
 * 双端队列
 * 两端都可以操作添加和删除
 * queue 普通队列
 * deque 双端队列   double-ended queue
 */
public interface Deque<E> {


    /**
     * 向头部添加元素
     *
     * @param e 待添加元素
     * @return 添加成功返回true
     */
    boolean offerFirst(E e);

    /**
     * 向尾部添加元素
     *
     * @param e 待添加元素
     * @return 添加成功返回true
     */
    boolean offerLast(E e);

    /**
     * 从队列头部删除元素
     *
     * @return 头部元素
     */
    E pollFirst();


    /**
     * 从尾部获取元素并删除
     *
     * @return 尾部元素
     */
    E pollLast();


    /**
     * 从头部获取元素但不删除
     *
     * @return 头部元素
     */
    E peekFirst();


    /**
     * 从尾部获取元素但不删除
     *
     * @return 尾部元素
     */
    E peekLast();


    /**
     * 队列是否为空
     *
     * @return 空返回true, 否则返回false
     */
    boolean isEmpty();

    /**
     * 队列是否已满
     *
     * @return 满返回true, 否则返回false
     */
    boolean isFull();
}
package org.example.datastructure;

import java.util.Iterator;

/**
 * 基于双向环形链表实现的双端队列
 * <p>
 * 注:双向和双端是两个概念,
 * 双向是指链表节点有两个指针,一个指向前一个节点,一个指向后一个节点;
 * 双端是指队列两端都可以操作添加和删除。
 * <p>
 * 为什么要用双向链表?
 * 因为双端队列需要操作尾端,所以需要尾指针,而单向链表只有头指针,所以需要双向链表。
 * <p>
 * 为什么要用环形链表?
 * 因为环形链表可以用一个哨兵,即充当头指针又充当尾指针,这样就可以省去一个指针。
 */
public class LinkedListDeque<E> implements Deque<E>, Iterable<E> {

    static class Node<E> {
        Node<E> prev;
        E value;
        Node<E> next;

        public Node(Node<E> prev, E value, Node<E> next) {
            this.prev = prev;
            this.value = value;
            this.next = next;
        }
    }

    int capacity = Integer.MAX_VALUE;
    int size;
    Node<E> sentinel = new Node<>(null, null, null);//哨兵

    public LinkedListDeque(int capacity) {
        this.capacity = capacity;
        //初始化哨兵
        sentinel.prev = sentinel;
        sentinel.next = sentinel;
    }

    /**
     * a added b
     * 向头部添加
     * a就是哨兵,b就是哨兵的next
     */
    @Override
    public boolean offerFirst(E e) {
        if (isFull()) return false;
        Node<E> a = sentinel;
        Node<E> b = sentinel.next;
        Node<E> added = new Node<>(a, e, b);
        a.next = added;
        b.prev = added;
        size++;
        return true;
    }

    /**
     * a added b
     * 向尾部添加
     * b就是哨兵,a就是哨兵的prev
     */
    @Override
    public boolean offerLast(E e) {
        if (isFull()) return false;
        Node<E> a = sentinel.prev;
        Node<E> b = sentinel;
        Node<E> added = new Node<>(a, e, b);
        a.next = added;
        b.prev = added;
        size++;
        return true;
    }

    /**
     * a removed b
     * 移除头部
     * a就是哨兵,b就是哨兵的next
     */
    @Override
    public E pollFirst() {
        if (isEmpty()) return null;
        Node<E> a = sentinel;
        Node<E> removed = sentinel.next;
        Node<E> b = removed.next;
        a.next = b;
        b.prev = a;
        size--;
        return removed.value;
    }

    /**
     * a removed b
     * 移除尾部
     * b就是哨兵,a就是哨兵的prev
     */
    @Override
    public E pollLast() {
        if (isEmpty()) return null;
        Node<E> b = sentinel;
        Node<E> removed = sentinel.prev;
        Node<E> a = removed.prev;
        a.next = b;
        b.prev = a;
        size--;
        return removed.value;
    }

    @Override
    public E peekFirst() {
        if (isEmpty()) return null;
        return sentinel.next.value;
    }

    @Override
    public E peekLast() {
        if (isEmpty()) return null;
        return sentinel.prev.value;
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            Node<E> p = sentinel.next;

            @Override
            public boolean hasNext() {
                return p != sentinel;
            }

            @Override
            public E next() {
                E value = p.value;
                p = p.next;
                return value;
            }
        };
    }

}

基于数组实现

package org.example.datastructure;

import java.util.Iterator;

/**
 * 基于循环数组实现的双端队列
 * 循环数组动的是头尾指针,不动的是数组元素
 * <p>
 * tail 停下来的位置不存储,会浪费一个位置
 * <p>
 * <p>
 * h - head
 * tail - tail
 * <p>
 * h
 * t
 * 0  1  2  3
 * a  b     c
 * offerLast(a)     先添加元素 tail++
 * offerLast(b)
 * offerFirst(c)    先head-- 再添加元素
 * <p>
 * pollFirst()      先取元素 head++
 * pollLast()       先tail-- 再取元素
 * <p>
 * head==tail 空
 * head~tail == 数组长度-1 满
 */
public class ArrayDeque1<E> implements Deque<E>, Iterable<E> {

    E[] array;
    int head;
    int tail;

    @SuppressWarnings("all")
    public ArrayDeque1(int capacity) {
        array = (E[]) new Object[capacity + 1];
    }

    static int inc(int i, int length) {
        if (i + 1 >= length)
            return 0;
        return i + 1;
    }

    static int dec(int i, int length) {
        if (i - 1 < 0)
            return length - 1;
        return i - 1;
    }

    @Override
    public boolean offerFirst(E e) {
        if (isFull()) return false;
        head = dec(head, array.length);
        array[head] = e;
        return true;
    }

    @Override
    public boolean offerLast(E e) {
        if (isFull()) return false;
        array[tail] = e;
        tail = inc(tail, array.length);
        return true;
    }

    @Override
    public E pollFirst() {
        if (isEmpty()) return null;
        E e = array[head];
        array[head] = null;//help gc
        head = inc(head, array.length);
        return e;
    }

    @Override
    public E pollLast() {
        if (isEmpty()) return null;
        tail = dec(tail, array.length);
        E e = array[tail];
        array[tail] = null;//help gc
        return e;
    }

    @Override
    public E peekFirst() {
        if (isEmpty())
            return null;
        return array[head];
    }

    @Override
    public E peekLast() {
        if (isEmpty())
            return null;
        return array[dec(tail, array.length)];
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        if (tail > head) {
            return tail - head == array.length - 1;
        } else if (tail < head) {
            return head - tail == 1;
        } else {
            return false;
        }
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;

            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E e = array[p];
                p = inc(p, array.length);
                return e;
            }
        };
    }
}

优先级队列

优先级队列特点:一端进,另一端出,但出的时候按照优先级出!

基于无序数组实现

无序数组实现的优势:
入队就直接加入对尾。
出队就遍历找到最大值,并删除,然后把后续的往前顺延。

package priorityqueue;  
  
/**  
 * 队列接口
 */  
public interface Queue<E> {  
    /**  
     * 向队列尾部添加一个元素  
     * @param val 待插入值  
     * @return 插入成功返回true,否则返回false  
     */    boolean offer(E val);  
  
    /**  
     * 从队列头部取出一个元素,并移除  
     * @return 如果队列非空,返回队列头部元素,否则返回null  
     */    E poll();  
  
    /**  
     * 从队列头部取出一个元素,但不移除  
     * @return 如果队列非空,返回队列头部元素,否则返回null  
     */    E peek();  
  
    /**  
     * 判断队列是否为空  
     * @return 空返回true,否则返回false  
     */    boolean isEmpty();  
  
  
    /**  
     * 判断队列是否已满  
     * @return 已满返回true,否则返回false  
     */    boolean isFull();  
}
package priorityqueue;  
/**
* 优先级接口
*/
public interface Priority {  
    /**  
     * 返回对象的优先级,约定数字越大,优先级越高  
     * @return  
     */  
    int priority();  
}
package priorityqueue;  
  
/**  
 * 基于无序数组实现  
 * 入队:把元素加到数组末尾  
 * 出队:找到最大优先级元素,删除,相当于进行了一次选择排序  
 */  
public class PriorityQueue<E extends Priority> implements Queue<E> {  
  
    Priority[] array;  
    int size;  
  
  
    public PriorityQueue(int capacity) {  
        array = new Priority[capacity];  
    }  
  
    //O(1)  
    @Override  
    public boolean offer(E e) {  
        if (isFull()) return false;  
        array[size++] = e;  
        return true;  
    }  
  
    //返回优先级最高的索引值  
    private int selectMax() {  
        int max = 0;  
        for (int i = 0; i < size; i++) {  
            if (array[i].priority() > array[max].priority()) {  
                max = i;  
            }  
        }  
        return max;  
    }  
  
    private void remove(int index) {  
        if (index < size - 1) {  
            //移动  
            System.arraycopy(array, index + 1, array, index, size - 1 - index);  
        }  
        array[--size]=null;//help GC 
    }  
  
    //O(n)  
    @Override  
    public E poll() {  
        if (isEmpty()) return null;  
        int max = selectMax();  
        E e = (E) array[max];  
        remove(max);  
        return e;  
    }  
  
    @Override  
    public E peek() {  
        if (isEmpty()) return null;  
        int max = selectMax();  
        return (E) array[max];  
    }  
  
    @Override  
    public boolean isEmpty() {  
        return size == 0;  
    }  
  
    @Override  
    public boolean isFull() {  
        return size == array.length;  
    }  
}

基于有序数组实现

package priorityqueue;  
  
/**  
 * 基于有序数组实现  
 * 优先级最高的在队尾,优点是出队操作变的简单,直接size--就行。  
 * 缺点是,入队操作需要做一次排序,插入到正确位置。相当于一次插入排序。  
 */  
public class PriorityQueue2<E extends Priority> implements Queue<E> {  
  
    Priority[] array;  
    int size;  
  
  
    public PriorityQueue2(int capacity) {  
        array = new Priority[capacity];  
    }  
  
    //O(n)  
    @Override  
    public boolean offer(E e) {  
        if (isFull()) return false;  
//        array[size++] = e;  
        insert(e);  
        size++;  
        return true;  
    }  
  
    /**  
     * 插入排序  
     */  
    private void insert(E e) {  
        int i = size - 1;  
        while (i >= 0 && array[i].priority() > e.priority()) {  
            array[i + 1] = array[i];  
            i--;  
        }  
        array[i + 1] = e;  
    }  
  
    //O(1)  
    @Override  
    public E poll() {  
        if (isEmpty()) return null;  
//        int max = selectMax();  
//        E e = (E) array[max];  
//        remove(max);  
        E e = (E) array[size - 1];  
        array[--size] = null;//help GC  
        return e;  
    }  
	//O(1)  
    @Override  
    public E peek() {  
        if (isEmpty()) return null;  
//        int max = selectMax();  
//        return (E) array[max];  
        return (E) array[size - 1];  
    }  
  
    @Override  
    public boolean isEmpty() {  
        return size == 0;  
    }  
  
    @Override  
    public boolean isFull() {  
        return size == array.length;  
    }  
}

基于无序和基于有序数组实现俩种的差别在于:
无序:入队O(1),出队O(n)
有序:入队O(n),出队O(1)

BUT,这俩种都不是最优的实现方式,接下来看看更优解。

基于堆实现

堆是一种基于树的数据结构,通常用完全二叉树实现。堆的特性如下:

  • 在大顶堆中,任意节点C与它的父节点P符合P.value >= C.value。
  • 在小顶堆中,任意节点C与它的父节点P符合P.vlaue <= C.value。
  • 最顶层节点(没有父亲)称之为root根节点。

二叉树,简单理解就是一个节点最多有俩个子节点。

完全二叉树,就是说树的每一层除了最后一层都是填满的(都是俩个子节点)。 另外,向完全二叉树加入节点时,必须从左开始添加。

它是非线性的数据结构,但是存储的时候可以使用线性的数组结构存储。

这种数据结构具备一些特征,前辈总结了一些规律公式,可以在已知父节点的时候快速找到子节点,或者已知子节点的时候快速找到父节点。
堆存在数组中有俩种方式,一种是从索引0直接存根,另一种是索引0空着从1开始存根节点。
- 如果从索引0开始存储节点数据
1. 节点 i 的父节点为 (i-1)/2,当i>0时。
2. 节点 i 的左子节点为 2i+1 ,右子节点为 2i+2,当然它们得小于size。
- 如果从索引1开始存储节点数据
1. 节点i的父节点为 i/2 ,当 i>1 时。
2. 节点 i 的左子节点为 2i ,右子节点为 2i+1 ,同样得小于size。

package priorityqueue;  
  
/**  
 * 基于大顶堆实现  
 * <p>  
 * 入队  
 * 1.入堆新元素,加入到数组末尾  
 * 2.不断比较新元素与它父节点优先级  
 * (上浮)  
 * - 如果父节点优先级低,则向下移动,并找到下一个parent  
 * - 直至父节点优先级更高或child==0为止。  
 * <p>  
 * 出队  
 * 1.交换堆顶和尾部元素,让尾部出队  
 * 2.(下潜)  
 * - 从堆顶开始,将父元素与俩个孩子较大者交换  
 * - 直到父元素大于俩个孩子,或者没有孩子为止。  
 */  
public class PriorityQueue3<E extends Priority> implements Queue<E> {  
  
    Priority[] array;  
    int size;  
  
  
    public PriorityQueue3(int capacity) {  
        array = new Priority[capacity];  
    }  
  
    //O(logn)  
    @Override  
    public boolean offer(E offered) {  
        if (isFull()) return false;  
        int child = size++;//确定索引,先不同填充值,可能要调整堆  
        int parent = (child - 1) / 2;//找到父节点  
        while (child > 0 && offered.priority() > array[parent].priority()) {  
            array[child] = array[parent];//下移  
            child = parent;  
            parent = (child - 1) / 2;  
        }  
        //循环结束找到目标待插入节点  
        array[child] = offered;  
        return true;  
    }  
  
    /**  
     * 交换  
     */  
    private void swap(int i, int j) {  
        Priority t = array[i];  
        array[i] = array[j];  
        array[j] = t;  
    }  
  
    /**  
     * O(logn)     * 下潜  
     * 因为可以通过公式,已知父节点可以得到子节点,所以参数一个即可  
     */  
    private void down(int parent) {  
        int left = 2 * parent + 1;  
        int right = left + 1;//等价于 2*parent+2  
        int max = parent;//假设父元素优先级更高  
        if (left < size && array[left].priority() > array[max].priority()) {//left<size因为通过公式得到的不一定存在  
            max = left;  
        }  
        if (right < size && array[right].priority() > array[max].priority()) {  
            max = right;  
        }  
        if (max != parent) {  
            //说明被更新了  
            swap(max, parent);  
            //递归调用  
            down(max);  
        }  
    }  
  
    @Override  
    public E poll() {  
        if (isEmpty()) return null;  
        swap(0, size - 1);//交换堆顶和尾元素  
        size--;//移除数组尾部元素,就是把size减1即可  
        Priority e = array[size];  
        array[size] = null;//help GC  
        //下潜  
        down(0);  
        return (E) e;  
    }  
  
    //O(1)  
    @Override  
    public E peek() {  
        if (isEmpty()) return null;  
        return (E) array[0];  
    }  
  
    @Override  
    public boolean isEmpty() {  
        return size == 0;  
    }  
  
    @Override  
    public boolean isFull() {  
        return size == array.length;  
    }  
}

三种实现区别
无序:入队O(1),出队O(n)
有序:入队O(n),出队O(1)
堆:入队O(logn),出队O(logn) 推荐

阻塞队列

目前队列存在的问题 1.很多场景要求分离生产者、消费者俩个角色,它们由不同的线程来担当,而之前的实现并没有考虑线程安全问题。 2.队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?只能不能循环尝试。 3.队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?只能不断循环尝试。

单锁实现

package blockqueue;  
  
/**  
 * 目前队列存在的问题  
 * > 1.很多场景要求分离生产者、消费者俩个角色,它们由不同的线程来担当,而之前的实现并没有考虑线程安全问题。  
 * > 2.队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?只能不能循环尝试。  
 * > 3.队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?只能不断循环尝试。  
 * <p>  
 * 解决方法:  
 * 1.用锁保证线程安全  
 * 2.用条件变量让poll或offer线程进入等待状态  
 */  
  
public interface BlockQueue<E> {//阻塞队列  
  
    /**  
     * 入队不再需要返回值,因为会进入阻塞状态  
     */  
    void offer(E e) throws InterruptedException;  
  
    boolean offer(E e, long timeout) throws InterruptedException;  
  
    E poll() throws InterruptedException;  
}
package blockqueue;  
  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
/**  
 * 单锁实现  
 *  
 * @param <E>  
 */  
public class BlockQueue1<E> implements BlockQueue<E> {  
  
    private final E[] array;  
    private int head;  
    private int tail;  
    private int size;  
  
    private ReentrantLock lock = new ReentrantLock();  
    private Condition headWaits = lock.newCondition();//配合poll方法使用 因为队列先入先出,出队的时候就用head表示  
    private Condition tailWaits = lock.newCondition();//配合offer方法使用 入队  
  
    private boolean isEmpty() {  
        return size == 0;  
    }  
  
    private boolean isFull() {  
        return size == array.length;  
    }  
  
    public BlockQueue1(int capacity) {  
        array = (E[]) new Object[capacity];  
    }  
  
    @Override  
    public void offer(E e) throws InterruptedException {  
        lock.lockInterruptibly();  
        try {  
            while (isFull()) {  
                //不能用if,防止虚假唤醒  
                tailWaits.await();//进入阻塞状态  
            }  
            //不满,可以添加  
            array[tail] = e;  
            if (++tail == array.length) {  
                tail = 0;  
            }  
            size++;  
            //此时,poll可能在等待队列非空要取数据,所以要去唤醒  
            headWaits.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /**  
     * 可以设置等待时间,上面的offer会一直等  
     */  
    @Override  
    public boolean offer(E e, long timeout) throws InterruptedException {  
        lock.lockInterruptibly();  
        try {  
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);//传过来毫秒转换一下  
            while (isFull()) {  
                if (t <= 0) {  
                    return false;  
                }  
                t = tailWaits.awaitNanos(t);//最多等待多少纳秒 1s 4s 返回值代表剩余时间  
            }  
            //不满,可以添加  
            array[tail] = e;  
            if (++tail == array.length) {  
                tail = 0;  
            }  
            size++;  
            //此时,poll可能在等待队列非空要取数据,所以要去唤醒  
            headWaits.signal();  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    @Override  
    public E poll() throws InterruptedException {  
        lock.lockInterruptibly();  
        try {  
            while (isEmpty()) {  
                headWaits.await();  
            }  
            E e = array[head];  
            array[head] = null;//help GC  
            if (++head == array.length) {  
                head = 0;  
            }  
            size--;  
            //此时,offer可能在等待队列不满要取数据,所以要去唤醒  
            tailWaits.signal();  
            return e;  
        } finally {  
            lock.unlock();  
        }  
    }  
}

双锁实现

上面单锁实现加锁是为了多线程下产生指令交错,结果达不到预期,所以加了一一把锁来实现。 加锁来保证多行代码的原子性,保证左边执行完,右边再执行。这是为什么加锁。

但是加锁后,会产生新的问题,offer和poll都需要加锁,而且用到同一把锁。 offer和poll线程是互相影响的,offer在加锁状态下,poll加不上锁会处于阻塞状态,得等offer执行完了解锁,poll才能获得锁。读写不能同时进行!这样是不合理的。一个操作头,一个操作尾,按理来讲应该互不干扰。所以可以用双锁来实现。

package blockqueue;  
  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicInteger;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
/**  
 * 双锁实现  
 *  
 */public class BlockQueue2<E> implements BlockQueue<E> {  
  
    private final E[] array;  
    private int head;  
    private int tail;  
    //    private int size;  
    private AtomicInteger size;  
  
    //    private ReentrantLock lock = new ReentrantLock();  
    private ReentrantLock tailLock = new ReentrantLock();//给offer用  
    private Condition tailWaits = tailLock.newCondition();  
  
    private ReentrantLock headLock = new ReentrantLock();//给poll方法用  
    private Condition headWaits = headLock.newCondition();  
  
//    private Condition headWaits = lock.newCondition();//配合poll方法使用 因为队列先入先出,出队的时候就用head表示  
//    private Condition tailWaits = lock.newCondition();//配合offer方法使用 入队  
  
    private boolean isEmpty() {  
        return size.get() == 0;  
    }  
  
    private boolean isFull() {  
        return size.get() == array.length;  
    }  
  
    public BlockQueue2(int capacity) {  
        array = (E[]) new Object[capacity];  
    }  
  
    @Override  
    public void offer(E e) throws InterruptedException {  
        int c;//添加前元素个数 c=0的时候进行加锁和唤醒操作  
        tailLock.lockInterruptibly();  
        try {  
            while (isFull()) {  
                //不能用if,防止虚假唤醒  
                tailWaits.await();//进入阻塞状态  
            }  
            //不满,可以添加  
            array[tail] = e;  
            if (++tail == array.length) {  
                tail = 0;  
            }  
            //修改size  
            /*             * 1.读取成员变量size的值  
             * 2.自增  
             * 3.结果写会成员变量size  
             */            c = size.getAndIncrement();//size++ 但能保证原子性  
            if (c + 1 < array.length) {  
                tailWaits.signal();  
            }  
  
        } finally {  
            tailLock.unlock();  
        }  
        if (c > 0) {  
            //解决死锁的方法很简单,不要写成嵌套的方式即可  
            headLock.lock();  
            try {  
                headWaits.signal();//signal和await必须搭配对应的锁  
            } finally {  
                headLock.unlock();  
            }  
        }  
    }  
  
    /**  
     * 可以设置等待时间,上面的offer会一直等  
     */  
    @Override  
    public boolean offer(E e, long timeout) throws InterruptedException {  
        int c;//添加前元素个数 c=0的时候进行加锁和唤醒操作  
        tailLock.lockInterruptibly();  
        try {  
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);//传过来毫秒转换一下  
            while (isFull()) {  
                if (t <= 0) {  
                    return false;  
                }  
                t = tailWaits.awaitNanos(t);//最多等待多少纳秒 1s 4s 返回值代表剩余时间  
            }  
            //不满,可以添加  
            array[tail] = e;  
            if (++tail == array.length) {  
                tail = 0;  
            }  
            //修改size  
            /*             * 1.读取成员变量size的值  
             * 2.自增  
             * 3.结果写会成员变量size  
             *///            size++;  
            c = size.getAndIncrement();//size++ 但能保证原子性  
            //此时,poll可能在等待队列非空要取数据,所以要去唤醒  
//            headWaits.signal();  
            if (c + 1 < array.length) {//如果不是从满到不满,自己唤醒剩余的  
                tailWaits.signal();  
            }  
  
        } finally {  
            tailLock.unlock();  
        }  
  
        if (c == 0) {  
            //唤醒等待的poll线程  
            headLock.lock();// offer_3 offer_3  
            try {  
                headWaits.signal();// offer_1 让offer_1唤醒一次,offer_2和3不做唤醒操作,让poll自己内部唤醒,减少锁的使用  
            } finally {  
                headLock.unlock();  
            }  
        }  
        return true;  
    }  
  
    @Override  
    public E poll() throws InterruptedException {  
        E e;  
        int c;//取走前的元素个数  
        headLock.lockInterruptibly();  
        try {  
            while (isEmpty()) {  
                headWaits.await();// poll_1 poll_2 poll_3  .   poll_1被唤醒后,自己唤醒poll2和3,这种思想叫做级联通知。  
            }  
            e = array[head];  
            array[head] = null;//help GC  
            if (++head == array.length) {  
                head = 0;  
            }  
            //修改size  
            /*             * 1.读取成员变量size的值  
             * 2.自增  
             * 3.结果写会成员变量size  
             *///            size--;  
            c = size.getAndDecrement();  
            if (c > 1) {//如果还有剩余元素,级联通知去唤醒剩下的  
                headWaits.signal();  
            }  
        } finally {  
            headLock.unlock();  
        }  
        //但是这样写,可能会产生死锁!!offer和poll中tailLock和headLock交叉引用了!  
        //所以要改成平级的,不要嵌套的。  
        //唤醒等待的offer线程  
        if (c == array.length) {//从队列满到不满时,由poll唤醒等待不满offer线程  
            tailLock.lock();  
            try {  
                tailWaits.signal();//signal和await必须搭配对应的锁  
            } finally {  
                tailLock.unlock();  
            }  
        }  
  
        return e;  
    }  
}

其实这也是阻塞队列的源码,我们自己实现之后,就会更加清晰。双锁实现的阻塞队列的设计思想是值得反复去斟酌学习的。