94 lines
3.1 KiB
Python
94 lines
3.1 KiB
Python
from airflow.decorators import dag, task
|
|
from pendulum import datetime
|
|
from airflow.operators.python import get_current_context
|
|
|
|
import csv
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import logging
|
|
import requests
|
|
from git import Repo
|
|
from urllib.parse import quote
|
|
|
|
|
|
@dag(
|
|
dag_id="csv_to_json_git_push",
|
|
schedule="@once",
|
|
start_date=datetime(2025, 8, 14),
|
|
catchup=False,
|
|
params={
|
|
"csv_url": "https://raw.githubusercontent.com/woogys/test/main/username.csv",
|
|
"branch": "main",
|
|
"file_path": "output/username2.json"
|
|
}
|
|
)
|
|
def csv_to_git():
|
|
|
|
@task
|
|
def fetch_csv_and_convert_to_json() -> list[dict]:
|
|
ctx = get_current_context()
|
|
csv_url = ctx["params"]["csv_url"]
|
|
|
|
logging.info(f"Downloading CSV from: {csv_url}")
|
|
response = requests.get(csv_url)
|
|
response.raise_for_status()
|
|
|
|
decoded_content = response.content.decode("utf-8").splitlines()
|
|
reader = csv.DictReader(decoded_content, delimiter=";")
|
|
data = list(reader)
|
|
logging.info("Converted %d rows to JSON", len(data))
|
|
return data
|
|
|
|
@task
|
|
def push_to_git(payload: list[dict], branch: str = None, file_path: str = None):
|
|
from airflow.hooks.base import BaseHook
|
|
|
|
ctx = get_current_context()
|
|
branch = branch or ctx["params"]["branch"]
|
|
file_path = file_path or ctx["params"]["file_path"]
|
|
|
|
connection = BaseHook.get_connection("git")
|
|
extra = connection.extra_dejson or {}
|
|
repo_url = extra.get("repo_url") or connection.host
|
|
if not repo_url:
|
|
raise ValueError("git Connection에 repo_url/host가 설정되어 있지 않습니다.")
|
|
|
|
authed_url = repo_url
|
|
if repo_url.startswith("http") and connection.password:
|
|
user = connection.login or "oauth2"
|
|
token = quote(connection.password, safe="")
|
|
authed_url = repo_url.replace("https://", f"https://{user}:{token}@")
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
repo = Repo.clone_from(authed_url, tmp)
|
|
|
|
remote_branches = repo.git.branch("-r")
|
|
exists = f"origin/{branch}" in remote_branches
|
|
repo.git.checkout("-B", branch, f"origin/{branch}" if exists else None)
|
|
|
|
abs_path = os.path.join(tmp, file_path)
|
|
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
|
|
with open(abs_path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
|
|
repo.git.add(file_path)
|
|
repo.git.config("user.name", "airflow-bot")
|
|
repo.git.config("user.email", "airflow@local")
|
|
|
|
try:
|
|
repo.git.pull("--rebase", "origin", branch, "--ff-only")
|
|
except Exception as e:
|
|
logging.warning("Git pull skipped: %s", e)
|
|
|
|
if repo.is_dirty(untracked_files=True):
|
|
repo.git.commit("-m", f"Add JSON export ({file_path})")
|
|
repo.git.push("origin", branch)
|
|
else:
|
|
logging.info("No changes to commit")
|
|
|
|
data = fetch_csv_and_convert_to_json()
|
|
push_to_git(data)
|
|
|
|
|
|
dag = csv_to_git() |