Избранное сообщение

Фетісов В. С. Комп’ютерні технології в тестуванні. Навчально-методичний посібник. 2-ге видання, перероблене та доповнене / Мои публикации

В 10-х годах я принимал участие в программе Европейского Союза Tempus "Освітні вимірювання, адаптовані до стандартів ЄС". В рамк...

Благодаря Интернету количество писателей и поэтов увеличивается в геометрической прогрессии. Поголовье читателей начинает заметно отставать.

понедельник, 28 мая 2018 г.

Java и Project Reactor. Эпизод 2 / Блог компании FunCorp / Хабр / Программирование на Java

Привет! Удивительно, но первая часть статьи даже кому-то понравилась.
Отдельное спасибо за ваши отзывы и комментарии. У меня для вас
плохая хорошая новость: нам ещё есть о чём поговорить! А если точнее, то о некоторых деталях работы Reactor.

Я отрекаюсь от магии

Для дальнейшего углубления в Reactor не будет лишним описать некоторые принципы его работы. Что же тщательно скрывается от нас за внешним слоем из Flux и Mono?


Официальная документация предлагает сравнивать Reactor с конвейером. Publisher выдаёт какие-то данные (материалы). Данные идут по цепочке из операторов (конвейерной ленте), обрабатываются, в конце получается готовый продукт, который передаётся в нужный Consumer/Subscriber и употребляется уже там.

Как работают операторы Reactor?
Рецепт усреднённый, потому что вариаций масса.Попытаемся дать грубое описание.

У каждого оператора есть какая-то
тактика реализация в виде объекта. Вызов оператора у Flux/Mono возвращает объект, реализующий этот оператор. Например, вызов flatMap() вернёт объект типа FluxFlatMap (наследник Flux).

Т.е. оператор — это Publisher, который, помимо какой-то своей логики, содержит ссылку на исходный (source) Publisher, к которому применяется. Вызовы операторов создают цепочку из Publisher.

При вызове subscribe() создаётся исходный Subscriber, он передаётся назад по нашей цепочке из паблишеров, каждый Publisher может завернуть Subscriber в другой Subscriber, таким образом создавая ещё одну цепочку, которая и передаётся в исходный Publisher для выполнения.

Логично, что всё это несёт какой-то оверхед, поэтому рекомендуется воздержаться от написания обычного (синхронного) кода через Flux или Mono.

Schedulers | Планировщики


Reactor не заботит модель исполнения вашей программы, но он любезно предоставляет необходимый для управления исполнением инструментарий. Разработчик
самурайволен самостоятельно выбирать
свою судьбу модель исполнения.

Модель исполнения и её детали определяются имплементацией интерфейса Scheduler (т.е. планировщика). Есть статические методы для ряда случаев жизни, позволяющие указать контекст выполнения:

  • Schedulers.immediate(). Выполнение будет происходить в текущем потоке;
  • Schedulers.single(). Выполнение в выделенном потоке. Осторожно! Он и в самом деле single, обращение не создаст новый scheduler/поток, а вернёт кешированное значение. Для создания выделенного потока/scheduler на каждый вызов используйте Schedulers.newSingle();
  • Schedulers.elastic(). Уже упоминался в прошлой статье. Выполнение задач списывает на workers (работяг, «воркеров»), которых сам же и создаёт. В случае idle (бездействия) worker прибивается. В качестве воркера выступает ExecutorService. Используется для блокирующих задач, например I/O. По умолчанию — unbounded, если нужно ограничение на количество воркеров — используйте Schedulers.newElastic();
  • Schedulers.parallel(). N воркеров, оптимизированных для параллельной работы. По умолчанию N = количеству доступных ядер, т.е. Runtime.getRuntime().availableProcessors(). Осторожно! Внутри Docker этот метод может нагло вам врать.

Стоит отметить, что коробочные Schedulers.single() и Schedulers.parallel() выбрасывают IllegalStateException при попытке запустить в них блокирующий оператор: block(), blockLast(), toIterable(), toStream(). Такое нововведение появилось в релизе 3.1.6.

Если всё-таки хотите заниматься подобными извращениями — используйте Shchedulers.newSingle() и Schedulers.newParallel(). Но лучшей практикой для блокирующих операторов считается использование Schedulers.elastic() или Schedulers.newElastic().

Экземпляры Scheduler так же можно инициализировать из ExecutorService с помощью Schedulers.fromExecutorService(). Из Executor тоже можно, но не рекомендуется.

Некоторые операторы из Flux и Mono запускаются сразу на конкретном Scheduler (но можно передать и свой). К примеру, уже знакомый Flux.interval() по умолчанию запускается на Schedulers.parallel().

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Контекст исполнения


