BigData

MWAA on EKS fargate and DAG

Kyle79 2021. 4. 30. 14:57

EKS 나 GKS 혹은 K8s 는 API 구조상 약간 다르긴 하지만,

Vendor Lock in 빠지지는 않는다.

 

 

 

patchandpray.com/post/installing-airflow-2-using-helm/

 

Patch and Pray

Installing Airflow 2.0 using Helm Note: As with everything in life old stuff becomes stale and outdated. Maybe also this article. Check versions and sources to see if the info in here is still applicable. So airflow 2.0 is released. Joy. With it it’s tim

patchandpray.com

 

 

https://betterprogramming.pub/serverless-kubernetes-cluster-on-aws-with-eks-on-fargate-a7545cf179be

 

Serverless Kubernetes Cluster on AWS with EKS on Fargate

Why it is a game-changer and how to use it

betterprogramming.pub

 

 

 

github.com/infinitelambda/apache_airflow_on_eks

 

infinitelambda/apache_airflow_on_eks

Contribute to infinitelambda/apache_airflow_on_eks development by creating an account on GitHub.

github.com

 

 

 

infinitelambda.com/post/swiss-army-knife-airflow-on-eks/

 

Apache Airflow on EKS: Swiss Army Knife Deployment | Infinite Lambda

Apache Airflow deployment on EKS to optimise for both flexibility and operation cost, a solution prepared for all use-cases. Read on our blog!

infinitelambda.com

 

https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12

 

Working with Amazon EKS and Amazon Managed Workflows for Apache Airflow v2.x

Introduction The Apache Airflow slack channel is a vibrant community of open source...

dev.to

 

 

** MWAA 환경설정 및 샘플코드

https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html

 

Using Amazon MWAA with Amazon EKS - Amazon Managed Workflows for Apache Airflow

When you use an eksctl command, you can include a --profile to specify a profile other than the default.

docs.aws.amazon.com

 

https://docs.aws.amazon.com/mwaa/latest/userguide/airflow-versions.html#airflow-versions-installing-202

 

Apache Airflow versions on Amazon Managed Workflows for Apache Airflow (MWAA) - Amazon Managed Workflows for Apache Airflow

Thanks for letting us know this page needs work. We're sorry we let you down. If you've got a moment, please tell us how we can make the documentation better.

docs.aws.amazon.com

 

** Airflow 2.0.2 KubernetesPodOperator

from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )

 

 

** Airflow 1.10.12 KubernetesPodOperator

from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )

 

PySpark 애플리케이션

pyspark_submit_sample = KubernetesPodOperator(
    task_id='pyspark_submit_sample',
    name='pyspark_submit_sample',
    namespace='airflow',
    image='spark_client_1:1.0',
    arguments=["pyspark","pyspark_sample.py"],
    hostnetwork=True,
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag
)

 

Hadoop 제어

hadoop_distcp_sample = KubernetesPodOperator(
    task_id='hadoop_distcp_sample',
    name='hadoop_distcp_sample',
    namespace='airflow',
    image='hadoop_client_1:1.0',
    arguments=["hadoop", HadoopDistcpCommandTool.getCommand(HadoopDistcpCommandTool(),
                         src='src_path',
                         dst='dst_path',
                         options={
                            '-Dmapred.job.queue.name=root.default',
                            '-overwrite',
                            '-strategy dynamic'},
                        args={
                            'YESTERDAY': execution_date})],
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag)

 

Python 애플리케이션

python2_app_sample = KubernetesPodOperator(
    task_id='python2_app_sample',
    name='python2_app_sample',
    namespace='airflow',
    image='python2:1.0',
    arguments=["python2","python2_app_sample.py"],
    hostnetwork=True,
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag
)
 
python3_app_sample = KubernetesPodOperator(
    task_id='python3_app_sample',
    name='python3_app_sample',
    namespace='airflow',
    image='python3:1.0',
    arguments=["python3","python3_app_sample.py"],
    hostnetwork=True,
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag
)

 

Hive 혹은 Beeline

beeline_sample = KubernetesPodOperator(
    task_id='beeline_sample',
    name='beeline_sample',
    namespace='airflow',
    image='hive_clinet:1.0',
    arguments=["beeline",BeelineCommandTool.getCommand(BeelineCommandTool(),
                         servicename='fintech_service',
                         filename='hv_daily_sample.sql',
                         conn_id='jdbc_conn_id',
                         args={
                             'YESTERDAY': execution_date})],
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag)

 

 

Sqoop

sqoop_sample = KubernetesPodOperator(
    task_id='sqoop_sample',
    name='sqoop_sample',
    namespace='airflow',
    image='sqoop_client:1.0',
    arguments=["sqoop", ScoopCommandTool.getCommand(ScoopCommandTool(),
                        servicename='fintech_service',
                        filename='sqoop_info_definition.yml',
                        args={
                             'YESTERDAY': execution_date,
                             'FROM': from_date,
                             'TO': to_date})],
    hostnetwork=True,
    in_cluster=True,
    is_delete_operator_pod=True,
    startup_timeout_seconds=300,
    execution_timeout=timedelta(minutes=120),
    retries=2,
    retry_delay=timedelta(minutes=2),
    image_pull_policy='IfNotPresent',
    service_account_name='airflow',
    on_retry_callback=SlackTool.make_handler(slack_channel,
                                             color='warning',
                                             message="Retry Task"),
    dag=dag)

 

 

 

 

** fargate 비용 절감 효과

 

aws.amazon.com/ko/blogs/containers/saving-money-pod-at-time-with-eks-fargate-and-aws-compute-savings-plans/

 

Saving money a pod at a time with EKS, Fargate, and AWS Compute Savings Plans | Amazon Web Services

At re:Invent 2019, we announced the ability to deploy Kubernetes pods on AWS Fargate via Amazon Elastic Kubernetes Service (Amazon EKS). Since then we’ve seen customers rapidly adopt the Kubernetes API to deploy pods onto Fargate, the AWS serverless infr

aws.amazon.com

 

 

 

 

 

 

 

'BigData' 카테고리의 다른 글

Airflow/EMR on EKS Using CDK (spark job, aws sso)  (0) 2021.05.10
Monitor Amazon EKS on AWS Fargate with Datadog  (0) 2021.04.30
puppeteer  (0) 2021.04.22
GraphQL  (0) 2021.04.13
kafka docker with ksql  (0) 2021.04.07