Zakleszczenie wątków (Deadlock)

  • Post author:
  • Post category:Java

Poniższy artykuł powstał na podstawie książki “The Well-Grounded Java Developer” autorstwa M. Verburg, J. Clark, B. Evens. Wykorzystałem w nim również fragmenty kodu opisywanego w tejże książce.

W naszym przykładzie zbudujemy system bankowy, którego zadaniem będzie wykonywanie przelewów pomiędzy kontami. Zdefiniujmy klasę Account, która przechowuje informacje o saldzie konta, oraz udostępnia metodę transferTo służącą do transferu środków na konto odbiorcy.

public class Account {
    private double balance;

    public Account(double openingBalance) {
        this.balance = openingBalance;
    }

    public synchronized boolean withdraw(final int amount) {
        if (this.balance >= amount) {
            this.balance = this.balance - amount;
            return true;
        }
        return false;
    }

    public synchronized void deposit(final int amount) {
        this.balance = this.balance + amount;
    }

    public synchronized double getBalance() {
        return this.balance;
    }

    public synchronized boolean transferTo(Account other, int amount) {
        try {
            Thread.sleep(10);
        } catch (InterruptedException __) {
        }
        if (this.balance >= amount) {
            this.balance = this.balance - amount;
            other.deposit(amount);
            return true;
        }
        return false;
    }
}

Metoda transferTo symuluje 10 ms opóźnienie, po czym wykonuje metodę deposit na przekazanym jako parametr koncie odbiorcy. Obie metody opatrzone są słowem kluczowym synchronized. Wyglądają więc na “bezpieczne” wątkowo – tylko jeden wątek może wykonywać metodę, kolejny zostanie zablokowany, aż do momentu wyjścia z metody przez poprzedni wątek.

Utworzymy teraz dwa wątki, które będą wykonywać operację transferu pomiędzy kontami A i B.

public class Main {
    private static final int MAX_TRANSFERS = 100;

    public static void main(String[] args) throws InterruptedException {
        Account accountA = new Account(10_000);
        Account accountB = new Account(10_000);

        Thread threadA = new Thread(() -> {
            for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
                boolean ok = accountA.transferTo(accountB, 1);
                if (!ok) {
                    System.out.println("Thread A failed at " + i);
                }
            }
        });

        Thread threadB = new Thread(() -> {
            for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
                boolean ok = accountB.transferTo(accountA, 1);
                if (!ok) {
                    System.out.println("Thread B failed at " + i);
                }
            }
        });

        threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();

        System.out.println(accountA.getBalance());
        System.out.println(accountB.getBalance());
    }
}

Po uruchomieniu programu czeka nas niemiła niespodzianka. Program zawiśnie. Możesz to sprawdzić za pomocą narzędzia JDK Mission Control. Oba wątki utknęły w metodzie deposit.

Dlaczego tak się dzieje? Wątek A rozpoczyna metodę transferTo. Słowo synchronized powoduje utworzenie blokady konta A dla innych wątków. W tym samym czasie wątek B również rozpoczyna pracę i blokuje konto B. Wątek A próbuje wykonać metodę deposit na koncie B. Tutaj pojawia się problem - konto B zostało zablokowane przez wątek B. Również wątek B nie może wykonać metody deposit na koncie A. Mamy więc przypadek zakleszczenia, deadlock.

Jak rozwiązać problem? Blokady muszą być zakładane w tej samej kolejności przez oba wątki. Aby to osiągnąć musimy przyjąć kryterium, które zostanie użyte do rozstrzygnięcia czy najpierw założyć blokadę na koncie nadawcy czy odbiorcy.

Wprowadźmy w tym celu identyfikator konta accountId. Będzie on nadawany automatycznie podczas tworzenia konta.

public class Account {
    private static int nextId = 0;

    private final int accountId;
    private double balance;

    public Account(double openingBalance) {
        this.balance = openingBalance;
        this.accountId = getAndIncrementId();
    }

    private synchronized int getAndIncrementId() {
        nextId = nextId + 1;
        return nextId;
    }

    public synchronized void deposit(final int amount) {
        this.balance = this.balance + amount;
    }

    public synchronized double getBalance() {
        return this.balance;
    }

    public int getAccountId() {
        return accountId;
    }

