들어가며
데이터 파이프라인을 운영하다 보면 복잡한 작업들의 의존성 관리, 스케줄링, 모니터링, 실패 처리 등이 필요합니다. Apache Airflow는 이러한 요구사항을 해결하기 위한 워크플로우 오케스트레이션 플랫폼으로, Airbnb에서 개발되어 현재 Apache 재단의 최상위 프로젝트로 관리되고 있습니다. 이 글에서는 Airflow의 핵심 개념과 아키텍처, 그리고 실전에서 알아야 할 내용들을 다룹니다.
Apache Airflow란?
Apache Airflow는 프로그래매틱하게 워크플로우를 작성, 스케줄링, 모니터링할 수 있는 플랫폼입니다. “Configuration as Code” 철학을 따르며, Python 코드로 워크플로우를 정의합니다.
왜 Airflow인가?
전통적인 cron 기반 스케줄링의 한계:
- 의존성 관리 부재: 작업 간 의존성을 명시적으로 표현할 수 없음
- 중앙화된 모니터링 부족: 각 작업의 실행 상태를 한눈에 파악하기 어려움
- 재시도 로직의 부재: 실패 시 재시도 정책을 일관되게 적용하기 어려움
- 백프레셔(Backpressure) 처리 불가: 작업이 밀렸을 때 대응 방법이 없음
- 동적 파이프라인 생성 불가: 런타임에 워크플로우를 동적으로 생성할 수 없음
Airflow는 이러한 문제들을 해결하며, 다음과 같은 특징을 제공합니다:
- 동적 파이프라인 생성: Python 코드로 동적으로 DAG를 생성할 수 있음
- 확장성: Task를 분산 처리할 수 있는 다양한 Executor 지원
- 풍부한 UI: DAG의 시각화, 로그 확인, 재실행 등을 웹 UI에서 수행
- 플러그인 아키텍처: 커스텀 Operator, Hook, Sensor 등을 쉽게 추가 가능
Airflow 아키텍처
Airflow는 여러 컴포넌트가 유기적으로 동작하는 분산 시스템입니다.
핵심 컴포넌트
1. Scheduler
Scheduler는 Airflow의 심장부로, 다음과 같은 역할을 수행합니다:
- DAG 파일을 주기적으로 파싱하여 변경사항 감지
- 스케줄에 따라 실행할 Task 결정
- Task를 Executor의 큐에 제출
동작 방식:
스케줄러는 기본적으로 단일 프로세스로 실행되지만, Airflow 2.0부터는 HA(High Availability) 구성도 가능합니다.
2. Executor
Executor는 Task를 실제로 실행하는 메커니즘을 정의합니다.
주요 Executor 타입:
SequentialExecutor
- 단일 프로세스에서 순차적으로 Task를 실행
- SQLite를 메타데이터 DB로 사용할 때의 기본 Executor
- 프로덕션 환경에는 부적합 (개발/테스트 용도)
LocalExecutor
- 같은 머신 내에서 멀티프로세싱을 통해 병렬 실행
parallelism설정으로 동시 실행 Task 수 제한- 소규모 프로덕션 환경에 적합
# airflow.cfg
[core]
executor = LocalExecutor
parallelism = 32 # 전체 병렬 실행 가능 Task 수
dag_concurrency = 16 # 하나의 DAG에서 동시 실행 가능 Task 수CeleryExecutor
- Celery 기반 분산 실행
- 여러 Worker 노드에 Task를 분산
- 메시지 브로커(Redis, RabbitMQ)를 통한 Task 큐잉
- 대규모 프로덕션 환경에 적합
KubernetesExecutor
- Kubernetes Pod로 각 Task를 실행
- Task마다 독립적인 환경과 리소스 할당 가능
- Auto-scaling 지원
- 리소스 격리 및 보안 측면에서 우수
CeleryKubernetesExecutor (Airflow 2.0+)
- CeleryExecutor와 KubernetesExecutor를 혼용
- Task 레벨에서 Executor를 선택할 수 있음
# Task 별로 다른 Executor 지정
task1 = BashOperator(
task_id='task1',
bash_command='echo "Celery"',
executor_config={}, # CeleryExecutor 사용
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Kubernetes"',
executor_config={
'KubernetesExecutor': {
'request_memory': '1Gi',
'limit_memory': '2Gi',
}
},
)3. Webserver
Flask 기반의 웹 애플리케이션으로, 다음 기능을 제공합니다:
- DAG 시각화 (Graph View, Tree View, Gantt Chart 등)
- Task 로그 조회
- DAG 실행 트리거 및 중단
- Variable, Connection 관리
- 사용자 인증 및 권한 관리 (RBAC)
4. Metadata Database
PostgreSQL, MySQL 등을 사용하며, 다음 정보를 저장합니다:
- DAG 정의 및 메타데이터
- Task 실행 이력
- 변수(Variables), 연결(Connections)
- 사용자 및 권한 정보
주요 테이블:
dag: DAG 정보dag_run: DAG 실행 인스턴스task_instance: Task 실행 인스턴스xcom: Task 간 데이터 공유를 위한 테이블
핵심 개념
DAG (Directed Acyclic Graph)
DAG는 Airflow에서 워크플로우를 정의하는 핵심 개념입니다.
주요 특성:
- Directed: Task 간의 의존성에 방향이 있음
- Acyclic: 순환 의존성이 없음 (A → B → C → A 불가)
- Graph: Task들이 노드로, 의존성이 엣지로 표현됨
위 다이어그램은 간단한 ETL 파이프라인 DAG를 나타냅니다. 각 노드는 Task이고, 화살표는 의존성을 의미합니다.
DAG 정의 방식
1. Context Manager 방식
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False, # 이전 실행의 성공 여부에 의존하지 않음
'start_date': datetime(2024, 1, 1),
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 지수 백오프
'max_retry_delay': timedelta(minutes=30),
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
description='Daily data processing pipeline',
schedule_interval='0 2 * * *', # 매일 02:00
catchup=False, # 과거 날짜 자동 실행 비활성화
max_active_runs=1, # 동시 실행 가능한 DAG 인스턴스 수
tags=['production', 'daily'],
) as dag:
# Task 정의
pass2. DAG Decorator 방식 (Airflow 2.0+)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def my_dag():
@task
def extract():
return {'data': [1, 2, 3]}
@task
def transform(data):
return {'transformed': [x * 2 for x in data['data']]}
@task
def load(data):
print(f"Loading {data}")
# Task 의존성 정의
load(transform(extract()))
dag_instance = my_dag()중요 파라미터
schedule_interval
DAG의 실행 주기를 정의합니다.
# Cron 표현식
schedule_interval='0 2 * * *' # 매일 02:00
# Preset
schedule_interval='@daily' # 0 0 * * *
schedule_interval='@hourly' # 0 * * * *
schedule_interval='@weekly' # 0 0 * * 0
schedule_interval='@monthly' # 0 0 1 * *
schedule_interval='@yearly' # 0 0 1 1 *
# timedelta
schedule_interval=timedelta(hours=3) # 3시간마다
# None (수동 실행만)
schedule_interval=Nonestart_date와 execution_date의 이해
Airflow의 스케줄링은 직관적이지 않을 수 있습니다:
start_date = datetime(2024, 1, 1, 2, 0) # 2024-01-01 02:00
schedule_interval = '@daily'
# 실제 실행 시점:
# execution_date: 2024-01-01 02:00 → 실행 시각: 2024-01-02 02:00
# execution_date: 2024-01-02 02:00 → 실행 시각: 2024-01-03 02:00execution_date는 “논리적 날짜”로, 해당 기간의 데이터를 처리한다는 의미입니다. 실제 실행은 다음 인터벌이 시작될 때 발생합니다.
catchup
과거 날짜에 대한 자동 실행 여부를 결정합니다.
# catchup=True인 경우
start_date = datetime(2024, 1, 1)
현재 날짜 = 2024, 1, 10
# → 2024-01-01부터 2024-01-09까지의 DAG Run이 모두 생성되어 실행됨
# catchup=False인 경우
# → 가장 최근 인터벌만 실행됨Task와 Operator
Operator
Operator는 단일 Task가 수행할 작업의 템플릿입니다.
주요 Operator 타입
1. BashOperator
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id='run_etl_script',
bash_command='python /scripts/etl.py',
env={'ENV': 'production'}, # 환경 변수
append_env=True, # 기존 환경 변수 유지
cwd='/tmp', # 작업 디렉토리
)2. PythonOperator
from airflow.operators.python import PythonOperator
def process_data(**context):
# context에서 실행 정보 접근
execution_date = context['execution_date']
ti = context['ti'] # TaskInstance
# XCom으로 이전 Task의 결과 가져오기
data = ti.xcom_pull(task_ids='extract_data')
result = {'processed': len(data)}
return result # 자동으로 XCom에 push됨
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True, # context 전달
)3. PythonVirtualenvOperator 독립적인 Python 가상 환경에서 실행:
from airflow.operators.python import PythonVirtualenvOperator
def callable_with_special_deps():
import pandas as pd # Worker에 설치되지 않은 패키지도 사용 가능
# ...
venv_task = PythonVirtualenvOperator(
task_id='venv_task',
python_callable=callable_with_special_deps,
requirements=['pandas==1.5.0', 'numpy==1.23.0'],
system_site_packages=False,
)4. Sensor 특정 조건이 만족될 때까지 대기하는 특수한 Operator:
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
# 파일 존재 여부 확인
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
poke_interval=30, # 30초마다 확인
timeout=600, # 10분 타임아웃
mode='poke', # 'poke' 또는 'reschedule'
)
# 다른 DAG의 Task 완료 대기
wait_for_upstream_dag = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=3600,
)Sensor 모드:
- poke 모드: Worker 슬롯을 점유한 채로 대기 (빠른 응답이 필요한 경우)
- reschedule 모드: Worker 슬롯을 반환하고 다시 스케줄링 (리소스 효율적)
Task 의존성 정의
# 기본 체이닝
task1 >> task2 >> task3
# 팬아웃 (Fan-out)
task1 >> [task2, task3, task4]
# 팬인 (Fan-in)
[task1, task2, task3] >> task4
# 복잡한 의존성
task1 >> task2
task1 >> task3
[task2, task3] >> task4
task4 >> [task5, task6]
[task5, task6] >> task7조건부 의존성 (Branching):
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
execution_date = context['execution_date']
if execution_date.weekday() < 5: # 평일
return 'weekday_task'
else:
return 'weekend_task'
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
weekday_task = BashOperator(task_id='weekday_task', bash_command='echo weekday')
weekend_task = BashOperator(task_id='weekend_task', bash_command='echo weekend')
branch >> [weekday_task, weekend_task]TaskGroup (Airflow 2.0+)
관련된 Task들을 논리적으로 그룹화:
from airflow.utils.task_group import TaskGroup
with TaskGroup('data_processing') as processing_group:
extract = BashOperator(task_id='extract', ...)
transform = BashOperator(task_id='transform', ...)
load = BashOperator(task_id='load', ...)
extract >> transform >> load
start >> processing_group >> endXCom (Cross-Communication)
Task 간 데이터를 공유하는 메커니즘입니다.
사용 방법
# Push
def push_function(**context):
context['ti'].xcom_push(key='my_key', value={'data': [1, 2, 3]})
# 또는 return으로 자동 push
return {'data': [1, 2, 3]} # 자동으로 'return_value' key로 저장
# Pull
def pull_function(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='push_task', key='my_key')
# key를 지정하지 않으면 'return_value' 가져옴
data = ti.xcom_pull(task_ids='push_task')XCom의 한계와 주의사항
- 크기 제한: XCom은 메타데이터 DB에 저장되므로 큰 데이터에는 부적합
- 직렬화: pickle로 직렬화되므로 복잡한 객체는 문제가 될 수 있음
- 권장 사항: 큰 데이터는 S3, GCS 등 외부 스토리지에 저장하고 경로만 XCom으로 전달
# 나쁜 예: 큰 DataFrame을 XCom으로 전달
def bad_practice():
df = pd.read_csv('large_file.csv') # 100MB
return df # XCom에 저장 시도 → 문제 발생
# 좋은 예: 경로만 전달
def good_practice():
df = pd.read_csv('large_file.csv')
s3_path = upload_to_s3(df)
return s3_path # 경로만 XCom에 저장Variables와 Connections
Variables
전역 변수를 저장하고 DAG에서 사용:
from airflow.models import Variable
# 설정 (UI 또는 CLI)
# airflow variables set my_var my_value
# 가져오기
my_var = Variable.get('my_var')
my_json_var = Variable.get('my_json_var', deserialize_json=True)
# 기본값 지정
my_var = Variable.get('my_var', default_var='default_value')Connections
외부 시스템 연결 정보를 저장:
from airflow.hooks.base import BaseHook
# Connection 가져오기
conn = BaseHook.get_connection('my_postgres_conn')
host = conn.host
port = conn.port
login = conn.login
password = conn.password
# Hook 사용 (권장)
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='my_postgres_conn')
records = pg_hook.get_records("SELECT * FROM users")동적 DAG 생성
Python의 동적 특성을 활용한 DAG 생성:
# 설정 기반 DAG 생성
configs = [
{'name': 'pipeline_a', 'schedule': '@daily', 'table': 'table_a'},
{'name': 'pipeline_b', 'schedule': '@hourly', 'table': 'table_b'},
]
for config in configs:
with DAG(
dag_id=f"etl_{config['name']}",
schedule_interval=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = BashOperator(
task_id='extract',
bash_command=f"echo 'Extracting from {config['table']}'",
)
load = BashOperator(
task_id='load',
bash_command=f"echo 'Loading to {config['table']}'",
)
extract >> load
# DAG를 globals()에 등록 (중요!)
globals()[f"etl_{config['name']}"] = dag실전 베스트 프랙티스
1. Idempotency (멱등성)
Task는 여러 번 실행되어도 같은 결과를 내야 합니다:
# 나쁜 예: 멱등하지 않음
INSERT INTO table VALUES (1, 'data');
# 좋은 예: 멱등함
INSERT INTO table VALUES (1, 'data')
ON CONFLICT (id) DO UPDATE SET value = 'data';
# 또는 날짜 기반 파티셔닝
DELETE FROM table WHERE date = '{{ ds }}';
INSERT INTO table SELECT * FROM source WHERE date = '{{ ds }}';2. Templating
Jinja 템플릿을 활용한 동적 파라미터 전달:
# 사용 가능한 템플릿 변수
run_query = BashOperator(
task_id='run_query',
bash_command="""
echo "Execution Date: {{ ds }}"
echo "Execution Date (no dashes): {{ ds_nodash }}"
echo "Previous Execution Date: {{ prev_ds }}"
echo "Tomorrow: {{ tomorrow_ds }}"
echo "DAG ID: {{ dag.dag_id }}"
echo "Task ID: {{ task.task_id }}"
echo "Run ID: {{ run_id }}"
""",
)
# 커스텀 매크로
def custom_macro(ds):
return f"processed_{ds}"
dag.user_defined_macros = {
'custom_macro': custom_macro
}
task = BashOperator(
task_id='task',
bash_command='echo {{ custom_macro(ds) }}',
)3. Connection Pool 설정
데이터베이스 연결 최적화:
# airflow.cfg
[core]
sql_alchemy_pool_size = 5 # Connection pool 크기
sql_alchemy_pool_recycle = 3600 # 1시간마다 연결 재생성
sql_alchemy_max_overflow = 10 # 추가로 생성 가능한 연결 수4. Task 실행 시간 제한
task = BashOperator(
task_id='task',
bash_command='long_running_script.sh',
execution_timeout=timedelta(hours=2), # 2시간 제한
)5. SLA (Service Level Agreement)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
# SLA 위반 시 호출
print(f"SLA missed for {task_list}")
with DAG(
'my_dag',
default_args={'sla': timedelta(hours=2)}, # 2시간 내 완료 기대
sla_miss_callback=sla_miss_callback,
) as dag:
task = BashOperator(...)6. Pool을 통한 리소스 제어
동시 실행 Task 수를 제한:
# UI 또는 CLI로 Pool 생성
# airflow pools set database_pool 5 "Database connection pool"
task = BashOperator(
task_id='db_task',
bash_command='...',
pool='database_pool', # Pool 지정
priority_weight=10, # 우선순위 (높을수록 먼저 실행)
)실전 예제: 복잡한 ETL 파이프라인
실제 프로덕션에서 사용할 만한 복잡한 파이프라인을 구현해봅시다:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com'],
}
with DAG(
'complex_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['production', 'etl'],
) as dag:
# 파일 도착 대기
wait_for_file = FileSensor(
task_id='wait_for_source_file',
filepath='/data/input/{{ ds }}.csv',
poke_interval=300, # 5분마다 체크
timeout=3600, # 1시간 타임아웃
mode='reschedule',
)
# 데이터 검증
def validate_data(**context):
import pandas as pd
ds = context['ds']
df = pd.read_csv(f'/data/input/{ds}.csv')
if len(df) == 0:
raise ValueError("Empty dataset")
if df.isnull().sum().sum() > len(df) * 0.1:
raise ValueError("Too many null values")
return {'row_count': len(df), 'file_path': f'/data/input/{ds}.csv'}
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
)
# 데이터 크기에 따라 분기
def choose_processing_method(**context):
ti = context['ti']
result = ti.xcom_pull(task_ids='validate_data')
row_count = result['row_count']
if row_count > 1000000:
return 'process_large_dataset'
else:
return 'process_small_dataset'
branch = BranchPythonOperator(
task_id='branch_by_size',
python_callable=choose_processing_method,
)
# 작은 데이터셋 처리
process_small = BashOperator(
task_id='process_small_dataset',
bash_command='python /scripts/process_small.py {{ ds }}',
)
# 큰 데이터셋 처리 (Spark)
process_large = BashOperator(
task_id='process_large_dataset',
bash_command='spark-submit /scripts/process_large.py {{ ds }}',
executor_config={
'KubernetesExecutor': {
'request_memory': '4Gi',
'request_cpu': '2',
}
},
)
# 데이터 품질 체크 그룹
with TaskGroup('quality_checks') as quality_group:
check_nulls = PythonOperator(
task_id='check_nulls',
python_callable=lambda: print("Checking nulls..."),
)
check_duplicates = PythonOperator(
task_id='check_duplicates',
python_callable=lambda: print("Checking duplicates..."),
)
check_schema = PythonOperator(
task_id='check_schema',
python_callable=lambda: print("Checking schema..."),
)
[check_nulls, check_duplicates, check_schema]
# 데이터 로드
load_to_warehouse = BashOperator(
task_id='load_to_warehouse',
bash_command='python /scripts/load_data.py {{ ds }}',
trigger_rule='none_failed', # 이전 Task 중 실패가 없으면 실행
)
# 알림
send_notification = BashOperator(
task_id='send_notification',
bash_command='python /scripts/send_slack.py "ETL completed for {{ ds }}"',
)
# 의존성 정의
wait_for_file >> validate >> branch
branch >> [process_small, process_large]
[process_small, process_large] >> quality_group
quality_group >> load_to_warehouse >> send_notification이 DAG를 시각화하면 다음과 같습니다:
모니터링과 로깅
로그 설정
# airflow.cfg
[logging]
base_log_folder = /var/log/airflow
remote_logging = True
remote_log_conn_id = my_s3_conn
remote_base_log_folder = s3://my-bucket/airflow-logs커스텀 로깅
from airflow.utils.log.logging_mixin import LoggingMixin
class MyClass(LoggingMixin):
def my_method(self):
self.log.info("This is an info message")
self.log.warning("This is a warning")
self.log.error("This is an error")Metrics
StatsD를 통한 메트릭 수집:
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow성능 최적화
1. DAG 파싱 최적화
# DAG 파일 상단에 조기 리턴 조건 추가
if not AIRFLOW_VAR: # 파싱만 필요한 경우
from airflow import DAG
dag = DAG('my_dag', ...)
else:
# 무거운 import는 여기서
import heavy_module2. Parallelism 설정
# airflow.cfg
[core]
parallelism = 32 # Airflow 전체 병렬 실행 Task 수
dag_concurrency = 16 # DAG당 병렬 실행 Task 수
max_active_runs_per_dag = 3 # DAG당 최대 동시 실행 Run 수
[celery]
worker_concurrency = 16 # Celery Worker당 병렬 처리 수3. Task 인스턴스 정리
# 오래된 Task 인스턴스 자동 정리
# airflow.cfg
[scheduler]
max_tis_per_query = 512
clean_tis_without_dagrun = True정리
Apache Airflow는 단순한 스케줄러를 넘어 복잡한 데이터 파이프라인을 관리할 수 있는 강력한 플랫폼입니다. 핵심을 요약하면:
핵심 컴포넌트:
- Scheduler: DAG를 모니터링하고 Task를 스케줄링
- Executor: Task의 실제 실행 방식을 정의 (Local, Celery, Kubernetes 등)
- Webserver: UI를 통한 모니터링 및 관리
- Metadata DB: 모든 실행 정보와 상태 저장
핵심 개념:
- DAG: 방향성 비순환 그래프로 워크플로우를 정의
- Operator: Task가 수행할 작업의 템플릿
- Task: DAG 내의 개별 실행 단위
- XCom: Task 간 데이터 공유 메커니즘
실전 팁:
- Task는 멱등하게 설계
- 큰 데이터는 XCom 대신 외부 스토리지 활용
- Pool을 통한 리소스 제어
- 적절한 Executor 선택 (규모에 따라)
- 동적 DAG 생성으로 반복 줄이기
Airflow를 처음 시작한다면 로컬에 설치하여 간단한 DAG부터 만들어보고, 점진적으로 복잡한 파이프라인을 구축하는 것을 권장합니다.