Poniższy artykuł powstał na podstawie książki “The Well-Grounded Java Developer” autorstwa M. Verburg, J. Clark, B. Evens. Wykorzystałem w nim również fragmenty kodu opisywanego w tejże książce.
W naszym przykładzie zbudujemy system bankowy, którego zadaniem będzie wykonywanie przelewów pomiędzy kontami. Zdefiniujmy klasę Account
, która przechowuje informacje o saldzie konta, oraz udostępnia metodę transferTo
służącą do transferu środków na konto odbiorcy.
public class Account {
private double balance;
public Account(double openingBalance) {
this.balance = openingBalance;
}
public synchronized boolean withdraw(final int amount) {
if (this.balance >= amount) {
this.balance = this.balance - amount;
return true;
}
return false;
}
public synchronized void deposit(final int amount) {
this.balance = this.balance + amount;
}
public synchronized double getBalance() {
return this.balance;
}
public synchronized boolean transferTo(Account other, int amount) {
try {
Thread.sleep(10);
} catch (InterruptedException __) {
}
if (this.balance >= amount) {
this.balance = this.balance - amount;
other.deposit(amount);
return true;
}
return false;
}
}
Metoda transferTo
symuluje 10 ms opóźnienie, po czym wykonuje metodę deposit
na przekazanym jako parametr koncie odbiorcy. Obie metody opatrzone są słowem kluczowym synchronized
. Wyglądają więc na “bezpieczne” wątkowo – tylko jeden wątek może wykonywać metodę, kolejny zostanie zablokowany, aż do momentu wyjścia z metody przez poprzedni wątek.
Utworzymy teraz dwa wątki, które będą wykonywać operację transferu pomiędzy kontami A i B.
public class Main {
private static final int MAX_TRANSFERS = 100;
public static void main(String[] args) throws InterruptedException {
Account accountA = new Account(10_000);
Account accountB = new Account(10_000);
Thread threadA = new Thread(() -> {
for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
boolean ok = accountA.transferTo(accountB, 1);
if (!ok) {
System.out.println("Thread A failed at " + i);
}
}
});
Thread threadB = new Thread(() -> {
for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
boolean ok = accountB.transferTo(accountA, 1);
if (!ok) {
System.out.println("Thread B failed at " + i);
}
}
});
threadA.start();
threadB.start();
threadA.join();
threadB.join();
System.out.println(accountA.getBalance());
System.out.println(accountB.getBalance());
}
}
Po uruchomieniu programu czeka nas niemiła niespodzianka. Program zawiśnie. Możesz to sprawdzić za pomocą narzędzia JDK Mission Control. Oba wątki utknęły w metodzie deposit
.
Dlaczego tak się dzieje? Wątek A rozpoczyna metodę transferTo
. Słowo synchronized
powoduje utworzenie blokady konta A dla innych wątków. W tym samym czasie wątek B również rozpoczyna pracę i blokuje konto B. Wątek A próbuje wykonać metodę deposit
na koncie B. Tutaj pojawia się problem - konto B zostało zablokowane przez wątek B. Również wątek B nie może wykonać metody deposit
na koncie A. Mamy więc przypadek zakleszczenia, deadlock.
Jak rozwiązać problem? Blokady muszą być zakładane w tej samej kolejności przez oba wątki. Aby to osiągnąć musimy przyjąć kryterium, które zostanie użyte do rozstrzygnięcia czy najpierw założyć blokadę na koncie nadawcy czy odbiorcy.
Wprowadźmy w tym celu identyfikator konta accountId
. Będzie on nadawany automatycznie podczas tworzenia konta.
public class Account {
private static int nextId = 0;
private final int accountId;
private double balance;
public Account(double openingBalance) {
this.balance = openingBalance;
this.accountId = getAndIncrementId();
}
private synchronized int getAndIncrementId() {
nextId = nextId + 1;
return nextId;
}
public synchronized void deposit(final int amount) {
this.balance = this.balance + amount;
}
public synchronized double getBalance() {
return this.balance;
}
public int getAccountId() {
return accountId;
}
public boolean transferTo(Account other, int amount) {
if (this.accountId == other.getAccountId()) {
return false;
}
if (this.accountId < other.accountId) {
synchronized (this) {
if (balance >= amount) {
balance = balance - amount;
synchronized (other) {
other.deposit(amount);
}
return true;
} else {
return false;
}
}
} else {
synchronized (other) {
synchronized (this) {
if (this.balance >= amount) {
this.balance = this.balance - amount;
other.deposit(amount);
return true;
} else {
return false;
}
}
}
}
}
}
Zwróć uwagę na zmiany w metodzie transferTo
. Identyfikator konta używany jest do ustalenia które konto ma zostać zablokowane w pierwszej kolejności.
Rozważmy jeszcze raz co się dokładnie będzie działo. Załóżmy że konto A zostało utworzone pierwsze i jego accountId
jest mniejszy niż konta B.
- Wątek A rozpoczyna operację transferu z konta A do B.
- Chwilę później wątek B rozpoczyna operację transferu z konta B do A.
- Wątek A sprawdza warunek
this.accountId < other.accountId
i zakłada blokadę na koncie A -synchronized (this)
- Wątek B sprawdza ten sam warunek i próbuje założyć blokadę na koncie o mniejszym id, czyli A -
synchronized (other)
. Operacja ta nie udaje się - wątek A zdążył już założyć blokadę w pkt. 3. Następuje zawieszenie wątku B. - Wątek A zakłada blokadę na koncie B -
synchronized (other)
i wykonuje metodęother.deposit(amount)
. Operacja założenia blokady na koncie B udaje się. - Wątek A zwalnia blokadę na koncie B, a następnie zwalnia blokadę na koncie A.
- Wątek B, który utknął przy zakładaniu blokady na koncie A zostaje odblokowany i kontynuuje pracę.
Jak widzisz cały proces jest dosyć skomplikowany, a mamy tu tylko prosty, podręcznikowy przykład.
Problemem w tym podejściu jest fakt, iż nasza metoda transferTo
modyfikuje nie tylko stan konta na rzecz którego została wykonana, ale również modyfikuje konto odbiorcy. Zdecydowanie lepszym rozwiązaniem będzie zmiana podejścia i zrzucenie odpowiedzialności za transfer środków z obiektu konta.
Wyobraźmy sobie sytuację, w której to nie obiekt Account
dokonuje transferu środków, ale istnieje jakiś specjalny serwis, który otrzymuje zadania przelewu, kolejkuje je, a następnie modyfikuje konta odbiorcy i nadawcy. W naszej przykładowej aplikacji wątki A i B będą tworzyć zadania przelewu TransferTask
a odrębny serwis AccountManager
będzie odpowiadał za ich przetworzenie.
Zdefiniujmy zadanie transferu środków, w którym oprócz kwoty przelewu amount
znajdą się informacje o koncie nadawcy sender
i odbiorcy receiver
.
public record TransferTask(
Account sender,
Account receiver,
int amount) {
}
Powyższą klasę zdefiniowałem z wykorzystaniem rekordów, wprowadzonych w JDK 14. Mamy więc pewność że obiekt tej klasy będzie niezmienny.
Działanie serwisu AccountManager
oparte będzie na dwóch wątkach. Jeden z nich odbierał będzie zadanie z kolejki zadań oczekujących pendingTask
, następnie pomniejszy saldo konta zlecającego transfer sender
, po czym umieści zadanie zdeponowania środków na koncie odbiorcy, do tego celu użyje kolejki forDeposit
.
Drugi z wątków odpowiedzialny będzie za obsługę kolejki forDeposit
. Po pobraniu z niej zadania, zwiększy saldo konta odbiorcy receiver
.
Zadania, które nie zostały wykonane (z powodu braku środków lub wystąpienia wyjątków) zostaną umieszczone w kolejce failed
. Obsługę tej kolejki pominiemy.
public class AccountManager {
private ConcurrentHashMap<Integer, Account> accounts = new ConcurrentHashMap<>();
private volatile boolean shutdown = false;
private BlockingQueue<TransferTask> pending = new LinkedBlockingQueue<>();
private BlockingQueue<TransferTask> forDeposit = new LinkedBlockingQueue<>();
private BlockingQueue<TransferTask> failed = new LinkedBlockingQueue<>();
private Thread withdrawals;
private Thread deposits;
public Account createAccount(int balance) {
var account = new Account(balance);
accounts.put(account.getId(), account);
return account;
}
public void submit(TransferTask transfer) {
try {
pending.put(transfer);
} catch (InterruptedException e) {
System.err.println("Pending queue error");
try {
failed.put(transfer);
} catch (InterruptedException ex) {
System.err.println("Failed queue error");
}
}
}
public void init() {
Runnable withdraw = () -> {
while (!shutdown) {
try {
var task = pending.poll(1, TimeUnit.SECONDS);
if (task != null) {
var sender = task.sender();
if (sender.withdraw(task.amount())) {
forDeposit.put(task);
} else {
failed.put(task);
}
}
} catch (InterruptedException e) {
System.err.println("Withdraw error");
}
}
};
Runnable deposit = () -> {
while (!shutdown) {
TransferTask task;
try {
task = forDeposit.poll(1, TimeUnit.SECONDS);
if (task != null) {
var receiver = task.receiver();
receiver.deposit(task.amount());
}
} catch (InterruptedException e) {
System.err.println("Deposit error");
}
}
};
init(withdraw, deposit);
}
private void init(Runnable withdraw, Runnable deposit) {
withdrawals = new Thread(withdraw);
deposits = new Thread(deposit);
withdrawals.start();
deposits.start();
}
public void await() {
try {
withdrawals.join();
deposits.join();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
}
public void shutdown() {
shutdown = true;
}
}
Metoda submit
służy do umieszczenia zadania w kolejce zadań oczekujących pending
.
W metodzie init
tworzone są wątki: withdraw
- odpowiedzialny za pobranie kwoty z konta nadawcy oraz deposit
, który zwiększa saldo konta odbiorcy.
Do pobrania zadania z kolejek wykorzystałem metodę poll
, która pobiera zadanie z listy, jeśli lista nie jest pusta, lub czeka określoną liczbę czasu na to, aż pojawi się nowe zadanie. Jeśli po tym czasie nadal brak zadania to zwraca wartość null
. Zamiast poll
możnaby użyć metody take
, jest to jednak metoda blokująca - jej działanie nie zakończy się dopóki nie zostanie zwrócone zadanie, co oznaczałoby dla nas problem z zakończeniem wątku - po prostu utknąłby w metodzie oczekując na dalsze zadania.
Pole shutdown
, zdefiniowane jako volatile
umożliwia zakończenie działania obu wątków. Mówiąc najprościej volatile
oznacza, że zmiany dokonywane na tej zmiennej będą bezpośrednio zapisywane w pamięci, czyli każda zmiana tej zmiennej będzie od razu widoczna w innych wątkach.
Metoda await
czeka, aż oba wątki zakończą swoje działanie.
Poniżej przykład aplikacji, która wykonuje operacje transferu pomiędzy kontami A i B za pomocą dwóch wątków.
public class Main {
private static final int MAX_TRANSFERS = 10_000;
public static void main(String[] args) throws InterruptedException {
var manager = new AccountManager();
manager.init();
var accountA = manager.createAccount(10_000);
var accountB = manager.createAccount(20_000);
Thread threadA = new Thread(() -> {
for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
var transfer = new TransferTask(accountA, accountB, 1);
manager.submit(transfer);
}
});
Thread threadB = new Thread(() -> {
for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
var transfer = new TransferTask(accountB, accountA, 1);
manager.submit(transfer);
}
});
threadA.start();
threadB.start();
threadA.join();
threadB.join();
TimeUnit.SECONDS.sleep(3);
manager.shutdown();
manager.await();
System.out.println(accountA.getBalance());
System.out.println(accountB.getBalance());
}
}
Kod opisywany w artykule znajdziesz w moim repozytorium https://github.com/javascratches/deadlock