EKS 나 GKS 혹은 K8s 는 API 구조상 약간 다르긴 하지만,
Vendor Lock in 빠지지는 않는다.
patchandpray.com/post/installing-airflow-2-using-helm/
https://betterprogramming.pub/serverless-kubernetes-cluster-on-aws-with-eks-on-fargate-a7545cf179be
github.com/infinitelambda/apache_airflow_on_eks
infinitelambda.com/post/swiss-army-knife-airflow-on-eks/
https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12
** MWAA 환경설정 및 샘플코드
https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html
** Airflow 2.0.2 KubernetesPodOperator
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
** Airflow 1.10.12 KubernetesPodOperator
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
PySpark 애플리케이션
pyspark_submit_sample = KubernetesPodOperator(
task_id='pyspark_submit_sample',
name='pyspark_submit_sample',
namespace='airflow',
image='spark_client_1:1.0',
arguments=["pyspark","pyspark_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
Hadoop 제어
hadoop_distcp_sample = KubernetesPodOperator(
task_id='hadoop_distcp_sample',
name='hadoop_distcp_sample',
namespace='airflow',
image='hadoop_client_1:1.0',
arguments=["hadoop", HadoopDistcpCommandTool.getCommand(HadoopDistcpCommandTool(),
src='src_path',
dst='dst_path',
options={
'-Dmapred.job.queue.name=root.default',
'-overwrite',
'-strategy dynamic'},
args={
'YESTERDAY': execution_date})],
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
Python 애플리케이션
python2_app_sample = KubernetesPodOperator(
task_id='python2_app_sample',
name='python2_app_sample',
namespace='airflow',
image='python2:1.0',
arguments=["python2","python2_app_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
python3_app_sample = KubernetesPodOperator(
task_id='python3_app_sample',
name='python3_app_sample',
namespace='airflow',
image='python3:1.0',
arguments=["python3","python3_app_sample.py"],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag
)
Hive 혹은 Beeline
beeline_sample = KubernetesPodOperator(
task_id='beeline_sample',
name='beeline_sample',
namespace='airflow',
image='hive_clinet:1.0',
arguments=["beeline",BeelineCommandTool.getCommand(BeelineCommandTool(),
servicename='fintech_service',
filename='hv_daily_sample.sql',
conn_id='jdbc_conn_id',
args={
'YESTERDAY': execution_date})],
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
Sqoop
sqoop_sample = KubernetesPodOperator(
task_id='sqoop_sample',
name='sqoop_sample',
namespace='airflow',
image='sqoop_client:1.0',
arguments=["sqoop", ScoopCommandTool.getCommand(ScoopCommandTool(),
servicename='fintech_service',
filename='sqoop_info_definition.yml',
args={
'YESTERDAY': execution_date,
'FROM': from_date,
'TO': to_date})],
hostnetwork=True,
in_cluster=True,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
execution_timeout=timedelta(minutes=120),
retries=2,
retry_delay=timedelta(minutes=2),
image_pull_policy='IfNotPresent',
service_account_name='airflow',
on_retry_callback=SlackTool.make_handler(slack_channel,
color='warning',
message="Retry Task"),
dag=dag)
** fargate 비용 절감 효과
'BigData' 카테고리의 다른 글
Airflow/EMR on EKS Using CDK (spark job, aws sso) (0) | 2021.05.10 |
---|---|
Monitor Amazon EKS on AWS Fargate with Datadog (0) | 2021.04.30 |
puppeteer (0) | 2021.04.22 |
GraphQL (0) | 2021.04.13 |
kafka docker with ksql (0) | 2021.04.07 |