    public boolean transferTo(Account other, int amount) {
        if (this.accountId == other.getAccountId()) {
            return false;
        }

        if (this.accountId < other.accountId) {
            synchronized (this) {
                if (balance >= amount) {
                    balance = balance - amount;
                    synchronized (other) {
                        other.deposit(amount);
                    }
                    return true;
                } else {
                    return false;
                }
            }
        } else {
            synchronized (other) {
                synchronized (this) {
                    if (this.balance >= amount) {
                        this.balance = this.balance - amount;
                        other.deposit(amount);
                        return true;
                    } else {
                        return false;
                    }
                }
            }
        }
    }
}

Zwróć uwagę na zmiany w metodzie transferTo. Identyfikator konta używany jest do ustalenia które konto ma zostać zablokowane w pierwszej kolejności.

Rozważmy jeszcze raz co się dokładnie będzie działo. Załóżmy że konto A zostało utworzone pierwsze i jego accountId jest mniejszy niż konta B.

  1. Wątek A rozpoczyna operację transferu z konta A do B.
  2. Chwilę później wątek B rozpoczyna operację transferu z konta B do A.
  3. Wątek A sprawdza warunek this.accountId < other.accountId i zakłada blokadę na koncie A - synchronized (this)
  4. Wątek B sprawdza ten sam warunek i próbuje założyć blokadę na koncie o mniejszym id, czyli A - synchronized (other). Operacja ta nie udaje się - wątek A zdążył już założyć blokadę w pkt. 3. Następuje zawieszenie wątku B.
  5. Wątek A zakłada blokadę na koncie B - synchronized (other) i wykonuje metodę other.deposit(amount). Operacja założenia blokady na koncie B udaje się.
  6. Wątek A zwalnia blokadę na koncie B, a następnie zwalnia blokadę na koncie A.
  7. Wątek B, który utknął przy zakładaniu blokady na koncie A zostaje odblokowany i kontynuuje pracę.

Jak widzisz cały proces jest dosyć skomplikowany, a mamy tu tylko prosty, podręcznikowy przykład.

Problemem w tym podejściu jest fakt, iż nasza metoda transferTo modyfikuje nie tylko stan konta na rzecz którego została wykonana, ale również modyfikuje konto odbiorcy. Zdecydowanie lepszym rozwiązaniem będzie zmiana podejścia i zrzucenie odpowiedzialności za transfer środków z obiektu konta.

Wyobraźmy sobie sytuację, w której to nie obiekt Account dokonuje transferu środków, ale istnieje jakiś specjalny serwis, który otrzymuje zadania przelewu, kolejkuje je, a następnie modyfikuje konta odbiorcy i nadawcy. W naszej przykładowej aplikacji wątki A i B będą tworzyć zadania przelewu TransferTask a odrębny serwis AccountManager będzie odpowiadał za ich przetworzenie.

Zdefiniujmy zadanie transferu środków, w którym oprócz kwoty przelewu amount znajdą się informacje o koncie nadawcy sender i odbiorcy receiver.

public record TransferTask(
        Account sender,
        Account receiver,
        int amount) {
}

Powyższą klasę zdefiniowałem z wykorzystaniem rekordów, wprowadzonych w JDK 14. Mamy więc pewność że obiekt tej klasy będzie niezmienny.

Działanie serwisu AccountManager oparte będzie na dwóch wątkach. Jeden z nich odbierał będzie zadanie z kolejki zadań oczekujących pendingTask, następnie pomniejszy saldo konta zlecającego transfer sender, po czym umieści zadanie zdeponowania środków na koncie odbiorcy, do tego celu użyje kolejki forDeposit.

Drugi z wątków odpowiedzialny będzie za obsługę kolejki forDeposit. Po pobraniu z niej zadania, zwiększy saldo konta odbiorcy receiver.

Zadania, które nie zostały wykonane (z powodu braku środków lub wystąpienia wyjątków) zostaną umieszczone w kolejce failed. Obsługę tej kolejki pominiemy.

