ssung_데이터 엔지니어링/12주차_3차 프로젝트

3차 프로젝트_(3)

ssungcohol 2024. 1. 11. 18:23

DB (Mysql)연동하여 Slack 메시지로 보내주기

 

1. Test를 위한 Mysql workbench를 사용해 Local 환경 구성해주기

 

2. Airflow에서 Admin 창에 들어가 Connection 설정 해주기

  • Connection Id - 설정하고자 하는 이름 입력
  • Connection Type - Mysql 선택
  • Host - 해당 PC의 IP 주소 입력 (cmd 창 -> ipconfig 입력 -> IPv4 주소 입력)
  • Login - Local mysql 아이디 입력
  • Password - 설정 비밀번호 입력
  • Port - 3306
  • 위의 정보들을 입력 후 Test 버튼을 눌러 연결 확인 후 Save

3. Slack DM을 보내고자 하는 Slack channel 생성

4. api.slack.com 접속 -> apps 클릭 (우상단) -> create apps -> scratch (초기 생성) 선택 -> incomming webhook 생성

5. cmd 환경에서 test 진행 (아래 이미지 그대로 입력)

6.  test 성공 시 해당 메시지가 생성한 channel에 전송됨

 

7. Airflow Admin에서 Connection에 들어가 Slack connection 연결

 

8. 이후 작성하고자 하는 DAG에 들어가 코드 작성 해주기

 

from datetime import datetime, timedelta
from email.policy import default
from textwrap import dedent
import pymysql

from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

sql_read_data = """
    SELECT jobinfo_test.table_1.company, position, location, review
    FROM jobinfo_test.table_1
    JOIN jobinfo_test.table_2
    ON jobinfo_test.table_1.company = jobinfo_test.table_2.company;
"""

def execute_sql_and_return_result(**kwargs):
    
    connection = pymysql.connect(
        host='XXX.XXX.XXX.XXX',
        user='ssungcohol',
        password='XXXXXXXXXXXXX',
        database='jobinfo_test'
    )

    try:
        # 커서 생성
        with connection.cursor() as cursor:
            # SQL 쿼리 실행
            sql_query = """
                SELECT jobinfo_test.table_1.company, position, location, review
                FROM jobinfo_test.table_1
                JOIN jobinfo_test.table_2
                ON jobinfo_test.table_1.company = jobinfo_test.table_2.company;
            """
            cursor.execute(sql_query)

            # 결과 가져오기
            result = cursor.fetchall()
    finally:
        # 연결 닫기
        connection.close()

    return result

with DAG(
    'slack_test2',
    default_args=default_args,
    schedule_interval='@once',
    start_date=datetime(2024, 1, 1),
    tags=['mysql', 'local', 'test', 'company']
) as dag:
    t1 = MySqlOperator(
        task_id="create_employees_table",
        mysql_conn_id="mysql_local_test",
        sql=sql_read_data,
        dag=dag
    )

    t2 = PythonOperator(
        task_id='execute_sql_and_return_result',
        python_callable=execute_sql_and_return_result,
        provide_context=True,
        dag=dag
    )

    t3 = SlackWebhookOperator(
        task_id='send_slack',
        http_conn_id='slack_conn',
        message='오늘의 채용 공고 + {{ task_instance.xcom_pull(task_ids="execute_sql_and_return_result") }}',
        dag=dag
    )

    t1 >> t2 >> t3
728x90

'ssung_데이터 엔지니어링 > 12주차_3차 프로젝트' 카테고리의 다른 글

3차 프로젝트_(5)  (0) 2024.01.13
3차 프로젝트_(4)  (0) 2024.01.11
3차 프로젝트_(2)  (0) 2024.01.10
3차 프로젝트_(1)  (0) 2024.01.08