Если вы ищете задачу воздушного потока, чтобы начать с ввода кадра данных, то вы используете ее неправильно. Если вы хотите выполнить свой скрипт как единое целое, вы можете использовать PythonOperator
или BashOperator
однако, если вы хотите разбить код на несколько задач, вам, вероятно, потребуется выполнить некоторый рефакторинг.
Чтобы создать BigQuery
внешняя таблица из csv
на GCS
вы можете установить external_table
в GCSToBigQueryOperator
как:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
external_table=True,
)
Я не знаю, какова функциональность фреймов данных в вашем рабочем процессе (я предполагаю, что это должно выполнить какое-то преобразование на csv
) для этого вы можете использовать GCSFileTransformOperator
(см. Исходный код). Этот оператор копирует данные из исходного местоположения GCS во временное местоположение в локальной файловой системе. Выполняет преобразование этого файла, как указано в
сценарии преобразования, и загружает выходные данные в целевую корзину. Если поле вывода не указано, исходный файл будет перезаписан.
Таким образом, возможно, что ваш рабочий процесс может быть:
- Подать заявку на землю в GCS
- Бежать
GCSFileTransformOperator
для обработки и очистки записей.
- Создайте таблицу в BigQuery с помощью
GCSToBigQueryOperator