Co znajdziesz w tym artykule?
- Thread – zaimplementuj metodę run i uruchom wątek
- Executors – zarządzanie pulami wątków
- Callable – wrzuć zadanie do executora i zaczekaj na wynik
- CompletableFuture – czyli jak składać transformacje w nieblokujący sposób
Thread
Zacznijmy od zdefiniowania klasy rozszerzającej Thread
i zaimplementujmy metodę run
.
import lombok.SneakyThrows; import java.util.concurrent.TimeUnit; public class SleepAndPrintThread extends Thread { private final int sleepInterval; public SleepAndPrintThread(String name, int sleepInterval) { super(name); this.sleepInterval = sleepInterval; } @SneakyThrows @Override public void run() { TimeUnit.SECONDS.sleep(this.sleepInterval); System.out.println("Message from thread " + this.getName() + " after " + this.sleepInterval + " seconds"); } }
Skorzystałem z metody TimeUnit.SECONDS.sleep
aby uśpić wątek na kilka sekund. Metoda wymaga obsłużenia wyjątku InterruptedException
. Do jego obsługi wykorzystałem adnotację @SneakyThrows z projektu Lombok.
Stwórzmy i uruchommy nasz wątek.
public class ThreadsApp { public static void main(String[] args) { Thread sleep3seconds = new SleepAndPrintThread("Thread-001", 3); sleep3seconds.start(); System.out.println("Message from main thread"); } }
Komunikatu na wyjściu:
Message from main thread Message from thread Thread-001 after 3 seconds
Join
Stwórzmy teraz kilka wątków i zmuśmy główny wątek aby poczekał na ich zakończenie.
public class ThreadsApp { public static void main(String[] args) { Thread sleep3seconds = new SleepAndPrintThread("Thread-001", 3); sleep3seconds.start(); Thread sleep5seconds = new SleepAndPrintThread("Thread-001", 5); sleep5seconds.start(); try { sleep5seconds.join(); sleep3seconds.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Message from main thread"); } }
Metoda join
klasy Thread
powoduje dołączenie wątku, względem którego została wykonana do wątku w którym nastąpiło wywołanie. W efekcie czego końcowy komunikat zostanie wyświetlony po zakończeniu działania poprzednich wątków.
Wyjście:
Message from thread Thread-001 after 3 seconds Message from thread Thread-001 after 5 seconds Message from main thread
Możemy skorzystać z wyrażenia lambda aby utworzyć implementację interfejsu Runnable
:
Runnable sleepy = () -> { System.out.println("Message from " + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(sleepy, "Runnable-001"); thread.start();
Executors
Executory pomagają zarządzać wątkami w wygodny sposób. Klasa Executors
dostarcza statycznych metod fabrycznych za pomocą których możemy utworzyć żądany executor.
SingleThreadExecutor
public class SingleExecutorApp { public static void main(String... args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Runnable finalCountdown = () -> { for (int i = 0; i <= 10; i++) { System.out.println(i); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable goForward = () -> System.out.println("Go!"); executorService.submit(finalCountdown); executorService.submit(goForward); executorService.shutdown(); } }
SingleThreadExecutor
pozwala na wykonywanie tylko jednego wątku jednocześnie. Wątek goForward
wykona się po zakończeniu wątku finalCountdown
.
FixedThreadPool
Aby stworzyć pulę wątków, które będą wykonywały się jednocześnie skorzystamy z metody fabrycznej newFixedThreadPool
która jako parametr przyjmuje liczbę wątków w puli.
ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable worker1 = () -> { System.out.println("Running 10 secods thread"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable worker2 = () -> { System.out.println("Running 5 seconds thread"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable worker3 = () -> { System.out.println("Running 3 seconds thread"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; executorService.submit(worker1); executorService.submit(worker2); executorService.submit(worker3); System.out.println("Workers loaded"); executorService.shutdown();
Workers loaded Running 10 secods thread Running 5 seconds thread Running 3 seconds thread
Jak widzisz wątki zostały od razu umieszczone w executorze, wątek worker3
wykona się po zakończeniu dwóch poprzednio wywołanych wątków – utworzyliśmy pulę zdolną obsłużyć tylko dwa wątki jednocześnie.
ScheduledThreadPool
Wykorzystując metodę fabrykującą newScheduledThreadPool
możemy stworzyć executor, za pomocą którego wątki mogą być wykonywane cyklicznie lub też z opóźnieniem.
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledExecutorApp { public static void main(String... args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3); Runnable worker1 = () -> { System.out.println("Worker 1 running 3 seconds thread"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable worker2 = () -> { System.out.println("Worker 2 running 5 seconds thread"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable worker3 = () -> { System.out.println("Worker 3 running scheduled workder"); }; executorService.submit(worker1); executorService.submit(worker2); executorService.scheduleAtFixedRate(worker3, 0, 1, TimeUnit.SECONDS); System.out.println("All thread loaded"); } }
Wątek worker3
będzie wykonywany cyklicznie co 1 sekundę. Ponieważ nie kończymy pracy executora metodą shutdown
program będzie działał w nieskończonej pętli. Zmodyfikujmy kod o dwie dodatkowe linijki:
executorService.awaitTermination(10, TimeUnit.SECONDS); executorService.shutdown();
Executor będzie czekał na 10 sekund na zakończenie wykonywanych wątków.
Callable
Metoda sumbit
executora pozwala również na umieszczanie zadań z wykorzystaniem interfejsu Callable
.
import java.util.concurrent.*; public class CallableApp { public static void main(String... args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); Callable<Integer> bestNumberEver = () -> { System.out.println("Calculating Best Number Ever"); TimeUnit.SECONDS.sleep(3); return 73; }; Callable<Integer> answerToUltimateQuestionOfLife = () -> { System.out.println("Calculating Answer to the Ultimate Question of Life, the Universe, and Everything"); TimeUnit.SECONDS.sleep(2); return 42; }; Future<Integer> bestNumberFuture = executorService.submit(bestNumberEver); Future<Integer> answerToLifeFuture = executorService.submit(answerToUltimateQuestionOfLife); Integer r = bestNumberFuture.get(); System.out.println(r); r = answerToLifeFuture.get(); System.out.println(r); executorService.shutdown(); } }
Umieszczenie zadania w executorze powoduje rozpoczęcie jego wykonywania i zwrócenie Future<T>
, w naszym wypadku Future<Integer>
ponieważ oba zadania są klasy Callable<Integer>
.
Wywołanie metody get
“czeka” na zakończenie wykonywania zadania. W poniższym kodzie metoda isDone
zwróci true w momencie kiedy zadanie będzie zakończone.
while (!bestNumberFuture.isDone()) { System.out.println("Calculating best number in progress..."); TimeUnit.SECONDS.sleep(1); }
invokeAny
Metoda invokeAny
wykonuje listę zadań i zwraca wynik uzyskany jako pierwszy.
List<Callable<Integer>> callableList = Arrays.asList(bestNumberEver, answerToUltimateQuestionOfLife); Integer result = executorService.invokeAny(callableList); System.out.println(result);
Calculating Best Number Ever Calculating Answer to the Ultimate Question of Life, the Universe, and Everything 42
CompletableFuture
Klasa ta implementuje interfejse Future
oraz CompletationStage
. CompletationStage
pozwala spojrzeć na obliczenia asynchroniczne w sposób “krokowy”.
Metoda runAsync
wykonuje asynchronicznie podany jako parametr interfejs Runnable
.
CompletableFuture.runAsync( () -> System.out.println(Thread.currentThread().getName()));
Jako drugi parametr można przekazać executor. Brak podania executora spowoduje wywołanie wątku w domyślnej puli ForkJoinPool
.
ExecutorService executorService = Executors.newFixedThreadPool(2); CompletableFuture.runAsync( () -> System.out.println(Thread.currentThread().getName()), executorService);
Metoda supplyAsync
zwraca CompletableFuture
, do którego możemy odwołać się metodą get
, pamiętaj że zablokujesz wówczas wykonanie do czasu zakończenia zadania.
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 73; }); System.out.println(result.get());
Na poniższym przykładzie używamy metod thenApply
oraz thenAccept
w kolejnych transformacjach.
CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 43; }, executorService) .thenApply(r -> { System.out.println("*2: " + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return r * 2; }) .thenApply(r -> { System.out.println("+1: " + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return r + 1; } ) .thenAccept(r -> { System.out.println("thenAccept: " + Thread.currentThread().getName()); System.out.println(r); });
Metoda exceptionally
umożliwia obsłużenie wyjątku
CompletableFuture.supplyAsync(() -> { try { System.out.println("Wątek: " + Thread.currentThread().getName()); SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } if (true) { throw new RuntimeException("Something went wrong"); } return 43; }, executorService) .exceptionally(exception -> { System.out.println("Error: " + exception.getMessage()); return -1; }) .thenAccept(r -> { System.out.println(r); }); executorService.shutdown(); }
Metoda thenCompose
umożliwia asynchroniczne wykonanie transformacji.
CompletableFuture<Double> valueFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return 100.00; }); CompletableFuture<Void> taxValueFuture = valueFuture.thenCompose( value -> CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return value * 0.23; } ) .thenAccept(System.out::println) ); taxValueFuture.get();
Metoda thenCombine
umożliwia złączenie transformacji tak aby powstał kolejny wynik
CompletableFuture<Long> calcValue = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName()); return 32L; }); CompletableFuture<Long> calcOtherValue = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName()); return 2L; }); CompletableFuture<Long> combinedValue = calcValue.thenCombine(calcOtherValue, (v1, v2) -> v1 * v2); System.out.println(combinedValue.get());