JAVA 实现生产者消费者

使用阻塞队列实现
package com.zhf.study.test;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class ProducerConsumerTest {

    private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    private CountDownLatch countDownLatch = new CountDownLatch(100);

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put(i);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    class Consume implements Runnable {
        @Override
        public void run() {
            while (true){
                try {
                    System.out.println("consume" + queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                countDownLatch.countDown();
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Producer());
            t.start();
        }
        for (int i = 0; i < 5; i++) {
            Thread consumer = new Thread(new Consume());
            consumer.start();
        }
        countDownLatch.await();
    }

}
使用线程通信实现
package com.zhf.study.test;

import org.junit.jupiter.api.Test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class ProducerConsumerTest {

    private int count = 0;

    private final Object lock = new Object();
    @Test
    public void test2() throws InterruptedException {

        Thread producer = new Thread(() -> {
            synchronized (lock) {
                while (true) {
                    if (count >= 10) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    count++;
                    System.out.println("produce:" + count);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    lock.notifyAll();
                }
            }
        });

        Thread consumer = new Thread(() -> {
            synchronized (lock) {
                while (true) {
                    if (count <= 0) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    System.out.println("consume:" + count);
                    count--;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    lock.notifyAll();
                }
            }
        });

        producer.start();
        consumer.start();
        Thread.sleep(100000);
    }
}
condition 实现
package com.zhf.study.test;

import org.junit.jupiter.api.Test;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class ProducerConsumerTest {

    private int capacity = 10;
    private Queue<Integer> queue = new LinkedList<>();
    private ReentrantLock LOCK = new ReentrantLock();
    private Condition full = LOCK.newCondition();
    private Condition empty = LOCK.newCondition();
    private int times;
    private CountDownLatch countDownLatch = new CountDownLatch(100);

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                LOCK.lock();
                while (queue.size() == capacity) {
                    try {
                        full.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                queue.add(i);
                System.out.println(Thread.currentThread() + "produer good" + i);
                empty.signalAll();
                LOCK.unlock();
            }
        }
    }

    class Consume implements Runnable {
        @Override
        public void run() {
            while (true) {
                LOCK.lock();
                while (queue.isEmpty()) {
                    try {
                        empty.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                System.out.println(Thread.currentThread() + "consumer good" + queue.poll());
                countDownLatch.countDown();
                full.signalAll();
                LOCK.unlock();
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Producer());
            t.start();
        }
        for (int i = 0; i < 5; i++) {
            Thread consumer = new Thread(new Consume());
            consumer.start();
        }
        countDownLatch.await();
    }

}

results matching ""

    No results matching ""