Как же сменить контекст исполнения? Нужно прибегнуть к одному из уже знакомых нам операторов:

  • publishOn();
  • subscribeOn().

Они оба принимают Scheduler в качестве аргумента и позволяют изменить контекст выполнения на указанный Scheduler.
Но почему их два и в чём же разница?

В случае с publishOn этот оператор применяется так же, как и любой другой, посреди цепочки вызовов. Все последующие Subscriber будут выполняться в контексте указанного Scheduler.

В случае с subscribeOn оператор «глобальный», срабатывает сразу на всю цепочку Subscriber. После вызова subscribe() контекстом выполнения будет указанный Scheduler. Далее контекст может изменяться с помощью оператора publishOn. Последующие вызовы subscribeOn игнорируются.


Flux.just("a", "b", "c") 
      
      .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
      .publishOn(Schedulers.elastic())
      
      .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
      .subscribeOn(Schedulers.parallel())
      .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));

Thread.sleep(5000);

выведет следующий результат:

before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
before publishOn: parallel-1
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

Обработка ошибок


В Reactor исключения воспринимаются как terminal event (терминальное событие).
Если где-то произошло исключение, значит, что-то пошло не так, наш конвейер останавливается, а ошибка прокидывается до финального Subscriber и его метода onError.

Любимая картинка

Почему так? Reactor не знает о серьёзности возникшего исключения и понятия не имеет, что с ним делать. Подобные ситуации должны как-то обрабатываться на уровне приложения. Для этого у Subscriber есть прекрасный метод onError(). Reactor вынуждает нас его переопределять и как-то реагировать на исключение, в противном случае мы будем получать UnsupportedOperationException при ошибках.


Философия try/catch


Что обычно делается внутри catch-блока в Java? Ну, не считая всеми любимых пустых catch-блоков.

  1. Static Fallback Value. Вернуть какое-то статическое значение по умолчанию:
    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       return DEFAULT_VALUE;
    }
  2. Fallback Method. Вызов альтернативного метода в случае ошибки:

    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       return loadValueFromCache();
    }
  3. Dynamic Fallback Value. Вернуть какое-то динамическое значение в зависимости от исключения:

    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       if (e instanceof TimeoutException) {
           return loadValueFromCache();
       }
       return DEFAULT_VALUE;
    }
  4. Catch and Rethrow. Обернуть в какое-то доменное исключение и пробросить исключение дальше:

    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       throw new BusinessException(e);
    }
  5. Log or React on the Side. Залогировать ошибку и пробросить исключение дальше:

    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       logger.error(e.getMessage(), e);
       throw e;
    }
  6. Using Resources and the Finally Block. Освобождение ресурсов в finally-блоке или с помощью try-with-resources.
    try {
       return fromRemoteAndUnstableSource();
    } catch(Throwable e) {
       
    } finally {
       cleanAllStuff();
    }

Приятная новость: всё это есть в Reactor в виде эквивалентных операторов.

Менее приятная новость: в случае ошибки ваша прекрасная последовательность данных всё равно завершится (terminal event), несмотря на оператора обработки ошибок.
Подобные операторы используются скорее для создания новой, резервной (fallback) последовательности на замену завершившейся.

Приведём пример:

Flux<String> s = Flux.range(1, 10)
   .map(v -> doSomethingDangerous(v))
   .map(v -> doSecondTransform(v));
s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error));

Можно сравнить это с похожим блоком try / catch:

try {
   for (int i = 1; i < 11; i++) {
       String v1 = doSomethingDangerous(i);
       String v2 = doSecondTransform(v1);
       System.out.println("RECEIVED " + v2);
   }
} catch (Throwable t) {
   System.err.println("CAUGHT " + t);
}

Обратите внимание: for прерывается!

Ещё пример завершения последовательности в случае ошибки:

Flux<String> flux = Flux.interval(Duration.ofMillis(250))
   .map(input -> {
       if (input < 3) return "tick " + input;
       throw new RuntimeException("boom");
   })
   .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100);

На экране получим:

tick 0
tick 1
tick 2
Uh oh

Реализация try/catch

Static Fallback Value

Используя оператор onErrorReturn:

Flux.just(10)
   .map(this::doSomethingDangerous)
   .onErrorReturn("RECOVERED");

Можно добавить предикат, чтобы оператор выполнялся не для всех исключений:

Flux.just(10)
   .map(this::doSomethingDangerous)
   .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

Fallback Method

Используя оператор onErrorResume,

Flux.just("key1", "key2")
   .flatMap(k -> callExternalService(k)) 
   .onErrorResume(e -> getFromCache(k)); 

