52 lines
1.6 KiB
Python
52 lines
1.6 KiB
Python
import io
|
|
from typing import Optional
|
|
|
|
from aiogram import Bot
|
|
from aiogram.enums import ContentType
|
|
from aiogram.types import User, PhotoSize, Message
|
|
|
|
import ai_agent
|
|
import utils
|
|
|
|
|
|
async def get_user_name_for_ai(user: User):
|
|
if user.first_name and user.last_name:
|
|
return "{} {}".format(user.first_name, user.last_name)
|
|
elif user.first_name:
|
|
return user.first_name
|
|
elif user.username:
|
|
return user.username
|
|
else:
|
|
return str(user.id)
|
|
|
|
|
|
async def download_photo(photo: PhotoSize, bot: Bot) -> bytes:
|
|
# noinspection PyTypeChecker
|
|
photo_bytes: io.BytesIO = await bot.download(photo.file_id)
|
|
return photo_bytes.getvalue()
|
|
|
|
|
|
def get_message_text(message: Message) -> Optional[str]:
|
|
if message.content_type == ContentType.TEXT:
|
|
return message.text
|
|
elif message.content_type == ContentType.PHOTO:
|
|
return message.caption
|
|
else:
|
|
return None
|
|
|
|
|
|
async def create_ai_message(message: Message, bot: Bot) -> ai_agent.Message:
|
|
ai_message = ai_agent.Message()
|
|
ai_message.message_id = message.message_id
|
|
ai_message.user_name = await get_user_name_for_ai(message.from_user)
|
|
if message.content_type == ContentType.TEXT:
|
|
ai_message.text = message.text
|
|
elif message.content_type == ContentType.PHOTO:
|
|
if message.media_group_id is None:
|
|
ai_message.text = message.caption
|
|
ai_message.image = await download_photo(message.photo[-1], bot)
|
|
else:
|
|
raise utils.UnsupportedContentType()
|
|
else:
|
|
raise utils.UnsupportedContentType()
|
|
return ai_message
|