From f7787db10e6eea9285bd7f10f6ee21bca86ceeac Mon Sep 17 00:00:00 2001 From: cheetahadmin Date: Thu, 28 Aug 2025 04:28:51 +0000 Subject: [PATCH] Add dags/soosoo/csv_to_json_git_push.py --- dags/soosoo/csv_to_json_git_push.py | 101 ++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 dags/soosoo/csv_to_json_git_push.py diff --git a/dags/soosoo/csv_to_json_git_push.py b/dags/soosoo/csv_to_json_git_push.py new file mode 100644 index 0000000..2583dfc --- /dev/null +++ b/dags/soosoo/csv_to_json_git_push.py @@ -0,0 +1,101 @@ +from airflow.decorators import dag, task +from pendulum import datetime +from airflow.operators.python import get_current_context + +import csv +import json +import tempfile +import os +import logging +import requests +from git import Repo +from urllib.parse import quote + + +@dag( + dag_id="csv_to_json_git_push", + schedule="@once", + start_date=datetime(2025, 8, 14), + catchup=False, + params={ + "csv_url": "https://raw.githubusercontent.com/woogys/test/main/username.csv", + "branch": "main", + "file_path": "output/username2.json" + }, + access_control={ + "DAG_Test_Access": { + "can_read", + "can_edit", + "can_dag_run" + } + } +) +def csv_to_git(): + + @task + def fetch_csv_and_convert_to_json() -> list[dict]: + ctx = get_current_context() + csv_url = ctx["params"]["csv_url"] + + logging.info(f"Downloading CSV from: {csv_url}") + response = requests.get(csv_url) + response.raise_for_status() + + decoded_content = response.content.decode("utf-8").splitlines() + reader = csv.DictReader(decoded_content, delimiter=";") + data = list(reader) + logging.info("Converted %d rows to JSON", len(data)) + return data + + @task + def push_to_git(payload: list[dict], branch: str = None, file_path: str = None): + from airflow.hooks.base import BaseHook + + ctx = get_current_context() + branch = branch or ctx["params"]["branch"] + file_path = file_path or ctx["params"]["file_path"] + + connection = BaseHook.get_connection("git") + extra = connection.extra_dejson or {} + repo_url = extra.get("repo_url") or connection.host + if not repo_url: + raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.") + + authed_url = repo_url + if repo_url.startswith("http") and connection.password: + user = connection.login or "oauth2" + token = quote(connection.password, safe="") + authed_url = repo_url.replace("https://", f"https://{user}:{token}@") + + with tempfile.TemporaryDirectory() as tmp: + repo = Repo.clone_from(authed_url, tmp) + + remote_branches = repo.git.branch("-r") + exists = f"origin/{branch}" in remote_branches + repo.git.checkout("-B", branch, f"origin/{branch}" if exists else None) + + abs_path = os.path.join(tmp, file_path) + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + with open(abs_path, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + repo.git.add(file_path) + repo.git.config("user.name", "airflow-bot") + repo.git.config("user.email", "airflow@local") + + try: + repo.git.pull("--rebase", "origin", branch, "--ff-only") + except Exception as e: + logging.warning("Git pull skipped: %s", e) + + if repo.is_dirty(untracked_files=True): + repo.git.commit("-m", f"Add JSON export ({file_path})") + repo.git.push("origin", branch) + else: + logging.info("No changes to commit") + + data = fetch_csv_and_convert_to_json() + push_to_git(data) + + +dag = csv_to_git() \ No newline at end of file