266 lines
11 KiB
Python
266 lines
11 KiB
Python
import json
|
|
import torch
|
|
import numpy as np
|
|
import triton_python_backend_utils as pb_utils
|
|
from typing import List, Dict, Any, Union, Tuple
|
|
|
|
from transformers import (
|
|
AutoModelForCausalLM,
|
|
AutoTokenizer,
|
|
GenerationConfig,
|
|
BitsAndBytesConfig,
|
|
)
|
|
from peft import PeftModel, PeftConfig
|
|
|
|
class TritonPythonModel:
|
|
def initialize(self, args: Dict[str, str]):
|
|
"""
|
|
모델 초기화: 설정 로드, 로거 설정, 모델 및 토크나이저 로드
|
|
"""
|
|
self.logger = pb_utils.Logger
|
|
self.model_config = json.loads(args["model_config"])
|
|
self.model_name = args["model_name"]
|
|
|
|
# 설정 파라미터 로드
|
|
self.base_model_path = self._get_config_param("base_model_path")
|
|
self.is_adapter_model = self._get_config_param("is_adapter_model", "false").lower() == "true"
|
|
self.adapter_model_path = self._get_config_param("adapter_model_path")
|
|
self.quantization = self._get_config_param("quantization")
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
# 로그 출력
|
|
self.logger.log_info(f"================ {self.model_name} Setup ================")
|
|
self.logger.log_info(f"Base Model: {self.base_model_path}")
|
|
self.logger.log_info(f"Adapter Mode: {self.is_adapter_model} ({self.adapter_model_path})")
|
|
self.logger.log_info(f"Quantization: {self.quantization}")
|
|
self.logger.log_info(f"Device: {self.device}")
|
|
|
|
self._load_model_and_tokenizer()
|
|
self.logger.log_info(f"Model initialized successfully.")
|
|
|
|
def _load_model_and_tokenizer(self):
|
|
"""모델과 토크나이저를 로드하고 설정합니다."""
|
|
# 1. Quantization 설정
|
|
bnb_config = self._get_bnb_config()
|
|
|
|
# 2. Base Model 로드
|
|
# Adapter 모델인 경우 Config에서 Base 경로를 덮어쓸 수 있음
|
|
load_path = self.base_model_path
|
|
if self.is_adapter_model:
|
|
peft_config = PeftConfig.from_pretrained(self.adapter_model_path)
|
|
load_path = peft_config.base_model_name_or_path
|
|
|
|
try:
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
load_path,
|
|
torch_dtype=torch.float16,
|
|
quantization_config=bnb_config,
|
|
device_map="auto",
|
|
local_files_only=True,
|
|
trust_remote_code=True
|
|
)
|
|
except Exception as e:
|
|
self.logger.log_error(f"Failed to load base model: {e}")
|
|
raise e
|
|
|
|
# 3. Adapter 병합 (필요 시)
|
|
if self.is_adapter_model:
|
|
self.model = PeftModel.from_pretrained(self.model, self.adapter_model_path)
|
|
|
|
# 추론 모드 설정
|
|
self.model.eval()
|
|
|
|
# 4. Tokenizer 로드
|
|
self.tokenizer = AutoTokenizer.from_pretrained(load_path, trust_remote_code=True)
|
|
|
|
# Pad Token 설정 (없을 경우 EOS로 대체)
|
|
if self.tokenizer.pad_token is None:
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token
|
|
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
|
|
self.logger.log_info("Pad token was None. Set to EOS token.")
|
|
|
|
# Chat Template 지원 여부 확인
|
|
self.supports_chat_template = (
|
|
hasattr(self.tokenizer, "chat_template") and
|
|
self.tokenizer.chat_template is not None
|
|
)
|
|
|
|
def _get_bnb_config(self) -> Union[BitsAndBytesConfig, None]:
|
|
"""양자화 설정 객체를 반환합니다."""
|
|
if self.quantization == "int4":
|
|
return BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_use_double_quant=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.float16
|
|
)
|
|
elif self.quantization == "int8":
|
|
return BitsAndBytesConfig(
|
|
load_in_8bit=True,
|
|
llm_int8_threshold=6.0,
|
|
llm_int8_has_fp16_weight=True
|
|
)
|
|
return None
|
|
|
|
def execute(self, requests):
|
|
"""Triton Inference Request 처리 메인 루프"""
|
|
responses = []
|
|
|
|
for request in requests:
|
|
try:
|
|
# 1. 입력 데이터 파싱
|
|
input_data, is_chat = self._parse_input(request)
|
|
self.logger.log_info(f"\n>>> [Input]: {input_data}")
|
|
|
|
# 2. Generation Config 생성
|
|
gen_config = self._create_generation_config(request)
|
|
|
|
# 3. 토크나이징
|
|
inputs = self._tokenize(input_data, is_chat)
|
|
|
|
# 4. 모델 추론 (Generate)
|
|
output_text = self._generate(inputs, gen_config)
|
|
|
|
# [LOGGING] 모델 응답 로그 출력
|
|
self.logger.log_info(f"\n<<< [Output]: {output_text}")
|
|
|
|
# 5. 응답 생성
|
|
responses.append(self._create_response(output_text))
|
|
|
|
except Exception as e:
|
|
self.logger.log_error(f"Error during execution: {e}")
|
|
# 에러 발생 시 빈 문자열 또는 에러 메시지 반환 (클라이언트 처리에 따라 변경 가능)
|
|
err_tensor = pb_utils.Tensor("text_output", np.array([str(e).encode('utf-8')], dtype=np.bytes_))
|
|
responses.append(pb_utils.InferenceResponse(output_tensors=[err_tensor]))
|
|
|
|
return responses
|
|
|
|
def _parse_input(self, request) -> Tuple[Union[str, List[Dict]], bool]:
|
|
"""입력 텐서를 파싱하여 텍스트 또는 대화 목록과 타입(채팅 여부)을 반환합니다."""
|
|
input_text = self._get_input_scalar(request, "text_input")
|
|
|
|
try:
|
|
# JSON 형식의 대화 기록인지 시도
|
|
conversation = json.loads(input_text)
|
|
# 리스트 형태여야 채팅 히스토리로 간주
|
|
if isinstance(conversation, list):
|
|
return conversation, True
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
return input_text, False
|
|
|
|
def _tokenize(self, input_data, is_chat: bool):
|
|
"""입력 데이터를 토큰화하여 모델 입력 텐서를 반환합니다."""
|
|
if self.supports_chat_template and is_chat:
|
|
return self.tokenizer.apply_chat_template(
|
|
input_data,
|
|
tokenize=True,
|
|
add_generation_prompt=True,
|
|
return_tensors="pt",
|
|
return_dict=True
|
|
).to(self.device)
|
|
else:
|
|
# 일반 텍스트인 경우
|
|
if is_chat: # Chat 형식이지만 템플릿 미지원 시 문자열 변환 시도
|
|
input_data = str(input_data)
|
|
|
|
return self.tokenizer(
|
|
input_data,
|
|
return_tensors="pt"
|
|
).to(self.device)
|
|
|
|
def _generate(self, inputs, gen_config: GenerationConfig) -> str:
|
|
"""모델 생성 로직 수행 및 디코딩"""
|
|
input_ids = inputs["input_ids"]
|
|
input_len = input_ids.shape[-1]
|
|
|
|
with torch.no_grad():
|
|
outputs = self.model.generate(
|
|
**inputs,
|
|
generation_config=gen_config,
|
|
pad_token_id=self.tokenizer.pad_token_id,
|
|
eos_token_id=self.tokenizer.eos_token_id
|
|
)
|
|
|
|
# 입력 토큰을 제외하고 생성된 토큰만 슬라이싱
|
|
generated_tokens = outputs[0][input_len:]
|
|
decoded_output = self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
|
|
|
|
return decoded_output.strip()
|
|
|
|
def _create_generation_config(self, request) -> GenerationConfig:
|
|
"""Request에서 파라미터를 추출하여 GenerationConfig 객체 생성"""
|
|
# 기본값 설정 및 입력값 추출 Helper
|
|
def get_param(name, default=None, cast_type=None):
|
|
val = self._get_input_scalar(request, name, default)
|
|
if val is not None and cast_type:
|
|
return cast_type(val)
|
|
return val
|
|
|
|
return GenerationConfig(
|
|
max_length=get_param("max_length", 1024, int), # max_length 기본값은 넉넉하게
|
|
max_new_tokens=get_param("max_new_tokens", 256, int),
|
|
temperature=get_param("temperature", 1.0, float),
|
|
do_sample=get_param("do_sample", False, bool),
|
|
top_k=get_param("top_k", 50, int),
|
|
top_p=get_param("top_p", 1.0, float),
|
|
repetition_penalty=get_param("repetition_penalty", 1.0, float),
|
|
# stream=get_param("stream", False, bool) # Python Backend에서 Stream은 별도 구현 필요
|
|
)
|
|
|
|
def _create_response(self, output_text: str):
|
|
"""생성된 텍스트를 Triton Response 객체로 변환"""
|
|
output_tensor = pb_utils.Tensor(
|
|
"text_output",
|
|
np.array([output_text.encode('utf-8')], dtype=np.bytes_)
|
|
)
|
|
return pb_utils.InferenceResponse(output_tensors=[output_tensor])
|
|
|
|
def _get_config_param(self, key: str, default: str = None) -> str:
|
|
"""config.pbtxt 파라미터 조회 Helper"""
|
|
params = self.model_config.get('parameters', {})
|
|
if key in params:
|
|
return params[key].get('string_value', default)
|
|
return default
|
|
|
|
def _get_input_scalar(self, request, name: str, default=None):
|
|
"""입력 텐서에서 스칼라 값을 추출하는 Helper"""
|
|
tensor = pb_utils.get_input_tensor_by_name(request, name)
|
|
if tensor is None:
|
|
return default
|
|
|
|
#value = tensor.as_numpy().item() # item()을 사용하여 스칼라 값 추출
|
|
value = self._np_decoder(tensor.as_numpy()[0])
|
|
|
|
# 바이트 타입 디코딩
|
|
# if isinstance(value, bytes):
|
|
# return value.decode('utf-8')
|
|
return value
|
|
|
|
def _np_decoder(self, obj):
|
|
"""
|
|
NumPy 객체의 데이터 타입을 확인하고 Python 기본 타입으로 변환합니다.
|
|
|
|
Args:
|
|
obj (numpy.ndarray element): 변환할 NumPy 배열의 요소.
|
|
|
|
Returns:
|
|
any: 해당 NumPy 요소에 대응하는 Python 기본 타입 (str, int, float, bool).
|
|
bytes 타입인 경우 UTF-8로 디코딩합니다.
|
|
"""
|
|
if isinstance(obj, bytes):
|
|
return obj.decode('utf-8')
|
|
if np.issubdtype(obj, np.integer):
|
|
return int(obj)
|
|
if np.issubdtype(obj, np.floating):
|
|
return round(float(obj), 3)
|
|
if isinstance(obj, np.bool_):
|
|
return bool(obj)
|
|
|
|
def finalize(self):
|
|
"""리소스 정리"""
|
|
self.logger.log_info(f"Finalizing model {self.model_name}")
|
|
self.model = None
|
|
self.tokenizer = None
|
|
torch.cuda.empty_cache() |