86 lines
3.2 KiB
Python
86 lines
3.2 KiB
Python
from airflow.decorators import dag, task
|
|
from airflow.operators.python import get_current_context
|
|
from pendulum import datetime
|
|
import json, os, tempfile, logging
|
|
from urllib.parse import quote
|
|
|
|
@dag(
|
|
dag_id="mysql_fetch_and_push_to_git",
|
|
schedule="@once",
|
|
start_date=datetime(2025, 8, 12),
|
|
catchup=False,
|
|
params={"branch": "main", "file_path": "output/output_dataset.json"},
|
|
)
|
|
def mysql_to_git_flow():
|
|
|
|
@task
|
|
def fetch_from_mysql():
|
|
from airflow.providers.mysql.hooks.mysql import MySqlHook
|
|
hook = MySqlHook(mysql_conn_id="cheetah_db")
|
|
rows = hook.get_records("""
|
|
SELECT
|
|
DATASET_SN AS datasetSn,
|
|
DATASET_NAME AS datasetName
|
|
FROM dataset
|
|
LIMIT 10
|
|
""")
|
|
payload = [dict(zip(["datasetSn","datasetName"], r)) for r in rows]
|
|
logging.info("Fetched %d rows", len(payload))
|
|
return payload
|
|
|
|
@task
|
|
def push_to_git(payload: list[dict], branch: str | None = None, file_path: str | None = None):
|
|
from airflow.hooks.base import BaseHook
|
|
from git import Repo
|
|
|
|
ctx = get_current_context()
|
|
p_branch = branch or ctx["params"]["branch"]
|
|
p_file_path = file_path or ctx["params"]["file_path"]
|
|
|
|
if not p_branch or "{" in p_branch:
|
|
raise ValueError(f"Invalid branch value: {p_branch!r}")
|
|
if not p_file_path or "{" in p_file_path:
|
|
raise ValueError(f"Invalid file_path value: {p_file_path!r}")
|
|
|
|
connection = BaseHook.get_connection("git")
|
|
extra = connection.extra_dejson or {}
|
|
repo_url = extra.get("repo_url") or connection.host
|
|
if not repo_url:
|
|
raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.")
|
|
|
|
authed_url = repo_url
|
|
if repo_url.startswith("http") and connection.password:
|
|
user = connection.login or "oauth2"
|
|
token = quote(connection.password, safe="")
|
|
authed_url = repo_url.replace("https://", f"https://{user}:{token}@")
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
repo = Repo.clone_from(authed_url, tmp)
|
|
|
|
remote_branches = repo.git.branch("-r")
|
|
exists = f"origin/{p_branch}" in remote_branches
|
|
repo.git.checkout("-B", p_branch, f"origin/{p_branch}" if exists else None)
|
|
|
|
abs_path = os.path.join(tmp, p_file_path)
|
|
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
|
|
with open(abs_path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
|
|
repo.git.add(p_file_path)
|
|
repo.git.config("user.name", "airflow-bot")
|
|
repo.git.config("user.email", "airflow@local")
|
|
try:
|
|
repo.git.pull("--rebase", "origin", p_branch, "--ff-only")
|
|
except Exception as e:
|
|
logging.info("pull skipped: %s", e)
|
|
|
|
if repo.is_dirty(untracked_files=True):
|
|
repo.git.commit("-m", f"Add dataset export ({p_file_path})")
|
|
repo.git.push("origin", p_branch)
|
|
else:
|
|
logging.info("No changes; skipping push")
|
|
|
|
data = fetch_from_mysql()
|
|
push_to_git(data)
|
|
|
|
dag = mysql_to_git_flow() |