01k6f98y6pqgdxm7baky7anbdy.py committed by groupuser
This commit is contained in:
parent
cb29fc5015
commit
3e06a47f70
@ -3,13 +3,12 @@ import os
|
||||
from datetime import datetime
|
||||
from airflow import DAG
|
||||
from airflow.operators.python import PythonOperator
|
||||
from decorators.cheetah_dag import cheetah_dag
|
||||
from decorators.push_to_cheetah_dataset import push_to_cheetah_dataset
|
||||
|
||||
KST = pendulum.timezone("Asia/Seoul")
|
||||
|
||||
SHARED_DIR = "/opt/airflow/shared"
|
||||
LOG_FILES = [f"{SHARED_DIR}/task1.log", f"{SHARED_DIR}/task2.log"]
|
||||
AGGREGATED_PATH = f"{SHARED_DIR}/aggregated_logs.txt"
|
||||
OUTPUT_PATH = f"{SHARED_DIR}/test_output.txt"
|
||||
|
||||
|
||||
@ -21,51 +20,25 @@ def log_task(task_name, log_path, **context):
|
||||
print(f"[{task_name}] wrote log to {log_path}")
|
||||
|
||||
|
||||
def aggregate_logs(**context):
|
||||
os.makedirs(os.path.dirname(AGGREGATED_PATH), exist_ok=True)
|
||||
logs = []
|
||||
for log_path in LOG_FILES:
|
||||
if os.path.exists(log_path):
|
||||
with open(log_path, "r", encoding="utf-8") as f:
|
||||
logs.append(f.read().strip())
|
||||
else:
|
||||
logs.append(f"[Missing] {log_path}")
|
||||
content = "\n".join(logs)
|
||||
with open(AGGREGATED_PATH, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
os.system(f"cp {AGGREGATED_PATH} {OUTPUT_PATH}")
|
||||
print(f"[aggregate_logs] copied to {OUTPUT_PATH}")
|
||||
@push_to_cheetah_dataset() # DAG 완료 시 Cheetah Dataset push 자동 연결
|
||||
with DAG(
|
||||
dag_id="'dp-cj'",
|
||||
default_args={"retries": 1},
|
||||
schedule="@once",
|
||||
start_date=datetime(2025, 8, 11, tzinfo=KST),
|
||||
catchup=False,
|
||||
) as dag:
|
||||
|
||||
t1 = PythonOperator(
|
||||
task_id="task1",
|
||||
python_callable=log_task,
|
||||
op_kwargs={"task_name": "Task 1", "log_path": LOG_FILES[0]},
|
||||
)
|
||||
|
||||
@cheetah_dag()
|
||||
def dp_cj_dag():
|
||||
with DAG(
|
||||
dag_id='dp-cj',
|
||||
default_args={"retries": 1},
|
||||
schedule="@once",
|
||||
start_date=datetime(2025, 8, 11, tzinfo=KST),
|
||||
catchup=False,
|
||||
) as dag:
|
||||
t2 = PythonOperator(
|
||||
task_id="task2",
|
||||
python_callable=log_task,
|
||||
op_kwargs={"task_name": "Task 2", "log_path": LOG_FILES[1]},
|
||||
)
|
||||
|
||||
t1 = PythonOperator(
|
||||
task_id="task1",
|
||||
python_callable=log_task,
|
||||
op_kwargs={"task_name": "Task 1", "log_path": LOG_FILES[0]},
|
||||
)
|
||||
|
||||
t2 = PythonOperator(
|
||||
task_id="task2",
|
||||
python_callable=log_task,
|
||||
op_kwargs={"task_name": "Task 2", "log_path": LOG_FILES[1]},
|
||||
)
|
||||
|
||||
aggregate = PythonOperator(
|
||||
task_id="aggregate_logs",
|
||||
python_callable=aggregate_logs,
|
||||
)
|
||||
|
||||
t1 >> t2 >> aggregate
|
||||
|
||||
return dag
|
||||
|
||||
dag = dp_cj_dag()
|
||||
t1 >> t2
|
||||
|
||||
Loading…
Reference in New Issue
Block a user