Core Java. Lecture #11

Java Concurrency (ending). Annotations

Ivan Ponomarev, Synthesized.io/MIPT

Part 3. Thread-safe data structures

hydra2

Non-blocking algorithms

  • Locking (via synchronized or ReentrantLock) solves the issue of coordinating the actions of different threads with a variable.

  • But if many threads compete for the lock (high lock contention), the cost of coordinating the threads becomes significant.

  • The alternative is non-blocking algorithms that use special atomic machine instructions (compare-and-swap).

  • In the Java library, classes of atomic variables and thread-safe collections are available, including those implementing non-blocking algorithms.

Atomics

  • package java.util.concurrent.atomic

    • AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference.

    • AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray.

  • Can be used as "enhanced volatiles", as the result of calling set(…​) is visible to other threads via get(…​)

  • Support atomic operations.

Atomic operations in atomic variable classes

getAndSet(newValue)    compareAndSet(expect, update)

incrementAndGet()      decrementAndGet()

getAndIncrement()      getAndDecrement()

getAndAdd(delta)       addAndGet(delta)

getAndUpdate(updateFunction)
updateAndGet(updateFunction)

getAndAccumulate(x, accumulatorBiFunction)
accumulateAndGet(x, accumulatorBiFunction)

Thread-safe collections

  • In earlier versions of Java, it was possible to "make" a collection thread-safe by wrapping it in Collections.synchronizedXXX(…​). This serialized any access to the internal state of the collection. Because of backward compatibility support, doing this is still possible, but not recommended.

  • The price of such a solution is high lock contention.

  • Version 5 introduces classes specifically designed for thread safety, with fewer locks.

  • Their use is preferred.

CopyOnWriteArrayList and CopyOnWriteArraySet

class CopyOnWriteArrayList<E> implements List<E>
class CopyOnWriteArraySet<E> extends AbstractSet<E>
  • Array-based data structures.

  • Recreate themselves from scratch on each modification.

  • It’s expensive, but all reading iterators are stable.

  • Good when there are multiple read operations per one write operation.

ConcurrentLinkedQueue/Deque

ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>
ConcurrentLinkedDeque<E> extends AbstractCollection<E>
        implements Deque<E>
  • Based on non-blocking algorithm (CAS operations)

  • poll() returns null if the queue is empty

  • Under the hood there are (double) linked lists

ConcurrentLinkedQueue/Deque

diag 666775d68d2655f19712c887153ecf32
diag 8aa1efc451f81c3628ec1d95951f0d52

Blocking queues: a means of implementing the producer-consumer pattern

blockingqueue
  • May be limited in size (capacity constrained).

  • The put() and take() methods wait until you can put or take an item.

  • PriorityBlockingQueue is not limited by capacity.

  • SynchronousQueue has no capacity at all, passes elements to processing threads directly.

ConcurrentHashMap

class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>
  • Replaces HashMap for multithreaded data access.

  • Not blocked when reading and rarely blocked when writing.

  • Does not allow the use of null as a key or value.

  • Useful methods are atomic:

    • putIfAbsent(key, value)

    • remove(key, value)

    • replace(key, oldValue, newValue)

ConcurrentHashMap: Java 7-

diag 75b946bfeba2674ef90d0eda76495543

ConcurrentHashMap: Java 8+

diag de8c5a19b8c187f0c487097730a7ea44

ConcurrentSkipListMap

class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>
  • Replaces TreeMap for multithreaded access.

  • Does not allow null as a key or a value.

  • Has atomic methods.

ConcurrentSkipListMap: идея

diag 2cc9358d9bc8084479bef4ca4b30492e

ConcurrentSkipListMap

diag b606e8be52eb288b608f929e19e6cf0d

ConcurrentSkipListMap

diag 680c74d5072e8e2c578e85e01057c382

ConcurrentSkipListMap

diag ae7dc25ab9f27a11bcda0d982bd8da44

ConcurrentSkipListMap

diag eccbf6ed36aa23fa571317cab7437339

