BigData

Airflow Glue

Kyle79 2021. 5. 10. 18:35

 

 

 

 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta
import boto3, json, time
glue_client = boto3.client('glue', region_name='us-east-1')

default_args = {
    'owner': 'SAM',
    'start_date': datetime(2019, 5, 27) #  days_ago(n=1)
}
default_schedule_interval = "0 15 * * *" # '@daily'

def run_customer_job(random_base):
    job = glue_client.start_job_run(JobName='customer')
    while True:
        status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
        if status['JobRun']['JobRunState'] == 'SUCCEEDED':
            break
        time.sleep(random_base)

def run_product_job(random_base):
    job = glue_client.start_job_run(JobName='product')
    while True:
        status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
        if status['JobRun']['JobRunState'] == 'SUCCEEDED':
            break
        time.sleep(random_base)

with DAG('dataload', default_args=default_args, schedule_interval=default_schedule_interval) as dag:
    task1 = PythonOperator(task_id='task1',
                            python_callable=run_customer_job,
                            op_kwargs={'random_base': 10},
                            )

    for i in range(5):
        task2 = PythonOperator(task_id='task2_' + str(i),
                                python_callable=run_product_job,
                                op_kwargs={'random_base': float(i) / 10},
                                )
        task1 >> task2




 

'BigData' 카테고리의 다른 글

appstream sagemaker noteboook  (0) 2021.05.25
NIFI Define Cluster Spec  (0) 2021.05.19
Apache Airflow & BigQuery  (0) 2021.05.10
Airflow Slack notifications  (0) 2021.05.10
Airflow/EMR on EKS Using CDK (spark job, aws sso)  (0) 2021.05.10