Что делать, если потребитель Кафки обрабатывает сообщение слишком долго? Будет ли Кафка повторно передавать этот раздел другому потребителю, и сообщение будет обработано дважды?
Да, это верно. Если потребителю Kafka потребуется слишком много времени для обработки сообщения, а последующий опрос() будет отложен, Kafka повторно назначит этот раздел другому потребителю, и сообщение будет обработано снова (и снова).
Для большей ясности, сначала нам нужно решить и определить " Как долго это слишком долго?".
Это определяется свойством max.poll.interval.ms
. Из документов,
Максимальная задержка между вызовами функции опроса() при использовании управления группами потребителей. Это накладывает верхнюю границу на количество времени, в течение которого потребитель может бездействовать, прежде чем получать дополнительные записи. Если функция опроса() не будет вызвана до истечения этого тайм-аута, то пользователь считается сбойным, и группа выполнит перебалансировку, чтобы переназначить разделы другому участнику.
Группа потребителей балансируется заново, если в течение этого времени нет вызовов для опроса ().
Есть еще одно свойство auto.commit.interval.ms
. Проверка смещения автоматической фиксации будет вызываться только во время опроса - она проверяет, больше ли прошедшее время, чем заданное время интервала автоматической фиксации, и если результат "да", смещение зафиксировано.
Если потребитель Kafka слишком долго обрабатывает записи, то последующий вызов poll() также задерживается, и смещения, возвращенные в последнем опросе (), не фиксируются. Если в это время произойдет перебалансировка, новый клиент-потребитель, назначенный этому разделу, снова начнет обработку сообщений.
Изменения баланса группы потребителей и последующего переназначения разделов можно избежать, увеличив это значение. Это увеличит допустимый интервал между опросами и даст больше времени потребителям для обработки записей, возвращенных из опроса (). Потребители присоединятся к перебалансировке только внутри вызова для опроса, поэтому увеличение максимального интервала опроса также задержит групповую перебалансировку.
Есть еще одна проблема в увеличении максимального интервала опроса до большого значения. Если потребитель умирает по какой-либо другой причине, это занимает больше времени, чем настроено max.poll.interval.ms
интервал для обнаружения сбоя.
session.timeout.ms
и heartbeat.interval.ms
доступны в этом случае, чтобы как можно раньше обнаружить полный сбой.
Для получения более подробной информации об этих параметрах:
Пожалуйста, обратите внимание, что значения, настроенные для session.timeout.ms
должно находиться в допустимом диапазоне, заданном в конфигурации брокера по свойствам
- group.min.session.timeout.ms
- group.max.session.timeout.ms
В противном случае при запуске клиента-потребителя возникнет следующее исключение.
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
Обновление: Чтобы избежать повторной обработки сообщений
В классе KafkaConsumer есть еще один метод commitAsync()
чтобы запустить операцию смещения фиксации.
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
Для получения более подробной информации о commitSync() и commitAsync(), пожалуйста, проверьте этот поток
Фиксация смещения вручную - это действие, означающее, что смещение было обработано, чтобы Кафка больше не отправлял зафиксированные записи для одного и того же раздела. Когда смещения фиксируются вручную, важно отметить, что если потребитель по какой-либо причине умирает до обработки записей, есть вероятность, что эти записи не будут обработаны снова.