ConcurrentSkipListMap

diag 3bedfe9b3655832d804a930fc0b7e7cc

ConcurrentSkipListMap

diag 48b14a62d6be88fd521f42d2150da41f

ConcurrentSkipListMap

diag 52de3fc564297968055f1815120e6258

ConcurrentSkipListMap

diag e527c2f59f60c54f5551afa4e3de4088

Part 4. Executor Framework

hydra2

Executor Framework

  • Trade is an expensive resource, so we want to:

    • limit the number of our threads so as not to get Out of Memory exception,

    • reuse existing threads by submitting new tasks after completing the old ones,

    • if some thread "crashed" - automatically create a new one.

  • There are Thread Pools in the standard library for all of this, you do not need to implement anything yourself.

  • No one uses the Thread API directly.

Executor Framework

//Abstraction of a computational task returning a result
public interface Callable<V> {
  V call() throws Exception;
}</V>

//Abstraction of the "manager of threads"
public interface ExecutorService {
  <T> Future<T> submit(Callable<T> task);
  /*... there are many other methods, will discuss later...*/
}

Future

//Abstraction of the "in progress" result that you can wait for
//finalization or just cancel
public interface Future<V> {
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isDone();
  boolean isCancelled();
}

Task, Executor, Future

taskexecfuture

How do I create an ExecutorService?

public class Executors {
  //fixed pool size
  public static ExecutorService newFixedThreadPool(int nThreads)
  public static ExecutorService newSingleThreadExecutor()
  //pool grows as needed, keeps inactive thread for 60 seconds
  public static ExecutorService newCachedThreadPool()
  //allows you to perform tasks with a delay or periodically
  public static
    ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  ...
}

Processing tasks in batches

//Run and wait for everything to finish
//List<Future<T>>, not List<T>, because exceptions are possible
<T>List <Future <T>> invokeAll(
    Collection <? extends Callable <T>> tasks)
        throws InterruptedException

//Run, return the first successful result
//cancel the rest
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

Cancellation of tasks and "termination of service"

//cancel a task if it has not already started
future.cancel(false)
//request task interruption (details ahead)
future.cancel(true)

//Prevent new tasks from being accepted
executorService.shutdown();
//Wait until the accepted tasks are completed
if (!service.awaitTermination(10, TimeUnit.SECONDS)){
    Abort tasks
    service.shutdownNow();
}

Interruption of tasks

  • In early versions of Java, there were (now deprecated) methods for forcibly stopping and suspending/resuming threads, but this turned out to be a bad idea:

    • there is no guarantee that the thread will not stop in the middle of the atomic operation.

    • suspension may leave the lock acquired for an indefinite amount of time

  • All this now is deprecated in favor of cooperative interruption mechanism.

Interrupting threads

  • ExecutorService.shutdownNow() calls the Thread.interrupt() method on threads.

  • the Thread.isInterrupted() method returns the interrupt status of the thread.

  • JMM Interruption Rule: An external thread call to the interrupt() method happens before the interrupted thread knows that it has been interrupted.

Cooperative termination mechanism

  • If the calculations are in a loop, the thread is obliged to periodically check the status of Thread.currentThread().isInterrupted() and, if the flag is set, write to the log the fact of interruption and exit the method.

  • InterruptedException may be thrown on waiting methods. What should we do with it?

What should we do with InterruptedException

  • If circumstances allow, it is best to declare it in throws and not to handle it at all.

  • If we cannot change the method’s signature (e. g. we are implementing run method of Runnable interface which doesn’t throw checked exceptions) then:

} catch (InterruptedException e) {
    //record the fact of interruption to the log...
    ...
    //restore the interrupted status
    Thread.currentThread().interrupt();
    //exit the interrupted method
    return;
}
  • You should never silently "swallow" an InterruptedException !

  • It is recommended to log the fact of interruption to facilitate debugging.

