Java 特殊隊列

2018-02-20 03:01 更新

Java集合教程 - Java特殊隊列


阻塞隊列

阻塞隊列通過添加兩組方法來擴展隊列:

  • 一組方法無限期地阻塞
  • 另一組方法允許您指定要阻止的時間段。

BlockingQueue 接口的實例表示阻塞隊列。 BlockingQueue 接口繼承自 Queue 接口。

put() offer()方法在阻塞隊列的尾部添加一個元素。如果阻塞隊列已滿,則put()方法將無限期阻塞,直到隊列中的空間可用。offer()方法允許您指定等待空間可用的時間段。 如果指定的元素添加成功,則返回true; 否則為假。

take()和poll()方法檢索和刪除阻塞隊列的頭。如果阻塞隊列為空,take()方法將無限期阻塞。poll()方法允許您指定在阻塞隊列為空時要等待的時間段; 如果在元素可用之前過去了指定的時間,則返回null。

來自 BlockingQueue Queue 接口的方法就像使用 Queue 。

BlockingQueue 被設計為線程安全的并且可以使用在生產者/消費者的情況下。

阻塞隊列不允許空元素和可以是有界的或無界的。

BlockingQueue 中的 remainingCapacity()返回可以添加到阻止隊列中而不阻塞的元素數(shù)。

BlockingQueue 可以控制多個線程被阻塞時的公平性。 如果阻塞隊列是公平的,它可以選擇最長的等待線程來執(zhí)行操作。如果阻塞隊列不公平,則不指定選擇的順序。

BlockingQueue 接口及其所有實現(xiàn)類都在 java.util.concurrent 包中。 以下是 BlockingQueue 接口的實現(xiàn)類:

由數(shù)組支持的 ArrayBlockingQueue BlockingQueue 的有界實現(xiàn)類。 我們可以在其構造函數(shù)中指定阻塞隊列的公平性。 默認情況下,它不公平。

LinkedBlockingQueue 可以用作有界或無界阻塞隊列。 它不允許為阻塞隊列指定公平規(guī)則。

PriorityBlockingQueue BlockingQueue 的無界實現(xiàn)類。 它的工作方式與 PriortyQueue 相同,用于排序阻塞隊列中的元素,并將阻塞特性添加到 PriorityQueue 中。

SynchronousQueue 實現(xiàn) BlockingQueue ,沒有任何容量。 put操作等待take操作以獲取元素。 它可以在兩個線程之間進行握手,并在兩個線程之間交換對象。 它的isEmpty()方法總是返回true。

DelayQueue是BlockingQueue的無界實現(xiàn)類。它保持一個元素,直到該元素經過指定的延遲。 如果有超過一個元素的延遲已經過去,那么其延遲最早傳遞的元素將被放置在隊列的頭部。


例子

