0

I want to create task which will be update columns rows and send mail for every line in data table. At the moment I create task which download the data from main table. I cannot create tasks for every line in temp data table. Could you tell what I doing wrong and how I can generate and run tasks in lopp?

from datetime import datetime, timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

from airflow.contrib.operators.bigquery_get_data import BigQueryGetDataOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryValueCheckOperator
from airflow.operators import PythonOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'cmap',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}


with DAG('dq_bigquery_test',
         max_active_runs=1,
         schedule_interval='@once',
         catchup=False,
         default_args=default_args) as dag:

    query = "SELECT * from `dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc` where MailRequired = false"
    insert = "INSERT into dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc (DataTimeStamp, Robot, Status) Values (CURRENT_TIMESTAMP(), 'TestRobot', 'Test')"

    my_bq_task = BigQueryOperator(
                    task_id='query_exc_on_teste',
                    sql=query,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    bigquery_conn_id='google_cloud_dbce_bi_prod',
                    use_legacy_sql=False,
                    destination_dataset_table='dev_dataquality.testTable')



    get_data = BigQueryGetDataOperator(
        task_id='get_data_from_query',
        project_id='dbce-bi-prod-e6fd',
        dataset_id='dev_dataquality',
        table_id='testTable',
        max_results='100',
        selected_fields='Robot,Status,MailRequired',
        bigquery_conn_id='google_cloud_dbce_bi_prod'
        )

    def process_data_from_bq(**kwargs):


        ti = kwargs['ti']
        update_column = []
        bq_data = ti.xcom_pull(task_ids='get_data_from_query')
        print(bq_data)
        # Now bq_data here would have your data in Python list
        for index, i in enumerate(bq_data):


            update_query = "UPDATE `dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc` SET MailSent = True WHERE Robot = '{}'".format(i[0])

            print(update_query)
            update_column.append(BigQueryOperator(
                    task_id='update_column_{}'.format(index),
                    sql=update_query,
                    write_disposition='WRITE_EMPTY',
                    create_disposition='CREATE_IF_NEEDED',
                    bigquery_conn_id='google_cloud_dbce_bi_prod',
                    use_legacy_sql=False,
                    dag=dag
                    ))
            if index not in [0]:
                update_column[index-1] >> update_column[index]                    


    process_data = PythonOperator(
        task_id='process_data_from_bq',
        python_callable=process_data_from_bq,
        provide_context=True
        )



    my_bq_task >> get_data >> process_data

Thank you for your help!

10
  • 1
    At least 2 things wrong [1] the python_callable of your process_data task is itself generating tasks; it doesn't work that way (a task cannot generate tasks; tasks must be generated at top-level, alongside DAG object creation) [2] generating tasks based on XCOM is not possible (because XCOM is materialized at DAG runtime and not DAG generation / parsing) link1, link2 Commented May 11, 2020 at 3:27
  • Okay, so I should create task outside of process_data task it is clear for me. How I can parse data from task and use list in creating task outside of prcess_data task? Commented May 11, 2020 at 9:07
  • Are you running your code inside Cloud Composer? Commented May 11, 2020 at 14:31
  • Yes, I run every dag via Cloud Composer. Commented May 11, 2020 at 15:31
  • @Kagemar do understand that "you can't generate downstream tasks based on output of an upstream tasks" (there's a catch) Read these for clarification link1 link2 link3. But if you are feeling adventurous, have a look at this [i strongly suggest you refrain from using this unless youv'e developed a fair bit understanding of airflow] Commented May 11, 2020 at 16:52

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.