Here is a python version of getting the ECR tokens for an AWS repository.

There is nothing to install and everyting runs smootly in from the airflow docker containers.

To make the script reusable, you need to create a variable called “aws_region_name” and set it to the correct region, for example “eu-central-1”

"""
You need to create a variable called "aws_region_name" and set it to the correct region, for example "eu-central-1"
"""
from datetime import datetime, timedelta

import json
from datetime import datetime

from airflow.decorators import dag, task

from airflow import settings
from airflow.models import Connection

#Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

@dag(default_args=default_args, schedule_interval='* */10 * * *', max_active_runs=1, start_date=datetime(2021, 1, 1), catchup=False, tags=['airflow'])
def refresh_docker_token_DAG():

    @task(multiple_outputs=True)
    def extract():
        import boto3
        from airflow.models import Variable
        aws_region_name = Variable.get("aws_region_name")
        ecr = boto3.client('ecr', region_name=aws_region_name)
        response = ecr.get_authorization_token()
        token = response['authorizationData'][0]['authorizationToken']
        registry_url = response['authorizationData'][0]['proxyEndpoint']

        return {"token": token, "registry_url": registry_url}



    @task()
    def set_token(token: str, registry_url: str):
        import logging
        import base64
        logger = logging.getLogger(__name__)

        connection_name = "docker_default"
        conn_type = "docker"
        host = registry_url
        port = None
        user = base64.b64decode(token).decode().split(":")[0]
        password = base64.b64decode(token).decode().split(":")[1]
        schema = ""
        extra = ""

        session = settings.Session
        try:
            connection_query = session.query(Connection).filter(Connection.conn_id == connection_name)
            connection_query_result = connection_query.one_or_none()
            if not connection_query_result:
                connection = Connection(conn_id=connection_name, conn_type=conn_type, host=host, port=port,
                                        login=user, password=password, schema=schema, extra=extra)
                session.add(connection)
                session.commit()
            else:
                connection_query_result.host = host
                connection_query_result.login = user
                connection_query_result.schema = schema
                connection_query_result.port = port
                connection_query_result.extra = extra
                connection_query_result.set_password(password)
                session.add(connection_query_result)
                session.commit()

        except Exception as e:
            logger.info("Failed creating connection")
            logger.info(e)

    data = extract()
    set_token(data["token"], data["registry_url"])

refresh_docker_token_dag = refresh_docker_token_DAG()



References