以下代碼顯示了如何在生產者/消費者應用程序中使用阻塞隊列。

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class BQProducer extends Thread {
  private final BlockingQueue<String> queue;
  private final String name;
  public BQProducer(BlockingQueue<String> queue, String name) {
    this.queue = queue;
    this.name = name;
  }
  @Override
  public void run() {
    while (true) {
      try {
        this.queue.put(UUID.randomUUID().toString());
        Thread.sleep(4000);
      }
      catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
class BQConsumer extends Thread {
  private final BlockingQueue<String> queue;
  private final String name;
  public BQConsumer(BlockingQueue<String> queue, String name) {
    this.queue = queue;
    this.name = name;
  }

  @Override
  public void run() {
    while (true) {
      try {
        String str = this.queue.take();
        System.out.println(name + "  took: " + str);
        Thread.sleep(3000);
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}

public class Main {
  public static void main(String[] args) {
    int capacity = 5;
    boolean fair = true;
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(capacity, fair);

    new BQProducer(queue, "Producer1").start();
    new BQProducer(queue, "Producer2").start();
    new BQProducer(queue, "Producer3").start();
    new BQConsumer(queue, "Consumer1").start();
    new BQConsumer(queue, "Consumer2").start();
  }
}

上面的代碼生成以下結果。


延遲隊列

DelayQueue 實現(xiàn) BlockingQueue 接口。 DelayQueue 中的元素必須保留一定的時間。

DelayQueue 使用一個名為 Delayed 的接口來獲取延遲時間。

該接口在java.util.concurrent包中。 其聲明如下:

public interface  Delayed  extends Comparable<Delayed>  {
   long  getDelay(TimeUnit timeUnit);
}

它擴展了 Comparable 接口,它的 compareTo()方法接受一個Delayed對象。

DelayQueue 調用每個元素的 getDelay()方法來獲取元素必須保留多長時間。 DelayQueue 將傳遞 TimeUnit 到此方法。

getDelay()方法返回一個零或一個負數(shù)時,是元素離開隊列的時間。

隊列通過調用元素的 compareTo()方法確定要彈出的那個。 此方法確定要從隊列中刪除的過期元素的優(yōu)先級。

以下代碼顯示了如何使用DelayQueue。

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedJob implements Delayed {
  private Instant scheduledTime;
  String jobName;

  public DelayedJob(String jobName, Instant scheduledTime) {
    this.scheduledTime = scheduledTime;
    this.jobName = jobName;
  }

  @Override
  public long getDelay(TimeUnit unit) {
    long delay = MILLIS.between(Instant.now(), scheduledTime);
    long returnValue = unit.convert(delay, MILLISECONDS);
    return returnValue;
  }

  @Override
  public int compareTo(Delayed job) {
    long currentJobDelay = this.getDelay(MILLISECONDS);
    long jobDelay = job.getDelay(MILLISECONDS);

    int diff = 0;
    if (currentJobDelay > jobDelay) {
      diff = 1;
    } else if (currentJobDelay < jobDelay) {
      diff = -1;
    }
    return diff;
  }

  @Override
  public String toString() {
    String str = this.jobName + ", " + "Scheduled Time:  "
        + this.scheduledTime;
    return str;
  }
}
public class Main {
  public static void main(String[] args) throws InterruptedException {
    BlockingQueue<DelayedJob> queue = new DelayQueue<>();
    Instant now = Instant.now();

    queue.put(new DelayedJob("A", now.plusSeconds(9)));
    queue.put(new DelayedJob("B", now.plusSeconds(3)));
    queue.put(new DelayedJob("C", now.plusSeconds(6)));
    queue.put(new DelayedJob("D", now.plusSeconds(1)));

    while (queue.size() > 0) {
      System.out.println("started...");
      DelayedJob job = queue.take();
      System.out.println("Job: " + job);
    }
    System.out.println("Finished.");
  }
}

上面的代碼生成以下結果。

傳輸隊列

傳輸隊列擴展阻塞隊列。

生產者使用 TransferQueue transfer(E element)方法將元素傳遞給消費者。

當生產者調用傳遞(E元素)方法時,它等待直到消費者獲取其元素。 tryTransfer()方法提供了該方法的非阻塞和超時版本。

getWaitingConsumerCount()方法返回等待消費者的數(shù)量。

如果有一個等待消費者, hasWaitingConsumer()方法返回true; 否則,返回false。 LinkedTransferQueue TransferQueue 接口的實現(xiàn)類。 它提供了一個無界的 TransferQueue

以下代碼顯示如何使用 TransferQueue 。

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

class TQProducer extends Thread {
  private String name;
  private TransferQueue<Integer> tQueue;
  private AtomicInteger sequence;
  public TQProducer(String name, TransferQueue<Integer> tQueue,
      AtomicInteger sequence) {
    this.name = name;
    this.tQueue = tQueue;
    this.sequence = sequence;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(4000);
        int nextNum = this.sequence.incrementAndGet();
        if (nextNum % 2 == 0) {
          System.out.format("%s:  Enqueuing: %d%n", name, nextNum);
          tQueue.put(nextNum); // Enqueue
        } else {
          System.out.format("%s: Handing  off: %d%n", name, nextNum);
          System.out.format("%s: has  a  waiting  consumer: %b%n", name,
              tQueue.hasWaitingConsumer());
          tQueue.transfer(nextNum); // A hand off
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

class TQConsumer extends Thread {
  private final String name;
  private final TransferQueue<Integer> tQueue;

  public TQConsumer(String name, TransferQueue<Integer> tQueue) {
    this.name = name;
    this.tQueue = tQueue;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(3000);

        int item = tQueue.take();
        System.out.format("%s removed:  %d%n", name, item);

      }
      catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

public class Main {
  public static void main(String[] args) {
    final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
    final AtomicInteger sequence = new AtomicInteger();

    for (int i = 0; i < 5; i++) {
      try {
        tQueue.put(sequence.incrementAndGet());
        System.out.println("Initial queue: " + tQueue);

        new TQProducer("Producer-1", tQueue, sequence).start();
        new TQConsumer("Consumer-1", tQueue).start();

      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

  }
}

上面的代碼生成以下結果。

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號