vk_chat_bot/vk/handlers/user.py

262 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import partial
from typing import List, Any
from vkbottle import bold, italic, API
from vkbottle.bot import Message
from vkbottle.framework.labeler import BotLabeler
import ai_agent
import utils
from messages import *
import vk.vk_database as database
from vk.utils import get_user_name_for_ai
labeler = BotLabeler()
# top_users - массив sqlite3.Row с двумя столбцами: user_id и value
async def format_rating(top_users: List[Any], api: API) -> str:
top_user_ids = [user['user_id'] for user in top_users]
users_info = await api.users.get(user_ids=top_user_ids)
result = ''
i = 1
for user in top_users:
for info in users_info:
if info.id == user['user_id']:
result += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['value'])
i = i + 1
break
return result
def calculate_total_messages(top_users: List[Any]) -> int:
total = 0
for user in top_users:
total += user['value']
return total
# noinspection SpellCheckingInspection
@labeler.chat_message(text="!помощь")
async def rules_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
response = bold('Команды для всех') + '\n'
response += '!я - показать свою визитку\n'
response += '!я* - изменить свою визитку\n'
response += '!кто* - показать визитку участника\n'
response += '!правила - вывести правила\n'
response += '!сегодня - статистика сообщений за сегодня\n'
response += '!месяц - статистика сообщений за месяц\n'
response += '!молчуны - список молчунов\n'
response += '!предупреждения - список участников с предупреждениями\n'
response += '!поздравление - запретить/разрешить поздравлять с днем рождения\n'
response += '!проверка* - проверить сообщения на нарушение правил\n'
response += '\n'
response += bold('Команды для администраторов') + '\n'
response += '!старт - начать работу в чате\n'
response += '!правила* - изменить правила\n'
response += '!приветствие* - изменить приветствие новичков\n'
response += '!возвращение* - изменить приветствие при возвращении\n'
response += '!деньрождения* - изменить уведомление о дне рождения\n'
response += '!личность* - изменить описание личности ИИ\n'
response += '!предупреждение* - выдать предупреждение участнику\n'
response += '!исключить* - навсегда исключить участника\n'
response += '\n'
response += italic('Команды с пометкой * нужно вызывать в ответном сообщении.')
await message.answer(response)
@labeler.chat_message(text="")
async def about_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
user_id = message.from_id
user = database.DB.create_user_if_not_exists(chat_id, user_id)
if message.reply_message is None:
info = await message.ctx_api.users.get(user_ids=[user_id])
if len(info) == 1:
if user['about'] is not None:
await message.answer(italic(f'{info[0].first_name} {info[0].last_name}') + '\n' + user['about'])
else:
await message.answer('Вы не установили визитку.')
else:
await message.answer('Не могу получить информацию об участнике.')
else:
if len(message.reply_message.text) > 0:
database.DB.user_update(chat_id, user_id, about=message.reply_message.text)
await message.answer('Визитка изменена.')
else:
await message.answer('Извините, но я понимаю только текст.')
@labeler.chat_message(text="!кто")
async def whois_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
if message.reply_message is None:
await message.answer(MESSAGE_NEED_REPLY)
return
target_user_id = message.reply_message.from_id
info = await message.ctx_api.users.get(user_ids=[target_user_id])
if len(info) == 1:
user = database.DB.get_user(chat_id, target_user_id)
if user is not None and user['about'] is not None:
await message.answer(italic(f'{info[0].first_name} {info[0].last_name}') + '\n' + user['about'])
else:
await message.answer(f'У {info[0].first_name} {info[0].last_name} нет визитки.')
else:
await message.answer('Не могу получить информацию об участнике.')
@labeler.chat_message(text="!сегодня")
async def stats_today_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_messages_today(chat_id)
if len(top_users) == 0:
await message.answer('Сегодня еще никто не писал.')
return
response = bold('Статистика за сегодня') + '\n'
response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users))
response += await format_rating(top_users, message.ctx_api)
await message.answer(response)
@labeler.chat_message(text="!месяц")
async def stats_month_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_messages_month(chat_id)
if len(top_users) == 0:
await message.answer('В этом месяце еще никто не писал.')
return
response = bold('Статистика за месяц') + '\n'
response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users))
response += await format_rating(top_users, message.ctx_api)
await message.answer(response)
@labeler.chat_message(text="!молчуны")
async def silent_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_silent(chat_id, 14)
if len(top_users) == 0:
await message.answer('Молчунов нет. Все молодцы!')
return
response = bold('Молчуны чата') + ' (не писали более 14 дней)\n'
response += await format_rating(top_users, message.ctx_api)
await message.answer(response)
@labeler.chat_message(text="!предупреждения")
async def warnings_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_warnings(chat_id)
if len(top_users) == 0:
await message.answer('Пока все спокойно. Продолжайте в том же духе.')
return
response = bold('Участники с предупреждениями') + '\n'
response += await format_rating(top_users, message.ctx_api)
await message.answer(response)
@labeler.chat_message(text="!поздравление")
async def no_birthday_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
user_id = message.from_id
user = database.DB.create_user_if_not_exists(chat_id, user_id)
happy_birthday = 1 if user['happy_birthday'] == 0 else 0
database.DB.user_toggle_happy_birthday(chat_id, user_id, happy_birthday)
if happy_birthday == 1:
await message.answer('Хорошо, я буду поздравлять тебя с днем рождения, если его дата не скрыта.')
else:
await message.answer('Хорошо, я не буду поздравлять тебя с днем рождения.')
@labeler.chat_message(text="!проверка")
async def check_rules_violation_handler(message: Message):
chat_id = message.peer_id
chat = database.DB.create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
chat_rules = chat['rules']
if chat_rules is None:
await message.answer(MESSAGE_DEFAULT_CHECK_RULES)
return
prompt = 'В чате действуют следующие правила:\n'
prompt += chat_rules + '\n\n'
prompt += 'Проверь, не нарушают ли правила следующие сообщения (если нарушают, то укажи пункты правил):'
chat_prompt = chat['ai_prompt']
ai_message = ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.from_id), text=prompt)
ai_fwd_messages: list[ai_agent.Message] = []
if message.reply_message is not None and len(message.reply_message.text) > 0:
ai_fwd_messages.append(
ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.reply_message.from_id),
text=message.reply_message.text))
else:
for fwd_message in message.fwd_messages:
if len(fwd_message.text) > 0:
ai_fwd_messages.append(
ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, fwd_message.from_id),
text=fwd_message.text))
if len(ai_fwd_messages) == 0:
await message.answer(MESSAGE_NEED_REPLY_OR_FORWARD)
return
await message.answer(
await utils.run_with_progress(
partial(ai_agent.agent.get_group_chat_reply, chat_id, chat_prompt, ai_message, ai_fwd_messages),
partial(message.ctx_api.messages.set_activity, peer_id=chat_id, type='typing'),
interval=4))