Apache Airflow 기초 개념 정리 - 초보자를 위한 가이드

2024년 12월 10일

data-platform

# Airflow# Data Engineering# Workflow# Data Pipeline

들어가며

데이터 파이프라인을 운영하다 보면 복잡한 작업들의 의존성 관리, 스케줄링, 모니터링, 실패 처리 등이 필요합니다. Apache Airflow는 이러한 요구사항을 해결하기 위한 워크플로우 오케스트레이션 플랫폼으로, Airbnb에서 개발되어 현재 Apache 재단의 최상위 프로젝트로 관리되고 있습니다. 이 글에서는 Airflow의 핵심 개념과 아키텍처, 그리고 실전에서 알아야 할 내용들을 다룹니다.

Apache Airflow란?

Apache Airflow는 프로그래매틱하게 워크플로우를 작성, 스케줄링, 모니터링할 수 있는 플랫폼입니다. “Configuration as Code” 철학을 따르며, Python 코드로 워크플로우를 정의합니다.

왜 Airflow인가?

전통적인 cron 기반 스케줄링의 한계:

  • 의존성 관리 부재: 작업 간 의존성을 명시적으로 표현할 수 없음
  • 중앙화된 모니터링 부족: 각 작업의 실행 상태를 한눈에 파악하기 어려움
  • 재시도 로직의 부재: 실패 시 재시도 정책을 일관되게 적용하기 어려움
  • 백프레셔(Backpressure) 처리 불가: 작업이 밀렸을 때 대응 방법이 없음
  • 동적 파이프라인 생성 불가: 런타임에 워크플로우를 동적으로 생성할 수 없음

Airflow는 이러한 문제들을 해결하며, 다음과 같은 특징을 제공합니다:

  • 동적 파이프라인 생성: Python 코드로 동적으로 DAG를 생성할 수 있음
  • 확장성: Task를 분산 처리할 수 있는 다양한 Executor 지원
  • 풍부한 UI: DAG의 시각화, 로그 확인, 재실행 등을 웹 UI에서 수행
  • 플러그인 아키텍처: 커스텀 Operator, Hook, Sensor 등을 쉽게 추가 가능

Airflow 아키텍처

Airflow는 여러 컴포넌트가 유기적으로 동작하는 분산 시스템입니다.

DAG Files

Scheduler

Executor

Workers

Webserver

MetaDB

핵심 컴포넌트

1. Scheduler

Scheduler는 Airflow의 심장부로, 다음과 같은 역할을 수행합니다:

  • DAG 파일을 주기적으로 파싱하여 변경사항 감지
  • 스케줄에 따라 실행할 Task 결정
  • Task를 Executor의 큐에 제출

동작 방식:

ExecutorMetaDBSchedulerDAG FilesExecutorMetaDBSchedulerDAG Filesloop[Every N sec]loop[Continuously]Scan directoryParse DAGsStore metadataQuery tasksReturn tasksSubmit to queue

스케줄러는 기본적으로 단일 프로세스로 실행되지만, Airflow 2.0부터는 HA(High Availability) 구성도 가능합니다.

2. Executor

Executor는 Task를 실제로 실행하는 메커니즘을 정의합니다.

주요 Executor 타입:

SequentialExecutor

  • 단일 프로세스에서 순차적으로 Task를 실행
  • SQLite를 메타데이터 DB로 사용할 때의 기본 Executor
  • 프로덕션 환경에는 부적합 (개발/테스트 용도)

LocalExecutor

  • 같은 머신 내에서 멀티프로세싱을 통해 병렬 실행
  • parallelism 설정으로 동시 실행 Task 수 제한
  • 소규모 프로덕션 환경에 적합
# airflow.cfg
[core]
executor = LocalExecutor
parallelism = 32  # 전체 병렬 실행 가능 Task 수
dag_concurrency = 16  # 하나의 DAG에서 동시 실행 가능 Task 수

CeleryExecutor

  • Celery 기반 분산 실행
  • 여러 Worker 노드에 Task를 분산
  • 메시지 브로커(Redis, RabbitMQ)를 통한 Task 큐잉
  • 대규모 프로덕션 환경에 적합

Scheduler

Message Broker

