본문 바로가기
ML

postgres db 기반 airflow 배치 작업 FastAPI 만들기

by 김덕배개발자 2025. 1. 15.
728x90

요즘 백엔드 작업을 하면서 자꾸 배치작업 관련 Task 가 주어지면서 아.. 안 할 수 없겠다 느낌이 들었습니다. Java와 다른 아키텍처에서 고통을 받고 있고 현업의 파이썬 코드는 생각보다 날 것 그 자체.. OpenSoruce가 양반이었나 생각이 들 정도로 어지럽습니다. 하여간 Airflow 서버가 안 떠있어서 그냥 나 혼자 DAG 관련 실습 공부 겸 정리해 보았습니다.

airflow

Airflow란 무엇일까요?

Airflow는 복잡한 데이터 파이프라인을 정의하고, 실행하며, 모니터링할 수 있는 강력한 도구입니다. 마치 레고 블록을 쌓듯이, 다양한 작업들을 연결하여 하나의 워크플로우를 만들 수 있죠. 이 워크플로우는 특정 시간에 자동으로 실행되도록 설정할 수 있으며, 작업의 진행 상황을 실시간으로 모니터링할 수 있습니다.

쉽게 말해, Airflow는 배치 작업을 자동화하고 관리하는 데 특화된 도구라고 할 수 있습니다.

왜 Airflow를 사용할까요?

  • 복잡한 워크플로우 시각화: DAG(Directed Acyclic Graph)라는 그래프 형태로 워크플로우를 시각화하여 복잡한 작업 관계를 쉽게 파악할 수 있습니다.
  • 강력한 스케줄링 기능: Crontab과 유사한 방식으로 작업 스케줄을 설정할 수 있으며, 더욱 유연하고 복잡한 스케줄링이 가능합니다.
  • 오픈 소스: 누구나 무료로 사용할 수 있으며, 커뮤니티가 활발하여 다양한 기능과 확장성을 제공합니다.
  • Python 기반: 파이썬으로 작업을 정의하기 때문에, 파이썬 생태계의 다양한 라이브러리를 활용할 수 있습니다.
  • 확장성: 다양한 백엔드(MySQL, PostgreSQL, SQLite 등)와 오 케스트레이터(Kubernetes, Docker 등)를 지원하여 대규모 환경에서도 사용 가능합니다.

 

결국 CI/CD를 구성하기 위해 필수적인 요소라고 할 수 있고, Application을 운영하는 관점에서 업데이트 관리에 용이하다고 할 수 있습니다.

 

간단한 예시

예를 들어, 매일 밤 12시에 특정 데이터베이스에서 데이터를 추출하여 S3에 저장하고, 이 데이터를 기반으로 모델을 학습한 후 결과를 시각화하는 작업을 자동화하고 싶다고 가정해 봅시다. Airflow를 사용하면 이러한 복잡한 워크플로우를 하나의 DAG로 정의하고, 스케줄링하여 자동으로 실행할 수 있습니다.

 

여기서 DAG는 한 워크플로우를 수행하기 위한 파이프라인을 노드와 에지로 표현할 수 있다. LangGraph처럼 하면 된다.. 데이터 흐름 처리를 순환한다고 생각하면 편할 것 같습니다.

 

데이터베이스 설정

psql postgres

CREATE DATABASE airflowdb; #디비 테스트용
\c airflowdb  # 디비 확인

CREATE DATABASE airflow_test;
CREATE USER kdb WITH PASSWORD '1234';
ALTER USER kdb WITH SUPERUSER;
GRANT ALL PRIVILEGES ON DATABASE airflow_test TO kdb;
CREATE DATABASE

 

간단하게 배치작업 메시지를 보내고, SQL 쿼리문을 저장시키는 파이프라인을 만들어볼 생각입니다.

적응의 동물이라 디비버도 이제 점차 익숙해지고 있는 것 같습니다

Airflow 환경 설정 및 설치

pip install apache-airflow
pip install apache-airflow-providers-postgres #posgres 사용을 위한

# Airflow 홈 디렉토리 설정
export AIRFLOW_HOME=~/airflow

# 데이터베이스 초기화
airflow db init

 

의존성문제가 많다고 하니 디버깅을 많이 해야 할 수 있다. airflow.config 파일에 아래와 같이 설정합니다.

