I am trying to run a BigQueryOperator on GCC. I have already succeeded in running for BigQueryCreateEmptyTableOperator and BigQueryTableDeleteOperator.
Here is my code for the dag:
import datetime
import os
import logging
from airflow import configuration
from airflow import models
from airflow import DAG
from airflow.operators import email_operator
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_check_operator
from airflow.utils import trigger_rule
from contextlib import suppress
import json
from airflow.operators import python_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
'email_on_failure': True,
'email_on_retry': True,
'project_id' : 'censored',
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
bq_dataset_name= 'test_tf_blocket'
bq_githib_table_id = bq_dataset_name + '.trialtable'
# [START composer_quickstart_schedule]
with models.DAG(
dag_id='composer_nicholas',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
def greeting():
logging.info('Hello World!')
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
bq_union_query = bigquery_operator.BigQueryOperator(
task_id='bq_union_query',
bql="""
select * from test_tf_blocket.nicholas_union_query;
""",
query_params={})
email_start = email_operator.EmailOperator(
task_id='email_it',
to='nicholas@censored.my',
subject='Sample temail',
html_content="""
Done.
""")
hello_python >> bq_union_query >> email_start
The dag fails when it hits the bigqueryOperator with the error(log) :
*** Reading remote log from gs://asia-south1-staging-b017f2bf-bucket/logs/composer_nicholas/bq_union_query/2019-03-21T14:56:45.453098+00:00/30.log.
[2019-03-22 13:12:54,129] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,167] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,168] {models.py:1573} INFO -
-------------------------------------------------------------------------------
Starting attempt 30 of 3
-------------------------------------------------------------------------------
[2019-03-22 13:12:54,199] {models.py:1595} INFO - Executing <Task(BigQueryOperator): bq_union_query> on 2019-03-21T14:56:45.453098+00:00
[2019-03-22 13:12:54,200] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run composer_nicholas bq_union_query 2019-03-21T14:56:45.453098+00:00 --job_id 571 --raw -sd DAGS_FOLDER/nicholas_union_query.py --cfg_path /tmp/tmpn1ic1w_6']
[2019-03-22 13:13:06,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:06,400] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-03-22 13:13:08,433] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,431] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2019-03-22 13:13:08,435] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,435] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-03-22 13:13:09,182] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,181] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2019-03-22 13:13:09,198] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,198] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,210] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,873] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,873] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/nicholas_union_query.py
[2019-03-22 13:13:12,207] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/models.py:2412: PendingDeprecationWarning: Invalid arguments were passed to BigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query *args: ()
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query **kwargs: {'api_resource_config': {'useQueryCache': True, 'jobType': 'QUERY'}}
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=PendingDeprecationWarning
[2019-03-22 13:13:12,209] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py:151: DeprecationWarning: Deprecated parameter `bql` used in Task id: bq_union_query. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:12,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning)
[2019-03-22 13:13:16,838] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:16,838] {cli.py:484} INFO - Running <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [running]> on host airflow-worker-7c9b9c7f86-xwhg5
[2019-03-22 13:13:17,455] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,453] {bigquery_operator.py:159} INFO - Executing:
[2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query select * from test_tf_blocket.nicholas_union_query;
[2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:17,632] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,632] {gcp_api_base_hook.py:92} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2019-03-22 13:13:17,657] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,656] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow
category=DeprecationWarning)
[2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning)
[2019-03-22 13:13:18,360] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,359] {discovery.py:873} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs?alt=json
[2019-03-22 13:13:18,885] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,884] {discovery.py:873} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json
[2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s', 404)
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuratio
jobId=self.running_job_id).execute(
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
return wrapped(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
raise HttpError(resp, content, uri=self.uri
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS"
During handling of the above exception, another exception occurred
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execut
time_partitioning=self.time_partitionin
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_quer
return self.run_with_configuration(configuration
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuratio
err.resp.status
Exception: ('BigQuery job status check failed. Final error was: %s', 404
[2019-03-22 13:13:20,347] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s', 404)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuration
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query jobId=self.running_job_id).execute()
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return wrapped(*args, **kwargs)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execute
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query raise HttpError(resp, content, uri=self.uri)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred:
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context)
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status)
[2019-03-22 13:13:20,351] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404)
[2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY
[2019-03-22 13:13:20,352] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY
[2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuration
[2019-03-22 13:13:20,403] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query jobId=self.running_job_id).execute()
[2019-03-22 13:13:20,405] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-03-22 13:13:20,406] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return wrapped(*args, **kwargs)
[2019-03-22 13:13:20,407] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execute
[2019-03-22 13:13:20,408] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query raise HttpError(resp, content, uri=self.uri)
[2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,410] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred:
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/bin/airflow", line 7, in <module>
[2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query exec(compile(f.read(), __file__, 'exec'))
[2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/airflow", line 32, in <module>
[2019-03-22 13:13:20,413] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query args.func(args)
[2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper
[2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return f(*args, **kwargs)
[2019-03-22 13:13:20,415] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 490, in run
[2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query _run(args, dag, ti)
[2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 406, in _run
[2019-03-22 13:13:20,417] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query pool=args.pool,
[2019-03-22 13:13:20,418] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2019-03-22 13:13:20,420] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return func(*args, **kwargs)
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context)
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute
[2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query
[2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration
[2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status)
[2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404)
If I had typed a different sql query, for eg ( delete table ), the query would work. I am doing a select query here for simplicity. Point is, the sql query in here works, but the dag fails. It would seem that the dag has failed to retrieve the query history/job history from BQ. I checked if the json file exist, and yes it did. Heres a screen shot
BQ SS
Initially I thought this was a permission issue, but i checked and the cloud composer generated service account has project owner rights and BQ admin rights. I've tried searching around but cannot seem to find an answer.
Any help is appreciated.