Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8176155

SubmissionPublisher closeExceptionally() may override close()

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: P3
    • Resolution: Fixed
    • Affects Version/s: 9
    • Fix Version/s: 9
    • Component/s: core-libs
    • Labels:
      None

      Backports

        Description

        As reported by Dávid Karnok on concurrency-interest
        http://markmail.org/thread/p2hlhe6i4fdntfpe


        The following test fails (after a few rounds for me) because a concurrent call to close() and closeExceptionally() race for the terminal condition and different subscribers may see different terminal state:


        Flow.Subscriber<Integer> createSub(Object[] result, int index, CountDownLatch cdl) {
            return new Flow.Subscriber<Integer>() {
                @Override
                public void onSubscribe(Flow.Subscription subscription) {

                }

                @Override
                public void onNext(Integer item) {

                }

                @Override
                public void onError(Throwable throwable) {
                    result[index] = throwable;
                    cdl.countDown();
                }

                @Override
                public void onComplete() {
                    result[index] = "complete";
                    cdl.countDown();
                }
            };
        }

        @Test
        public void closeErrorRace() throws Exception {
            ExecutorService exec = Executors.newSingleThreadExecutor();
            try {
                for (int i = 0; i < 1000; i++) {
                    System.out.println("Round " + i);
                    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

                    CountDownLatch cdl = new CountDownLatch(2);

                    Object[] result = { null, null };

                    sp.subscribe(createSub(result, 0, cdl));

                    Flow.Subscriber<Integer> sb2 = createSub(result, 1, cdl);

                    Throwable ex = new RuntimeException();

                    AtomicInteger wip = new AtomicInteger(2);

                    Runnable r1 = () -> {
                        wip.decrementAndGet();
                        while (wip.get() != 0) ;

                        sp.closeExceptionally(ex);
                        sp.subscribe(sb2);
                    };

                    exec.submit(r1);

                    wip.decrementAndGet();
                    while (wip.get() != 0) ;

                    sp.close();

                    assertTrue(cdl.await(5, TimeUnit.SECONDS));

                    assertEquals(result[0], result[1]);
                }
            } finally {
                exec.shutdownNow();
            }
        }
        I'm assuming consistency is desirable here and the fix could be an if statement inside the synchronized block of closeExceptionally():

        if (!closed) {
            BufferedSubscription b;
            synchronized (this) {
                b = clients;
                if (b == null) { // or if (closed) {
                    return;
                }
                clients = null;
                closed = true;
                closedException = error;
            }
            // the rest
        }

          Activity

          Show
          martin Martin Buchholz added a comment - http://cr.openjdk.java.net/~martin/webrevs/openjdk9/jsr166-jdk9-integration/SubmissionPublisher/
          Hide
          hgupdate HG Updates added a comment -
          URL: http://hg.openjdk.java.net/jdk9/dev/jdk/rev/4b8e662483a1
          User: martin
          Date: 2017-03-07 18:20:21 +0000
          Show
          hgupdate HG Updates added a comment - URL: http://hg.openjdk.java.net/jdk9/dev/jdk/rev/4b8e662483a1 User: martin Date: 2017-03-07 18:20:21 +0000
          Hide
          hgupdate HG Updates added a comment -
          URL: http://hg.openjdk.java.net/jdk9/jdk9/jdk/rev/4b8e662483a1
          User: lana
          Date: 2017-03-15 14:49:32 +0000
          Show
          hgupdate HG Updates added a comment - URL: http://hg.openjdk.java.net/jdk9/jdk9/jdk/rev/4b8e662483a1 User: lana Date: 2017-03-15 14:49:32 +0000

            People

            • Assignee:
              martin Martin Buchholz
              Reporter:
              martin Martin Buchholz
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: