from airflow.decorators import dag, task from airflow.operators.python import get_current_context from pendulum import datetime import json, os, tempfile, logging from urllib.parse import quote @dag( dag_id="mysql_fetch_and_push_to_git", schedule="@once", start_date=datetime(2025, 8, 12), catchup=False, params={"branch": "main", "file_path": "output/output_dataset.json"}, access_control={ "DAG_Test_2_Access": { "can_read", "can_edit", "can_dag_run" }, } ) def mysql_to_git_flow(): @task def fetch_from_mysql(): from airflow.providers.mysql.hooks.mysql import MySqlHook hook = MySqlHook(mysql_conn_id="cheetah_db") rows = hook.get_records(""" SELECT DATASET_SN AS datasetSn, DATASET_NAME AS datasetName FROM dataset LIMIT 10 """) payload = [dict(zip(["datasetSn","datasetName"], r)) for r in rows] logging.info("Fetched %d rows", len(payload)) return payload @task def push_to_git(payload: list[dict], branch: str | None = None, file_path: str | None = None): from airflow.hooks.base import BaseHook from git import Repo ctx = get_current_context() p_branch = branch or ctx["params"]["branch"] p_file_path = file_path or ctx["params"]["file_path"] if not p_branch or "{" in p_branch: raise ValueError(f"Invalid branch value: {p_branch!r}") if not p_file_path or "{" in p_file_path: raise ValueError(f"Invalid file_path value: {p_file_path!r}") connection = BaseHook.get_connection("git") extra = connection.extra_dejson or {} repo_url = extra.get("repo_url") or connection.host if not repo_url: raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.") authed_url = repo_url if repo_url.startswith("http") and connection.password: user = connection.login or "oauth2" token = quote(connection.password, safe="") authed_url = repo_url.replace("https://", f"https://{user}:{token}@") with tempfile.TemporaryDirectory() as tmp: repo = Repo.clone_from(authed_url, tmp) remote_branches = repo.git.branch("-r") exists = f"origin/{p_branch}" in remote_branches repo.git.checkout("-B", p_branch, f"origin/{p_branch}" if exists else None) abs_path = os.path.join(tmp, p_file_path) os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) repo.git.add(p_file_path) repo.git.config("user.name", "airflow-bot") repo.git.config("user.email", "airflow@local") try: repo.git.pull("--rebase", "origin", p_branch, "--ff-only") except Exception as e: logging.info("pull skipped: %s", e) if repo.is_dirty(untracked_files=True): repo.git.commit("-m", f"Add dataset export ({p_file_path})") repo.git.push("origin", p_branch) else: logging.info("No changes; skipping push") data = fetch_from_mysql() push_to_git(data) dag = mysql_to_git_flow()