можно добавить предикат, чтобы оператор выполнялся не для всех исключений:

Flux.just("timeout1", "unknown", "key2")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(TimeoutException.class, getFromCache(k))
   .onErrorResume((Predicate<Throwable>) error -> error instanceof UnknownKeyException, registerNewEntry(k, "DEFAULT"));

Аналогично:

Flux.just("timeout1", "unknown", "key2")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(error -> {
       if (error instanceof TimeoutException)
           return getFromCache(k);
       else if (error instanceof UnknownKeyException) 
           return registerNewEntry(k, "DEFAULT");
       else
           return Flux.error(error);
   });

Dynamic Fallback Value

Всё тот же onErrorResume:

erroringFlux.onErrorResume(error -> Mono.just(
   myWrapper.fromError(error); 
));

Catch and Rethrow


Можно сделать двумя способами. Первый — с оператором onErrorResume:

Flux.just("timeout1")
   .flatMap(k -> callExternalService(k))
   .onErrorResume(original -> Flux.error(
       new BusinessException("oops, SLA exceeded", original)
   );

И более лаконично — с помощью onErrorMap:

Flux.just("timeout1")
   .flatMap(k -> callExternalService(k))
   .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

Log or React on the Side

Добавить какой-то side effect (метрики, логирование) можно с помощью оператора doOnError

LongAdder failureStat = new LongAdder();
Flux<String> flux = Flux.just("unknown")
   .flatMap(k -> callExternalService(k))
   .doOnError(e -> {
       failureStat.increment();
       log("uh oh, falling back, service failed for key " + k);
   })
   .onErrorResume(e -> getFromCache(k));

Using Resources and the Finally Block

Итак, как же получить аналог try-with-resources или блок finally? На выручку нам приходит оператор Flux.using().

Для начала нужно ознакомиться с интерфейсом Disposable. Он заставляет нас реализовать метод dispose(). Вызов этого метода должен отменять или завершать какую-то задачу или последовательность. Вызовы метода должны быть идемпотентными. Использованные ресурсы должны быть освобождены.

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
   @Override
   public void dispose() {
       isDisposed.set(true);
   }

   @Override
   public String toString() {
       return "DISPOSABLE";
   }
};

Flux<String> flux = Flux.using(
       () -> disposableInstance,  
       disposable -> Flux.just(disposable.toString()), 
       Disposable::dispose  
);

Повторение | Retrying

При повторе (retry) наблюдается похожее поведение, оригинальная последовательность завершается (terminate event), мы повторно подписываемся (re-subscribing) на Flux.

Разберём на примере. Код

Flux.interval(Duration.ofMillis(250))
   .map(input -> {
       if (input < 3) return "tick " + input;
       throw new RuntimeException("boom");
   })
   .elapsed()
   .retry(1)
   .subscribe(System.out::println, System.err::println);

Thread.sleep(2100);

выведет

259,tick 0
249,tick 1
251,tick 2
506,tick 0
248,tick 1
253,tick 2
java.lang.RuntimeException: boom

Более сложная логика повторов доступна с использованием оператора retryWhen().

Заключение

Надеюсь, этой небольшой заметке удалось пролить свет на некоторые особенности работы Reactor.

Подведём итоги:
  • контекстом выполнения можно манипулировать с помощью операторов publishOn, subscribeOn и Schedulers;
  • для обработки исключительных ситуаций есть множество операторов на все случаи жизни;
  • посылание terminate signal приводит к завершению оригинальной «последовательности»;
  • для освобождения ресурсов используется интерфейс Dispose.
Спасибо за внимание!

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
Источник: https://habr.com/company/funcorp/blog/359194/?utm_source=habrahabr&utm_medium=rss&utm_campaign=359194

Смотри также:

Зачем нужна Java. http://fetisovvs.blogspot.com/2014/07/java.html
Разбор основных концепций параллелизма. http://fetisovvs.blogspot.com/2018/04/java.html

