I have BigQuery connectors all running, but I have some existing scripts in Docker containers I wish to schedule on Cloud Composer instead of App Engine Flexible.
I have the below script that seems to follow the examples I can find:
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.docker_operator import DockerOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_args = {
# Setting start date as yesterday starts the DAG immediately
'start_date': yesterday,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
schedule_interval = '45 09 * * *'
dag = DAG('xxx-merge', default_args=default_args, schedule_interval=schedule_interval)
hfan = DockerOperator(
task_id = 'hfan',
image = 'gcr.io/yyyyy/xxxx'
)
...but when trying to run it tells me in the web UI:
Broken DAG: [/home/airflow/gcs/dags/xxxx.py] No module named docker
Is it perhaps that the Docker is not configured to work inside the Kubernetes cluster that Cloud Composer runs? Or am I just missing something in the syntax?
As noted in tobi6's answer, you need to have the PyPI package for docker installed in your Composer environment. There are instructions here for installing PyPI packages in your environment at a particular package version.
This means: whereever your Airflow instance is installed, the Python package named
docker
is missing.If I configure my personal machine, I can install missing packages with
EDIT
Within the source code of the docker component https://airflow.incubator.apache.org/_modules/airflow/operators/docker_operator.html
there is an import statement:
So the new error
cannot import name Client
seems to me to be connected to a broken install or a wrong version of thedocker
package.I got it resolved by installing docker-py==1.10.6 in the PyPI section of composer.
However, to get DockerOperator to work properly requires a bit more effort as the composer workers do not have access to the Docker daemon. Head to the GCP console and perform the following steps; after getting cluster credentials).
Export current deployment config to file
kubectl get deployment airflow-worker -o yaml --export > airflow-worker-config.yaml
Edit airflow-worker-config.yaml (example link) to mount docker.sock and docker, grant privileged access to airflow-worker to run docker commands
Apply deployment settings
kubectl apply -f airflow-worker-config.yaml