Multithreading

  • Post author:
  • Post category:Java

Co znajdziesz w tym artykule?

  1. Thread – zaimplementuj metodę run i uruchom wątek
  2. Executors – zarządzanie pulami wątków
  3. Callable – wrzuć zadanie do executora i zaczekaj na wynik
  4. CompletableFuture – czyli jak składać transformacje w nieblokujący sposób

Thread

Zacznijmy od zdefiniowania klasy rozszerzającej Thread i zaimplementujmy metodę run.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

public class SleepAndPrintThread extends Thread {

    private final int sleepInterval;

    public SleepAndPrintThread(String name, int sleepInterval) {
        super(name);
        this.sleepInterval = sleepInterval;
    }

    @SneakyThrows
    @Override
    public void run() {
        TimeUnit.SECONDS.sleep(this.sleepInterval);
        System.out.println("Message from thread " + this.getName() + " after " + this.sleepInterval + " seconds");
    }

}

Skorzystałem z metody TimeUnit.SECONDS.sleep aby uśpić wątek na kilka sekund. Metoda wymaga obsłużenia wyjątku InterruptedException. Do jego obsługi wykorzystałem adnotację @SneakyThrows z projektu Lombok.

Stwórzmy i uruchommy nasz wątek.

public class ThreadsApp {

    public static void main(String[] args) {
        Thread sleep3seconds = new SleepAndPrintThread("Thread-001", 3);
        sleep3seconds.start();
        System.out.println("Message from main thread");
    }

}

Komunikatu na wyjściu:

Message from main thread
Message from thread Thread-001 after 3 seconds

Join

Stwórzmy teraz kilka wątków i zmuśmy główny wątek aby poczekał na ich zakończenie.

public class ThreadsApp {

