Свободные функции перемещены в конец файла.

This commit is contained in:
Kirill Kirilenko 2026-03-01 22:59:35 +03:00
parent 11367c6c86
commit e05fb28246

View file

@ -40,65 +40,6 @@ class Message:
message_id: int = None message_id: int = None
def _add_message_prefix(text: Optional[str], username: Optional[str] = None) -> str:
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M")
prefix = f"[{current_time}, {username}]" if username is not None else f"[{current_time}]"
return f"{prefix}: {text}" if text is not None else f"{prefix}:"
def _encode_image(image: bytes) -> str:
encoded_image = base64.b64encode(image).decode('utf-8')
return f"data:image/jpeg;base64,{encoded_image}"
def _serialize_message(role: str, text: Optional[str], image: Optional[bytes]) -> dict:
serialized = {"role": role, "content": []}
if text is not None:
serialized["content"].append({"type": "text", "text": text})
if image is not None:
serialized["content"].append({"type": "image_url", "image_url": {"url": _encode_image(image)}})
return serialized
def _remove_none_recursive(data: Union[dict, list, any]) -> Union[dict, list, any]:
if isinstance(data, dict):
return {
k: _remove_none_recursive(v)
for k, v in data.items()
if v is not None
}
elif isinstance(data, list):
return [
_remove_none_recursive(item)
for item in data
if item is not None
]
else:
return data
def _serialize_assistant_message(message: AssistantMessage) -> AssistantMessageTypedDict:
return _remove_none_recursive(message.model_dump(by_alias=True))
def _get_resolution_for_aspect_ratio(aspect_ratio: str) -> Tuple[int, int]:
aspect_ratio_resolution_map = {
"1:1": (1280, 1280),
"4:3": (1280, 1024),
"3:4": (1024, 1280),
"16:9": (1280, 720),
"9:16": (720, 1280)
}
return aspect_ratio_resolution_map.get(aspect_ratio, (1280, 1024))
def _convert_image_to_jpeg(image: bytes) -> bytes:
img = Image.open(BytesIO(image)).convert("RGB")
output = BytesIO()
img.save(output, format='JPEG', quality=95, optimize=True)
return output.getvalue()
class AiAgent: class AiAgent:
def __init__(self, def __init__(self,
openrouter_token: str, openrouter_model: str, openrouter_token: str, openrouter_model: str,
@ -423,3 +364,62 @@ def create_ai_agent(openrouter_token: str, openrouter_model: str,
db: BasicDatabase, platform: str): db: BasicDatabase, platform: str):
global agent global agent
agent = AiAgent(openrouter_token, openrouter_model, fal_token, replicate_token, db, platform) agent = AiAgent(openrouter_token, openrouter_model, fal_token, replicate_token, db, platform)
def _add_message_prefix(text: Optional[str], username: Optional[str] = None) -> str:
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M")
prefix = f"[{current_time}, {username}]" if username is not None else f"[{current_time}]"
return f"{prefix}: {text}" if text is not None else f"{prefix}:"
def _encode_image(image: bytes) -> str:
encoded_image = base64.b64encode(image).decode('utf-8')
return f"data:image/jpeg;base64,{encoded_image}"
def _serialize_message(role: str, text: Optional[str], image: Optional[bytes]) -> dict:
serialized = {"role": role, "content": []}
if text is not None:
serialized["content"].append({"type": "text", "text": text})
if image is not None:
serialized["content"].append({"type": "image_url", "image_url": {"url": _encode_image(image)}})
return serialized
def _remove_none_recursive(data: Union[dict, list, any]) -> Union[dict, list, any]:
if isinstance(data, dict):
return {
k: _remove_none_recursive(v)
for k, v in data.items()
if v is not None
}
elif isinstance(data, list):
return [
_remove_none_recursive(item)
for item in data
if item is not None
]
else:
return data
def _serialize_assistant_message(message: AssistantMessage) -> AssistantMessageTypedDict:
return _remove_none_recursive(message.model_dump(by_alias=True))
def _get_resolution_for_aspect_ratio(aspect_ratio: str) -> Tuple[int, int]:
aspect_ratio_resolution_map = {
"1:1": (1280, 1280),
"4:3": (1280, 1024),
"3:4": (1024, 1280),
"16:9": (1280, 720),
"9:16": (720, 1280)
}
return aspect_ratio_resolution_map.get(aspect_ratio, (1280, 1024))
def _convert_image_to_jpeg(image: bytes) -> bytes:
img = Image.open(BytesIO(image)).convert("RGB")
output = BytesIO()
img.save(output, format='JPEG', quality=95, optimize=True)
return output.getvalue()