ssung_데이터 엔지니어링/11주차_Airflow 고급

Airflow 고급_(3)

ssungcohol 2024. 1. 3. 20:18

Dag Dependencies

  • Dag 실행 방법
    • 주기적 실행 : schedule로 지정
    • 다른 Dag에 의해 트리거
      • Explicit Trigger : Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
      • Reactive Trigger : Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
    • 상황에 따라 다른 테스크 실행 방식
      • BranchPythonOperator : 조건에 따라 다른 테스크로 분기
      • LatestOnlyOperator : 과거 데이터 Backfill 시에는 불필요한 테스크 처리
      • 앞 단의 테스크들의 상황에 따라 실패해도 후위의 테스크가 동작해야하는 경우가 있을 수 있음

Explicit trigger

  • TriggerDagRunOperator
  • Dag A가 명시적으로 Dag B를 트리거 하는 방법
  • 이 때, Dag A는 Trigger Dag Run Operator로 구현
     - 코드 예시
    from airflow.operators.trigger_dagrun import TriggerDagRunOpertor

    trigger_B = TriggerDagRunOperator(
        task_id="trigger_B",
        Trigger_dag_id="트리거하려는 DAG이름"
    )

Explicit trigger

 

Reactive trigger

  • ExternalTaskSensor
  • Dag B가 Dag A의 테스크가 끝나기를 대기 (Dag A는 Dag B의 동작 여부를 모름)

Reactive trigger


Jinja Template

  • Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
    • Django 템플릿 엔진에서 영감을 받아 개발
    • Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
    • Flask에서 주로 사용
  • 변수는 이중 중괄호 {{ }}로 감싸서 사용 (중괄호 안에는 앞뒤로 띄어쓰기가 있어야 함)
    Ex_) <h1> 안녕하세요, {{ name }}님! </h1>
  • 제어문은 퍼센트 기호 {% %}로 표시
    Ex_)
    <ul>
    {% for item in items %}
      <li>{{ item }}</li>
    {% endfor %}
    </ul>

Jinja Template + Airflow

  • Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화 된 문자열로 정의 가능 -> 이를 통해 재사용이 가능하고 사용자 정의가 가능한 워크플로우 생성
    • execution_date를 코드 내에서 쉽게 사용 : {{ ds }}
      # BashOperator 를 사용하여 템플릿 작업 정의
      -------------------------------------------------------------
      task1 = BashOperator(
        task_id='task1',
        bash_command='echo "{{ ds }}"',
        dag = dag
      )
    • 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
      # 동적 매개변수가 있는 다른 템플릿 작업 정의
      -------------------------------------------------------------
      task2 = BashOperator(
        task_id='task2',
        bash_command='echo "안녕하세요, {{ params.name }}!"',
        params={'name': 'John'}, # 사용자 정의가 가능한 매개변수
        dag = dag
      )

Sensor

  • Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
  • Airflow는 몇 가지의 내장된 Sensor를 제공
    • FileSensor : 지정된 위치에 파일이 생길 때까지 대기
    • HttpSensor : HTTP 요청을 수행하고 지정된 응답이 대기
    • SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    • TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
    • ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
  • 기본으로는 주기적으로 poke를 하는 것
    • worker를 하나 붙잡고 poke 간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke할지 결정해주는 파라미터가 존재 = mode (mode 값은 reschedule 또는 poke)

ExternalTaskSensor

  • DAG B의 ExternalTaskSensor 테스크가 DAG A의 특정 테스크가 끝난는지 체크
    • 먼저 동일한 shcedule_interval을 사용
    • 해당 경우 두 테스크들의 Execution Date가 동일해야함. 아니면 매칭이 되지 않음
  • DAG A와 DAG B가 서로 다른 schedule interval을 가질경우
    • DAG A가 DAG B보다 5분 먼저 실행된다면?
       - execution_delta를 사용
       - execution_date_fn을 사용하면 조금 더 복잡하게 컨트롤 가능
    • 만일 두 개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우는 ExternalTaskSensor 사용불가
  • 조건이 까다로워 잘 사용하지 않음

BranchPythonOperator

  • 상황에 따라 뒤에 실행되어야 할 테스크를 동적으로 결정해주는 Operator
    • 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
  • TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음


LatestOnlyOperator

  • Time-sensitive 한 테스크들이 과거 데이터의 Backfill 시 실행되는 것을 막기 위함
  • 현재 시간이 지금 테스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨

LatestOnlyOperator


Trigger Rules

  • Upstream 테스크의 성공실패 상황에 따라 뒷단 테스크의 실행여부를 결정하고 싶다면?
    • 보통의 경우 앞단이 하나라도 실패하면 뒷 단의 테스크는 실행불가
  • Operator에 trigger_rule 이란 파라미터로 결정 가능
    • trigger_rule은 테스크에 주어지는 파라미터로 다음과 같은 값이 가능
    • all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
  • Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)
    • ALL_SUCCESS : (default)
    • ALL_FAILED
    • ALL_DONE
    • ONE_FAILED
    • ONE_SUCCESS
    • NONE_FAILED
    • NONE_FAILED_MIN_ONE_SUCCESS

Dynamic Dag

  • 템플릿과 YAML을 기반으로 DAG를 동적으로 작성
    • Jinja를 기반으로 DAG 자체의 템플릿의 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
  • 이를 통해 비슷한 DAG를 계속해서 메뉴얼하게 개발하는 것을 방지
  • DAG를 계속해서 만드는 것과 한 DAG 안에서 테스크를 늘리는 것 사이의 밸런스가 필요
    • 오너가 다르거나 테스크의 수가 너무 커지는 경우 DAG를 복제해 나가는 것이 더 좋음
728x90

'ssung_데이터 엔지니어링 > 11주차_Airflow 고급' 카테고리의 다른 글

Airflow 고급_(5)  (0) 2024.01.05
Airflow_고급(4)  (1) 2024.01.04
Airflow 고급_(1), (2)  (2) 2024.01.03