Je souhaite recevoir une notification par e-mail lorsqu'un seul opérateur de flux d'air échoue. J'en ai besoin car l'échec de certaines tâches ne doit pas définir l'ensemble du pipeline comme ayant échoué.

Pour simuler l'erreur, j'ai défini un compartiment source comme compartiment non existant.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = "Stefano Giostra"
__credits__ = "Stefano Giostra"
__maintainer__ = "Stefano Giostra"
__version__ = "0.9.3"
__status__ = "Dev"


from airflow.models import Variable, DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from lib.bb_utils import *
import logging
from airflow.utils import dates
from datetime import timedelta
from functools import partial
from lib.bb_utils import load_json_file
from airflow.utils.email import send_email


ROOT_PATH = '/home/airflow/gcs/dags'
logger = logging.getLogger("dag_demo_2")


def notify_email(context, config):        # **kwargs
    """Send custom email alerts."""

    alerting_email_address = config.get('email_address')
    print("---> notify_email -------------------")
    print(context)
    print(f"-->{alerting_email_address}")
    print("<------------------------------------")
    # print(context['dag'])
    # email title.
    # title = "Airflow alert: {task_name} Failed".format(context)
    #
    # # email contents
    # body = """
    # Hi, <br><br>
    # There's been an error in the {task_name} job.<br>
    # <br>
    # Forever yours,<br>
    # Airflow bot <br>
    # """.format(**contextDict)
    # for dest in dest_email:
    #     send_email(dest, title, body)


# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# Dizionario dati con le chiavi richieste dai DAG di AirFlow
my_email = 'abc@xyz.com'
default_args = {
    "owner": 'SG',
    "depends_on_past": False,
    "start_date": dates.days_ago(1),
    "end_date": None,
    "email_on_failure": 'my_email',
    "email_on_retry": False,
    "email": [my_email],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "max_active_runs": 1,
    "on_failure_callback": partial(notify_email, config={'email_address': my_email})
}

dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=default_args, schedule_interval="@once") as ldag:
    project = Variable.get("PROJECT")
    source_bucket = 'sg-dev'
    source_object = 'covid19_italy/national_trends_2.csv'
    bq_dataset = "covid19_italy"
    bq_table_name = "national_trends"
    bq_task_id = f'gcs_to_bq_load_{bq_table_name}'
    schema_fields = load_json_file(f"{ROOT_PATH}/source/{bq_dataset}/{bq_table_name}_tabschema.json")

    t = GoogleCloudStorageToBigQueryOperator(
        dag=ldag,
        task_id=bq_task_id,
        bucket=source_bucket,
        source_objects=[source_object],
        destination_project_dataset_table="{0}.{1}.{2}".format(project, bq_dataset, bq_table_name),
        schema_fields=schema_fields,
        source_format='CSV',
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE"
    )
0
Stefano G. 9 nov. 2020 à 20:38

1 réponse

Meilleure réponse

Pour invoquer notify_email() en cas d'échec, il suffira de régler default_args avec :

"on_failure_callback": notify_email

Alors default_args doit être inclus dans la phrase de création du DAG :

with DAG(dag_id='SG-DagDemo-Once', default_args=default_args) as dag:

Vous pouvez essayer quelque chose comme ce qui suit pour appeler la fonction notify_email() en cas d'échec de l'opérateur ; chaque opérateur appellera la même fonction (exemple tiré de gcs_to_bq ) :

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'on_failure_callback': notify_email
}

dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=args, schedule_interval=None) as dag:
    create_test_dataset = bash_operator.BashOperator(
        task_id='create_airflow_test_dataset',
        bash_command='bq mk airflow_test')

    # [START howto_operator_gcs_to_bq]
    load_csv = GoogleCloudStorageToBigQueryOperator(
        task_id='gcs_to_bq_example',
        bucket='cloud-samples-data',
        source_objects=['bigquery/us-states/us-states.csv'],
        destination_project_dataset_table='airflow_test.gcs_to_bq_table',
        schema_fields=[
            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        write_disposition='WRITE_TRUNCATE')
    # [END howto_operator_gcs_to_bq]

    delete_test_dataset = bash_operator.BashOperator(
        task_id='delete_airflow_test_dataset',
        bash_command='bq rm -r -f -d airflow_test')

    create_test_dataset >> load_csv >> delete_test_dataset

Vous pouvez simuler une erreur en modifiant un élément de configuration sur chaque opérateur. Et vous devrez terminer la configuration pour l'envoi de l'e-mail en notify_email().

1
rsantiago 10 nov. 2020 à 00:18