Worker 1

Worker 2

Worker 3

MetaDB

KubernetesExecutor

  • Kubernetes Pod로 각 Task를 실행
  • Task마다 독립적인 환경과 리소스 할당 가능
  • Auto-scaling 지원
  • 리소스 격리 및 보안 측면에서 우수

CeleryKubernetesExecutor (Airflow 2.0+)

  • CeleryExecutor와 KubernetesExecutor를 혼용
  • Task 레벨에서 Executor를 선택할 수 있음
# Task 별로 다른 Executor 지정
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "Celery"',
    executor_config={},  # CeleryExecutor 사용
)

task2 = BashOperator(
    task_id='task2',
    bash_command='echo "Kubernetes"',
    executor_config={
        'KubernetesExecutor': {
            'request_memory': '1Gi',
            'limit_memory': '2Gi',
        }
    },
)

3. Webserver

Flask 기반의 웹 애플리케이션으로, 다음 기능을 제공합니다:

  • DAG 시각화 (Graph View, Tree View, Gantt Chart 등)
  • Task 로그 조회
  • DAG 실행 트리거 및 중단
  • Variable, Connection 관리
  • 사용자 인증 및 권한 관리 (RBAC)

4. Metadata Database

PostgreSQL, MySQL 등을 사용하며, 다음 정보를 저장합니다:

  • DAG 정의 및 메타데이터
  • Task 실행 이력
  • 변수(Variables), 연결(Connections)
  • 사용자 및 권한 정보

주요 테이블:

  • dag: DAG 정보
  • dag_run: DAG 실행 인스턴스
  • task_instance: Task 실행 인스턴스
  • xcom: Task 간 데이터 공유를 위한 테이블

핵심 개념

DAG (Directed Acyclic Graph)

DAG는 Airflow에서 워크플로우를 정의하는 핵심 개념입니다.

주요 특성:

  • Directed: Task 간의 의존성에 방향이 있음
  • Acyclic: 순환 의존성이 없음 (A → B → C → A 불가)
  • Graph: Task들이 노드로, 의존성이 엣지로 표현됨

Extract

Transform

Load

Notify

위 다이어그램은 간단한 ETL 파이프라인 DAG를 나타냅니다. 각 노드는 Task이고, 화살표는 의존성을 의미합니다.

DAG 정의 방식

1. Context Manager 방식

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,  # 이전 실행의 성공 여부에 의존하지 않음
    'start_date': datetime(2024, 1, 1),
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,  # 지수 백오프
    'max_retry_delay': timedelta(minutes=30),
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    description='Daily data processing pipeline',
    schedule_interval='0 2 * * *',  # 매일 02:00
    catchup=False,  # 과거 날짜 자동 실행 비활성화
    max_active_runs=1,  # 동시 실행 가능한 DAG 인스턴스 수
    tags=['production', 'daily'],
) as dag:
    # Task 정의
    pass

2. DAG Decorator 방식 (Airflow 2.0+)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def my_dag():
    @task
    def extract():
        return {'data': [1, 2, 3]}

    @task
    def transform(data):
        return {'transformed': [x * 2 for x in data['data']]}

    @task
    def load(data):
        print(f"Loading {data}")

    # Task 의존성 정의
    load(transform(extract()))

dag_instance = my_dag()

중요 파라미터

schedule_interval

DAG의 실행 주기를 정의합니다.

# Cron 표현식
schedule_interval='0 2 * * *'  # 매일 02:00

# Preset
schedule_interval='@daily'     # 0 0 * * *
schedule_interval='@hourly'    # 0 * * * *
schedule_interval='@weekly'    # 0 0 * * 0
schedule_interval='@monthly'   # 0 0 1 * *
schedule_interval='@yearly'    # 0 0 1 1 *

# timedelta
schedule_interval=timedelta(hours=3)  # 3시간마다

# None (수동 실행만)
schedule_interval=None

start_date와 execution_date의 이해

Airflow의 스케줄링은 직관적이지 않을 수 있습니다:

start_date = datetime(2024, 1, 1, 2, 0)  # 2024-01-01 02:00
schedule_interval = '@daily'

