From ed9a6d3456d376e996726bd5fe6377eb435c26e3 Mon Sep 17 00:00:00 2001 From: cheetahadmin Date: Thu, 28 Aug 2025 04:26:16 +0000 Subject: [PATCH] Add dags/groupuser/mysql_fetch_and_push_to_git.py --- dags/groupuser/mysql_fetch_and_push_to_git.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 dags/groupuser/mysql_fetch_and_push_to_git.py diff --git a/dags/groupuser/mysql_fetch_and_push_to_git.py b/dags/groupuser/mysql_fetch_and_push_to_git.py new file mode 100644 index 0000000..14b5b36 --- /dev/null +++ b/dags/groupuser/mysql_fetch_and_push_to_git.py @@ -0,0 +1,93 @@ +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from pendulum import datetime +import json, os, tempfile, logging +from urllib.parse import quote + +@dag( + dag_id="mysql_fetch_and_push_to_git", + schedule="@once", + start_date=datetime(2025, 8, 12), + catchup=False, + params={"branch": "main", "file_path": "output/output_dataset.json"}, + access_control={ + "DAG_Test_2_Access": { + "can_read", + "can_edit", + "can_dag_run" + }, + } +) +def mysql_to_git_flow(): + + @task + def fetch_from_mysql(): + from airflow.providers.mysql.hooks.mysql import MySqlHook + hook = MySqlHook(mysql_conn_id="cheetah_db") + rows = hook.get_records(""" + SELECT + DATASET_SN AS datasetSn, + DATASET_NAME AS datasetName + FROM dataset + LIMIT 10 + """) + payload = [dict(zip(["datasetSn","datasetName"], r)) for r in rows] + logging.info("Fetched %d rows", len(payload)) + return payload + + @task + def push_to_git(payload: list[dict], branch: str | None = None, file_path: str | None = None): + from airflow.hooks.base import BaseHook + from git import Repo + + ctx = get_current_context() + p_branch = branch or ctx["params"]["branch"] + p_file_path = file_path or ctx["params"]["file_path"] + + if not p_branch or "{" in p_branch: + raise ValueError(f"Invalid branch value: {p_branch!r}") + if not p_file_path or "{" in p_file_path: + raise ValueError(f"Invalid file_path value: {p_file_path!r}") + + connection = BaseHook.get_connection("git") + extra = connection.extra_dejson or {} + repo_url = extra.get("repo_url") or connection.host + if not repo_url: + raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.") + + authed_url = repo_url + if repo_url.startswith("http") and connection.password: + user = connection.login or "oauth2" + token = quote(connection.password, safe="") + authed_url = repo_url.replace("https://", f"https://{user}:{token}@") + + with tempfile.TemporaryDirectory() as tmp: + repo = Repo.clone_from(authed_url, tmp) + + remote_branches = repo.git.branch("-r") + exists = f"origin/{p_branch}" in remote_branches + repo.git.checkout("-B", p_branch, f"origin/{p_branch}" if exists else None) + + abs_path = os.path.join(tmp, p_file_path) + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + with open(abs_path, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + repo.git.add(p_file_path) + repo.git.config("user.name", "airflow-bot") + repo.git.config("user.email", "airflow@local") + try: + repo.git.pull("--rebase", "origin", p_branch, "--ff-only") + except Exception as e: + logging.info("pull skipped: %s", e) + + if repo.is_dirty(untracked_files=True): + repo.git.commit("-m", f"Add dataset export ({p_file_path})") + repo.git.push("origin", p_branch) + else: + logging.info("No changes; skipping push") + + data = fetch_from_mysql() + push_to_git(data) + +dag = mysql_to_git_flow() \ No newline at end of file