Core Java. Lecture #11

Java Concurrency (ending). Annotations

Ivan Ponomarev, Synthesized.io/MIPT

Part 3. Thread-safe data structures

hydra2

Non-blocking algorithms

  • Locking (via synchronized or ReentrantLock) solves the issue of coordinating the actions of different threads with a variable.

  • But if many threads compete for the lock (high lock contention), the cost of coordinating the threads becomes significant.

  • The alternative is non-blocking algorithms that use special atomic machine instructions (compare-and-swap).

  • In the Java library, classes of atomic variables and thread-safe collections are available, including those implementing non-blocking algorithms.

Atomics

  • package java.util.concurrent.atomic

    • AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference.

    • AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray.

  • Can be used as "enhanced volatiles", as the result of calling set(…​) is visible to other threads via get(…​)

  • Support atomic operations.

Atomic operations in atomic variable classes

getAndSet(newValue)    compareAndSet(expect, update)

incrementAndGet()      decrementAndGet()

getAndIncrement()      getAndDecrement()

getAndAdd(delta)       addAndGet(delta)

getAndUpdate(updateFunction)
updateAndGet(updateFunction)

getAndAccumulate(x, accumulatorBiFunction)
accumulateAndGet(x, accumulatorBiFunction)

Thread-safe collections

  • In earlier versions of Java, it was possible to "make" a collection thread-safe by wrapping it in Collections.synchronizedXXX(…​). This serialized any access to the internal state of the collection. Because of backward compatibility support, doing this is still possible, but not recommended.

  • The price of such a solution is high lock contention.

  • Version 5 introduces classes specifically designed for thread safety, with fewer locks.

  • Their use is preferred.

CopyOnWriteArrayList and CopyOnWriteArraySet

class CopyOnWriteArrayList<E> implements List<E>
class CopyOnWriteArraySet<E> extends AbstractSet<E>
  • Array-based data structures.

  • Recreate themselves from scratch on each modification.

  • It’s expensive, but all reading iterators are stable.

  • Good when there are multiple read operations per one write operation.

ConcurrentLinkedQueue/Deque

ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>
ConcurrentLinkedDeque<E> extends AbstractCollection<E>
        implements Deque<E>
  • Based on non-blocking algorithm (CAS operations)

  • poll() returns null if the queue is empty

  • Under the hood there are (double) linked lists

ConcurrentLinkedQueue/Deque

Diagram
Diagram

Blocking queues: a means of implementing the producer-consumer pattern

blockingqueue
  • May be limited in size (capacity constrained).

  • The put() and take() methods wait until you can put or take an item.

  • PriorityBlockingQueue is not limited by capacity.

  • SynchronousQueue has no capacity at all, passes elements to processing threads directly.

ConcurrentHashMap

class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>
  • Replaces HashMap for multithreaded data access.

  • Not blocked when reading and rarely blocked when writing.

  • Does not allow the use of null as a key or value.

  • Useful methods are atomic:

    • putIfAbsent(key, value)

    • remove(key, value)

    • replace(key, oldValue, newValue)

ConcurrentHashMap: Java 7-

Diagram

ConcurrentHashMap: Java 8+

Diagram

ConcurrentSkipListMap

class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>
  • Replaces TreeMap for multithreaded access.

  • Does not allow null as a key or a value.

  • Has atomic methods.

ConcurrentSkipListMap: the principle

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

ConcurrentSkipListMap

Diagram

Part 4. Executor Framework

hydra2

Executor Framework

  • Thread is an expensive resource, so we want to:

    • limit the number of our threads so as not to get Out of Memory exception,

    • reuse existing threads by submitting new tasks after completing the old ones,

    • if some thread "crashed" - automatically create a new one.

  • There are Thread Pools in the standard library for all of this, you do not need to implement anything yourself.

  • No one uses the Thread API directly.

Executor Framework

//Abstraction of a computational task returning a result
public interface Callable<V> {
  V call() throws Exception;
}</V>

//Abstraction of the "manager of threads"
public interface ExecutorService {
  <T> Future<T> submit(Callable<T> task);
  /*... there are many other methods, will discuss later...*/
}

Future

//Abstraction of the "in progress" result that you can wait for
//finalization or just cancel
public interface Future<V> {
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isDone();
  boolean isCancelled();
}

Task, Executor, Future

taskexecfuture

How do I create an ExecutorService?

public class Executors {
  //fixed pool size
  public static ExecutorService newFixedThreadPool(int nThreads)
  public static ExecutorService newSingleThreadExecutor()
  //pool grows as needed, keeps inactive thread for 60 seconds
  public static ExecutorService newCachedThreadPool()
  //allows you to perform tasks with a delay or periodically
  public static
    ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  ...
}

Processing tasks in batches

//Run and wait for everything to finish
//List<Future<T>>, not List<T>, because exceptions are possible
<T>List <Future <T>> invokeAll(
    Collection <? extends Callable <T>> tasks)
        throws InterruptedException

