Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8277129

Structured Concurrency (Preview)

    XMLWordPrintable

    Details

    • Type: JEP
    • Status: Draft
    • Priority: P4
    • Resolution: Unresolved
    • Fix Version/s: None
    • Component/s: core-libs
    • Labels:
      None
    • Author:
      Ron Pressler
    • JEP Type:
      Feature
    • Exposure:
      Open
    • Scope:
      SE

      Description

      Summary

      Simplify concurrent programs by adding elementary library support for structured concurrency, a simple principle that states that when the flow of execution splits into multiple concurrent flows, they rejoin in the same code block. That discipline makes dealing with failures, cancellation, and deadlines in concurrent code more manageable, as well as provides the runtime with useful information about the relationships among concurrent tasks that can be reflected by observability tools. This is a Preview Feature.

      Goals

      • Add a high-level user-facing API that helps write clear and correct concurrent code based on the principles of structured concurrency.
      • Add a primitive, low-level construct that allows organizing a cohort of threads. This construct will support high-level structured concurrency constructs -- provided either by the JDK or third-party libraries -- on the one hand, and plug into the platform's management for observability, on the other.
      • Add a management API, to be used by tools, for observing the relationships among threads as defined by the primitive thread-cohort construct.

      Non-Goals

      • It is not a goal to replace other concurrency constructs in the JDK, such as ExecutorService and Future.
      • It is not a goal to provide a definitive structured concurrency API for Java, because structured concurrency is a relatively new concept that might well evolve.
      • It is not a goal to propose a new task/thread cancellation mechanism to supersede the existing interruption mechanism. This might be addressed by a future JEP.
      • It is not a goal to propose a mechanism for sharing streams of data among threads (channels). This might be addressed by a future JEP.

      Motivation

      In the early decades of computer programming, code in languages such as Assembly and Fortran suffered from common pitfalls. Pervasive use of jmp / GO TO instructions made the flow of execution unclear and hard to reason about; operation failures were often ignored, leading to incorrect assumptions and inscrutable bugs that manifested far away from their cause. With experience, best practices were devised to avoid those problems with a more disciplined and structured use of control flow and error handling. Later, these conventions were codified and built into programming languages as basic constructs: the program was broken into subroutines with well-defined entry and exit points; structured loops represented the pattern of a sequence of operations that could be canceled with a break instruction; exceptions ensured that all failures will be noticed and handled. These constructs expressing structure make the program easier to understand not only as code but also during execution. For example, subroutines allow us to analyze the operation of distinct code units in isolation, but they also allow us to obtain meaningful information about what a running program is doing in the form of a stack-trace. These capabilities stem from a basic principle at the core of structured programming that guides the program's control flow: a block of code has a single point of entry, and its points of exit are clearly defined.

      Concurrent code still suffers from common pitfalls, some of which are analogous to those in early sequential programming. For example, a method might spawn a thread to run some task, and then return, leaving the task running in the background, its lifetime unclear from the code; a concurrent task might fail, leaving its failure unnoticed and ignored. These pitfalls arise from a fire-and-forget style, where a concurrent task is spawned and then left orphaned. Those who've gained experience writing concurrent code have developed some best practices, such as never spawning a concurrent task without hanging on to a Future, that is then clearly joined and its possible failures handled. But even when such care and discipline are exercised, only the programmer knows the logical relationship among the concurrent tasks – for example, that a specific method waiting on a Future is not just interested in the task's outcome, but is also its logical "owner." Threads serve as reasonable units for concurrent code, but we don't have any constructs for expressing natural relationships among different threads analogous to the caller/callee relationship we have in sequential code, relationships that will be clear not only to the programmer, but also to the runtime.

      Example

      To illustrate all the work sequential constructs do for us consider the following example. Method foo is called in a server program as part of handling a request, and to do its work it needs to call on two microservices. We choose to split foo's work into two helper methods, bar and baz, each calling one of the services:

      String foo() throws IOException, InterruptedException {
          int bar = bar(); // throws IOException, InterruptedException
          String baz = baz(); // ditto
          return baz + bar;
      }

      Let's assume our program is now running bar() on one of its threads. Thanks to the built-in sequential constructs, we automatically get the following:

      1. If bar fails with an exception, it will automatically be propagated from foo, which will fail as well, and skip invoking baz.
      2. If some other thread wants to cancel the transaction foo is carrying out by interrupting its thread, bar will be interrupted, which will abort foo (skipping baz) as well.
      3. If we obtain a thread dump to observe the application, we will find bar on a thread's stack trace, where we'll see it's been called by foo.

      We all now take it for granted, but structure does a lot of work for us here. It organizes code into logical units and makes them, and the relationships among them, clear both as text and at runtime. It also automatically aborts the execution of a code unit upon an error in one of the sub-units.

      We then notice that bar and baz are independent, and, wishing to reduce foo's total latency, we choose to try and query both microservices concurrently by writing the following code:

      String foo() throws IOException, InterruptedException {
          Future<Integer> bar = myExecutorService.submit(() -> bar());
          Future<String> baz = myExecutorService.submit(() -> baz());
          try {
              return baz.get() + bar.get();
          } catch (ExecutionException e) {
              if (e.getCause() instanceof IOException ioe) throw ioe;
              throw new RuntimeException(e);
          }
      }

      This code is mostly correct and even idiomatic. But what happens to the three properties we got with the sequential code?

      1. Even though we were disciplined in our handling of errors, if bar fails quickly, we will still wait for baz to complete, even though foo is now certain to fail, and if baz fails quickly, foo will throw, leaving bar running (an implementation of ExecutorService.invokeAll might take care of such detail for us, but it is cumbersome, especially when the results are not all of the same type, as is the case here).
      2. If the thread running foo is interrupted, bar and baz will still attempt to complete even though foo will abort (having Future.get automatically propagate interruptions will not work here, as foo might get interrupted during baz.get(), but the thread running bar will not be interrupted; it also does not make sense as awaiting a future does not imply exclusive ownership of the task, even though, in this case we do have such ownership).
      3. Even though bar and baz still perform their operation on behalf of foo, a thread-dump will show foo, bar, and baz on the stacks of different threads, without expressing their logical hierarchy.

      This happens because even though the programmer knows that bar and baz work as a unit to perform tasks as part of foo, those relationships -- bar and baz operate as a unit, which, in turn, is "owned" by foo -- are lost, as the tasks submitted to the ExecutorService are treated as independent of each other and of foo. The JDK's concurrency building blocks are too primitive to express these high-level relationships, and as we gain concurrency, we lose the benefits structure gives us in sequential code.

      Just as best practices in sequential programming were codified and reified into built-in programming constructs, it is time for some best practices in concurrent programming to be codified. Those will not only make code easier to write correctly, but will also make the logical relationship among concurrent operations known to the runtime, which will then be able to present them when we observe the running program.

      Structured Concurrency

      Because virtual threads are cheap enough to be spawned for each task, they can also serve to represent relationships, something not possible for more primitive constructs such as Runnable or Callable, and so can serve as a foundation for powerful, yet simple, constructs that express intent and role in concurrent code.

      Structured Concurrency, a term coined by Martin Sústrik and later popularized in a blog post by Nathaniel J. Smith , offers an attractive blueprint for such a construct. Its core principle can be described as follows:

      When the flow of execution splits into multiple concurrent flows, they rejoin in the same code block.

      This ensures that lifetime of a concurrent operation is confined by a syntax block, just like that of a sequential operation in structured programming. It also yields a hierarchy of operations, where "sibling" operations work as a unit, and can be canceled as a unit, on behalf of a "parent" that awaits their completion and monitors them for failures. Like in the case of structured programming, the power of structured concurrency comes from well-defined entry and exit points for the flow of execution through a block of code.

      It's common for concurrent operations to have some logical grouping and a hierarchy. For example, suppose our task was to render a web page, and we decide to obtain all the referenced images from their respective URLs concurrently. If we fail to load one of the images, we might want to declare the entire operation failed and cancel all other attempts, or we might want to return a partial result; furthermore, we might want to cancel the entire operation if it does not complete by a certain deadline, or we might want to return only those images obtained by that deadline and cancel other attempts. Whatever policy we choose, all image loading attempts are treated as a group. Furthermore, for each of the URLs we might want to concurrently try connecting to multiple IP addresses the DNS resolves the URL to, retaining the first successfully connected socket, and canceling all other attempts. These attempts, too, form a group, that is further down in the hierarchy, as it splits from each of the image-URL tasks. Structured concurrency expresses those groups and hierarchies of concurrent tasks, just as method calls and loops do so for sequential code. Moreover, it makes the various cancellation and error-handling policies – which are generally more varied than for sequential code – easier to express in a clear way. It aligns the shape of the code with the shape of the hierarchy and policy.

      Structured concurrency is a counterpart to parallel streams and (their underlying mechanism) ForkJoinPool. Those are concerned with data-parallelism and computation, and ForkJoinPool also employs "structured parallelism" where forks are followed by joins. But as concurrency focuses more on interaction – through I/O and/or message passing – than pure data processing, structured concurrency places a special emphasis on handling cancellation and partial failures, and its goal isn't just to assist in writing a correct algorithm, but also to express the application's logical unit in a manner that is reflected both in the code's structure as well as in runtime observation with various service tools.

      Description

      StructuredExecutor

      There are many ways to implement structured concurrency. We've opted to offer an API that introduces Java programmers to the concepts of structured concurrency on the one hand, while looking familiar by resembling existing constructs, like ExecutorService on the other.

      Using StructuredExecutor, the motivating example above would be rewritten as follows:

      String foo() throws IOException, InterruptedException {
          try (var s = StructuredExecutor.open()) {
              var handler = new StructuredExecutor.ShutdownOnFailure(); 
              Future<Integer> bar = s.fork(() -> bar(), handler); 
              Future<String> baz = s.fork(() -> baz(), handler); 
      
              s.join(); 
              handler.throwIfFailed();
      
              return baz.resultNow() + bar.resultNow(); 
          } catch (ExecutionException e) {
              if (e.getCause() instanceof IOException ioe) throw ioe;
              throw new RuntimeException(e);
         }
      }

      This will regain the three structure properties: 1. If either bar or baz fail, the other will be cancelled if it hasn't terminated yet, as stated by a clear policy object (handler). 2. If the thread running foo is interrupted before or during the call to join, join will cancel both child tasks. 3. A thread dump of the kind described below will show the hierarchy among the tasks, presenting the threads running bar and baz as children of the thread running foo.

      The StructuredExecutor class is an AutoCloseable intended to be used in a session confined to a try-with-resources block with a regular lifecycle:

      1. The executor session is first opened, and the thread that opens the executor becomes its owner.
      2. Tasks are then forked, returning a Future. To run each forked task, the executor will spawn a new thread created by a ThreadFactory. Each task can fork more tasks, or even create its own, nested,StructuredExecutors.
      3. The owner then joins the forks by calling StructuredExecutor.join. This is mandatory. If the executor has not been joined, it will throw an exception when closing.
      4. The results of the forks are then processed, or their exceptions possibly propagated.
      5. The owner thread closes the executor session (implicitly, by the try-with-resources mechanism).

      Under some circumstances, we may wish to terminate the entire computation early. This is ordinarily a complex and error-prone process. StructuredExecutor provides a mechanism to shut down the entire computation in an orderly fashion. Any thread in the session, or the owner, may call StructuredExecutor::shutdown, which will interrupt all forks and cause join to immediately wake up and return. This is done when the executor's operation is known to have completed its goal, either successfully or unsuccessfully, even before all forks terminate, and is the concurrent analog to the break statement in a sequential loop: it signifies that the entire operation is done -- either successfully or unsuccessfully -- possibly before all iterations (or, in the concurrent case, forks) have completed.

      If the owning thread is interrupted while or before joining, or if the deadline expires before the forks terminate or shutdown is called, join will throw an appropriate exception, as there is no point in processing incomplete results.

      Closing a structured executor will interrupt and then await any straggler forked threads, such as those that have already been interrupted due to a call to shutdown but not yet terminated, or if join or joinUntil throws due to interruption or expiration. The forked tasks should, like all concurrent tasks, be responsive to interruption.

      When forking, we can optionally provide a completion handler that will be invoked when the fork terminates. The completion handler encapsulates a policy for early completion and, possibly, result aggregation. Two such handlers are included out of the box: ShutdownOnFailure and ShutdownOnSuccess.

      Once join returns, all futures are known to have completed; they've either succeeded, failed, or have been cancelled. Their result or exception can be then obtained, without blocking, using either the new resultNow or exceptionNow methods without any additional blocking (those methods throw an IllegalStateException if called before the Future completes).

      The threads spawned by StructuredExecutor are grouped in a way that informs the runtime of the relationships among them, and they can be presented in thread dumps or by other tools (see below). This also results in all ScopeLocal bindings that are in effect when the executor session is opened to be captured and bound in the forks. Here is how we can use StructuredExecutor with a shutdown-on-failure policy to concurrently run a collection of tasks and fail if any of them fails:

      <T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
          try (var s = StructuredExecutor.open()) {
              var handler = new StructuredExecutor.ShutdownOnFailure(); 
              List<Future<T>> futures = tasks.stream().map(t -> s.fork(t, handler)).toList(); 
      
              s.join(); 
              handler.throwIfFailed(e -> e); // propagate exception as-is
      
              return futures.stream().map(Future::resultNow).toList();
          } 
      } 

      Sometimes we'd like to complete the concurrent session early not if any fork fails but, rather, if one succeeds – e.g. if we want to get any result from a collection of redundant services – employing a shutdown-on-success policy. Here is an example of that, combined with a deadline that will cancel all live tasks when elapsed.

      <T> race(List<Callable<T>> tasks, Duration timeout) throws Throwable {
          try (var s = StructuredExecutor.open()) {
              var handler = new StructuredExecutor.ShutdownOnSuccess<T>(); 
              for (var t : tasks)
                  s.fork(t, handler);
      
              s.joinUntil(Instant.now().plus(timeout))); 
      
              return handler.result(ex -> ex); // will throw if none of the forks completed successfully 
          } 
      }

      StructuredExecutor is more than a utility that allows waiting for multiple futures as a group, and possibly canceling them as a group. Every forked thread is owned by the executor that forked it and the executor, in turn, is owned by the thread that opened it. This creates a tree where a parent starts and then waits for its children. This structure is both reflected in the code's organization, and is tracked by the runtime to allow tools to examine it. Internally, this is done by the mechanism below.

      Low-level API for organizing groups of threads

      StrcuturedExecutor is built on top of a low-level API, which can be used to implement other structured concurrency constructs in libraries, or perhaps in the future, other structured APIs in the Java Platform.

      This section is work in progress.

      Thread dumps

      The new thread dump introduced by JEP: Virtual Threads (Preview) is extended to support "structure". If the jcmd command (or the HotSpotDiagnosticMXBean API) is used to create a thread dump in JSON format, for example

      jcmd <pid> JavaThread.dump -format=json <file>

      then the thread dump will include an object for each StructuredExecutor. The object contains an array of its threads forked and their stack traces. The owner of a StructuredExecutor will typically be blocked in the join method waiting for sub-tasks to complete. The thread dump will make it easier to see what the threads executing the sub-tasks are doing. The object for a StructuredExecutor in the thread dump also has a reference to its parent so that the structure of the program can be reconstituted from the thread dump.

      Management API

      A new tooling/management API for observing the relationships among threads is added.

      This section is work in progress.

      Dependences

      JEP xxx: Virtual Threads (Preview)

        Attachments

          Activity

            People

            Assignee:
            rpressler Ron Pressler
            Reporter:
            alanb Alan Bateman
            Owner:
            Ron Pressler Ron Pressler
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

              Dates

              Created:
              Updated: