Java:在特定队列大小之后,以提交方式阻塞提交的ExecutorService。
I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.
我正在尝试编写一个解决方案,其中一个线程生成可以并行执行的I/ o密集型任务。每个任务都有重要的内存数据。因此,我希望能够限制正在等待的任务的数量。
If I create ThreadPoolExecutor like this:
如果我创建这样的ThreadPoolExecutor:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Then the executor.submit(callable)
throws RejectedExecutionException
when the queue fills up and all the threads are already busy.
然后,当队列填满并且所有的线程都已经繁忙时,executor.submit(可调用)抛出RejectedExecutionException。
What can I do to make executor.submit(callable)
block when the queue is full and all threads are busy?
当队列满了,所有线程都很忙时,我可以做什么来让executor.submit(callable)块?
EDIT: I tried this:
编辑:我试着:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).
它在某种程度上达到了我想要的效果,但方式并不优雅(基本上拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多的内容)。
EDIT: (5 years after asking the question)
编辑:(问问题5年后)
To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.
对于任何正在阅读这个问题及其答案的人,请不要把已经接受的答案当作一个正确的答案。请通读所有的答案和评论。
7 个解决方案
#1
50
I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).
我也做过同样的事。诀窍是创建一个block queue,其中offer()方法实际上是put()。(您可以使用您想要的任何base BlockingQueue impl)。
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
Note that this only works for thread pool where corePoolSize==maxPoolSize
so be careful there (see comments).
注意,这只适用于corePoolSize==maxPoolSize的线程池(请参见注释)。
更多相关文章
- Java多线程系列八——volatile和ThreadLocal
- java线程池深入二
- Java Executor多线程框架
- java线程--volatile实现可见性
- Maven:主线程中的NoClassDefFoundError
- java线程池使用场景和使用方法较详细文摘
- java中多线程安全问题产生&解决方案——同步方法