airflow/dags/soosoo/csv_to_json_git_push.py

101 lines
3.2 KiB
Python

from airflow.decorators import dag, task
from pendulum import datetime
from airflow.operators.python import get_current_context
import csv
import json
import tempfile
import os
import logging
import requests
from git import Repo
from urllib.parse import quote
@dag(
dag_id="csv_to_json_git_push",
schedule="@once",
start_date=datetime(2025, 8, 14),
catchup=False,
params={
"csv_url": "https://raw.githubusercontent.com/woogys/test/main/username.csv",
"branch": "main",
"file_path": "output/username2.json"
},
access_control={
"DAG_Test_Access": {
"can_read",
"can_edit",
"can_dag_run"
}
}
)
def csv_to_git():
@task
def fetch_csv_and_convert_to_json() -> list[dict]:
ctx = get_current_context()
csv_url = ctx["params"]["csv_url"]
logging.info(f"Downloading CSV from: {csv_url}")
response = requests.get(csv_url)
response.raise_for_status()
decoded_content = response.content.decode("utf-8").splitlines()
reader = csv.DictReader(decoded_content, delimiter=";")
data = list(reader)
logging.info("Converted %d rows to JSON", len(data))
return data
@task
def push_to_git(payload: list[dict], branch: str = None, file_path: str = None):
from airflow.hooks.base import BaseHook
ctx = get_current_context()
branch = branch or ctx["params"]["branch"]
file_path = file_path or ctx["params"]["file_path"]
connection = BaseHook.get_connection("git")
extra = connection.extra_dejson or {}
repo_url = extra.get("repo_url") or connection.host
if not repo_url:
raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.")
authed_url = repo_url
if repo_url.startswith("http") and connection.password:
user = connection.login or "oauth2"
token = quote(connection.password, safe="")
authed_url = repo_url.replace("https://", f"https://{user}:{token}@")
with tempfile.TemporaryDirectory() as tmp:
repo = Repo.clone_from(authed_url, tmp)
remote_branches = repo.git.branch("-r")
exists = f"origin/{branch}" in remote_branches
repo.git.checkout("-B", branch, f"origin/{branch}" if exists else None)
abs_path = os.path.join(tmp, file_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
repo.git.add(file_path)
repo.git.config("user.name", "airflow-bot")
repo.git.config("user.email", "airflow@local")
try:
repo.git.pull("--rebase", "origin", branch, "--ff-only")
except Exception as e:
logging.warning("Git pull skipped: %s", e)
if repo.is_dirty(untracked_files=True):
repo.git.commit("-m", f"Add JSON export ({file_path})")
repo.git.push("origin", branch)
else:
logging.info("No changes to commit")
data = fetch_csv_and_convert_to_json()
push_to_git(data)
dag = csv_to_git()