BigData
Airflow Glue
Kyle79
2021. 5. 10. 18:35
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import boto3, json, time
glue_client = boto3.client('glue', region_name='us-east-1')
default_args = {
'owner': 'SAM',
'start_date': datetime(2019, 5, 27) # days_ago(n=1)
}
default_schedule_interval = "0 15 * * *" # '@daily'
def run_customer_job(random_base):
job = glue_client.start_job_run(JobName='customer')
while True:
status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(random_base)
def run_product_job(random_base):
job = glue_client.start_job_run(JobName='product')
while True:
status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(random_base)
with DAG('dataload', default_args=default_args, schedule_interval=default_schedule_interval) as dag:
task1 = PythonOperator(task_id='task1',
python_callable=run_customer_job,
op_kwargs={'random_base': 10},
)
for i in range(5):
task2 = PythonOperator(task_id='task2_' + str(i),
python_callable=run_product_job,
op_kwargs={'random_base': float(i) / 10},
)
task1 >> task2