Что делать, если потребитель Кафки обрабатывает сообщение слишком долго? Будет ли Кафка повторно передавать этот раздел другому потребителю, и сообщение будет обработано дважды?

0

Вопрос

Предполагать Kafka, 1 partition, 2 consumers.(2-й потребитель простаивает)

Предположим, что 1-й из них получил сообщение, обрабатывает его с помощью 3 других служб и внезапно подключается к одной из них и пропускает тайм-аут Кафки.

Будет ли Кафка повторно назначать раздел 2-му потребителю, и сообщение будет обработано дважды (предположим, что 1-й в конечном итоге завершится успешно)?

1

Лучший ответ

1

Что делать, если потребитель Кафки обрабатывает сообщение слишком долго? Будет ли Кафка повторно передавать этот раздел другому потребителю, и сообщение будет обработано дважды?

Да, это верно. Если потребителю Kafka потребуется слишком много времени для обработки сообщения, а последующий опрос() будет отложен, Kafka повторно назначит этот раздел другому потребителю, и сообщение будет обработано снова (и снова).

Для большей ясности, сначала нам нужно решить и определить " Как долго это слишком долго?".

Это определяется свойством max.poll.interval.ms. Из документов,

Максимальная задержка между вызовами функции опроса() при использовании управления группами потребителей. Это накладывает верхнюю границу на количество времени, в течение которого потребитель может бездействовать, прежде чем получать дополнительные записи. Если функция опроса() не будет вызвана до истечения этого тайм-аута, то пользователь считается сбойным, и группа выполнит перебалансировку, чтобы переназначить разделы другому участнику.

Группа потребителей балансируется заново, если в течение этого времени нет вызовов для опроса ().

Есть еще одно свойство auto.commit.interval.ms. Проверка смещения автоматической фиксации будет вызываться только во время опроса - она проверяет, больше ли прошедшее время, чем заданное время интервала автоматической фиксации, и если результат "да", смещение зафиксировано.

Если потребитель Kafka слишком долго обрабатывает записи, то последующий вызов poll() также задерживается, и смещения, возвращенные в последнем опросе (), не фиксируются. Если в это время произойдет перебалансировка, новый клиент-потребитель, назначенный этому разделу, снова начнет обработку сообщений.

Изменения баланса группы потребителей и последующего переназначения разделов можно избежать, увеличив это значение. Это увеличит допустимый интервал между опросами и даст больше времени потребителям для обработки записей, возвращенных из опроса (). Потребители присоединятся к перебалансировке только внутри вызова для опроса, поэтому увеличение максимального интервала опроса также задержит групповую перебалансировку.

Есть еще одна проблема в увеличении максимального интервала опроса до большого значения. Если потребитель умирает по какой-либо другой причине, это занимает больше времени, чем настроено max.poll.interval.ms интервал для обнаружения сбоя.

session.timeout.ms и heartbeat.interval.ms доступны в этом случае, чтобы как можно раньше обнаружить полный сбой.

Для получения более подробной информации об этих параметрах:

Пожалуйста, обратите внимание, что значения, настроенные для session.timeout.ms должно находиться в допустимом диапазоне, заданном в конфигурации брокера по свойствам

  • group.min.session.timeout.ms
  • group.max.session.timeout.ms

В противном случае при запуске клиента-потребителя возникнет следующее исключение.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Обновление: Чтобы избежать повторной обработки сообщений

В классе KafkaConsumer есть еще один метод commitAsync() чтобы запустить операцию смещения фиксации.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Для получения более подробной информации о commitSync() и commitAsync(), пожалуйста, проверьте этот поток

Фиксация смещения вручную - это действие, означающее, что смещение было обработано, чтобы Кафка больше не отправлял зафиксированные записи для одного и того же раздела. Когда смещения фиксируются вручную, важно отметить, что если потребитель по какой-либо причине умирает до обработки записей, есть вероятность, что эти записи не будут обработаны снова.

2021-11-25 07:04:25

Спасибо, все ясно. Есть ли какие-либо способы избежать повторного обращения?
J.J. Beam

@J. J. Beam обновил ответ со ссылками и образцом
arunkvelu

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................