JAVA 实现生产者消费者
使用阻塞队列实现
package com.zhf.study.test;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
public class ProducerConsumerTest {
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
private CountDownLatch countDownLatch = new CountDownLatch(100);
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
class Consume implements Runnable {
@Override
public void run() {
while (true){
try {
System.out.println("consume" + queue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
}
}
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new Producer());
t.start();
}
for (int i = 0; i < 5; i++) {
Thread consumer = new Thread(new Consume());
consumer.start();
}
countDownLatch.await();
}
}
使用线程通信实现
package com.zhf.study.test;
import org.junit.jupiter.api.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
public class ProducerConsumerTest {
private int count = 0;
private final Object lock = new Object();
@Test
public void test2() throws InterruptedException {
Thread producer = new Thread(() -> {
synchronized (lock) {
while (true) {
if (count >= 10) {
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
count++;
System.out.println("produce:" + count);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.notifyAll();
}
}
});
Thread consumer = new Thread(() -> {
synchronized (lock) {
while (true) {
if (count <= 0) {
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("consume:" + count);
count--;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.notifyAll();
}
}
});
producer.start();
consumer.start();
Thread.sleep(100000);
}
}
condition 实现
package com.zhf.study.test;
import org.junit.jupiter.api.Test;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerTest {
private int capacity = 10;
private Queue<Integer> queue = new LinkedList<>();
private ReentrantLock LOCK = new ReentrantLock();
private Condition full = LOCK.newCondition();
private Condition empty = LOCK.newCondition();
private int times;
private CountDownLatch countDownLatch = new CountDownLatch(100);
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
LOCK.lock();
while (queue.size() == capacity) {
try {
full.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.add(i);
System.out.println(Thread.currentThread() + "produer good" + i);
empty.signalAll();
LOCK.unlock();
}
}
}
class Consume implements Runnable {
@Override
public void run() {
while (true) {
LOCK.lock();
while (queue.isEmpty()) {
try {
empty.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread() + "consumer good" + queue.poll());
countDownLatch.countDown();
full.signalAll();
LOCK.unlock();
}
}
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new Producer());
t.start();
}
for (int i = 0; i < 5; i++) {
Thread consumer = new Thread(new Consume());
consumer.start();
}
countDownLatch.await();
}
}