I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.

我正在尝试编写一个解决方案,其中一个线程生成可以并行执行的I/ o密集型任务。每个任务都有重要的内存数据。因此,我希望能够限制正在等待的任务的数量。

If I create ThreadPoolExecutor like this:

如果我创建这样的ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Then the executor.submit(callable) throws RejectedExecutionException when the queue fills up and all the threads are already busy.

然后,当队列填满并且所有的线程都已经繁忙时,executor.submit(可调用)抛出RejectedExecutionException。

What can I do to make executor.submit(callable) block when the queue is full and all threads are busy?

当队列满了,所有线程都很忙时,我可以做什么来让executor.submit(callable)块?

EDIT: I tried this:

编辑:我试着:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).

它在某种程度上达到了我想要的效果,但方式并不优雅(基本上拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多的内容)。

EDIT: (5 years after asking the question)

编辑:(问问题5年后)

To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.

对于任何正在阅读这个问题及其答案的人,请不要把已经接受的答案当作一个正确的答案。请通读所有的答案和评论。

7 个解决方案

#1


50

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

我也做过同样的事。诀窍是创建一个block queue,其中offer()方法实际上是put()。(您可以使用您想要的任何base BlockingQueue impl)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).

注意,这只适用于corePoolSize==maxPoolSize的线程池(请参见注释)。

更多相关文章

  1. Java多线程系列八——volatile和ThreadLocal
  2. java线程池深入二
  3. Java Executor多线程框架
  4. java线程--volatile实现可见性
  5. Maven:主线程中的NoClassDefFoundError
  6. java线程池使用场景和使用方法较详细文摘
  7. java中多线程安全问题产生&解决方案——同步方法

随机推荐

  1. golang编译器用什么写的
  2. golang 如何结束一个协程
  3. golang闭包有什么用
  4. golang 如何处理大数据
  5. golang 主要做什么
  6. golang 如何安装包
  7. golang 优秀框架有哪些
  8. golang 如何debug
  9. golang 引用和指针的区别
  10. golang 内存泄露的原因