Первый контакт с «var» в Java 10. http://fetisovvs.blogspot.com/2018/01/var-java-10-java.html
JAVA 9. Что нового? http://fetisovvs.blogspot.com/2017/10/java-9-java.html
Концепции объектно-ориентированного программирования — ООП в Java. http://fetisovvs.blogspot.com/2017/01/java-java.html
Анимации в Android по полочкам (Часть 1. Базовые анимации). http://fetisovvs.blogspot.com/2018/02/android-1-java.html
Двести пятьдесят русскоязычных обучающих видео докладов и лекций о Java. http://fetisovvs.blogspot.com/2015/12/java-5-java-java.html
Абстрактные классы и методы. http://fetisovvs.blogspot.com/2017/02/java.html
Полное руководство по Java Reflection API. Рефлексия на примерах. http://fetisovvs.blogspot.com/2017/02/java-reflection-api-java.html
Микросервисы для Java программистов. Практическое введение во фреймворки и контейнеры. http://fetisovvs.blogspot.com/2017/10/java-java.html
Микросервисы для Java программистов. Практическое введение во фреймворки и контейнеры. (Часть 3). http://fetisovvs.blogspot.com/2017/10/java-3-java.html
ТОП-3 способа конвертировать массив в ArrayList. Пример на Java. http://fetisovvs.blogspot.com/2016/09/3-arraylist-java-java.html
Ввод–вывод в Java. http://fetisovvs.blogspot.com/2016/05/java-java_28.html
Enum-Всемогущий. http://fetisovvs.blogspot.com/2017/02/enum-java.html
Массивы в Java. Создание и обработка. http://fetisovvs.blogspot.com/2017/10/java-java_18.html
Arrays, Collections: Алгоритмический минимум. http://fetisovvs.blogspot.com/2017/12/arrays-collections.html
Популярные методы для работы с Java массивами. http://fetisovvs.blogspot.com/2016/09/java-java_29.html
Пример использования метода replace в Java. Как заменить символ в строке? http://fetisovvs.blogspot.com/2017/01/replace-java-java.html
Класс Scanner в Java — описание и пример использования. http://fetisovvs.blogspot.com/2017/01/scanner-java-java.html
Пример использования метода trim в Java: как удалить пробелы в начале и конце строки? http://fetisovvs.blogspot.com/2017/01/trim-java-java.html
Spark — Потрясающий веб-микрофреймворк для Java. http://fetisovvs.blogspot.com/2017/10/spark-java-java.html
Чтение и запись CSV файла с помощью SuperCSV. http://fetisovvs.blogspot.com/2017/01/csv-supercsv-java-java.html
Конструкция try/catch/finally (исключения). http://fetisovvs.blogspot.com/2017/01/trycatchfinally-java.html
1000+ часов видео по Java на русском. http://fetisovvs.blogspot.nl/2017/06/1000-java-java.html
Шпаргалка Java программиста 7.1 Типовые задачи: Оптимальный путь преобразования InputStream в строку. http://fetisovvs.blogspot.com/2016/04/java-71-inputstream-java.html
Шпаргалки Java программиста 10: Lombok. http://fetisovvs.blogspot.nl/2017/12/java-10-lombok-java.html
Шпаргалки Java программиста 9: Java SE — Шпаргалка для собеседований и повторений. http://fetisovvs.blogspot.com/2017/12/java-9-java-se-java.html
Шпаргалка Java программиста 8. Библиотеки для работы с Json (Gson, Fastjson,
LoganSquare, Jackson, JsonPath и другие). http://fetisovvs.blogspot.com/2016/04/java-8-json-gson-fastjson-logansquare.html
Java 8 и паттерн Стратегия. http://fetisovvs.blogspot.com/2018/03/java-8-java.html
Реализация ООП-наследования в классах, работающих с SQL и MS Entity Framework. http://fetisovvs.blogspot.com/2017/02/sql-ms-entity-framework.html
Как установить соединение с СУБД MySQL в IntelliJ IDEA в редакции Community. http://fetisovvs.blogspot.com/2016/04/mysql-intellij-idea-community-java.html
Как с помощью maven работать с библиотеками, которых в maven нет. http://fetisovvs.blogspot.com/2017/03/maven-maven-java.html
Проекты по созданию компиляторов из Java в JavaScript и исполняемые файлы. http://fetisovvs.blogspot.com/2018/01/java-javascript-java.html
Диагностика утечек памяти в Java. http://fetisovvs.blogspot.com/2017/03/java-java_18.html
Spring AOP и JavaConfig в плагинах для Atlassian Jira. http://fetisovvs.blogspot.com/2018/04/spring-aop-javaconfig-atlassian-jira.html
Блеск и нищета Java для настольных систем. http://fetisovvs.blogspot.com/2018/04/java-haulmont-java.html
Разбор задачек от Одноклассников на JPoint 2018. http://fetisovvs.blogspot.com/2018/04/jpoint-2018-java.html
Программируем… выход из лабиринта. http://fetisovvs.blogspot.com/2015/10/java.html
Основы работы с IntelliJ IDEA. Интерфейс программы. http://fetisovvs.blogspot.com/2016/09/intellij-idea-java.html
Ускоряем время сборки и доставки java web приложения. http://fetisovvs.blogspot.com/2018/03/java-web-java.html

Комментариев нет:

Отправить комментарий