airflow/dags/groupuser/01keaxfzxn5ctnx4bsc0ftpq2q.py

121 lines
3.8 KiB
Python

# ----------------------------------------------
# Cheetah Pipeline Template
# ----------------------------------------------
# USER_EDIT_IMPORT_START
# 필요한 라이브러리 import 영역
import os
import json
from airflow.operators.python import PythonOperator
from operators.cheetah_dataset_operator import CheetahDatasetOperator
# USER_EDIT_IMPORT_END
from decorators.cheetah_pipeline_decorator import cheetah_pipeline
from airflow import DAG
from datetime import datetime
@cheetah_pipeline(
upload_outputs=True,
push_to_dataset=False,
s3_bucket=None,
s3_conn_id=None
)
def dag_template():
# ------------------------------------------------
# DAG 기본 설정 (수정 가능)
# ------------------------------------------------
# USER_EDIT_DAG_META_START
start_date = datetime(2026, 1, 1)
schedule = None
catchup = False
tags = ['cheetah', 'test']
# 사용자 정의 로직
# [Helper] 입력 경로 찾기 함수 (표준화)
def get_input_path(upstream_task_id: str, sub_path: str = "") -> str:
"""
이전 Task의 결과물이 마운트된 표준 경로를 반환합니다.
Path: /cheetah/inputs/{upstream_task_id}/output/{sub_path}
"""
base = f"/cheetah/inputs/{upstream_task_id}/output"
return os.path.join(base, sub_path) if sub_path else base
# [Helper] 출력 경로 만들기 함수 (표준화)
def get_output_path(filename: str) -> str:
"""
현재 Task가 결과물을 저장해야 할 표준 경로를 반환합니다.
Path: /cheetah/outputs/{filename}
"""
return os.path.join("/cheetah/outputs", filename)
def _simulate_drm_logic(**context):
# 1. [Input] 이전 Task(download_dataset)가 'raw_data' 폴더에 다운로드 받은 것 가져오기
# 사용자는 "어떤 Task의 결과인가?"만 알면 됨
upstream_id = 'download_dataset'
input_dir = get_input_path(upstream_id, sub_path="raw_data")
print(f"[_simulate_drm] Reading inputs from: {input_dir}")
if os.path.exists(input_dir):
print(f"Files: {os.listdir(input_dir)}")
else:
print("Warning: Input directory not found.")
# 2. [Output] 결과물 생성
# 사용자는 "어떤 파일명으로 저장할까?"만 알면 됨 (/cheetah/outputs 자동 지정)
target_file = get_output_path("decrypted_result.json")
dummy_data = {
"source": input_dir,
"status": "success",
"timestamp": str(datetime.now())
}
with open(target_file, 'w') as f:
json.dump(dummy_data, f, indent=4)
print(f"[_simulate_drm] Saved output to: {target_file}")
# USER_EDIT_DAG_META_END
with DAG(
dag_id='ca_no_cvtd_test',
start_date=start_date,
schedule=schedule,
catchup=catchup,
tags=tags,
is_paused_upon_creation=False
) as dag:
# ------------------------------------------------
# TASK 작성 영역 (수정 가능)
# ------------------------------------------------
# USER_EDIT_TASK_START
# [Task 1] 데이터셋 다운로드
# - output_path='raw_data' 지정 시 -> /cheetah/outputs/raw_data 에 저장됨
t1 = CheetahDatasetOperator(
task_id='download_dataset',
dataset_sn=409,
output_path='raw_data'
)
# [Task 2] DRM 해제 (Python Logic)
# - InitContainer가 t1의 산출물을 /cheetah/inputs/... 로 가져옴
# - 로직 수행 후 /cheetah/outputs 에 결과 저장 -> 자동 업로드
t2 = PythonOperator(
task_id='simulate_drm',
python_callable=_simulate_drm_logic
)
t1 >> t2
# USER_EDIT_TASK_END
return dag
dag = dag_template()