from functools import partial from typing import List, Any from aiogram import Bot, Router, F from aiogram.enums import ContentType from aiogram.types import Message from aiogram.utils.formatting import Bold, Italic import ai_agent import utils from messages import * import tg.tg_database as database from tg.utils import get_user_name_for_ai router = Router() # top_users - массив sqlite3.Row с двумя столбцами: user_id и value async def format_rating(chat_id: int, top_users: List[Any], bot: Bot) -> str: result = '' i = 1 for user in top_users: info = await bot.get_chat_member(chat_id, user['user_id']) result += '{}. {} - {}\n'.format(i, utils.full_name(info.user.first_name, info.user.last_name), user['value']) i = i + 1 return result def calculate_total_messages(top_users: List[Any]) -> int: total = 0 for user in top_users: total += user['value'] return total @router.message(F.text == '!помощь') async def help_handler(message: Message): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return response = Bold('Команды для всех') + '\n' response += '!я - показать свою визитку\n' response += '!я* - изменить свою визитку\n' response += '!кто* - показать визитку участника\n' response += '!правила - вывести правила\n' response += '!сегодня - статистика сообщений за сегодня\n' response += '!месяц - статистика сообщений за месяц\n' response += '!молчуны - список молчунов\n' response += '!предупреждения - список участников с предупреждениями\n' response += '!проверка* - проверить сообщения на нарушение правил\n' response += '\n' response += Bold('Команды для администраторов') + '\n' response += '!старт - начать работу в чате\n' response += '!правила* - изменить правила\n' response += '!приветствие* - изменить приветствие новичков\n' response += '!личность* - изменить описание личности ИИ\n' response += '!предупреждение* - выдать предупреждение участнику\n' response += '\n' response += Italic('Команды с пометкой * нужно вызывать в ответном сообщении.') await message.answer(**response.as_kwargs()) @router.message(F.text == "!я") async def about_handler(message: Message): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return target_user = message.from_user user = database.DB.create_user_if_not_exists(chat_id, target_user.id) if message.reply_to_message is None: if user['about'] is not None: response = Italic(utils.full_name(target_user.first_name, target_user.last_name)) + '\n' + user['about'] await message.answer(**response.as_kwargs()) else: await message.answer('Вы не установили визитку.') else: if message.reply_to_message.content_type == ContentType.TEXT: database.DB.user_update(chat_id, target_user.id, about=message.reply_to_message.text) await message.answer('Визитка изменена.') else: await message.answer('Извините, но я понимаю только текст.') @router.message(F.text == "!кто") async def whois_handler(message: Message): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return if message.reply_to_message is None: await message.answer(MESSAGE_NEED_REPLY) return target_user = message.reply_to_message.from_user full_name = utils.full_name(target_user.first_name, target_user.last_name) user = database.DB.get_user(chat_id, target_user.id) if user is not None and user['about'] is not None: response = Italic(full_name) + '\n' + user['about'] await message.answer(**response.as_kwargs()) else: await message.answer(f'У {full_name} нет визитки.') @router.message(F.text == "!сегодня") async def stats_today_handler(message: Message, bot: Bot): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_messages_today(chat_id) if len(top_users) == 0: await message.answer('Сегодня еще никто не писал.') return response = Bold('Статистика за сегодня') + '\n' response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users)) response += await format_rating(chat_id, top_users, bot) await message.answer(**response.as_kwargs()) @router.message(F.text == "!месяц") async def stats_month_handler(message: Message, bot: Bot): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_messages_month(chat_id) if len(top_users) == 0: await message.answer('В этом месяце еще никто не писал.') return response = Bold('Статистика за месяц') + '\n' response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users)) response += await format_rating(chat_id, top_users, bot) await message.answer(**response.as_kwargs()) @router.message(F.text == "!молчуны") async def silent_handler(message: Message, bot: Bot): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_messages_today(chat_id) if len(top_users) == 0: await message.answer('Молчунов нет. Все молодцы!') return response = Bold('Молчуны чата') + ' (не писали более 14 дней)\n' response += await format_rating(chat_id, top_users, bot) await message.answer(**response.as_kwargs()) @router.message(F.text == "!предупреждения") async def warnings_handler(message: Message, bot: Bot): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_warnings(chat_id) if len(top_users) == 0: await message.answer('Пока все спокойно. Продолжайте в том же духе.') return response = Bold('Участники с предупреждениями') + '\n' response += await format_rating(chat_id, top_users, bot) await message.answer(**response.as_kwargs()) @router.message(F.text == "!проверка") async def check_rules_violation_handler(message: Message, bot: Bot): chat_id = message.chat.id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return if message.reply_to_message is None or message.reply_to_message.content_type != ContentType.TEXT: await message.answer(MESSAGE_NEED_REPLY) return chat_rules = chat['rules'] if chat_rules is None: await message.answer(MESSAGE_DEFAULT_CHECK_RULES) return prompt = 'В чате действуют следующие правила:\n' prompt += chat_rules + '\n\n' prompt += 'Проверь, не нарушают ли правила следующие сообщения (если нарушают, то укажи пункты правил):' chat_prompt = chat['ai_prompt'] ai_message = ai_agent.Message(user_name=await get_user_name_for_ai(message.from_user), text=prompt) ai_fwd_messages = [ai_agent.Message(user_name=await get_user_name_for_ai(message.reply_to_message.from_user), text=message.reply_to_message.text)] await message.answer( await utils.run_with_progress( partial(ai_agent.agent.get_group_chat_reply, chat_id, chat_prompt, ai_message, ai_fwd_messages), partial(bot.send_chat_action, chat_id, 'typing'), interval=4))