Here is a python version of getting the ECR tokens for an AWS repository.
There is nothing to install and everyting runs smootly in from the airflow docker containers.
To make the script reusable, you need to create a variable called “aws_region_name” and set it to the correct region, for example “eu-central-1”
"""
You need to create a variable called "aws_region_name" and set it to the correct region, for example "eu-central-1"
"""
from datetime import datetime, timedelta
import json
from datetime import datetime
from airflow.decorators import dag, task
from airflow import settings
from airflow.models import Connection
#Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
@dag(default_args=default_args, schedule_interval='* */10 * * *', max_active_runs=1, start_date=datetime(2021, 1, 1), catchup=False, tags=['airflow'])
def refresh_docker_token_DAG():
@task(multiple_outputs=True)
def extract():
import boto3
from airflow.models import Variable
aws_region_name = Variable.get("aws_region_name")
ecr = boto3.client('ecr', region_name=aws_region_name)
response = ecr.get_authorization_token()
token = response['authorizationData'][0]['authorizationToken']
registry_url = response['authorizationData'][0]['proxyEndpoint']
return {"token": token, "registry_url": registry_url}
@task()
def set_token(token: str, registry_url: str):
import logging
import base64
logger = logging.getLogger(__name__)
connection_name = "docker_default"
conn_type = "docker"
host = registry_url
port = None
user = base64.b64decode(token).decode().split(":")[0]
password = base64.b64decode(token).decode().split(":")[1]
schema = ""
extra = ""
session = settings.Session
try:
connection_query = session.query(Connection).filter(Connection.conn_id == connection_name)
connection_query_result = connection_query.one_or_none()
if not connection_query_result:
connection = Connection(conn_id=connection_name, conn_type=conn_type, host=host, port=port,
login=user, password=password, schema=schema, extra=extra)
session.add(connection)
session.commit()
else:
connection_query_result.host = host
connection_query_result.login = user
connection_query_result.schema = schema
connection_query_result.port = port
connection_query_result.extra = extra
connection_query_result.set_password(password)
session.add(connection_query_result)
session.commit()
except Exception as e:
logger.info("Failed creating connection")
logger.info(e)
data = extract()
set_token(data["token"], data["registry_url"])
refresh_docker_token_dag = refresh_docker_token_DAG()
References
- The first article which I read about, includes an idea of how to clean the images.
- Using the ECR Airflow plugin