Как превратить Моно в действительно асинхронный (не реактивный!) вызов метода?

0

Вопрос

У меня есть метод

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}

В обычном потоке программы я вызываю этот метод асинхронно через событие Кафки.

Для целей тестирования мне нужно представить метод как веб-службу, но метод должен быть представлен как асинхронный: возвращать только HTTP-код 200 OK ("запрос принят") и продолжать обработку данных в фоновом режиме.

Нормально ли (= нет ли у этого каких-либо нежелательных побочных эффектов) просто позвонить Mono#subscribe() и вернуться из метода контроллера?

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

Или лучше сделать это так (здесь меня смущает предупреждение от IntelliJ, может быть, то же самое, что https://youtrack.jetbrains.com/issue/IDEA-276018 ?):

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
    return Mono.empty();
}

Или какое-то другое решение?

2

Лучший ответ

3

Нормально ли (= нет ли у этого каких-либо нежелательных побочных эффектов) просто вызвать Mono#subscribe() и вернуться из метода контроллера?

Есть побочные эффекты, но вы можете спокойно жить с ними:

  • Это действительно огонь и забудь, что означает, что, хотя вы никогда не будете уведомлены об успехе (что осознает большинство людей), вы также никогда не будете уведомлены о неудаче (что осознает гораздо меньше людей).
  • Если процесс по какой-то причине зависнет, этот издатель никогда не завершится, и у вас не будет возможности узнать об этом. Поскольку вы подписываетесь на ограниченный пул эластичных потоков, он также свяжет один из этих ограниченных потоков на неопределенный срок.

Первый момент, с которым вас может быть все в порядке, или вы, возможно, захотите каким-то образом протоколировать некоторые ошибки дальше по этой реактивной цепочке в качестве побочного эффекта, чтобы у вас, по крайней мере, было внутреннее уведомление, если что-то пойдет не так.

Что касается второго пункта - я бы рекомендовал установить (щедрый) тайм-аут для вызова метода, чтобы он, по крайней мере, был отменен, если он не был завершен в установленное время и больше не зависал, потребляя ресурсы. Если вы выполняете асинхронную задачу, то это не является серьезной проблемой, так как она просто потребляет немного памяти. Если вы обертываете блокирующий вызов в эластичном планировщике, то это, однако, еще хуже, поскольку вы связываете поток в этом пуле потоков на неопределенный срок.

Я бы также задался вопросом, зачем вам вообще нужно использовать ограниченный эластичный планировщик здесь - он используется для переноса блокирующих вызовов, что, похоже, не является основой этого варианта использования. (Чтобы было ясно, если ваша служба блокируется, вы должны полностью завернуть ее в эластичный планировщик, но если нет, то для этого нет причин.)

Наконец, этот пример:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}

...это блестящий пример того, чего не следует делать, поскольку вы создаете своего рода "самозваный реактивный метод" - кто-то может очень разумно подписаться на этого возвращенного издателя, думая, что он завершится, когда основной издатель завершит, что, очевидно, здесь не происходит. Используя void возвращаемый тип и, следовательно, ничего не возвращать-это правильное решение в этом сценарии.

2021-11-23 16:54:58
1

Ваш вариант со следующим кодом на самом деле в порядке:

@GetMapping
public void processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

На самом деле это то, что вы делаете в @Scheduled метод, который просто ничего не возвращает, и вы явно подписываетесь на Mono или Flux так что элементы испускаются.

2021-11-23 08:36:44

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................