В его строке документа, elasticsearch.helpers.async_bulk
описывает себя как
Помощник для :метамфетамина:
~elasticsearch.AsyncElasticsearch.bulk
api, который обеспечивает более удобный для человека интерфейс - он использует итератор действий и отправляет их в elasticsearch по частям. Источник
Контекст
Я использую AsyncElasticsearch.bulk()
успешно отправить фреймы данных pandas в какой-либо экземпляр ES
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Вопрос
Однако, когда дело доходит до async_bulk
, я получаю index is missing
ошибки.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Пытался настроиться _rec_to_actions()
несколькими способами без особого эффекта.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Я думаю, главная проблема в том, что я не совсем уверен, что знаю, что такое действие в контексте elasticsearch. Это понятие присутствует повсюду в документации, но в исходном коде этой библиотеки нет четкой структуры данных (во всяком случае, я не смог ее найти).
Что именно является действием и как я должен настроить свой генератор для отправки данных df в self.index
?
Окружающая среда
- python = "3.9.5"
- elasticsearch = "7.14.1"