0

And this is my code below.

I am getting the below error, when I run the below code. I am trying to transform gvcf/vcf files in my google cloud storage to bigquery using gcp-variant-transforms api.

[2018-06-06 16:46:42,589] {models.py:1428} INFO - Executing on 2018-06-06 21:46:34.252526 [2018-06-06 16:46:42,589] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run GcsToBigQuery gcsToBigquery_ID 2018-06-06T21:46:34.252526 --job_id 168 --raw -sd DAGS_FOLDER/GcsToBigQuery.py'] [2018-06-06 16:46:43,204] {base_task_runner.py:98} INFO - Subtask: [2018-06-06 16:46:43,202] {init.py:45} INFO - Using executor SequentialExecutor [2018-06-06 16:46:43,284] {base_task_runner.py:98} INFO - Subtask: [2018-06-06 16:46:43,283] {models.py:189} INFO - Filling up the DagBag from /apps/airflow/dags/GcsToBigQuery.py [2018-06-06 16:46:43,853] {base_task_runner.py:98} INFO - Subtask: [2018-06-06 16:46:43,852] {gcp_dataflow_hook.py:111} INFO - Start waiting for DataFlow process to complete. [2018-06-06 16:46:46,931] {base_task_runner.py:98} INFO - Subtask: [2018-06-06 16:46:46,930] {GcsToBigQuery.py:48} ERROR - Status : FAIL : gcsToBigquery: Not able to run: DataFlow failed with return code 1 [2018-06-06 16:46:46,931] {base_task_runner.py:98} INFO - Subtask: [2018-06-06 16:46:46,930] {python_operator.py:90} INFO - Done. Returned value was: None

Please help me with this issue. Thanks!

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
from airflow.operators.python_operator import PythonOperator
import logging

default_args = {
    'owner': 'My Name',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 6),
    'email': ['MY Email'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('GcsToBigQuery', default_args=default_args,
          description='To move GVCF/VCF files from Google Cloud Storage to Big Query',
          schedule_interval='@once',
          start_date=datetime(2018, 6, 6))

dataflow_py_file = 'gcp_variant_transforms.vcf_to_bq'
PY_OPTIONS = ['-m']

DATAFLOW_OPTIONS_PY = {
    "project": "project-Name",
    "input_pattern": "gs://test-gvcf/1000-genomes.vcf",
    "output_table": "trc-mayo-projectsample:1000genomicsID.1000_genomesSamp",
     "staging_location": "gs://test-gvcf/vcftobq/staging",
     "temp_location": "gs://test-gvcf/vcftobq/temp",
     "job_name": "dataflowstarter25",
     #"setup_file": "./setup.py",
     "runner": "DataflowRunner"
}


def gcsToBigquery():
    try:
        dataflowHook = DataFlowHook(gcp_conn_id='google_cloud_platform_id')
        dataflowHook.start_python_dataflow(task_id='dataflowStarter2_ID', variables=DATAFLOW_OPTIONS_PY,
                                       dataflow=dataflow_py_file, py_options=PY_OPTIONS)
    except Exception as e:
        logging.error("Status : FAIL : gcsToBigquery: Not able to run: " + str(e.message))

gcsToBigquery_task = PythonOperator(task_id='gcsToBigquery_ID',
                                    python_callable=gcsToBigquery,
                                    dag=dag)

1 Answer 1

1

This issue can be avoided by using the DataflowPythonOperator and installing the gcp_variant_transforms API in the cloud instance.

Command to install the required components:

sudo pip install git+https://github.com/googlegenomics/gcp-variant-transforms.git

In case someone have this issue as well, you can take a look on this post where it is described a detailed explanation of the steps that srikanth followed to execute the code successfully.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.