Как передавать кадры данных во временные таблицы для задач воздушного потока

0

Вопрос

Итак, у меня есть фрагмент кода, который имеет несколько функций на python для рабочего потока, который мы разработали. Этот рабочий поток принимает CSV-файл и пропускает его через фрейм данных. Этот фрейм данных затем передается через несколько функций, которые применяют различные преобразования к фрейму данных.

Однако при написании этого кода в среде airflow из-за того, как фреймы данных работают в виртуальной среде , и из-за того, что данные выполняются на нескольких машинах, я не смогу пропускать свои фреймы данных через каждую функцию и мне придется где-то их хранить?

Кто-нибудь знает, как настроить временную таблицу в bigquery для прохождения через фрейм данных для каждой из моих функций, чтобы я мог запускать ETL для них всех, используя свои задачи воздушного потока?

airflow google-bigquery python
2021-11-21 14:39:06
1

Лучший ответ

1

Если вы ищете задачу воздушного потока, чтобы начать с ввода кадра данных, то вы используете ее неправильно. Если вы хотите выполнить свой скрипт как единое целое, вы можете использовать PythonOperator или BashOperator однако, если вы хотите разбить код на несколько задач, вам, вероятно, потребуется выполнить некоторый рефакторинг.

Чтобы создать BigQuery внешняя таблица из csv на GCS вы можете установить external_table в GCSToBigQueryOperator как:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery_example',
        bucket='cloud-samples-data',
        source_objects=['bigquery/us-states/us-states.csv'],
        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
        schema_fields=[
            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        write_disposition='WRITE_TRUNCATE',
        external_table=True,
    )

Я не знаю, какова функциональность фреймов данных в вашем рабочем процессе (я предполагаю, что это должно выполнить какое-то преобразование на csv) для этого вы можете использовать GCSFileTransformOperator (см. Исходный код). Этот оператор копирует данные из исходного местоположения GCS во временное местоположение в локальной файловой системе. Выполняет преобразование этого файла, как указано в сценарии преобразования, и загружает выходные данные в целевую корзину. Если поле вывода не указано, исходный файл будет перезаписан.

Таким образом, возможно, что ваш рабочий процесс может быть:

  1. Подать заявку на землю в GCS
  2. Бежать GCSFileTransformOperator для обработки и очистки записей.
  3. Создайте таблицу в BigQuery с помощью GCSToBigQueryOperator
2021-11-21 15:14:56

Таким образом, это имеет смысл, но будет ли это лучшим решением, как в случае преобразования csv, при каждой функции удаляются столбцы, изменяются имена столбцов, добавляются и удаляются столбцы и т.д. А добавление в таблицу фреймов данных с параметрами столбцов может привести к проблемам при загрузке в большой запрос, поскольку схема всегда находится в движении.
Mizanur Choudhury

@MizanurChoudhury, это зависит от вашего конкретного ETL. Вы также можете очистить все, что сможете, прежде чем сбрасывать необработанный csv в GCS. Это в большей степени вопрос о том, какой компонент вы контролируете и где вы можете вносить изменения. Все это, однако, большие вопросы - это разработка целого процесса, который выходит за рамки вопроса о StackOverflow.
Elad

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................