CompletableFuture

  • Introduced in Java 8, extending Future.

  • Allow you to explicitly set the result (hence Completable) and assemble a chain of asynchronous calculations.

  • Can be used as follows:

CompletableFuture<Integer> f = new CompletableFuture<>();
executor.execute(() -> {
  int n = workHard(arg);
  f.complete(n); });

executor.execute(() -> {
  int n = workSmart(arg);
  f.complete(n); });

executor.execute(() -> {
  Throwable t = ...;
  f.completeExceptionally(t); });

CompletableFuture composition with action

Method

Parameter

Description

thenApply

T→U

Apply a function to the result.

thenAccept

T→void

Like thenApply, but with void result.

thenCompose

T→CompletableFuture<U>

Invoke the function on the result and execute the returned future.

handle

(T, Throwable) → U

Process the result or error and yield a new result.

whenComplete

(T, Throwable) → void

Like handle, but with void result.

CompletableFuture composition with action

Method

Parameter

Description

completeOnTimeout

T, long, TimeUnit

Yield the given value as the result in case of timeout (Java 9+)

orTimeout

long, TimeUnit

Throw `TimeoutException ` in case of timeout (Java 9+)

thenRun

Runnable

Execute the Runnable with void result.

Composition of several CompletableFutures

Method

Parameter

Description

thenCombine

ComletableFuture<U>,
(T, U)→V

Execute both and combine the results with the given function.

thenAcceptBoth

ComletableFuture<U>,
(T, U)→void

Like thenCombine, but with void result.

runAfterBoth

CompletableFuture<?>,
Runnable

Execute the runnable after both complete.

Composition of several CompletableFutures

Method

Parameter

Description

applyToEither

CompletableFuture<T>,
T→V

When a result is available from one or the other, pass it to the given function.

acceptEither

CompletableFuture<T>,
T→void

Like applyToEither, but with void result.

runAfterEither

CompletableFuture<?>,
Runnable

Execute the runnable after one or the other completes.

static allOf

CompletableFuture<?>…​

Complete with void result after all given futeres complete.

static anyOf

CompletableFuture<?>…​

Complete after any of the given futures completes, with the same result cast to Object.

If that didn’t seem enough…​

  • Each of these methods has a variant with the postfix Async (for example, thenApplyAsync) that allows you to perform an additional action in another thread of the specified Executor.

We just scratched the surface

jcip
  • Brian Goetz et al., Java Concurrency in Practice.

  • Released in 2006, in the era of Java 5.0.

  • No new editions were published.

  • It was released in Russian language in 2020!

  • Still the most comprehensive guide to different aspects of multithreaded Java programming.

Code Review Checklist

Acknowledgements

These people gave their feedback and helped to improve the material of this lecture:

@2caco3 @asm0dey @dolzhenko @DrEdwardHyde

@dyer_the @krems5 @LordOfBoredom

@miha_x64 @vaddyacom @vdimir

(all errors and inaccuracies are mine)

Sometimes you feel the smell while reading the code…​

import org.json.JSONObject;
class NaivePersonSerializer {
  String toJSON(Person person) {
    JSONObject result = new JSONObject();
      result.put("firstName", person.getFirstName());
      result.put("lastName", person.getLastName());
      result.put("birthDate",
        person.getBirthDate()
        .format(DateTimeFormatter.ISO_LOCAL_DATE));
      result.put(...)
      result.put(...)
      //ещё 20 полей
      result.put(...)
      return result.toString();
  }
}

There is a feeling that something is wrong…​

public class NaiveController {
  private final Service service;
  public void executeCommand(String command) {
    switch (command) {
      case "foo":
        service.foo(); break;
      case "bar":
        service.bar(); break;
      case ...
      //Ещё 15 веток...
      case "help":
      default:
        service.help(); break;
    }
  }
}

Coupling & Cohesion

  • Coupling: the degree of interdependence between software modules; a measure of how closely connected two routines or modules are; the strength of the relationships between modules.

  • Cohesion refers to the degree to which the elements inside a module belong together.

  • Low coupling often correlates with high cohesion, and vice versa.

