ssung_데이터 엔지니어링/9주차_Airflow

Airflow_(2)

ssungcohol 2023. 12. 12. 19:55

트랜잭션

  • Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법
    • BEGIN과 END (COMMIT) 사이에 해당하는 SQL들을 사용
    • ROLLBACK은 BEGIN 이전의 상태로 돌아가라는 SQL 명령어

트랜잭션 구현

  • 두 가지 종류의 트랜잭션이 존재
    • 레코드 변경/삭제/추가를 바로 반영하는지의 여부는 autocommit이라는 파라미터로 조절가능
  • autocommit = True
    • 기본적으로 모든 SQL statement가 바로 물리 테이블에 커밋
    • 이를 바꾸고 싶다면 BEGIN;END (COMMIT)을 사용 (or ROLLBACK)
  • autocommit = False
    • 기본적으로 모든 SQL statement가 커밋되지 않음. 즉, 모두 스테이징 상태로 존재
    • 커넥션 객체의 .commit()과 .rollback()함수로 커밋할지 말지 결정
  • Python의 경우 try / catch와 같이 사용하는 것이 일반적
    • 에러 발생 시 rollback을 명시적으로 실행. 정상 = commit 실행
    • except에서 raise를 호출하면 발생한 원래 exception이 위로 전파
       - ETL을 관리하는 입장에서 어떤 에러가 감춰지는 것보다는 명확하게 드러나는 것이 좋음
       - except의 뒤에 raise를 붙여 호출하는 것이 좋음

Airflow 코드의 기본 구조

  • DAG를 대표하는 객체를 먼저 만듦
    • DAG 이름, 실행주기, 실행날짜, 오너 ....
  • 다음으로 DAG를 구성하는 테스크들을 만듦
    • 테스크 별로 적합한 오퍼레이터를 선택
    • 테스크 ID를 부여 후 해야할 작업의 세부사항 지정
  • 최종적으로 테스크들간의 실행 순서를 결정
  • DAG 설정 예
     - 아래의 코드에서 'retry_dealy'는 task level에 적용 (on_failure_callback or on_success_callback)
     - 뒤에서 DAG 객체를 만들 때 지정
from datetime import datetime, timedelta

default_args = {
    'owner': 'ssungcohol',
    'email': ['ssungcohol@maelong.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=3),
}
  • DAG 설정 예2
     - catchup의 의미를 이해하는 것이 중요
from airflow import DAG

dag = DAG(
    "dag_v1", #DAG name
    start_date=datetime(2020,8,7,hour=0,minute=00),
    schedule='0***",
    tags=["exanple"],
    catchup=False,
    # common settings
    default_args=default_args
)

 

728x90

'ssung_데이터 엔지니어링 > 9주차_Airflow' 카테고리의 다른 글

Airflow_(4)  (0) 2023.12.14
Airflow_(3)  (0) 2023.12.13
Airflow_(1)  (0) 2023.12.12