Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8209007

Deadlock in nested ForkJoin Stream.parallel

    Details

      Description

      ADDITIONAL SYSTEM INFORMATION :
      MacBook Pro (15-inch, 2017)
      3.1 GHz Intel Core i7
      16 GB 2133 MHz LPDDR3

      $uname -a
      Darwin XXXXX 16.7.0 Darwin Kernel Version 16.7.0: Thu Jun 21 20:07:39 PDT 2018; root:xnu-3789.73.14~1/RELEASE_X86_64 x86_64

      $ java --version
      java 10.0.2 2018-07-17
      Java(TM) SE Runtime Environment 18.3 (build 10.0.2+13)
      Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.2+13, mixed mode)

      A DESCRIPTION OF THE PROBLEM :
      Reentrant stack trace leads to deadlock in nested Stream.parallel operations using a concurrent hash map.

      I am building a data storage library in which I would like to use Stream.parallel. The application which uses this library also uses Stream.parallel and concurrent data structures to manipulate data. The combination leads to a deadlock with some very strange stack traces where the compute method called in the outer loop appears to be reentrant.

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      Run the attached source code in either of two parallel execution modes which fail. (line 24 & 25)
      Three additional modes of execution succeed. (lines 28, 29 & 30)

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      A thread's stack trace should never contain line 87 more than once. The code should not deadlock
      ACTUAL -
      Found one Java-level deadlock:
      =============================
      "outer pool1":
        waiting to lock monitor 0x00007ff6ca8aba00 (object 0x00000006cfb8a370, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
        which is held by "outer pool2"
      "outer pool2":
        waiting to lock monitor 0x00007ff6ca085800 (object 0x00000006cfb30b58, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
        which is held by "outer pool1"

      Java stack information for the threads listed above:
      ===================================================
      "outer pool1":
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
      - waiting to lock <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
      - locked <0x00000006cf9a8c60> (a java.util.concurrent.ConcurrentHashMap$Node)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
      - locked <0x00000006cf81c3b0> (a java.util.concurrent.ConcurrentHashMap$Node)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
      - locked <0x00000006cf81c1c0> (a java.util.concurrent.ConcurrentHashMap$Node)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
      - locked <0x00000006cfb31d80> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
      - locked <0x00000006cfa26d78> (a java.util.concurrent.ConcurrentHashMap$Node)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
      - locked <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
      - locked <0x00000006cfb195f0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinTask.doInvoke(java.base@10.0.2/ForkJoinTask.java:408)
      at java.util.concurrent.ForkJoinTask.invoke(java.base@10.0.2/ForkJoinTask.java:736)
      at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(java.base@10.0.2/ForEachOps.java:159)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.evaluateParallel(java.base@10.0.2/ForEachOps.java:209)
      at java.util.stream.AbstractPipeline.evaluate(java.base@10.0.2/AbstractPipeline.java:233)
      at java.util.stream.LongPipeline.forEach(java.base@10.0.2/LongPipeline.java:421)
      at java.util.stream.LongPipeline$Head.forEach(java.base@10.0.2/LongPipeline.java:579)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$5(NestedParallel.java:84)
      at com.upserve.NestedParallel$$Lambda$5/2101440631.run(Unknown Source)
      at java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(java.base@10.0.2/ForkJoinTask.java:1407)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
      at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)
      "outer pool2":
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
      - waiting to lock <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
      at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
      at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
      at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
      at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
      at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
      at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
      - locked <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
      at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
      at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
      at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
      at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
      at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
      at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
      at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
      at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
      at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
      at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
      at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)

      Found 1 deadlock.


      ---------- BEGIN SOURCE ----------
      package com.upserve;

      import java.util.*;
      import java.util.concurrent.*;
      import java.util.concurrent.atomic.*;
      import java.util.function.*;
      import java.util.stream.*;

      public class NestedParallel implements Runnable {

          static final String STREAM_PARALLEL = "STREAM PARALLEL";
          static final String LOOP_PARALLEL = "LOOP PARALLEL";
          static final String SERIAL = "SERIAL";

          final ConcurrentMap<String, Long> concurrentMap;
          final ExecutorService outerPool;
          final ExecutorService innerPool;
          final Random random;
          final String innerLoop;
          final String outerLoop;

          public static void main(String[] args){
      // // DEADLOCKS
              NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, STREAM_PARALLEL);
      // NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, LOOP_PARALLEL);

      // // SUCCESSS
      // NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, SERIAL);
      // NestedParallel nestedParallel = new NestedParallel(LOOP_PARALLEL, STREAM_PARALLEL);
      // NestedParallel nestedParallel = new NestedParallel(SERIAL, LOOP_PARALLEL); // SLOW!

              System.out.println("starting");
              nestedParallel.run();
              System.out.println("finished");
          }

          public NestedParallel(String outerLoop, String innerLoop) {

              concurrentMap = new ConcurrentHashMap<>();

              this.innerLoop = innerLoop;
              this.outerLoop = outerLoop;

              Function<String, ForkJoinPool.ForkJoinWorkerThreadFactory> threadFactoryFunction = name -> pool ->
              {
                  final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
                  worker.setName(name + worker.getPoolIndex());
                  return worker;
              };

              Function<String, ForkJoinPool> forkJoinPoolFunction = name -> new ForkJoinPool(
                      Runtime.getRuntime().availableProcessors(),
                      threadFactoryFunction.apply(name),
                      (t, e) -> {
                          System.out.println("In pool " + name + ", thread " + t + " threw exception: " + e);
                      },
                      true
              );

              outerPool = forkJoinPoolFunction.apply("outer pool");
              innerPool = forkJoinPoolFunction.apply("inner pool");

              random = new Random();
          }

          @Override
          public void run() {
              if (STREAM_PARALLEL.equals(outerLoop)) {
                  streamParallelOuterTask();
              } else if (LOOP_PARALLEL.equals(outerLoop)) {
                  loopParallelOuterTask();
              } else if (SERIAL.equals(outerLoop)) {
                  serialOuterTask();
              } else {
                  throw new RuntimeException("Invalid command: " + outerLoop);
              }
          }

          private void streamParallelOuterTask(){
              Future future = outerPool.submit(() -> {
                  random
                          .longs(1_000_000, 0, 1_000)
                          .parallel()
                          .forEach(val -> {
                                      if (val % 1017 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
                                      String key = String.valueOf(val);
                                      concurrentMap.compute(key, biFunction(val));
                                  }
                          );
              });

              try {
                  future.get();
              } catch (InterruptedException e) {
                  throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
              } catch (ExecutionException e) {
                  throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
              }
          }

          private void loopParallelOuterTask(){
              List<Long> vals = random.longs(1_000_000, 0, 1_000).boxed().collect(Collectors.toList());

              List<Future> futures = new ArrayList<>();
              for(Long val: vals) {
                  futures.add(outerPool.submit(() -> {
                      if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
                      String key = String.valueOf(val);
                      concurrentMap.compute(key, biFunction(val));
                  }));
              }

              futures.forEach(f -> {
                  try {
                      f.get();
                  } catch (InterruptedException e) {
                      throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
                  }
              });
          }

          private void serialOuterTask() {
              random
                      .longs(1_000_000, 0, 1_000)
                      .forEach(val -> {
                                  if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
                                  String key = String.valueOf(val);
                                  concurrentMap.compute(key, biFunction(val));
                              }
                      );
          }

          private BiFunction<String, Long, Long> biFunction(long val) {
              return (k, v) -> {
                  if (v == null) {
                      v = val;
                  } else {
                      v += val;
                  }

                  final Long vMod = v % 10_000;

                  // Do some naive task
                  if (STREAM_PARALLEL.equals(innerLoop)) {
                      return streamParallelInnerTask(vMod);

                  } else if (LOOP_PARALLEL.equals(innerLoop)) {
                      return loopParallelInnerTask(vMod);

                  } else if (SERIAL.equals(innerLoop)) {
                      return serialInnerTask(vMod);
                  } else {
                      throw new RuntimeException("Invalid command: " + innerLoop);
                  }
              };
          }

          private Long streamParallelInnerTask(long vMod) {
              LongAdder longAdder = new LongAdder();
              // If outerPool is used again, no deadlock but process will livelock after a while
              Future future = innerPool.submit(() -> {
                  LongStream.range(1, vMod)
                          .parallel()
                          .forEach(value -> {
                              // System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
                              longAdder.add(value);
                          });
              });
              try {
                  future.get();
              } catch (InterruptedException e) {
                  throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
              } catch (ExecutionException e) {
                  throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
              }
              return longAdder.longValue();
          }

          private Long loopParallelInnerTask(long vMod) {
              LongAdder longAdder = new LongAdder();
              List<Future> futures = new ArrayList<>();
              for (long i=0; i<vMod; i++) {
                  final Long innerVal = i;
                  // Can not use outerPool again here as it causes livelock with all thread waiting
                  futures.add(innerPool.submit(() -> {
      // System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
                      longAdder.add(innerVal);
                  }));
              }

              futures.forEach(f -> {
                  try {
                      f.get();
                  } catch (InterruptedException e) {
                      throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
                  }
              });
              return longAdder.longValue();
          }

          private Long serialInnerTask(long vMod) {
              LongAdder longAdder = new LongAdder();
              LongStream.range(1, vMod)
                      .forEach(value -> {
                          // System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
                          longAdder.add(value);
                      });
              return longAdder.longValue();
          }
      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      Beware of Stream.parallel and where it may be used as an internal concern in a library.
      Use other methods to parallelize high levels of an application that use Stream.parallel internally.

      FREQUENCY : always


        Attachments

          Activity

            People

            • Assignee:
              dl Doug Lea
              Reporter:
              webbuggrp Webbug Group
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: