Dag Dependencies
- Dag 실행 방법
- 주기적 실행 : schedule로 지정
- 다른 Dag에 의해 트리거
- Explicit Trigger : Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
- Reactive Trigger : Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
- 상황에 따라 다른 테스크 실행 방식
- BranchPythonOperator : 조건에 따라 다른 테스크로 분기
- LatestOnlyOperator : 과거 데이터 Backfill 시에는 불필요한 테스크 처리
- 앞 단의 테스크들의 상황에 따라 실패해도 후위의 테스크가 동작해야하는 경우가 있을 수 있음
Explicit trigger
- TriggerDagRunOperator
- Dag A가 명시적으로 Dag B를 트리거 하는 방법
- 이 때, Dag A는 Trigger Dag Run Operator로 구현
- 코드 예시
from airflow.operators.trigger_dagrun import TriggerDagRunOpertor
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
Trigger_dag_id="트리거하려는 DAG이름"
)

Reactive trigger
- ExternalTaskSensor
- Dag B가 Dag A의 테스크가 끝나기를 대기 (Dag A는 Dag B의 동작 여부를 모름)

Jinja Template
- Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
- Django 템플릿 엔진에서 영감을 받아 개발
- Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- Flask에서 주로 사용
- 변수는 이중 중괄호 {{ }}로 감싸서 사용 (중괄호 안에는 앞뒤로 띄어쓰기가 있어야 함)
Ex_) <h1> 안녕하세요, {{ name }}님! </h1> - 제어문은 퍼센트 기호 {% %}로 표시
Ex_)
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
Jinja Template + Airflow
- Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화 된 문자열로 정의 가능 -> 이를 통해 재사용이 가능하고 사용자 정의가 가능한 워크플로우 생성
- execution_date를 코드 내에서 쉽게 사용 : {{ ds }}
# BashOperator 를 사용하여 템플릿 작업 정의
-------------------------------------------------------------
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag = dag
) - 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
# 동적 매개변수가 있는 다른 템플릿 작업 정의
-------------------------------------------------------------
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의가 가능한 매개변수
dag = dag
)
- execution_date를 코드 내에서 쉽게 사용 : {{ ds }}
Sensor
- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- Airflow는 몇 가지의 내장된 Sensor를 제공
- FileSensor : 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청을 수행하고 지정된 응답이 대기
- SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
- ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
- 기본으로는 주기적으로 poke를 하는 것
- worker를 하나 붙잡고 poke 간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke할지 결정해주는 파라미터가 존재 = mode (mode 값은 reschedule 또는 poke)
ExternalTaskSensor
- DAG B의 ExternalTaskSensor 테스크가 DAG A의 특정 테스크가 끝난는지 체크
- 먼저 동일한 shcedule_interval을 사용
- 해당 경우 두 테스크들의 Execution Date가 동일해야함. 아니면 매칭이 되지 않음
- DAG A와 DAG B가 서로 다른 schedule interval을 가질경우
- DAG A가 DAG B보다 5분 먼저 실행된다면?
- execution_delta를 사용
- execution_date_fn을 사용하면 조금 더 복잡하게 컨트롤 가능 - 만일 두 개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우는 ExternalTaskSensor 사용불가
- DAG A가 DAG B보다 5분 먼저 실행된다면?
- 조건이 까다로워 잘 사용하지 않음
BranchPythonOperator
- 상황에 따라 뒤에 실행되어야 할 테스크를 동적으로 결정해주는 Operator
- 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
- TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음

LatestOnlyOperator
- Time-sensitive 한 테스크들이 과거 데이터의 Backfill 시 실행되는 것을 막기 위함
- 현재 시간이 지금 테스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨

Trigger Rules
- Upstream 테스크의 성공실패 상황에 따라 뒷단 테스크의 실행여부를 결정하고 싶다면?
- 보통의 경우 앞단이 하나라도 실패하면 뒷 단의 테스크는 실행불가
- Operator에 trigger_rule 이란 파라미터로 결정 가능
- trigger_rule은 테스크에 주어지는 파라미터로 다음과 같은 값이 가능
- all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
- Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)
- ALL_SUCCESS : (default)
- ALL_FAILED
- ALL_DONE
- ONE_FAILED
- ONE_SUCCESS
- NONE_FAILED
- NONE_FAILED_MIN_ONE_SUCCESS
Dynamic Dag
- 템플릿과 YAML을 기반으로 DAG를 동적으로 작성
- Jinja를 기반으로 DAG 자체의 템플릿의 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
- 이를 통해 비슷한 DAG를 계속해서 메뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG 안에서 테스크를 늘리는 것 사이의 밸런스가 필요
- 오너가 다르거나 테스크의 수가 너무 커지는 경우 DAG를 복제해 나가는 것이 더 좋음
728x90
'ssung_데이터 엔지니어링 > 11주차_Airflow 고급' 카테고리의 다른 글
Airflow 고급_(5) (0) | 2024.01.05 |
---|---|
Airflow_고급(4) (1) | 2024.01.04 |
Airflow 고급_(1), (2) (2) | 2024.01.03 |