트랜잭션
- Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법
- BEGIN과 END (COMMIT) 사이에 해당하는 SQL들을 사용
- ROLLBACK은 BEGIN 이전의 상태로 돌아가라는 SQL 명령어
트랜잭션 구현
- 두 가지 종류의 트랜잭션이 존재
- 레코드 변경/삭제/추가를 바로 반영하는지의 여부는 autocommit이라는 파라미터로 조절가능
- autocommit = True
- 기본적으로 모든 SQL statement가 바로 물리 테이블에 커밋
- 이를 바꾸고 싶다면 BEGIN;END (COMMIT)을 사용 (or ROLLBACK)
- autocommit = False
- 기본적으로 모든 SQL statement가 커밋되지 않음. 즉, 모두 스테이징 상태로 존재
- 커넥션 객체의 .commit()과 .rollback()함수로 커밋할지 말지 결정
- Python의 경우 try / catch와 같이 사용하는 것이 일반적
- 에러 발생 시 rollback을 명시적으로 실행. 정상 = commit 실행
- except에서 raise를 호출하면 발생한 원래 exception이 위로 전파
- ETL을 관리하는 입장에서 어떤 에러가 감춰지는 것보다는 명확하게 드러나는 것이 좋음
- except의 뒤에 raise를 붙여 호출하는 것이 좋음
Airflow 코드의 기본 구조
- DAG를 대표하는 객체를 먼저 만듦
- DAG 이름, 실행주기, 실행날짜, 오너 ....
- 다음으로 DAG를 구성하는 테스크들을 만듦
- 테스크 별로 적합한 오퍼레이터를 선택
- 테스크 ID를 부여 후 해야할 작업의 세부사항 지정
- 최종적으로 테스크들간의 실행 순서를 결정
- DAG 설정 예
- 아래의 코드에서 'retry_dealy'는 task level에 적용 (on_failure_callback or on_success_callback)
- 뒤에서 DAG 객체를 만들 때 지정
from datetime import datetime, timedelta
default_args = {
'owner': 'ssungcohol',
'email': ['ssungcohol@maelong.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
- DAG 설정 예2
- catchup의 의미를 이해하는 것이 중요
from airflow import DAG
dag = DAG(
"dag_v1", #DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule='0***",
tags=["exanple"],
catchup=False,
# common settings
default_args=default_args
)
728x90
'ssung_데이터 엔지니어링 > 9주차_Airflow' 카테고리의 다른 글
Airflow_(4) (0) | 2023.12.14 |
---|---|
Airflow_(3) (0) | 2023.12.13 |
Airflow_(1) (0) | 2023.12.12 |