65 lines
2.1 KiB
Python
65 lines
2.1 KiB
Python
from typing import List
|
|
|
|
import aiohttp
|
|
|
|
from vkbottle import API
|
|
from vkbottle.bot import Message
|
|
from vkbottle_types.codegen.objects import PhotosPhotoSizes
|
|
from vkbottle_types.objects import MessagesMessageAttachmentType
|
|
|
|
import ai_agent
|
|
import utils
|
|
|
|
|
|
class MyAPI(API):
|
|
def __init__(self, bot_id: int, api_token: str):
|
|
super().__init__(api_token)
|
|
self.bot_id = bot_id
|
|
|
|
|
|
def get_bot_id(api: API) -> int:
|
|
my_api: MyAPI = api
|
|
return my_api.bot_id
|
|
|
|
|
|
async def get_user_name_for_ai(api: API, user_id: int):
|
|
user = await api.users.get(user_ids=[user_id])
|
|
if len(user) == 1:
|
|
return "{} {}".format(user[0].first_name, user[0].last_name)
|
|
else:
|
|
return '@id' + str(user_id)
|
|
|
|
|
|
async def download_photo(photos: List[PhotosPhotoSizes]) -> bytes:
|
|
max_photo_size = 16*1024*1024
|
|
async with aiohttp.ClientSession() as session:
|
|
for size_type in ['w', 'z', 'y', 'x', 'm', 's']:
|
|
for photo in photos:
|
|
if photo.type != size_type:
|
|
continue
|
|
async with session.head(photo.url) as response:
|
|
if response.status != 200 or response.content_length > max_photo_size:
|
|
break
|
|
async with session.get(photo.url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
break
|
|
|
|
raise RuntimeError(f"Failed to download photo. Status code: {response.status}")
|
|
|
|
|
|
async def create_ai_message(message: Message) -> ai_agent.Message:
|
|
ai_message = ai_agent.Message()
|
|
ai_message.message_id = message.conversation_message_id
|
|
ai_message.user_name = await get_user_name_for_ai(message.ctx_api, message.from_id)
|
|
if message.text is not None:
|
|
ai_message.text = message.text
|
|
for attachment in message.attachments:
|
|
if attachment.type == MessagesMessageAttachmentType.PHOTO:
|
|
ai_message.image = await download_photo(attachment.photo.sizes)
|
|
break
|
|
else:
|
|
continue
|
|
if ai_message.text is None and ai_message.image is None:
|
|
raise utils.UnsupportedContentType()
|
|
return ai_message
|