public class AccountManager {
    private ConcurrentHashMap<Integer, Account> accounts = new ConcurrentHashMap<>();
    private volatile boolean shutdown = false;
    private BlockingQueue<TransferTask> pending = new LinkedBlockingQueue<>();
    private BlockingQueue<TransferTask> forDeposit = new LinkedBlockingQueue<>();
    private BlockingQueue<TransferTask> failed = new LinkedBlockingQueue<>();
    private Thread withdrawals;
    private Thread deposits;

    public Account createAccount(int balance) {
        var account = new Account(balance);
        accounts.put(account.getId(), account);
        return account;
    }

    public void submit(TransferTask transfer) {
        try {
            pending.put(transfer);
        } catch (InterruptedException e) {
            System.err.println("Pending queue error");
            try {
                failed.put(transfer);
            } catch (InterruptedException ex) {
                System.err.println("Failed queue error");
            }
        }
    }

    public void init() {
        Runnable withdraw = () -> {
            while (!shutdown) {
                try {
                    var task = pending.poll(1, TimeUnit.SECONDS);
                    if (task != null) {
                        var sender = task.sender();
                        if (sender.withdraw(task.amount())) {
                            forDeposit.put(task);
                        } else {
                            failed.put(task);
                        }
                    }
                } catch (InterruptedException e) {
                    System.err.println("Withdraw error");
                }
            }
        };

        Runnable deposit = () -> {
            while (!shutdown) {
                TransferTask task;
                try {
                    task = forDeposit.poll(1, TimeUnit.SECONDS);
                    if (task != null) {
                        var receiver = task.receiver();
                        receiver.deposit(task.amount());
                    }
                } catch (InterruptedException e) {
                    System.err.println("Deposit error");
                }
            }

        };

        init(withdraw, deposit);
    }

    private void init(Runnable withdraw, Runnable deposit) {
        withdrawals = new Thread(withdraw);
        deposits = new Thread(deposit);
        withdrawals.start();
        deposits.start();
    }

    public void await() {
        try {
            withdrawals.join();
            deposits.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void shutdown() {
        shutdown = true;
    }

}

Metoda submit służy do umieszczenia zadania w kolejce zadań oczekujących pending.

W metodzie init tworzone są wątki: withdraw - odpowiedzialny za pobranie kwoty z konta nadawcy oraz deposit, który zwiększa saldo konta odbiorcy.

Do pobrania zadania z kolejek wykorzystałem metodę poll, która pobiera zadanie z listy, jeśli lista nie jest pusta, lub czeka określoną liczbę czasu na to, aż pojawi się nowe zadanie. Jeśli po tym czasie nadal brak zadania to zwraca wartość null. Zamiast poll możnaby użyć metody take, jest to jednak metoda blokująca - jej działanie nie zakończy się dopóki nie zostanie zwrócone zadanie, co oznaczałoby dla nas problem z zakończeniem wątku - po prostu utknąłby w metodzie oczekując na dalsze zadania.

Pole shutdown, zdefiniowane jako volatile umożliwia zakończenie działania obu wątków. Mówiąc najprościej volatile oznacza, że zmiany dokonywane na tej zmiennej będą bezpośrednio zapisywane w pamięci, czyli każda zmiana tej zmiennej będzie od razu widoczna w innych wątkach.

Metoda await czeka, aż oba wątki zakończą swoje działanie.

Poniżej przykład aplikacji, która wykonuje operacje transferu pomiędzy kontami A i B za pomocą dwóch wątków.

public class Main {
    private static final int MAX_TRANSFERS = 10_000;

    public static void main(String[] args) throws InterruptedException {
        var manager = new AccountManager();
        manager.init();

        var accountA = manager.createAccount(10_000);
        var accountB = manager.createAccount(20_000);

        Thread threadA = new Thread(() -> {
            for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
                var transfer = new TransferTask(accountA, accountB, 1);
                manager.submit(transfer);
            }
        });

        Thread threadB = new Thread(() -> {
            for (int i = 0; i < MAX_TRANSFERS; i = i + 1) {
                var transfer = new TransferTask(accountB, accountA, 1);
                manager.submit(transfer);
            }
        });

        threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();

        TimeUnit.SECONDS.sleep(3);
        manager.shutdown();
        manager.await();

        System.out.println(accountA.getBalance());
        System.out.println(accountB.getBalance());
    }
}

Kod opisywany w artykule znajdziesz w moim repozytorium https://github.com/javascratches/deadlock