Core Java. Лекция 11

Java Concurrency (окончание). Annotations

Иван Пономарёв, КУРС/МФТИ

Часть 3. Потокобезопасные структуры данных

hydra2

Неблокирующие алгоритмы

  • Блокировка (через synchronized или ReentrantLock) решает вопрос координации действий разных тредов с переменной.

  • Но если много тредов конкурируют за блокировку (high lock contention), затраты ресурсов на координацию тредов становятся значительными.

  • Альтернативой являются неблокирующие алгоритмы, использующие поддержку специальных атомарных машинных инструкций (compare-and-swap).

  • В Java-библиотеке доступны классы атомарных переменных и потокобезопасные коллекции, реализованные в том числе на неблокирующих алгоритмах.

Atomics

  • package java.util.concurrent.atomic

    • AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference.

    • AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray.

  • Могут быть использованы как «улучшенные volatile-переменные», т. к. результат вызова set(…​) виден другим тредам при вызове get(…​)

  • Поддерживают атомарные операции.

Aтомарные операции в классах атомарных переменных

getAndSet(newValue)    compareAndSet(expect, update)

incrementAndGet()      decrementAndGet()

getAndIncrement()      getAndDecrement()

getAndAdd(delta)       addAndGet(delta)

getAndUpdate(updateFunction)
updateAndGet(updateFunction)

getAndAccumulate(x, accumulatorBiFunction)
accumulateAndGet(x, accumulatorBiFunction)

Потокобезопасные коллекции

  • В ранних версиях Java можно было «сделать» коллекцию потокобезопасной, обернув в Collections.synchronizedXXX(…​). Это сериализовывало любой доступ к внутреннему состоянию коллекции. Из-за поддержки обратной совместимости сейчас так тоже можно, но не нужно.

  • Цена такого решения — плохой параллелизм: конкуренция за блокировку (lock contention).

  • С версии 5 появились классы, специально разработанные для потокобезопасности, с меньшим количеством блокировок.

  • Их использование является предпочтительным.

CopyOnWriteArrayList и CopyOnWriteArraySet

class CopyOnWriteArrayList<E> implements List<E>
class CopyOnWriteArraySet<E> extends AbstractSet<E>
  • Структуры данных на основе массива.

  • Пересоздают всё заново при каждой модификации.

  • Это дорого, зато все читающие итераторы стабильны.

  • Хороши, когда на одну операцию записи приходится много операций чтения.

ConcurrentLinkedQueue/Deque

ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>
ConcurrentLinkedDeque<E> extends AbstractCollection<E>
        implements Deque<E>
  • Основаны на неблокирующем алгоритме (CAS-операции)

  • poll() вернёт null, если очередь пуста

  • Под капотом — связные/двусвязные списки

ConcurrentLinkedQueue/Deque

diag 666775d68d2655f19712c887153ecf32
diag 8aa1efc451f81c3628ec1d95951f0d52

Блокирующие очереди: средство реализации producer-consumer pattern

blockingqueue
  • Могут быть ограничены по размеру (capacity constrained).

  • Методы put() и take() ждут, пока не появится возможность положить или взять элемент.

  • PriorityBlockingQueue не лимитируется по capacity.

  • SynchronousQueue не имеет capacity вовсе, передаёт элементы обрабатывающим тредам напрямую.

ConcurrentHashMap

class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>
  • Замена HashMap при разделённом доступе к данным.

  • Не блокируется при чтении и редко блокируется при записи.

  • Не позволяет использовать null в качестве ключа или значения.

  • Полезные методы атомарны:

    • putIfAbsent(key, value)

    • remove(key, value)

    • replace(key, oldValue, newValue)

ConcurrentHashMap: Java 7-

diag 75b946bfeba2674ef90d0eda76495543

ConcurrentHashMap: Java 8+

diag de8c5a19b8c187f0c487097730a7ea44

ConcurrentSkipListMap

class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>
  • Замена TreeMap при разделённом доступе к данным.

  • Не позволяет использовать null в качестве ключа или значения.

  • Имеет атомарные методы.

ConcurrentSkipListMap: идея

diag 2cc9358d9bc8084479bef4ca4b30492e

ConcurrentSkipListMap

diag b606e8be52eb288b608f929e19e6cf0d

ConcurrentSkipListMap

diag 680c74d5072e8e2c578e85e01057c382

ConcurrentSkipListMap