//Run, return the first successful result
//cancel the rest
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

Cancellation of tasks and "termination of service"

//cancel a task if it has not already started
future.cancel(false)
//request task interruption (details ahead)
future.cancel(true)

//Prevent new tasks from being accepted
executorService.shutdown();
//Wait until the accepted tasks are completed
if (!service.awaitTermination(10, TimeUnit.SECONDS)){
    Abort tasks
    service.shutdownNow();
}

Interruption of tasks

  • In early versions of Java, there were (now deprecated) methods for forcibly stopping and suspending/resuming threads, but this turned out to be a bad idea:

    • there is no guarantee that the thread will not stop in the middle of the atomic operation.

    • suspension may leave the lock acquired for an indefinite amount of time

  • All this now is deprecated in favor of cooperative interruption mechanism.

Interrupting threads

  • ExecutorService.shutdownNow() calls the Thread.interrupt() method on threads.

  • the Thread.isInterrupted() method returns the interrupt status of the thread.

  • JMM Interruption Rule: An external thread call to the interrupt() method happens before the interrupted thread knows that it has been interrupted.

Cooperative termination mechanism

  • If the calculations are in a loop, the thread is obliged to periodically check the status of Thread.currentThread().isInterrupted() and, if the flag is set, write to the log the fact of interruption and exit the method.

  • InterruptedException may be thrown on waiting methods. What should we do with it?

What should we do with InterruptedException

  • If circumstances allow, it is best to declare it in throws and not to handle it at all.

  • If we cannot change the method’s signature (e. g. we are implementing run method of Runnable interface which doesn’t throw checked exceptions) then:

} catch (InterruptedException e) {
    //record the fact of interruption to the log...
    ...
    //restore the interrupted status
    Thread.currentThread().interrupt();
    //exit the interrupted method
    return;
}
  • You should never silently "swallow" an InterruptedException !

  • It is recommended to log the fact of interruption to facilitate debugging.

CompletableFuture

  • Introduced in Java 8, extending Future.

  • Allow you to explicitly set the result (hence Completable) and assemble a chain of asynchronous calculations.

  • Can be used as follows:

CompletableFuture<Integer> f = new CompletableFuture<>();
executor.execute(() -> {
  int n = workHard(arg);
  f.complete(n); });

executor.execute(() -> {
  int n = workSmart(arg);
  f.complete(n); });

executor.execute(() -> {
  Throwable t = ...;
  f.completeExceptionally(t); });

CompletableFuture composition with action

Method

Parameter

Description

thenApply

T→U

Apply a function to the result.

thenAccept

T→void

Like thenApply, but with void result.

thenCompose

T→CompletableFuture<U>

Invoke the function on the result and execute the returned future.

handle

(T, Throwable) → U

Process the result or error and yield a new result.

whenComplete

(T, Throwable) → void

Like handle, but with void result.

CompletableFuture composition with action

Method

Parameter

Description

completeOnTimeout

T, long, TimeUnit

Yield the given value as the result in case of timeout (Java 9+)

orTimeout

long, TimeUnit

Throw `TimeoutException ` in case of timeout (Java 9+)

thenRun

Runnable

Execute the Runnable with void result.

Composition of several CompletableFutures

Method

Parameter

Description

thenCombine

ComletableFuture<U>,
(T, U)→V

Execute both and combine the results with the given function.

thenAcceptBoth

ComletableFuture<U>,
(T, U)→void

Like thenCombine, but with void result.

runAfterBoth

CompletableFuture<?>,
Runnable

Execute the runnable after both complete.

Composition of several CompletableFutures

Method

Parameter

Description

applyToEither

CompletableFuture<T>,
T→V

When a result is available from one or the other, pass it to the given function.

acceptEither

CompletableFuture<T>,
T→void

Like applyToEither, but with void result.

runAfterEither

CompletableFuture<?>,
Runnable

Execute the runnable after one or the other completes.

static allOf

CompletableFuture<?>…​

Complete with void result after all given futeres complete.

static anyOf

CompletableFuture<?>…​

Complete after any of the given futures completes, with the same result cast to Object.

If that didn’t seem enough…​

  • Each of these methods has a variant with the postfix Async (for example, thenApplyAsync) that allows you to perform an additional action in another thread of the specified Executor.

We just scratched the surface

jcip
  • Brian Goetz et al., Java Concurrency in Practice.

  • Released in 2006, in the era of Java 5.0.

  • No new editions were published.

  • It was released in Russian language in 2020!

  • Still the most comprehensive guide to different aspects of multithreaded Java programming.

Code Review Checklist

Acknowledgements

These people gave their feedback and helped to improve the material of this lecture:

@2caco3 @asm0dey @dolzhenko @DrEdwardHyde

@dyer_the @krems5 @LordOfBoredom

@miha_x64 @vaddyacom @vdimir

(all errors and inaccuracies are mine)

Sometimes you feel the smell while reading the code…​