# 실제 실행 시점:
# execution_date: 2024-01-01 02:00 → 실행 시각: 2024-01-02 02:00
# execution_date: 2024-01-02 02:00 → 실행 시각: 2024-01-03 02:00

2024-01-012024-01-012024-01-012024-01-012024-01-022024-01-022024-01-022024-01-022024-01-032024-01-032024-01-032024-01-032024-01-04execution_date실제 실행 시점execution_date실제 실행 시점DAG RunsAirflow 스케줄링 타임라인

execution_date는 “논리적 날짜”로, 해당 기간의 데이터를 처리한다는 의미입니다. 실제 실행은 다음 인터벌이 시작될 때 발생합니다.

catchup

과거 날짜에 대한 자동 실행 여부를 결정합니다.

# catchup=True인 경우
start_date = datetime(2024, 1, 1)
현재 날짜 = 2024, 1, 10
# → 2024-01-01부터 2024-01-09까지의 DAG Run이 모두 생성되어 실행됨

# catchup=False인 경우
# → 가장 최근 인터벌만 실행됨

Task와 Operator

Operator

Operator는 단일 Task가 수행할 작업의 템플릿입니다.

주요 Operator 타입

1. BashOperator

from airflow.operators.bash import BashOperator

run_script = BashOperator(
    task_id='run_etl_script',
    bash_command='python /scripts/etl.py',
    env={'ENV': 'production'},  # 환경 변수
    append_env=True,  # 기존 환경 변수 유지
    cwd='/tmp',  # 작업 디렉토리
)

2. PythonOperator

from airflow.operators.python import PythonOperator

def process_data(**context):
    # context에서 실행 정보 접근
    execution_date = context['execution_date']
    ti = context['ti']  # TaskInstance

    # XCom으로 이전 Task의 결과 가져오기
    data = ti.xcom_pull(task_ids='extract_data')

    result = {'processed': len(data)}
    return result  # 자동으로 XCom에 push됨

process = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    provide_context=True,  # context 전달
)

3. PythonVirtualenvOperator 독립적인 Python 가상 환경에서 실행:

from airflow.operators.python import PythonVirtualenvOperator

def callable_with_special_deps():
    import pandas as pd  # Worker에 설치되지 않은 패키지도 사용 가능
    # ...

venv_task = PythonVirtualenvOperator(
    task_id='venv_task',
    python_callable=callable_with_special_deps,
    requirements=['pandas==1.5.0', 'numpy==1.23.0'],
    system_site_packages=False,
)

4. Sensor 특정 조건이 만족될 때까지 대기하는 특수한 Operator:

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor

# 파일 존재 여부 확인
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    poke_interval=30,  # 30초마다 확인
    timeout=600,  # 10분 타임아웃
    mode='poke',  # 'poke' 또는 'reschedule'
)

# 다른 DAG의 Task 완료 대기
wait_for_upstream_dag = ExternalTaskSensor(
    task_id='wait_for_upstream',
    external_dag_id='upstream_dag',
    external_task_id='final_task',
    timeout=3600,
)

Sensor 모드:

  • poke 모드: Worker 슬롯을 점유한 채로 대기 (빠른 응답이 필요한 경우)
  • reschedule 모드: Worker 슬롯을 반환하고 다시 스케줄링 (리소스 효율적)

Task 의존성 정의

# 기본 체이닝
task1 >> task2 >> task3

# 팬아웃 (Fan-out)
task1 >> [task2, task3, task4]

# 팬인 (Fan-in)
[task1, task2, task3] >> task4

# 복잡한 의존성
task1 >> task2
task1 >> task3
[task2, task3] >> task4
task4 >> [task5, task6]
[task5, task6] >> task7

Task 1

Task 2

Task 3

Task 4

Task 5

Task 6

Task 7

조건부 의존성 (Branching):

from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    execution_date = context['execution_date']
    if execution_date.weekday() < 5:  # 평일
        return 'weekday_task'
    else:
        return 'weekend_task'

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

weekday_task = BashOperator(task_id='weekday_task', bash_command='echo weekday')
weekend_task = BashOperator(task_id='weekend_task', bash_command='echo weekend')

branch >> [weekday_task, weekend_task]

평일주말

Start

Branch

Weekday

Weekend