[database]
sql_alchemy_conn = postgresql+psycopg2://kdb:1234@localhost:5432/airflow_test

[core]
dags_folder = /Users/kdb/Desktop/airflow/test/dags
load_examples = False

 

터미널에 이제 airflow 계정을 생성해서 웹에서 접속할 수 있도록 해줍니다.(여러 개를 만들어서 다른 사람도 접속을 할 수 있겠지?)

airflow users create \
    --username kdb \
    --firstname kim \
    --lastname deokbae \
    --role Admin \
    --email kim.db@kt.com \
    --password 1234

 

설정이 끝났다 이제 들어가 보면 웹페이지가 뜹니다.

# 웹서버 시작
airflow webserver --port 8080 -D

# 스케줄러 시작
airflow scheduler -D

DAG의 현황관리가 쉽게 표현이 가능하다. DAG.py 파이프라인 파일을 만들고 등록해 주면 됩니다.

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
import random
import logging

default_args = {
    'owner': 'kdb',
    'depends_on_past': True,  # 순차적 실행을 위해 True로 설정
    'start_date': datetime(2024, 8, 1),  # 23/24 시즌 시작
    'end_date': datetime(2025, 5, 19),   # 시즌 종료
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def get_postgres_conn():
    pg_hook = PostgresHook(postgres_conn_id='postgres_custom')
    return pg_hook.get_conn()

def create_epl_tables():
    conn = get_postgres_conn()
    cur = conn.cursor()
    
    # EPL 팀 테이블
    cur.execute("""
        CREATE TABLE IF NOT EXISTS epl_teams (
            team_id SERIAL PRIMARY KEY,
            team_name VARCHAR(100),
            stadium VARCHAR(100),
            manager VARCHAR(100)
        )
    """)
    
    # EPL 경기 테이블
    cur.execute("""
        CREATE TABLE IF NOT EXISTS epl_matches (
            match_id SERIAL PRIMARY KEY,
            matchday INTEGER,
            match_date DATE,
            home_team_id INTEGER REFERENCES epl_teams(team_id),
            away_team_id INTEGER REFERENCES epl_teams(team_id),
            home_score INTEGER,
            away_score INTEGER,
            status VARCHAR(20),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # EPL 순위표
    cur.execute("""
        CREATE TABLE IF NOT EXISTS epl_standings (
            standing_id SERIAL PRIMARY KEY,
            team_id INTEGER REFERENCES epl_teams(team_id),
            matchday INTEGER,
            played INTEGER,
            won INTEGER,
            drawn INTEGER,
            lost INTEGER,
            goals_for INTEGER,
            goals_against INTEGER,
            goal_difference INTEGER,
            points INTEGER,
            position INTEGER,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    conn.commit()
    cur.close()
    conn.close()
    logging.info("EPL 테이블이 생성되었습니다.")

def initialize_epl_teams():
    teams = [
        ("Manchester City", "Etihad Stadium", "Pep Guardiola"),
        ("Arsenal", "Emirates Stadium", "Mikel Arteta"),
        ("Manchester United", "Old Trafford", "Erik ten Hag"),
        ("Liverpool", "Anfield", "Jurgen Klopp"),
        ("Newcastle", "St James' Park", "Eddie Howe"),
        ("Brighton", "Amex Stadium", "Roberto De Zerbi"),
        ("Aston Villa", "Villa Park", "Unai Emery"),
        ("Tottenham", "Tottenham Hotspur Stadium", "Ange Postecoglou"),
        ("Chelsea", "Stamford Bridge", "Mauricio Pochettino"),
        ("Crystal Palace", "Selhurst Park", "Roy Hodgson"),
        ("West Ham", "London Stadium", "David Moyes"),
        ("Fulham", "Craven Cottage", "Marco Silva"),
        ("Brentford", "Gtech Community Stadium", "Thomas Frank"),
        ("Nottingham Forest", "City Ground", "Steve Cooper"),
        ("Everton", "Goodison Park", "Sean Dyche"),
        ("Bournemouth", "Vitality Stadium", "Andoni Iraola"),
        ("Wolves", "Molineux", "Gary O'Neil"),
        ("Sheffield United", "Bramall Lane", "Paul Heckingbottom"),
        ("Luton Town", "Kenilworth Road", "Rob Edwards"),
        ("Burnley", "Turf Moor", "Vincent Kompany")
    ]
    
    conn = get_postgres_conn()
    cur = conn.cursor()
    
    for team_name, stadium, manager in teams:
        cur.execute(
            """
            INSERT INTO epl_teams (team_name, stadium, manager) 
            VALUES (%s, %s, %s) ON CONFLICT DO NOTHING
            """,
            (team_name, stadium, manager)
        )
    
    conn.commit()
    cur.close()
    conn.close()
    logging.info("EPL 팀 데이터가 초기화되었습니다.")

def generate_matchday_fixtures():
    conn = get_postgres_conn()
    cur = conn.cursor()
    
    execution_date = context['execution_date'].date()
    matchday = ((execution_date - datetime(2023, 8, 1).date()).days // 7) + 1
    
    if matchday > 38:  # EPL 시즌 종료
        return
    
    # 해당 매치데이의 경기가 이미 있는지 확인
    cur.execute("SELECT COUNT(*) FROM epl_matches WHERE matchday = %s", (matchday,))
    if cur.fetchone()[0] > 0:
        logging.info(f"매치데이 {matchday}의 경기가 이미 존재합니다.")
        return
    
    # 팀 목록 가져오기
    cur.execute("SELECT team_id FROM epl_teams")
    team_ids = [r[0] for r in cur.fetchall()]
    
    # 매치데이별 경기 생성 (각 팀이 한 번씩만 경기하도록)
    random.shuffle(team_ids)
    matches = []
    for i in range(0, len(team_ids), 2):
        home_score = random.randint(0, 4)
        away_score = random.randint(0, 4)
        
        cur.execute(
            """
            INSERT INTO epl_matches (matchday, match_date, home_team_id, away_team_id, 
                                   home_score, away_score, status)
            VALUES (%s, %s, %s, %s, %s, %s, 'COMPLETED')
            """,
            (matchday, execution_date, team_ids[i], team_ids[i+1], 
             home_score, away_score)
        )
    
    conn.commit()
    cur.close()
    conn.close()
    logging.info(f"매치데이 {matchday}의 경기가 생성되었습니다.")

def update_standings():
    conn = get_postgres_conn()
    cur = conn.cursor()
    
    execution_date = context['execution_date'].date()
    matchday = ((execution_date - datetime(2023, 8, 1).date()).days // 7) + 1
    
    if matchday > 38:  # EPL 시즌 종료
        return
    
    # 모든 팀의 현재 매치데이 성적 계산
    cur.execute("""
        WITH match_results AS (
            SELECT 
                team_id,
                COUNT(*) as played,
                SUM(CASE WHEN (team_id = home_team_id AND home_score > away_score) OR 
                           (team_id = away_team_id AND away_score > home_score) THEN 1 ELSE 0 END) as won,
                SUM(CASE WHEN (team_id = home_team_id AND home_score = away_score) OR 
                           (team_id = away_team_id AND away_score = home_score) THEN 1 ELSE 0 END) as drawn,
                SUM(CASE WHEN (team_id = home_team_id AND home_score < away_score) OR 
                           (team_id = away_team_id AND away_score < home_score) THEN 1 ELSE 0 END) as lost,
                SUM(CASE WHEN team_id = home_team_id THEN home_score 
                        ELSE away_score END) as goals_for,
                SUM(CASE WHEN team_id = home_team_id THEN away_score 
                        ELSE home_score END) as goals_against
            FROM (
                SELECT home_team_id as team_id, home_score, away_score, home_team_id, away_team_id 
                FROM epl_matches WHERE matchday <= %s
                UNION ALL
                SELECT away_team_id as team_id, away_score, home_score, home_team_id, away_team_id 
                FROM epl_matches WHERE matchday <= %s
            ) all_matches
            GROUP BY team_id
        )
        SELECT 
            team_id,
            played,
            won,
            drawn,
            lost,
            goals_for,
            goals_against,
            goals_for - goals_against as goal_difference,
            won * 3 + drawn as points
        FROM match_results
        ORDER BY points DESC, goal_difference DESC, goals_for DESC
    """, (matchday, matchday))
    
    standings = cur.fetchall()
    
    # 순위 업데이트
    for position, row in enumerate(standings, 1):
        team_id, played, won, drawn, lost, goals_for, goals_against, goal_difference, points = row
        
        cur.execute("""
            INSERT INTO epl_standings 
            (team_id, matchday, played, won, drawn, lost, goals_for, goals_against, 
             goal_difference, points, position)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (team_id, matchday) DO UPDATE SET
                played = EXCLUDED.played,
                won = EXCLUDED.won,
                drawn = EXCLUDED.drawn,
                lost = EXCLUDED.lost,
                goals_for = EXCLUDED.goals_for,
                goals_against = EXCLUDED.goals_against,
                goal_difference = EXCLUDED.goal_difference,
                points = EXCLUDED.points,
                position = EXCLUDED.position,
                updated_at = CURRENT_TIMESTAMP
        """, (team_id, matchday, played, won, drawn, lost, goals_for, goals_against, 
              goal_difference, points, position))
    
    conn.commit()
    cur.close()
    conn.close()
    logging.info(f"매치데이 {matchday}의 순위가 업데이트되었습니다.")

with DAG(
    'epl_season_2425_dag',
    default_args=default_args,
    description='2023/24 EPL 시즌 시뮬레이션',
    schedule_interval=timedelta(days=7),  # 매주 실행
    catchup=True  # 과거 날짜부터 실행
) as dag:

    create_tables_task = PythonOperator(
        task_id='create_epl_tables',
        python_callable=create_epl_tables,
    )

    initialize_teams_task = PythonOperator(
        task_id='initialize_epl_teams',
        python_callable=initialize_epl_teams,
    )

    generate_fixtures_task = PythonOperator(
        task_id='generate_matchday_fixtures',
        python_callable=generate_matchday_fixtures,
    )

    update_standings_task = PythonOperator(
        task_id='update_standings',
        python_callable=update_standings,
    )

    # 태스크 순서 정의
    create_tables_task >> initialize_teams_task >> generate_fixtures_task >> update_standings_task

 

저는 배치를 받아올게 뭐가 있을까 생각하다가 축구 관련된 epl2425_realtime_dag를 만들어줍니다.

 

실시간 DAG 실행하기

이제 설정한 DAG를 실행해 볼 차례입니다. Airflow UI에서 DAG을 활성화하고 실행하면 되는데, 여기서 몇 가지 주의할 점이 있습니다.

  1. PostgreSQL 연결 설정 우선 Airflow UI의 Admin -> Connections 메뉴에서 PostgreSQL 연결을 설정해야 합니다. 아래 정보로 새로운 Connection을 추가합니다:

 

Conn Id: postgres_custom
Conn Type: Postgres
Host: localhost
Schema: airflow_test
Login: kdb
Password: 1234
Port: 5432

 

DAG 활성화 웹 UI에서 DAG을 활성화하면, 설정한 스케줄에 따라 자동으로 실행됩니다. 하지만 처음에는 수동으로 실행해 보는 것이 좋습니다. 실행 결과 확인 실행이 완료되면 PostgreSQL에서 다음 쿼리로 결과를 확인할 수 있습니다.

 

 

위처럼 실행하니 생성이 되고, 천천히 스케줄링에 맞게 배치가 도는 것 같습니다

 

 

테이블이 생성되었다 보다시피 이건 예제라 대충 했지만 정확한 파이프라인에 규격에 맞춰서 잘 만들어야겠다는 생각이 들었습니다. 하나라도 컷 나면 안 돌기 때문입니다. 사실 airflow 내부에서 로깅이 가능하긴 하지만 첫 코드부터 잘 짜는 게 맞는 것 같습니다. 나머지는 다음 포스팅에서 다른 서비스인 FAST API 와 연동을 하면서 더 깊숙하게 적어보겠습니다.

 

 

728x90

'ML' 카테고리의 다른 글

Entropy 는 무엇인가?  (0) 2024.03.06
(머신러닝) Soft Max 기법  (0) 2022.11.09
(머신러닝) 로지스틱 리그레션 정리  (0) 2022.11.09
머신러닝의 정의 와 개념 및 원리 성능 향상  (0) 2022.11.09
데이터분석한다?  (0) 2022.11.07