import org.json.JSONObject;
class NaivePersonSerializer {
  String toJSON(Person person) {
    JSONObject result = new JSONObject();
      result.put("firstName", person.getFirstName());
      result.put("lastName", person.getLastName());
      result.put("birthDate",
        person.getBirthDate()
        .format(DateTimeFormatter.ISO_LOCAL_DATE));
      result.put(...)
      result.put(...)
      //20 more fields
      result.put(...)
      return result.toString();
  }
}

There is a feeling that something is wrong…​

public class NaiveController {
  private final Service service;
  public void executeCommand(String command) {
    switch (command) {
      case "foo":
        service.foo(); break;
      case "bar":
        service.bar(); break;
      case ...
      //15 more branches...
      case "help":
      default:
        service.help(); break;
    }
  }
}

Coupling & Cohesion

  • Coupling: the degree of interdependence between software modules; a measure of how closely connected two routines or modules are; the strength of the relationships between modules.

  • Cohesion refers to the degree to which the elements inside a module belong together.

  • Low coupling often correlates with high cohesion, and vice versa.

Coupling vs. Cohesion

coupling cohesion

Questions

  • Why coupling is high and cohesion is low in the above examples?

  • What danger does it impose on the project?

How would we like to solve this problem?

//Mark what we want to serialize, do not serialize by default.
//(Option: mark what we do not want to serialize, serialize by default)
public class Person {
    @Published
    private final String firstName;
    @Published
    private final String lastName;
    @Published
    private final LocalDate birthDate;
    //... as many other fields as you like
}

//As an option...
JsonSerializer<Person> ser = new JsonSerializer<>(Person.class);
JSONObject json = ser.serialize(p);

Controller

public class Controller {
  private final Service service;

  @Command("foo")
  void doSomething() { service.foo(); }

  @Command("bar")
  void bar() { service.bar(); }

  //15 more commands

  @Command() //default one
  void help() { service.help(); }
}

new CommandDispatcher(new Controller(srv)).executeCommand("foo");

Annotations

  • Annotation is a way of marking up Java code by adding meta-information to various elements of a Java program.

  • They work approximately like modifiers (public, static), but there is a mechanism for creating your own annotations.

  • Can be processed at three stages:

    • at the stage of code generation before compilation (annotation processing, Language Model API),

    • at the stage of instrumentation of bytecode (Instrumentation API),

    • at the stage of code execution (via Reflection API).

Annotation definition syntax

modifiers @interface AnnotationName {
  type elementName();
  ...
  type elementName default value;
}

Example

public @interface BugReport {
  String assignedTo() default "";
  int severity();
}

Annotation Interfaces

  • Just as enum classes are a special kind of class, annotation interfaces are a special kind of interface.

  • Inherited from java.lang.annotation.Annotation.

  • Cannot be extended by inheritance, cannot be parameterized through generics.

  • Methods cannot have parameters.

Valid return types of annotation methods (JLS 9.6.1)

  • Primitives (int, short, long, byte, char, double, float, boolean),

  • enums,

  • String,

  • Class (with a possible parameter restriction like Class<? extends MyClass>),

  • other annotations,

  • arrays of the above (but not arrays of arrays).

Annotation Interface Definition Example

public @interface BugReport {
  //nested types are valid!
  enum Status {UNCONFIRMED, CONFIRMED, FIXED, NOTABUG}
  boolean showStopper() default false;
  String assignedTo() default "";

  //metaclass, to be explained later
  Class<?> testCase() default Void.class;
  Status status() default Status.UNCONFIRMED;

  //Annotation type. We use a special syntax to construct an instance ,
  //to be explained later
  Reference ref() default @Reference(id = "");
  String[] reportedBy();
}

Use of annotations

@AnnotationName(elementName1=value1, elementName2=value2,...)

@AnnotationName(singleValue)

@AnnotationName //no values, no parens

Use of annotations

@BugReport(assignedTo="Harry", severity=10)

//order doesn't matter
@BugReport(severity=10, assignedTo="Harry")

//if default values are set, you can skip the parameters
@BugReport(severity=10)

//if there are no parameters or all of them have default,
//then brackets can be omitted
@BugReport

//if there is no default value for the parameter, it must be defined
//(otherwise a compilation error).

Single-value annotations

public @interface ActionListenerFor {
  String value();
  int priority() default 0;
}

//use
@ActionListenerFor("yellowButton")

//which is equivalent to
@ActionListenerFor(value = "yellowButton")

//at the same time:
@ActionListenerFor(value = "yellowButton", priority = 1)

Arrays and other annotations

//Arrays are specified in curly braces
@BugReport(..., reportedBy = {"Harry", "Carl"})

//Same as {"Harry"}
@BugReport(..., reportedBy = "Harry")

//Constructing nested annotations
@BugReport(..., ref = @Reference(id = "12345"))

//... As you can see, using the allowed types,
//you can specify an arbitrarily complex data structure
The annotation value cannot be set to null. null will not be allowed even for default values.