End

TaskGroup (Airflow 2.0+)

관련된 Task들을 논리적으로 그룹화:

from airflow.utils.task_group import TaskGroup

with TaskGroup('data_processing') as processing_group:
    extract = BashOperator(task_id='extract', ...)
    transform = BashOperator(task_id='transform', ...)
    load = BashOperator(task_id='load', ...)

    extract >> transform >> load

start >> processing_group >> end

Processing

Extract

Transform

Load

Start

End

XCom (Cross-Communication)

Task 간 데이터를 공유하는 메커니즘입니다.

Task2XComTask1Task2XComTask1{'count': 100}ProcessPush dataPull data{'count': 100}

사용 방법

# Push
def push_function(**context):
    context['ti'].xcom_push(key='my_key', value={'data': [1, 2, 3]})
    # 또는 return으로 자동 push
    return {'data': [1, 2, 3]}  # 자동으로 'return_value' key로 저장

# Pull
def pull_function(**context):
    ti = context['ti']
    data = ti.xcom_pull(task_ids='push_task', key='my_key')
    # key를 지정하지 않으면 'return_value' 가져옴
    data = ti.xcom_pull(task_ids='push_task')

XCom의 한계와 주의사항

  1. 크기 제한: XCom은 메타데이터 DB에 저장되므로 큰 데이터에는 부적합
  2. 직렬화: pickle로 직렬화되므로 복잡한 객체는 문제가 될 수 있음
  3. 권장 사항: 큰 데이터는 S3, GCS 등 외부 스토리지에 저장하고 경로만 XCom으로 전달
# 나쁜 예: 큰 DataFrame을 XCom으로 전달
def bad_practice():
    df = pd.read_csv('large_file.csv')  # 100MB
    return df  # XCom에 저장 시도 → 문제 발생

# 좋은 예: 경로만 전달
def good_practice():
    df = pd.read_csv('large_file.csv')
    s3_path = upload_to_s3(df)
    return s3_path  # 경로만 XCom에 저장

Variables와 Connections

Variables

전역 변수를 저장하고 DAG에서 사용:

from airflow.models import Variable

# 설정 (UI 또는 CLI)
# airflow variables set my_var my_value

# 가져오기
my_var = Variable.get('my_var')
my_json_var = Variable.get('my_json_var', deserialize_json=True)

# 기본값 지정
my_var = Variable.get('my_var', default_var='default_value')

Connections

외부 시스템 연결 정보를 저장:

from airflow.hooks.base import BaseHook

# Connection 가져오기
conn = BaseHook.get_connection('my_postgres_conn')
host = conn.host
port = conn.port
login = conn.login
password = conn.password

# Hook 사용 (권장)
from airflow.providers.postgres.hooks.postgres import PostgresHook

pg_hook = PostgresHook(postgres_conn_id='my_postgres_conn')
records = pg_hook.get_records("SELECT * FROM users")

동적 DAG 생성

Python의 동적 특성을 활용한 DAG 생성:

# 설정 기반 DAG 생성
configs = [
    {'name': 'pipeline_a', 'schedule': '@daily', 'table': 'table_a'},
    {'name': 'pipeline_b', 'schedule': '@hourly', 'table': 'table_b'},
]

for config in configs:
    with DAG(
        dag_id=f"etl_{config['name']}",
        schedule_interval=config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
    ) as dag:
        extract = BashOperator(
            task_id='extract',
            bash_command=f"echo 'Extracting from {config['table']}'",
        )

        load = BashOperator(
            task_id='load',
            bash_command=f"echo 'Loading to {config['table']}'",
        )

        extract >> load

        # DAG를 globals()에 등록 (중요!)
        globals()[f"etl_{config['name']}"] = dag

실전 베스트 프랙티스

1. Idempotency (멱등성)

Task는 여러 번 실행되어도 같은 결과를 내야 합니다:

# 나쁜 예: 멱등하지 않음
INSERT INTO table VALUES (1, 'data');

# 좋은 예: 멱등함
INSERT INTO table VALUES (1, 'data')
ON CONFLICT (id) DO UPDATE SET value = 'data';

