MWAA on EKS fargate and DAG
EKS 나 GKS 혹은 K8s 는 API 구조상 약간 다르긴 하지만,
Vendor Lock in 빠지지는 않는다.
patchandpray.com/post/installing-airflow-2-using-helm/
Patch and Pray
Installing Airflow 2.0 using Helm Note: As with everything in life old stuff becomes stale and outdated. Maybe also this article. Check versions and sources to see if the info in here is still applicable. So airflow 2.0 is released. Joy. With it it’s tim
patchandpray.com
https://betterprogramming.pub/serverless-kubernetes-cluster-on-aws-with-eks-on-fargate-a7545cf179be
Serverless Kubernetes Cluster on AWS with EKS on Fargate
Why it is a game-changer and how to use it
betterprogramming.pub
github.com/infinitelambda/apache_airflow_on_eks
infinitelambda/apache_airflow_on_eks
Contribute to infinitelambda/apache_airflow_on_eks development by creating an account on GitHub.
github.com
infinitelambda.com/post/swiss-army-knife-airflow-on-eks/
Apache Airflow on EKS: Swiss Army Knife Deployment | Infinite Lambda
Apache Airflow deployment on EKS to optimise for both flexibility and operation cost, a solution prepared for all use-cases. Read on our blog!
infinitelambda.com
https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12
Working with Amazon EKS and Amazon Managed Workflows for Apache Airflow v2.x
Introduction The Apache Airflow slack channel is a vibrant community of open source...
dev.to
** MWAA 환경설정 및 샘플코드
https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html
Using Amazon MWAA with Amazon EKS - Amazon Managed Workflows for Apache Airflow
When you use an eksctl command, you can include a --profile to specify a profile other than the default.
docs.aws.amazon.com
Apache Airflow versions on Amazon Managed Workflows for Apache Airflow (MWAA) - Amazon Managed Workflows for Apache Airflow
Thanks for letting us know this page needs work. We're sorry we let you down. If you've got a moment, please tell us how we can make the documentation better.
docs.aws.amazon.com
** Airflow 2.0.2 KubernetesPodOperator
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
** Airflow 1.10.12 KubernetesPodOperator
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
PySpark 애플리케이션
pyspark_submit_sample = KubernetesPodOperator(
task_id='pyspark_submit_sample',
name='pyspark_submit_sample',
namespace='airflow',
image='spark_client_1:1.0',
arguments=["pyspark","pyspark_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
Hadoop 제어
hadoop_distcp_sample = KubernetesPodOperator(
task_id='hadoop_distcp_sample',
name='hadoop_distcp_sample',
namespace='airflow',
image='hadoop_client_1:1.0',
arguments=["hadoop", HadoopDistcpCommandTool.getCommand(HadoopDistcpCommandTool(),
src='src_path',
dst='dst_path',
options={
'-Dmapred.job.queue.name=root.default',
'-overwrite',
'-strategy dynamic'},
args={
'YESTERDAY': execution_date})],
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
Python 애플리케이션
python2_app_sample = KubernetesPodOperator(
task_id='python2_app_sample',
name='python2_app_sample',
namespace='airflow',
image='python2:1.0',
arguments=["python2","python2_app_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
python3_app_sample = KubernetesPodOperator(
task_id='python3_app_sample',
name='python3_app_sample',
namespace='airflow',
image='python3:1.0',
arguments=["python3","python3_app_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
Hive 혹은 Beeline
beeline_sample = KubernetesPodOperator(
task_id='beeline_sample',
name='beeline_sample',
namespace='airflow',
image='hive_clinet:1.0',
arguments=["beeline",BeelineCommandTool.getCommand(BeelineCommandTool(),
servicename='fintech_service',
filename='hv_daily_sample.sql',
conn_id='jdbc_conn_id',
args={
'YESTERDAY': execution_date})],
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
Sqoop
sqoop_sample = KubernetesPodOperator(
task_id='sqoop_sample',
name='sqoop_sample',
namespace='airflow',
image='sqoop_client:1.0',
arguments=["sqoop", ScoopCommandTool.getCommand(ScoopCommandTool(),
servicename='fintech_service',
filename='sqoop_info_definition.yml',
args={
'YESTERDAY': execution_date,
'FROM': from_date,
'TO': to_date})],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
** fargate 비용 절감 효과
Saving money a pod at a time with EKS, Fargate, and AWS Compute Savings Plans | Amazon Web Services
At re:Invent 2019, we announced the ability to deploy Kubernetes pods on AWS Fargate via Amazon Elastic Kubernetes Service (Amazon EKS). Since then we’ve seen customers rapidly adopt the Kubernetes API to deploy pods onto Fargate, the AWS serverless infr
aws.amazon.com