from functools import partial from typing import List, Any from vkbottle import bold, italic, API from vkbottle.bot import Message from vkbottle.framework.labeler import BotLabeler import ai_agent import utils from messages import * import vk.vk_database as database from vk.utils import get_user_name_for_ai labeler = BotLabeler() # top_users - массив sqlite3.Row с двумя столбцами: user_id и value async def format_rating(top_users: List[Any], api: API) -> str: top_user_ids = [user['user_id'] for user in top_users] users_info = await api.users.get(user_ids=top_user_ids) result = '' i = 1 for user in top_users: for info in users_info: if info.id == user['user_id']: result += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['value']) i = i + 1 break return result def calculate_total_messages(top_users: List[Any]) -> int: total = 0 for user in top_users: total += user['value'] return total # noinspection SpellCheckingInspection @labeler.chat_message(text="!помощь") async def rules_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return response = bold('Команды для всех') + '\n' response += '!правила - вывести правила\n' response += '!сегодня - статистика сообщений за сегодня\n' response += '!месяц - статистика сообщений за месяц\n' response += '!молчуны - список молчунов\n' response += '!предупреждения - список участников с предупреждениями\n' response += '!поздравление - запретить/разрешить поздравлять с днем рождения\n' response += '!проверка* - проверить сообщения на нарушение правил\n' response += '\n' response += bold('Команды для администраторов') + '\n' response += '!старт - начать работу в чате\n' response += '!правила* - изменить правила\n' response += '!приветствие* - изменить приветствие новичков\n' response += '!возвращение* - изменить приветствие при возвращении\n' response += '!деньрождения* - изменить уведомление о дне рождения\n' response += '!личность* - изменить описание личности ИИ\n' response += '!предупреждение* - выдать предупреждение участнику\n' response += '!исключить* - навсегда исключить участника\n' response += '\n' response += italic('Команды с пометкой * нужно вызывать в ответном сообщении.') await message.answer(response) @labeler.chat_message(text="!сегодня") async def stats_today_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_messages_today(chat_id) if len(top_users) == 0: await message.answer('Сегодня еще никто не писал.') return response = bold('Статистика за сегодня') + '\n' response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users)) response += await format_rating(top_users, message.ctx_api) await message.answer(response) @labeler.chat_message(text="!месяц") async def stats_month_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_messages_month(chat_id) if len(top_users) == 0: await message.answer('В этом месяце еще никто не писал.') return response = bold('Статистика за месяц') + '\n' response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users)) response += await format_rating(top_users, message.ctx_api) await message.answer(response) @labeler.chat_message(text="!молчуны") async def silent_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_silent(chat_id, 14) if len(top_users) == 0: await message.answer('Молчунов нет. Все молодцы!') return response = bold('Молчуны чата') + ' (не писали более 14 дней)\n' response += await format_rating(top_users, message.ctx_api) await message.answer(response) @labeler.chat_message(text="!предупреждения") async def warnings_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return top_users = database.DB.get_top_warnings(chat_id) if len(top_users) == 0: await message.answer('Пока все спокойно. Продолжайте в том же духе.') return response = bold('Участники с предупреждениями') + '\n' response += await format_rating(top_users, message.ctx_api) await message.answer(response) @labeler.chat_message(text="!поздравление") async def no_birthday_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return user_id = message.from_id user = database.DB.create_user_if_not_exists(chat_id, user_id) happy_birthday = 1 if user['happy_birthday'] == 0 else 0 database.DB.user_toggle_happy_birthday(chat_id, user_id, happy_birthday) if happy_birthday == 1: await message.answer('Хорошо, я буду поздравлять тебя с днем рождения, если его дата не скрыта.') else: await message.answer('Хорошо, я не буду поздравлять тебя с днем рождения.') @labeler.chat_message(text="!проверка") async def check_rules_violation_handler(message: Message): chat_id = message.peer_id chat = database.DB.create_chat_if_not_exists(chat_id) if chat['active'] == 0: await message.answer(MESSAGE_CHAT_NOT_ACTIVE) return chat_rules = chat['rules'] if chat_rules is None: await message.answer(MESSAGE_DEFAULT_CHECK_RULES) return prompt = 'В чате действуют следующие правила:\n' prompt += chat_rules + '\n\n' prompt += 'Проверь, не нарушают ли правила следующие сообщения (если нарушают, то укажи пункты правил):' chat_prompt = chat['ai_prompt'] ai_message = ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.from_id), text=prompt) ai_fwd_messages: list[ai_agent.Message] = [] if message.reply_message is not None and len(message.reply_message.text) > 0: ai_fwd_messages.append( ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.reply_message.from_id), text=message.reply_message.text)) else: for fwd_message in message.fwd_messages: if len(fwd_message.text) > 0: ai_fwd_messages.append( ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, fwd_message.from_id), text=fwd_message.text)) if len(ai_fwd_messages) == 0: await message.answer(MESSAGE_NEED_REPLY_OR_FORWARD) return await message.answer( await utils.run_with_progress( partial(ai_agent.agent.get_group_chat_reply, chat_id, chat_prompt, ai_message, ai_fwd_messages), partial(message.ctx_api.messages.set_activity, peer_id=chat_id, type='typing'), interval=4))