# 또는 날짜 기반 파티셔닝
DELETE FROM table WHERE date = '{{ ds }}';
INSERT INTO table SELECT * FROM source WHERE date = '{{ ds }}';

2. Templating

Jinja 템플릿을 활용한 동적 파라미터 전달:

# 사용 가능한 템플릿 변수
run_query = BashOperator(
    task_id='run_query',
    bash_command="""
        echo "Execution Date: {{ ds }}"
        echo "Execution Date (no dashes): {{ ds_nodash }}"
        echo "Previous Execution Date: {{ prev_ds }}"
        echo "Tomorrow: {{ tomorrow_ds }}"
        echo "DAG ID: {{ dag.dag_id }}"
        echo "Task ID: {{ task.task_id }}"
        echo "Run ID: {{ run_id }}"
    """,
)

# 커스텀 매크로
def custom_macro(ds):
    return f"processed_{ds}"

dag.user_defined_macros = {
    'custom_macro': custom_macro
}

task = BashOperator(
    task_id='task',
    bash_command='echo {{ custom_macro(ds) }}',
)

3. Connection Pool 설정

데이터베이스 연결 최적화:

# airflow.cfg
[core]
sql_alchemy_pool_size = 5  # Connection pool 크기
sql_alchemy_pool_recycle = 3600  # 1시간마다 연결 재생성
sql_alchemy_max_overflow = 10  # 추가로 생성 가능한 연결 수

4. Task 실행 시간 제한

task = BashOperator(
    task_id='task',
    bash_command='long_running_script.sh',
    execution_timeout=timedelta(hours=2),  # 2시간 제한
)

5. SLA (Service Level Agreement)

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    # SLA 위반 시 호출
    print(f"SLA missed for {task_list}")

with DAG(
    'my_dag',
    default_args={'sla': timedelta(hours=2)},  # 2시간 내 완료 기대
    sla_miss_callback=sla_miss_callback,
) as dag:
    task = BashOperator(...)

6. Pool을 통한 리소스 제어

동시 실행 Task 수를 제한:

# UI 또는 CLI로 Pool 생성
# airflow pools set database_pool 5 "Database connection pool"

task = BashOperator(
    task_id='db_task',
    bash_command='...',
    pool='database_pool',  # Pool 지정
    priority_weight=10,  # 우선순위 (높을수록 먼저 실행)
)

실전 예제: 복잡한 ETL 파이프라인

실제 프로덕션에서 사용할 만한 복잡한 파이프라인을 구현해봅시다:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['alerts@company.com'],
}

with DAG(
    'complex_etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['production', 'etl'],
) as dag:

    # 파일 도착 대기
    wait_for_file = FileSensor(
        task_id='wait_for_source_file',
        filepath='/data/input/{{ ds }}.csv',
        poke_interval=300,  # 5분마다 체크
        timeout=3600,  # 1시간 타임아웃
        mode='reschedule',
    )

    # 데이터 검증
    def validate_data(**context):
        import pandas as pd
        ds = context['ds']
        df = pd.read_csv(f'/data/input/{ds}.csv')

        if len(df) == 0:
            raise ValueError("Empty dataset")
        if df.isnull().sum().sum() > len(df) * 0.1:
            raise ValueError("Too many null values")

        return {'row_count': len(df), 'file_path': f'/data/input/{ds}.csv'}

    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
    )

    # 데이터 크기에 따라 분기
    def choose_processing_method(**context):
        ti = context['ti']
        result = ti.xcom_pull(task_ids='validate_data')
        row_count = result['row_count']

        if row_count > 1000000:
            return 'process_large_dataset'
        else:
            return 'process_small_dataset'

    branch = BranchPythonOperator(
        task_id='branch_by_size',
        python_callable=choose_processing_method,
    )

    # 작은 데이터셋 처리
    process_small = BashOperator(
        task_id='process_small_dataset',
        bash_command='python /scripts/process_small.py {{ ds }}',
    )

    # 큰 데이터셋 처리 (Spark)
    process_large = BashOperator(
        task_id='process_large_dataset',
        bash_command='spark-submit /scripts/process_large.py {{ ds }}',
        executor_config={
            'KubernetesExecutor': {
                'request_memory': '4Gi',
                'request_cpu': '2',
            }
        },
    )

    # 데이터 품질 체크 그룹
    with TaskGroup('quality_checks') as quality_group:
        check_nulls = PythonOperator(
            task_id='check_nulls',
            python_callable=lambda: print("Checking nulls..."),
        )

        check_duplicates = PythonOperator(
            task_id='check_duplicates',
            python_callable=lambda: print("Checking duplicates..."),
        )

        check_schema = PythonOperator(
            task_id='check_schema',
            python_callable=lambda: print("Checking schema..."),
        )

        [check_nulls, check_duplicates, check_schema]

    # 데이터 로드
    load_to_warehouse = BashOperator(
        task_id='load_to_warehouse',
        bash_command='python /scripts/load_data.py {{ ds }}',
        trigger_rule='none_failed',  # 이전 Task 중 실패가 없으면 실행
    )

    # 알림
    send_notification = BashOperator(
        task_id='send_notification',
        bash_command='python /scripts/send_slack.py "ETL completed for {{ ds }}"',
    )

    # 의존성 정의
    wait_for_file >> validate >> branch
    branch >> [process_small, process_large]
    [process_small, process_large] >> quality_group
    quality_group >> load_to_warehouse >> send_notification