diag ae7dc25ab9f27a11bcda0d982bd8da44

ConcurrentSkipListMap

diag eccbf6ed36aa23fa571317cab7437339

ConcurrentSkipListMap

diag 3bedfe9b3655832d804a930fc0b7e7cc

ConcurrentSkipListMap

diag 48b14a62d6be88fd521f42d2150da41f

ConcurrentSkipListMap

diag 52de3fc564297968055f1815120e6258

ConcurrentSkipListMap

diag e527c2f59f60c54f5551afa4e3de4088

Часть 4. Executor Framework

hydra2

Executor Framework

  • Тред — дорогой ресурс, поэтому мы хотим:

    • ограничивать количество наших тредов, чтобы не устроить Out of Memory,

    • переиспользовать имеющиеся треды, подавая им новые задачи после завершения старых,

    • но если какой-то тред «вылетел» — автоматически создавать новый.

  • В стандартной библиотеке для этого есть Thread Pools, не надо ничего делать самостоятельно.

  • Никто не использует Thread API напрямую.

Executor Framework

//Абстракция вычислительной задачи, возвращающей результат
public interface Callable<V> {
  V call() throws Exception;
}

//Абстракция «менджера тредов»
public interface ExecutorService {
  <T> Future<T> submit(Callable<T> task);
  /*...есть и много другого, речь впереди...*/
}

Future

//Абстракция результата "in progress", который можно ждать,
//а можно и отменить
public interface Future<V> {
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isDone();
  boolean isCancelled();
}

Task, Executor, Future

taskexecfuture

Как создать ExecutorService?

public class  Executors {
  //фиксированный размер пула
  public static ExecutorService newFixedThreadPool(int nThreads)
  public static ExecutorService newSingleThreadExecutor()
  //пул растёт по необходимости, держит неактивный тред 60 секунд
  public static ExecutorService newCachedThreadPool()
  //позволяет выполнять задачи с задержкой или периодичностью
  public static
    ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  ...
}

Обработка задач «пачками»

//Запускаем и ждём, пока все выполнятся
//List<Future<T>>, а не List<T>, т. к. возможны исключения
<T> List<Future<T>> invokeAll(
    Collection<? extends Callable<T>> tasks)
        throws InterruptedException

//Запускаем, возвращаем первый успешный результат,
//отменяем остальные
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

Отмена задач и «прекращение обслуживания»

//отменить задачу, если она ещё не начала выполняться
future.cancel(false)
//запросить прерывание задачи (подробности впереди)
future.cancel(true)

//Запретить приём новых задач
executorService.shutdown();
//Подождать, пока принятые задачи завершатся
if (!service.awaitTermination(10, TimeUnit.SECONDS)){
    //Прервать выполнение задач
    service.shutdownNow();
}

Прерывание задач

  • В ранних версиях Java существовали (ныне deprecated) методы принудительной остановки и приостановки/возобновления тредов, но это оказалось плохой идеей:

    • нет гарантий, что тред не остановится посередине атомарной операции.

    • приостановка может на неопределённое время "завесить" блокировку

  • В итоге, имеется кооперативный механизм прерывания.

Прерывание тредов

  • ExecutorService.shutdownNow() вызывает метод Thread.interrupt() на потоках выполнения.

  • метод Thread.isInterrupted() возвращает статус прерывания треда.

  • JMM Interruption Rule: вызов метода interrupt() внешним потоком happens-before прерываемый поток узнаёт о том, что он прерван.

Кооперативный механизм прерывания

  • Если вычисления в цикле, тред обязан периодически проверять статус Thread.currentThread().isInterrupted() и, если флаг выставлен, записав в лог факт прерывания, выходить из метода.

  • На ждущих методах может быть выброшен InterruptedException. Что с ним делать?

Что делать с InterruptedException

  • Если контекст позволяет, его всегда следует пробрасывать выше (декларируя в throws).

  • Если выше пробрасывать нельзя (например, мы находимся в методе run интерфейса Runnable), то:

} catch (InterruptedException e) {
    //записываем факт прерывания в лог ...
    ...
    //восстанавливаем interrupted-статус
    Thread.currentThread().interrupt();
    //выходим из прерванной процедуры
    return;
}
  • Просто так "проглатывать" InterruptedException ни в коем случае нельзя!

  • Рекомендуется записывать факт прерывания в лог, для прозрачности отладки.

CompletableFuture

  • Появились в Java 8, расширяют Future.

  • Позволяют явно задать результат (отсюда 'Completable') и собрать цепочку асинхронных вычислений.

  • Могут быть использованы так:

CompletableFuture<Integer> f = new CompletableFuture<>();
executor.execute(() -> {
  int n = workHard(arg);
  f.complete(n); });

executor.execute(() -> {
  int n = workSmart(arg);
  f.complete(n); });

executor.execute(() -> {
  Throwable t = ...;
  f.completeExceptionally(t); });

Композиция CompletableFuture с действием

Method

Parameter

Description

thenApply

T→U

Apply a function to the result.

thenAccept

T→void

Like thenApply, but with void result.

thenCompose

T→CompletableFuture<U>

Invoke the function on the result and execute the returned future.

handle

(T, Throwable) → U

Process the result or error and yield a new result.

whenComplete

(T, Throwable) → void

Like handle, but with void result.

Композиция CompletableFuture с действием

Method

Parameter

Description

completeOnTimeout

T, long, TimeUnit

Yield the given value as the result in case of timeout (Java 9+)

orTimeout

long, TimeUnit

Throw `TimeoutException ` in case of timeout (Java 9+)

thenRun

Runnable

Execute the Runnable with void result.

Композиция нескольких CompletableFuture

Method

Parameter

Description

thenCombine

ComletableFuture<U>,
(T, U)→V

Execute both and combine the results with the given function.

thenAcceptBoth

ComletableFuture<U>,
(T, U)→void

Like thenCombine, but with void result.

runAfterBoth

CompletableFuture<?>,
Runnable

Execute the runnable after both complete.

Композиция нескольких CompletableFuture

Method

Parameter

Description

applyToEither

CompletableFuture<T>,
T→V

When a result is available from one or the other, pass it to the given function.

acceptEither

CompletableFuture<T>,
T→void

Like applyToEither, but with void result.

runAfterEither

CompletableFuture<?>,
Runnable

Execute the runnable after one or the other completes.

static allOf

CompletableFuture<?>…​

Complete with void result after all given futeres complete.

static anyOf

CompletableFuture<?>…​

Complete after any of the given futures completes, with the same result cast to Object.

Если этого показалось мало…​

  • Каждый из этих методов имеет вариант с постфиксом Async (например, thenApplyAsync), позволяющий выполнить дополнительное действие в другом треде заданного Executor-a.

Мы только прошлись по основам

jcip
  • Brian Goetz et al., Java Concurrency in Practice.

  • Вышла в 2006-м году, в эпоху Java 5.0.

  • Новых изданий не выходило.

  • На русском языке вышла в 2020-м году!

  • Всё ещё самое полное руководство по разным аспектам многопоточного программирования на Java.

Code Review Checklist

  • Roman Leventov: Code Review Checklist: Java Concurrency

  • Порядка 100 пунктов, по которым можно проверить concurrency код на распространённые ошибки

Благодарности

Эти люди дали свой фидбэк и помогли улучшить материал этой лекции:

@2caco3 @asm0dey @dolzhenko @DrEdwardHyde

@dyer_the @krems5 @LordOfBoredom

@miha_x64 @vaddyacom @vdimir

(все ошибки и неточности — мои)

Иногда при чтении кода начинает рябить в глазах…​

import org.json.JSONObject;
class NaivePersonSerializer {
  String toJSON(Person person) {
    JSONObject result = new JSONObject();
      result.put("firstName", person.getFirstName());
      result.put("lastName", person.getLastName());
      result.put("birthDate",
        person.getBirthDate()
        .format(DateTimeFormatter.ISO_LOCAL_DATE));
      result.put(...)
      result.put(...)
      //ещё 20 полей
      result.put(...)
      return result.toString();
  }
}

Возникает ощущение, что что-то не так…​

public class NaiveController {
  private final Service service;
  public void executeCommand(String command) {
    switch (command) {
      case "foo":
        service.foo(); break;
      case "bar":
        service.bar(); break;
      case ...
      //Ещё 15 веток...
      case "help":
      default:
        service.help(); break;
    }
  }
}

Coupling & Cohesion

  • Coupling: the degree of interdependence between software modules; a measure of how closely connected two routines or modules are; the strength of the relationships between modules.

  • Cohesion refers to the degree to which the elements inside a module belong together.

  • Low coupling often correlates with high cohesion, and vice versa.

Coupling vs. Cohesion

coupling cohesion

Вопросы

  • За счёт чего в приведённых примерах high coupling + low cohesion?

  • Чем это опасно для проекта?

Как бы мы хотели решить эту проблему?

//Помечаем то, что хотим сериализовывать, не сериализуем по умолчанию.
//Вариант: наоборот, помечать то, что сериализовывать не хотим.
public class Person {
    @Published
    private final String firstName;
    @Published
    private final String lastName;
    @Published
    private final LocalDate birthDate;
    //...сколько угодно ещё полей
}

//Как вариант...
JsonSerializer<Person> ser = new JsonSerializer<>(Person.class);
JSONObject json = ser.serialize(p);

Контроллер

public class Controller {
  private final Service service;

  @Command("foo")
  void doSomething() { service.foo(); }

  @Command("bar")
  void bar() { service.bar(); }

  //ещё 15 команд

  @Command() //дефолтная
  void help() { service.help(); }
}

new CommandDispatcher(new Controller(srv)).executeCommand("foo");

Аннотации

  • Аннотация — способ разметки кода на Java, через добавление метаинформации к различным элементам программы на Java.

  • Работают приблизительно как модификаторы (public, static), но существует развитый механизм создания собственных аннотаций.

  • Могут быть обработаны на трёх этапах:

    • на этапе кодогенерации перед компиляцией (annotation processing, Language Model API),

    • на этапе инструментации байт-кода (Instrumentation API),

    • на этапе выполнения кода (через Reflection API).

Синтаксис определения аннотаций

modifiers @interface AnnotationName {
  type elementName();
  ...
  type elementName default value;
}

Пример

public @interface BugReport {
  String assignedTo() default "";
  int severity();
}

Annotation Interfaces

  • Подобно тому, как enum-классы — это специальный вид классов, annotation-интерфейсы — это специальный вид интерфейсов.

  • Наследуются от java.lang.annotation.Annotation.

  • Нельзя расширять наследованием, нельзя параметризовать через generics.

  • Методы не могут иметь параметров.

Допустимые типы методов аннотации (JLS 9.6.1)

  • Примитивы (int, short, long, byte, char, double, float, boolean),

  • enum-ы,

  • String,

  • Class (с возможным ограничителем параметра, вроде Class<? extends MyClass>),

  • другие аннотации,

  • массивы из вышеперечисленного (но не массивы массивов).

Пример определения интерфейса аннотации

public @interface BugReport {
  //вложенные типы допустимы!
  enum Status {UNCONFIRMED, CONFIRMED, FIXED, NOTABUG}
  boolean showStopper() default false;
  String assignedTo() default "";

  //метакласс, речь впереди
  Class<?> testCase() default Void.class;
  Status status() default Status.UNCONFIRMED;

  //Тип аннотации. При конструировании используется синтаксис
  //аннотации, речь впереди
  Reference ref() default @Reference(id = "");
  String[] reportedBy();
}

Использование аннотаций

@AnnotationName(elementName1=value1, elementName2=value2,...)

@AnnotationName(singleValue)

@AnnotationName //no values, no parens

Использование аннотаций

@BugReport(assignedTo="Harry", severity=10)

//порядок не имеет значения
@BugReport(severity=10, assignedTo="Harry")

//если заданы значения по умолчанию, то можно пропускать
@BugReport(severity=10)

//если параметров нет или все имеют умолчания,
//то можно не ставить скобки
@BugReport

//Если на параметр не задан default, его надо  обязательно определять
//(иначе ошибка компиляции).

Single-value annotations

public @interface ActionListenerFor {
  String value();
  int priority() default 0;
}

//использование
@ActionListenerFor("yellowButton")

//что эквивалентно
@ActionListenerFor(value = "yellowButton")

//в то же время:
@ActionListenerFor(value = "yellowButton", priority = 1)

Массивы и другие аннотации

//Массивы задаются в фигурных скобках
@BugReport(..., reportedBy = {"Harry", "Carl"})

//То же самое, что {"Harry"}
@BugReport(..., reportedBy = "Harry")

//Конструирование вложенных аннотаций
@BugReport(..., ref = @Reference(id = "12345"))

//...Как видим, используя разрешённые типы,
//можно задать сколь угодно сложную структуру данных
Значение аннотации не может быть установлено в null. Не допускатся null даже в default-значениях.