from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import boto3, json, time
glue_client = boto3.client('glue', region_name='us-east-1')
default_args = {
'owner': 'SAM',
'start_date': datetime(2019, 5, 27) # days_ago(n=1)
}
default_schedule_interval = "0 15 * * *" # '@daily'
def run_customer_job(random_base):
job = glue_client.start_job_run(JobName='customer')
while True:
status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(random_base)
def run_product_job(random_base):
job = glue_client.start_job_run(JobName='product')
while True:
status = glue_client.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(random_base)
with DAG('dataload', default_args=default_args, schedule_interval=default_schedule_interval) as dag:
task1 = PythonOperator(task_id='task1',
python_callable=run_customer_job,
op_kwargs={'random_base': 10},
)
for i in range(5):
task2 = PythonOperator(task_id='task2_' + str(i),
python_callable=run_product_job,
op_kwargs={'random_base': float(i) / 10},
)
task1 >> task2
'BigData' 카테고리의 다른 글
appstream sagemaker noteboook (0) | 2021.05.25 |
---|---|
NIFI Define Cluster Spec (0) | 2021.05.19 |
Apache Airflow & BigQuery (0) | 2021.05.10 |
Airflow Slack notifications (0) | 2021.05.10 |
Airflow/EMR on EKS Using CDK (spark job, aws sso) (0) | 2021.05.10 |