@inponomarev
Ivan Ponomarev, Synthesized.io/MIPT
Locking (via synchronized
or ReentrantLock
) solves the issue of coordinating the actions of different threads with a variable.
But if many threads compete for the lock (high lock contention), the cost of coordinating the threads becomes significant.
The alternative is non-blocking algorithms that use special atomic machine instructions (compare-and-swap).
In the Java library, classes of atomic variables and thread-safe collections are available, including those implementing non-blocking algorithms.
package java.util.concurrent.atomic
AtomicBoolean
, AtomicInteger
, AtomicLong
, AtomicReference
.
AtomicIntegerArray
, AtomicLongArray
, AtomicReferenceArray
.
Can be used as "enhanced volatiles", as the result of calling set(…)
is visible to other threads via get(…)
Support atomic operations.
getAndSet(newValue) compareAndSet(expect, update)
incrementAndGet() decrementAndGet()
getAndIncrement() getAndDecrement()
getAndAdd(delta) addAndGet(delta)
getAndUpdate(updateFunction)
updateAndGet(updateFunction)
getAndAccumulate(x, accumulatorBiFunction)
accumulateAndGet(x, accumulatorBiFunction)
In earlier versions of Java, it was possible to "make" a collection thread-safe by wrapping it in Collections.synchronizedXXX(…)
. This serialized any access to the internal state of the collection. Because of backward compatibility support, doing this is still possible, but not recommended.
The price of such a solution is high lock contention.
Version 5 introduces classes specifically designed for thread safety, with fewer locks.
Their use is preferred.
class CopyOnWriteArrayList<E> implements List<E>
class CopyOnWriteArraySet<E> extends AbstractSet<E>
Array-based data structures.
Recreate themselves from scratch on each modification.
It’s expensive, but all reading iterators are stable.
Good when there are multiple read operations per one write operation.
ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>
ConcurrentLinkedDeque<E> extends AbstractCollection<E>
implements Deque<E>
Based on non-blocking algorithm (CAS operations)
poll()
returns null
if the queue is empty
Under the hood there are (double) linked lists
May be limited in size (capacity constrained).
The put()
and take()
methods wait until you can put or take an item.
PriorityBlockingQueue
is not limited by capacity.
SynchronousQueue
has no capacity at all, passes elements to processing threads directly.
class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>
Replaces HashMap
for multithreaded data access.
Not blocked when reading and rarely blocked when writing.
Does not allow the use of null
as a key or value.
Useful methods are atomic:
putIfAbsent(key, value)
remove(key, value)
replace(key, oldValue, newValue)
class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>
Replaces TreeMap
for multithreaded access.
Does not allow null
as a key or a value.
Has atomic methods.
Thread is an expensive resource, so we want to:
limit the number of our threads so as not to get Out of Memory exception,
reuse existing threads by submitting new tasks after completing the old ones,
if some thread "crashed" - automatically create a new one.
There are Thread Pools in the standard library for all of this, you do not need to implement anything yourself.
No one uses the Thread API directly.
//Abstraction of a computational task returning a result
public interface Callable<V> {
V call() throws Exception;
}</V>
//Abstraction of the "manager of threads"
public interface ExecutorService {
<T> Future<T> submit(Callable<T> task);
/*... there are many other methods, will discuss later...*/
}
//Abstraction of the "in progress" result that you can wait for
//finalization or just cancel
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
boolean cancel(boolean mayInterruptIfRunning);
boolean isDone();
boolean isCancelled();
}
public class Executors {
//fixed pool size
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
//pool grows as needed, keeps inactive thread for 60 seconds
public static ExecutorService newCachedThreadPool()
//allows you to perform tasks with a delay or periodically
public static
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
...
}
//Run and wait for everything to finish
//List<Future<T>>, not List<T>, because exceptions are possible
<T>List <Future <T>> invokeAll(
Collection <? extends Callable <T>> tasks)
throws InterruptedException
//Run, return the first successful result
//cancel the rest
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//cancel a task if it has not already started
future.cancel(false)
//request task interruption (details ahead)
future.cancel(true)
//Prevent new tasks from being accepted
executorService.shutdown();
//Wait until the accepted tasks are completed
if (!service.awaitTermination(10, TimeUnit.SECONDS)){
Abort tasks
service.shutdownNow();
}
In early versions of Java, there were (now deprecated) methods for forcibly stopping and suspending/resuming threads, but this turned out to be a bad idea:
there is no guarantee that the thread will not stop in the middle of the atomic operation.
suspension may leave the lock acquired for an indefinite amount of time
All this now is deprecated in favor of cooperative interruption mechanism.
ExecutorService.shutdownNow()
calls the Thread.interrupt()
method on threads.
the Thread.isInterrupted()
method returns the interrupt status of the thread.
JMM Interruption Rule: An external thread call to the interrupt()
method happens before the interrupted thread knows that it has been interrupted.
If the calculations are in a loop, the thread is obliged to periodically check the status of Thread.currentThread().isInterrupted()
and, if the flag is set, write to the log the fact of interruption and exit the method.
InterruptedException
may be thrown on waiting methods. What should we do with it?
InterruptedException
If circumstances allow, it is best to declare it in throws
and not to handle it at all.
If we cannot change the method’s signature (e. g. we are implementing run
method of Runnable
interface which doesn’t throw checked exceptions) then:
} catch (InterruptedException e) {
//record the fact of interruption to the log...
...
//restore the interrupted status
Thread.currentThread().interrupt();
//exit the interrupted method
return;
}
You should never silently "swallow" an InterruptedException
!
It is recommended to log the fact of interruption to facilitate debugging.
Introduced in Java 8, extending Future
.
Allow you to explicitly set the result (hence Completable
) and assemble a chain of asynchronous calculations.
Can be used as follows:
CompletableFuture<Integer> f = new CompletableFuture<>();
executor.execute(() -> {
int n = workHard(arg);
f.complete(n); });
executor.execute(() -> {
int n = workSmart(arg);
f.complete(n); });
executor.execute(() -> {
Throwable t = ...;
f.completeExceptionally(t); });
Method | Parameter | Description |
| T→U | Apply a function to the result. |
| T→void | Like |
| T→CompletableFuture<U> | Invoke the function on the result and execute the returned future. |
| (T, Throwable) → U | Process the result or error and yield a new result. |
| (T, Throwable) → void | Like |
Method | Parameter | Description |
| T, long, TimeUnit | Yield the given value as the result in case of timeout (Java 9+) |
| long, TimeUnit | Throw `TimeoutException ` in case of timeout (Java 9+) |
| Runnable | Execute the |
Method | Parameter | Description |
| ComletableFuture<U>, | Execute both and combine the results with the given function. |
| ComletableFuture<U>, | Like |
| CompletableFuture<?>, | Execute the runnable after both complete. |
Method | Parameter | Description |
| CompletableFuture<T>, | When a result is available from one or the other, pass it to the given function. |
| CompletableFuture<T>, | Like |
| CompletableFuture<?>, | Execute the runnable after one or the other completes. |
| CompletableFuture<?>… | Complete with |
| CompletableFuture<?>… | Complete after any of the given futures completes, with the same result cast to |
Each of these methods has a variant with the postfix Async
(for example, thenApplyAsync
) that allows you to perform an additional action in another thread of the specified Executor.
![]() |
|
Roman Leventov: Code Review Checklist: Java Concurrency
About 100 points to check the concurrency code for common errors
These people gave their feedback and helped to improve the material of this lecture:
@2caco3 @asm0dey @dolzhenko @DrEdwardHyde
@dyer_the @krems5 @LordOfBoredom
@miha_x64 @vaddyacom @vdimir
(all errors and inaccuracies are mine)
import org.json.JSONObject;
class NaivePersonSerializer {
String toJSON(Person person) {
JSONObject result = new JSONObject();
result.put("firstName", person.getFirstName());
result.put("lastName", person.getLastName());
result.put("birthDate",
person.getBirthDate()
.format(DateTimeFormatter.ISO_LOCAL_DATE));
result.put(...)
result.put(...)
//20 more fields
result.put(...)
return result.toString();
}
}
public class NaiveController {
private final Service service;
public void executeCommand(String command) {
switch (command) {
case "foo":
service.foo(); break;
case "bar":
service.bar(); break;
case ...
//15 more branches...
case "help":
default:
service.help(); break;
}
}
}
Coupling: the degree of interdependence between software modules; a measure of how closely connected two routines or modules are; the strength of the relationships between modules.
Cohesion refers to the degree to which the elements inside a module belong together.
Low coupling often correlates with high cohesion, and vice versa.
Why coupling is high and cohesion is low in the above examples?
What danger does it impose on the project?
//Mark what we want to serialize, do not serialize by default.
//(Option: mark what we do not want to serialize, serialize by default)
public class Person {
@Published
private final String firstName;
@Published
private final String lastName;
@Published
private final LocalDate birthDate;
//... as many other fields as you like
}
//As an option...
JsonSerializer<Person> ser = new JsonSerializer<>(Person.class);
JSONObject json = ser.serialize(p);
public class Controller {
private final Service service;
@Command("foo")
void doSomething() { service.foo(); }
@Command("bar")
void bar() { service.bar(); }
//15 more commands
@Command() //default one
void help() { service.help(); }
}
new CommandDispatcher(new Controller(srv)).executeCommand("foo");
Annotation is a way of marking up Java code by adding meta-information to various elements of a Java program.
They work approximately like modifiers (public
, static
), but there is a mechanism for creating your own annotations.
Can be processed at three stages:
at the stage of code generation before compilation (annotation processing, Language Model API),
at the stage of instrumentation of bytecode (Instrumentation API),
at the stage of code execution (via Reflection API).
modifiers @interface AnnotationName {
type elementName();
...
type elementName default value;
}
public @interface BugReport {
String assignedTo() default "";
int severity();
}
Just as enum
classes are a special kind of class, annotation interfaces are a special kind of interface.
Inherited from java.lang.annotation.Annotation
.
Cannot be extended by inheritance, cannot be parameterized through generics.
Methods cannot have parameters.
Primitives (int
, short
, long
, byte
, char
, double
, float
, boolean
),
enums,
String
,
Class
(with a possible parameter restriction like Class<? extends MyClass>
),
other annotations,
arrays of the above (but not arrays of arrays).
public @interface BugReport {
//nested types are valid!
enum Status {UNCONFIRMED, CONFIRMED, FIXED, NOTABUG}
boolean showStopper() default false;
String assignedTo() default "";
//metaclass, to be explained later
Class<?> testCase() default Void.class;
Status status() default Status.UNCONFIRMED;
//Annotation type. We use a special syntax to construct an instance ,
//to be explained later
Reference ref() default @Reference(id = "");
String[] reportedBy();
}
@AnnotationName(elementName1=value1, elementName2=value2,...)
@AnnotationName(singleValue)
@AnnotationName //no values, no parens
@BugReport(assignedTo="Harry", severity=10)
//order doesn't matter
@BugReport(severity=10, assignedTo="Harry")
//if default values are set, you can skip the parameters
@BugReport(severity=10)
//if there are no parameters or all of them have default,
//then brackets can be omitted
@BugReport
//if there is no default value for the parameter, it must be defined
//(otherwise a compilation error).
public @interface ActionListenerFor {
String value();
int priority() default 0;
}
//use
@ActionListenerFor("yellowButton")
//which is equivalent to
@ActionListenerFor(value = "yellowButton")
//at the same time:
@ActionListenerFor(value = "yellowButton", priority = 1)
//Arrays are specified in curly braces
@BugReport(..., reportedBy = {"Harry", "Carl"})
//Same as {"Harry"}
@BugReport(..., reportedBy = "Harry")
//Constructing nested annotations
@BugReport(..., ref = @Reference(id = "12345"))
//... As you can see, using the allowed types,
//you can specify an arbitrarily complex data structure
The annotation value cannot be set to null . null will not be allowed even for default values. |