Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8241094

Updating core poolsize of a ThreadPoolExecutor throws RejectedExecutionException

    Details

      Description

      ADDITIONAL SYSTEM INFORMATION :
      Ubuntu 16.04 LTS 64 bit

      Java 8:
      openjdk version "1.8.0_242"
      OpenJDK Runtime Environment (build 1.8.0_242-8u242-b08-0ubuntu3~16.04-b08)
      OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

      Java 13:
      openjdk version "13.0.2" 2020-01-14
      OpenJDK Runtime Environment (build 13.0.2+8)
      OpenJDK 64-Bit Server VM (build 13.0.2+8, mixed mode, sharing)

      Java 14 EA:
      openjdk version "14" 2020-03-17
      OpenJDK Runtime Environment (build 14+36-1461)
      OpenJDK 64-Bit Server VM (build 14+36-1461, mixed mode, sharing)


      A DESCRIPTION OF THE PROBLEM :
      I am running into an issue where if I attempt to resize a ThreadPoolExecutor's core pool size to a different number after the pool has been created, then intermittently, some tasks are rejected with a RejectedExecutionException even though I never submit more than queueSize + maxPoolSize number of tasks.

      The problem that I am trying to solve is to extend ThreadPoolExecutor that resizes its core threads based on the pending executions sitting in the thread pool's queue. I need this because by default a ThreadPoolExecutor will create a new Thread only if the queue is full.

      Here is a small self-contained Pure Java 8 program that demonstrates the problem.

      ```
      import static java.lang.Math.max;
      import static java.lang.Math.min;

      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.Executors;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;

      public class ThreadPoolResizeTest {

          public static void main(String[] args) throws Exception {
              // increase the number of iterations if unable to reproduce
              // for me 100 iterations have been enough
              int numberOfExecutions = 100;

              for (int i = 1; i <= numberOfExecutions; i++) {
                  executeOnce();
              }
          }

          private static void executeOnce() throws Exception {
              int minThreads = 1;
              int maxThreads = 5;
              int queueCapacity = 10;

              ThreadPoolExecutor pool = new ThreadPoolExecutor(
                      minThreads, maxThreads,
                      0, TimeUnit.SECONDS,
                      new LinkedBlockingQueue<Runnable>(queueCapacity),
                      new ThreadPoolExecutor.AbortPolicy()
              );

              ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
              scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                      0, 10, TimeUnit.MILLISECONDS);
              CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

              try {
                  int totalTasksToSubmit = queueCapacity + maxThreads;

                  for (int i = 1; i <= totalTasksToSubmit; i++) {
                      // following line sometimes throws a RejectedExecutionException
                      pool.submit(() -> {
                          // block the thread and prevent it from completing the task
                          taskBlocker.join();
                      });
                      // Thread.sleep(10); //enabling even a small sleep makes the problem go away
                  }
              } finally {
                  taskBlocker.complete(null);
                  scheduler.shutdown();
                  pool.shutdown();
              }
          }

          /**
           * Resize the thread pool if the number of pending tasks are non-zero.
           */
          private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
              int pendingExecutions = pool.getQueue().size();
              int approximateRunningExecutions = pool.getActiveCount();

              /*
               * New core thread count should be the sum of pending and currently executing tasks
               * with an upper bound of maxThreads and a lower bound of minThreads.
               */
              int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

              pool.setCorePoolSize(newThreadCount);
              pool.prestartAllCoreThreads();
          }
      }
      ```

      Why should the pool ever throw a RejectedExecutionException if I never submit more that the queueCapacity+maxThreads. I am never changing the max threads so by ThreadPoolExecutor's definition, it should either accommodate the task in a Thread or to the queue.

      Of course, if I never resize the pool, then the thread pool never rejects any submissions. This is also hard to debug since adding any sort of delays in the submissions makes the problem go away. Is there a race condition in ThreadPoolExecutor?

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      Run the provided Java program to repro the issue. With 100 iterations it is extremely reproducible but with lower number of iterations, does not seem to be.

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      Program should exit normally always without throwing any RejectedExecutionException.
      ACTUAL -
      Program throws RejectedExecutionException

      ---------- BEGIN SOURCE ----------
      import static java.lang.Math.max;
      import static java.lang.Math.min;

      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.Executors;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;

      public class ThreadPoolResizeTest {

          public static void main(String[] args) throws Exception {
              // increase the number of iterations if unable to reproduce
              // for me 100 iterations have been enough
              int numberOfExecutions = 100;

              for (int i = 1; i <= numberOfExecutions; i++) {
                  executeOnce();
              }
          }

          private static void executeOnce() throws Exception {
              int minThreads = 1;
              int maxThreads = 5;
              int queueCapacity = 10;

              ThreadPoolExecutor pool = new ThreadPoolExecutor(
                      minThreads, maxThreads,
                      0, TimeUnit.SECONDS,
                      new LinkedBlockingQueue<Runnable>(queueCapacity),
                      new ThreadPoolExecutor.AbortPolicy()
              );

              ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
              scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                      0, 10, TimeUnit.MILLISECONDS);
              CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

              try {
                  int totalTasksToSubmit = queueCapacity + maxThreads;

                  for (int i = 1; i <= totalTasksToSubmit; i++) {
                      // following line sometimes throws a RejectedExecutionException
                      pool.submit(() -> {
                          // block the thread and prevent it from completing the task
                          taskBlocker.join();
                      });
                      // Thread.sleep(10); //enabling even a small sleep makes the problem go away
                  }
              } finally {
                  taskBlocker.complete(null);
                  scheduler.shutdown();
                  pool.shutdown();
              }
          }

          /**
           * Resize the thread pool if the number of pending tasks are non-zero.
           */
          private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
              int pendingExecutions = pool.getQueue().size();
              int approximateRunningExecutions = pool.getActiveCount();

              /*
               * New core thread count should be the sum of pending and currently executing tasks
               * with an upper bound of maxThreads and a lower bound of minThreads.
               */
              int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

              pool.setCorePoolSize(newThreadCount);
              pool.prestartAllCoreThreads();
          }
      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      Adding a Thread.sleep() for a small amount, like > 10ms seems to get around the problem although it could just be a coincidence

      FREQUENCY : always


        Attachments

          Activity

            People

            • Assignee:
              martin Martin Buchholz
              Reporter:
              webbuggrp Webbug Group
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: