ssung_데이터 엔지니어링/12주차_3차 프로젝트

3차 프로젝트_(5)

ssungcohol 2024. 1. 13. 00:48

pythonOperator에서의 op_args, op_kwargs

  • 두 가지의 공통점은 변수를 전달해주는 역할
  • op_args
    •  list 형태의 인수 전달
    • operator를 생성할 때 op_args 변수로 전달
  • op_kwargs
    • Dictionary 형태의 인수 전달
    • operator에 전달 되는 모든 변수를 전달
    • op_kwargs의 키에 맞는 변수를 선언하면 자동으로 전달
    • params는 airflow DAG를 실행할 때 전달한 파라미터 (Trigger DAG w / config 사용)

op_args

  • 예제 코드
t1 = PythonOperator(
    task_id='print_test',
    python_callable=print,
    op_args=["hello", "world"],
    dag=dag
)
  • 결과 값
더보기

hello world


op_kwargs

  • 예제코드
def print_test_2(**kwargs):
    print(f"{kwargs['name']} is {kwargs['job']}.")
  
t1 = PythonOperator(
    task_id='print_test_2',
    python_callable=print_test_2,
    op_kwargs={"name" : "ssungcohol", "job" : "blogger"},
    dag=dag
)
  • 결과 값
더보기

ssungcohol is blogger.


Dictionary

  • PythonOperator는 **kwargs가 아닌 딕셔너리를 통해 매개변수로 받고 있음
  • op_kwargs가 아닌 params 형태로 데이터를 전달 할 수도 있음

Catch Up

  • start_date가 지금으로부터 1년 전이라고 할 때 (2023.01.13) start date부터 스케줄링 되어 실행된 dag가 비어있음
  • 이 때, catch up 의 인자값을 True로 설정하면 비어있던 부분을 순차적으로 채워주게 됨 (기본값 = False)
  • 이를 활용해 과거 시점부터 스케줄링을 통해 작업을 배치 형식으로 진행해야 하는 경우에 사용
  • 과거에 스케줄링 되어있던 Dag를 중단했다가 다시 시작하는 경우에도 사용
  • 주의사항은 Dag들이 한 번에 실행되기 때문에 서버에 과부하가 올 가능성이 있음
  • 이를 방지하기 위한 다양한 속성값들이 존재
    • max_active_runs : dag 작업 수준에서 설정, catch up 중에 dagrun이 얼마나 실행될 수 있는지를 설정
    • depends_on_past : dag 작업 수준에서 설정, 가장 최근에 dag에서 동일한 작업이 실행됐을 때 해당 dag가 실행될 수 있도록 제약을 걸 수 있음
    • wait_for_downstream : dag 수준에서 설정, 다음 dag를 실행하려면 전체 task가 수행되어야 실행될 수 있도록 설정
    • catchup_by_default : config 파일에서 설정이 가능, dag를 만들 때 기본값으로 True, False를 설정 가능
728x90

'ssung_데이터 엔지니어링 > 12주차_3차 프로젝트' 카테고리의 다른 글

3차 프로젝트_(4)  (0) 2024.01.11
3차 프로젝트_(3)  (0) 2024.01.11
3차 프로젝트_(2)  (0) 2024.01.10
3차 프로젝트_(1)  (0) 2024.01.08