이 DAG를 시각화하면 다음과 같습니다:

SmallLargeQuality Check

Nulls

Duplicates

Schema

Wait File

Validate

Size?

Process

Spark

Load

Notify

모니터링과 로깅

로그 설정

# airflow.cfg
[logging]
base_log_folder = /var/log/airflow
remote_logging = True
remote_log_conn_id = my_s3_conn
remote_base_log_folder = s3://my-bucket/airflow-logs

커스텀 로깅

from airflow.utils.log.logging_mixin import LoggingMixin

class MyClass(LoggingMixin):
    def my_method(self):
        self.log.info("This is an info message")
        self.log.warning("This is a warning")
        self.log.error("This is an error")

Metrics

StatsD를 통한 메트릭 수집:

# airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

성능 최적화

1. DAG 파싱 최적화

# DAG 파일 상단에 조기 리턴 조건 추가
if not AIRFLOW_VAR:  # 파싱만 필요한 경우
    from airflow import DAG
    dag = DAG('my_dag', ...)
else:
    # 무거운 import는 여기서
    import heavy_module

2. Parallelism 설정

# airflow.cfg
[core]
parallelism = 32  # Airflow 전체 병렬 실행 Task 수
dag_concurrency = 16  # DAG당 병렬 실행 Task 수
max_active_runs_per_dag = 3  # DAG당 최대 동시 실행 Run 수

[celery]
worker_concurrency = 16  # Celery Worker당 병렬 처리 수

3. Task 인스턴스 정리

# 오래된 Task 인스턴스 자동 정리
# airflow.cfg
[scheduler]
max_tis_per_query = 512
clean_tis_without_dagrun = True

정리

Apache Airflow는 단순한 스케줄러를 넘어 복잡한 데이터 파이프라인을 관리할 수 있는 강력한 플랫폼입니다. 핵심을 요약하면:

핵심 컴포넌트:

  • Scheduler: DAG를 모니터링하고 Task를 스케줄링
  • Executor: Task의 실제 실행 방식을 정의 (Local, Celery, Kubernetes 등)
  • Webserver: UI를 통한 모니터링 및 관리
  • Metadata DB: 모든 실행 정보와 상태 저장

핵심 개념:

  • DAG: 방향성 비순환 그래프로 워크플로우를 정의
  • Operator: Task가 수행할 작업의 템플릿
  • Task: DAG 내의 개별 실행 단위
  • XCom: Task 간 데이터 공유 메커니즘

실전 팁:

  • Task는 멱등하게 설계
  • 큰 데이터는 XCom 대신 외부 스토리지 활용
  • Pool을 통한 리소스 제어
  • 적절한 Executor 선택 (규모에 따라)
  • 동적 DAG 생성으로 반복 줄이기

Airflow를 처음 시작한다면 로컬에 설치하여 간단한 DAG부터 만들어보고, 점진적으로 복잡한 파이프라인을 구축하는 것을 권장합니다.

© 2025, 미나리와 함께 만들었음