Coupling vs. Cohesion

coupling cohesion

Questions

  • Why coupling is high and cohesion is low in the above examples?

  • What danger does it impose on the project?

How would we like to solve this problem?

//Mark what we want to serialize, do not serialize by default.
//(Option: mark what we do not want to serialize, serialize by default)
public class Person {
    @Published
    private final String firstName;
    @Published
    private final String lastName;
    @Published
    private final LocalDate birthDate;
    //... as many other fields as you like
}

//As an option...
JsonSerializer<Person> ser = new JsonSerializer<>(Person.class);
JSONObject json = ser.serialize(p);

Controller

public class Controller {
  private final Service service;

  @Command("foo")
  void doSomething() { service.foo(); }

  @Command("bar")
  void bar() { service.bar(); }

  //15 more commands

  @Command() //default one
  void help() { service.help(); }
}

new CommandDispatcher(new Controller(srv)).executeCommand("foo");

Annotations

  • Annotation is a way of marking up Java code by adding meta-information to various elements of a Java program.

  • They work approximately like modifiers (public, static), but there is a mechanism for creating your own annotations.

  • Can be processed at three stages:

    • at the stage of code generation before compilation (annotation processing, Language Model API),

    • at the stage of instrumentation of bytecode (Instrumentation API),

    • at the stage of code execution (via Reflection API).

Annotation definition syntax

modifiers @interface AnnotationName {
  type elementName();
  ...
  type elementName default value;
}

Example

public @interface BugReport {
  String assignedTo() default "";
  int severity();
}

Annotation Interfaces

  • Just as enum classes are a special kind of class, annotation interfaces are a special kind of interface.

  • Inherited from java.lang.annotation.Annotation.

  • Cannot be extended by inheritance, cannot be parameterized through generics.

  • Methods cannot have parameters.

Valid return types of annotation methods (JLS 9.6.1)

  • Primitives (int, short, long, byte, char, double, float, boolean),

  • enums,

  • String,

  • Class (with a possible parameter restriction like Class<? extends MyClass>),

  • other annotations,

  • arrays of the above (but not arrays of arrays).

Annotation Interface Definition Example

public @interface BugReport {
  //nested types are valid!
  enum Status {UNCONFIRMED, CONFIRMED, FIXED, NOTABUG}
  boolean showStopper() default false;
  String assignedTo() default "";

  //metaclass, to be explained later
  Class<?> testCase() default Void.class;
  Status status() default Status.UNCONFIRMED;

  //Annotation type. We use a special syntax to construct an instance ,
  //to be explained later
  Reference ref() default @Reference(id = "");
  String[] reportedBy();
}

Use of annotations

@AnnotationName(elementName1=value1, elementName2=value2,...)

@AnnotationName(singleValue)

@AnnotationName //no values, no parens

Use of annotations

@BugReport(assignedTo="Harry", severity=10)

//order doesn't matter
@BugReport(severity=10, assignedTo="Harry")

//if default values are set, you can skip the parameters
@BugReport(severity=10)

//if there are no parameters or all of them have default,
//then brackets can be omitted
@BugReport

//if there is no default value for the parameter, it must be defined
//(otherwise a compilation error).

Single-value annotations

public @interface ActionListenerFor {
  String value();
  int priority() default 0;
}

//use
@ActionListenerFor("yellowButton")

//which is equivalent to
@ActionListenerFor(value = "yellowButton")

//at the same time:
@ActionListenerFor(value = "yellowButton", priority = 1)

Arrays and other annotations

//Arrays are specified in curly braces
@BugReport(..., reportedBy = {"Harry", "Carl"})

//Same as {"Harry"}
@BugReport(..., reportedBy = "Harry")

//Constructing nested annotations
@BugReport(..., ref = @Reference(id = "12345"))

//... As you can see, using the allowed types,
//you can specify an arbitrarily complex data structure
The annotation value cannot be set to null. null will not be allowed even for default values.