    public static void main(String[] args)  {
        Thread sleep3seconds = new SleepAndPrintThread("Thread-001", 3);
        sleep3seconds.start();

        Thread sleep5seconds = new SleepAndPrintThread("Thread-001", 5);
        sleep5seconds.start();

        try {
            sleep5seconds.join();
            sleep3seconds.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Message from main thread");
    }

}

Metoda join klasy Thread powoduje dołączenie wątku, względem którego została wykonana do wątku w którym nastąpiło wywołanie. W efekcie czego końcowy komunikat zostanie wyświetlony po zakończeniu działania poprzednich wątków.

Wyjście:

Message from thread Thread-001 after 3 seconds
Message from thread Thread-001 after 5 seconds
Message from main thread

Możemy skorzystać z wyrażenia lambda aby utworzyć implementację interfejsu Runnable:

Runnable sleepy = () -> {
    System.out.println("Message from " + Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(sleepy, "Runnable-001");
thread.start();

Executors

Executory pomagają zarządzać wątkami w wygodny sposób. Klasa Executors dostarcza statycznych metod fabrycznych za pomocą których możemy utworzyć żądany executor.

SingleThreadExecutor

public class SingleExecutorApp {

    public static void main(String... args) {

        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Runnable finalCountdown = () -> {
            for (int i = 0; i <= 10; i++) {
                System.out.println(i);
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable goForward = () -> System.out.println("Go!");

        executorService.submit(finalCountdown);
        executorService.submit(goForward);
        executorService.shutdown();
    }

}

SingleThreadExecutor pozwala na wykonywanie tylko jednego wątku jednocześnie. Wątek goForward wykona się po zakończeniu wątku finalCountdown.

FixedThreadPool

Aby stworzyć pulę wątków, które będą wykonywały się jednocześnie skorzystamy z metody fabrycznej newFixedThreadPool która jako parametr przyjmuje liczbę wątków w puli.

ExecutorService executorService = Executors.newFixedThreadPool(2);

    Runnable worker1 = () -> {
        System.out.println("Running 10 secods thread");
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    Runnable worker2 = () -> {
        System.out.println("Running 5 seconds thread");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    Runnable worker3 = () -> {
        System.out.println("Running 3 seconds thread");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    executorService.submit(worker1);
    executorService.submit(worker2);
    executorService.submit(worker3);
    System.out.println("Workers loaded");
    executorService.shutdown();
Workers loaded
Running 10 secods thread
Running 5 seconds thread
Running 3 seconds thread

Jak widzisz wątki zostały od razu umieszczone w executorze, wątek worker3 wykona się po zakończeniu dwóch poprzednio wywołanych wątków – utworzyliśmy pulę zdolną obsłużyć tylko dwa wątki jednocześnie.

ScheduledThreadPool

Wykorzystując metodę fabrykującą newScheduledThreadPool możemy stworzyć executor, za pomocą którego wątki mogą być wykonywane cyklicznie lub też z opóźnieniem.

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorApp {

    public static void main(String... args) throws InterruptedException {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

        Runnable worker1 = () -> {
            System.out.println("Worker 1 running 3 seconds thread");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        Runnable worker2 = () -> {
            System.out.println("Worker 2 running 5 seconds thread");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        Runnable worker3 = () -> {
            System.out.println("Worker 3 running scheduled workder");
        };

        executorService.submit(worker1);
        executorService.submit(worker2);
        executorService.scheduleAtFixedRate(worker3, 0, 1, TimeUnit.SECONDS);
        System.out.println("All thread loaded");
    }

}

Wątek worker3 będzie wykonywany cyklicznie co 1 sekundę. Ponieważ nie kończymy pracy executora metodą shutdown program będzie działał w nieskończonej pętli. Zmodyfikujmy kod o dwie dodatkowe linijki:

executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();

Executor będzie czekał na 10 sekund na zakończenie wykonywanych wątków.

Callable

Metoda sumbit executora pozwala również na umieszczanie zadań z wykorzystaniem interfejsu Callable.

import java.util.concurrent.*;

public class CallableApp {

    public static void main(String... args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        Callable<Integer> bestNumberEver = () -> {
            System.out.println("Calculating Best Number Ever");
            TimeUnit.SECONDS.sleep(3);
            return 73;
        };

        Callable<Integer> answerToUltimateQuestionOfLife = () -> {
            System.out.println("Calculating Answer to the Ultimate Question of Life, the Universe, and Everything");
            TimeUnit.SECONDS.sleep(2);
            return 42;
        };

        Future<Integer> bestNumberFuture = executorService.submit(bestNumberEver);
        Future<Integer> answerToLifeFuture = executorService.submit(answerToUltimateQuestionOfLife);

        Integer r = bestNumberFuture.get();
        System.out.println(r);

        r =  answerToLifeFuture.get();
        System.out.println(r);

        executorService.shutdown();
    }

}

Umieszczenie zadania w executorze powoduje rozpoczęcie jego wykonywania i zwrócenie Future<T>, w naszym wypadku Future<Integer> ponieważ oba zadania są klasy Callable<Integer>.

Wywołanie metody get “czeka” na zakończenie wykonywania zadania. W poniższym kodzie metoda isDone zwróci true w momencie kiedy zadanie będzie zakończone.

    while (!bestNumberFuture.isDone()) {
        System.out.println("Calculating best number in progress...");
        TimeUnit.SECONDS.sleep(1);
    }

invokeAny

Metoda invokeAny wykonuje listę zadań i zwraca wynik uzyskany jako pierwszy.

        List<Callable<Integer>> callableList = Arrays.asList(bestNumberEver, answerToUltimateQuestionOfLife);
        Integer result = executorService.invokeAny(callableList);
        System.out.println(result);
Calculating Best Number Ever
Calculating Answer to the Ultimate Question of Life, the Universe, and Everything
42

CompletableFuture

Klasa ta implementuje interfejse Future oraz CompletationStage. CompletationStage pozwala spojrzeć na obliczenia asynchroniczne w sposób “krokowy”.

Metoda runAsync wykonuje asynchronicznie podany jako parametr interfejs Runnable.

CompletableFuture.runAsync(
  () -> System.out.println(Thread.currentThread().getName()));

Jako drugi parametr można przekazać executor. Brak podania executora spowoduje wywołanie wątku w domyślnej puli ForkJoinPool.

ExecutorService executorService = Executors.newFixedThreadPool(2);

CompletableFuture.runAsync(
        () -> System.out.println(Thread.currentThread().getName()),
        executorService);

Metoda supplyAsync zwraca CompletableFuture, do którego możemy odwołać się metodą get, pamiętaj że zablokujesz wówczas wykonanie do czasu zakończenia zadania.

CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 73;
});

System.out.println(result.get());

Na poniższym przykładzie używamy metod thenApply oraz thenAccept w kolejnych transformacjach.

CompletableFuture.supplyAsync(() -> {
try {
    System.out.println(Thread.currentThread().getName());
    TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
    e.printStackTrace();
}
return 43;
}, executorService)
    .thenApply(r -> {
        System.out.println("*2: " + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return r * 2;
    })
    .thenApply(r -> {
                System.out.println("+1: " + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return r + 1;
            }
    )
    .thenAccept(r -> {
        System.out.println("thenAccept: " + Thread.currentThread().getName());
        System.out.println(r);
    });

Metoda exceptionally umożliwia obsłużenie wyjątku

    CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println("Wątek: " + Thread.currentThread().getName());
            SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (true) {
            throw new RuntimeException("Something went wrong");
        }
        return 43;
    }, executorService)
            .exceptionally(exception -> {
                System.out.println("Error: " + exception.getMessage());
                return -1;
            })
            .thenAccept(r -> {
                System.out.println(r);
            });

    executorService.shutdown();
}

Metoda thenCompose umożliwia asynchroniczne wykonanie transformacji.

CompletableFuture<Double> valueFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100.00;
});

CompletableFuture<Void> taxValueFuture = valueFuture.thenCompose(
        value -> CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return value * 0.23;
                }
        )
                .thenAccept(System.out::println)
);
taxValueFuture.get();

Metoda thenCombine umożliwia złączenie transformacji tak aby powstał kolejny wynik

CompletableFuture<Long> calcValue = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    }
    System.out.println(Thread.currentThread().getName());
    return 32L;
});

CompletableFuture<Long> calcOtherValue = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(4);
    } catch (InterruptedException e) {
    }
    System.out.println(Thread.currentThread().getName());
    return 2L;
});

CompletableFuture<Long> combinedValue = calcValue.thenCombine(calcOtherValue, (v1, v2) -> v1 * v2);
System.out.println(combinedValue.get());