BlockingQueue实现生产者消费者模型

1
2
3
4
5
6
7
package me.learn.treads;

import java.util.concurrent.BlockingQueue;

public interface Consumer extends Runnable {
void consume(BlockingQueue queue) throws Exception;
}
1
2
3
4
5
6
7
package me.learn.treads;

import java.util.concurrent.BlockingQueue;

public interface Producer extends Runnable {
void produce(BlockingQueue queue) throws Exception;
}
1
2
3
4
5
package me.learn.treads;

public abstract class Task {
protected abstract void run() throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package me.learn.treads;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractConsumer implements Consumer {

protected abstract BlockingQueue attachQueue();
protected final AtomicInteger increaseInt = new AtomicInteger();
@Override
public void run() {
Thread.currentThread().setName("consumer#" + increaseInt.incrementAndGet());
System.out.println("consumer at thread$"+Thread.currentThread().getName());
while (true) {
try {
consume(attachQueue());
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package me.learn.treads;

import java.util.concurrent.BlockingQueue;

public abstract class AbstractProducer implements Producer {

protected abstract BlockingQueue attachQueue();

@Override
public void run() {
System.out.println("producer at Thread#"+Thread.currentThread().getName());
while (true) {
try {
produce(attachQueue());
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package me.learn.treads;

import java.util.concurrent.BlockingQueue;

public class MyConsumer extends AbstractConsumer {

private BlockingQueue queue;
public MyConsumer(BlockingQueue queue) {
this.queue = queue;
}

@Override
protected BlockingQueue attachQueue() {
return queue;
}

@Override
public void consume(BlockingQueue queue) throws Exception {
BlockQueueProducerConsumerModel.MyTask task = (BlockQueueProducerConsumerModel.MyTask) queue.take();
if (task != null) {
System.out.print(Thread.currentThread().getName());
task.run();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package me.learn.treads;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class MyProducer extends AbstractProducer {

private final AtomicInteger increaseNumber = new AtomicInteger();

private BlockingQueue queue;

private final Random random = new Random();

public MyProducer(BlockingQueue queue) {
this.queue = queue;
}

@Override
protected BlockingQueue attachQueue() {
return queue;
}

@Override
public void produce(BlockingQueue queue) throws Exception {
queue.offer(BlockQueueProducerConsumerModel.MyTask.newTask(() -> increaseNumber.incrementAndGet()));
System.out.println("queue size " + queue.size());
Thread.sleep(500);
}

private int getRandomMillisecond() {
return random.ints(0,5).findAny().getAsInt() * 1000;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package me.learn.treads;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public final class BlockQueueProducerConsumerModel {
private static final BlockingQueue<Task> myBlockQueue = new LinkedBlockingQueue<>();
private final AbstractProducer producer;
private final AbstractConsumer consumer;

public static final AtomicInteger increaseInt = new AtomicInteger();

private static final Random random = new Random();

public BlockQueueProducerConsumerModel(AbstractProducer producer, AbstractConsumer consumer) {
this.producer = producer;
this.consumer = consumer;
}

public void start() {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(producer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
}

public static void main(String[] argv) {
BlockQueueProducerConsumerModel model = new BlockQueueProducerConsumerModel(
new MyProducer(myBlockQueue),
new MyConsumer(myBlockQueue));
model.start();
}

static class MyTask extends Task {

private Supplier supplier;

private MyTask(Supplier supplier) {
this.supplier = supplier;
}

public static MyTask newTask(Supplier supplier) {
return new MyTask(supplier);
}

@Override
protected void run() throws Exception {
System.out.println("running #"+this.supplier.get());
Thread.sleep(getRandomMillisecond());
}

private int getRandomMillisecond() {
return random.ints(0,5).findAny().getAsInt() * 1000;
}
}
}