262 lines
11 KiB
Python
262 lines
11 KiB
Python
from functools import partial
|
||
from typing import List, Any
|
||
|
||
from vkbottle import bold, italic, API
|
||
from vkbottle.bot import Message
|
||
from vkbottle.framework.labeler import BotLabeler
|
||
|
||
import ai_agent
|
||
import utils
|
||
from messages import *
|
||
|
||
import vk.vk_database as database
|
||
from vk.utils import get_user_name_for_ai
|
||
|
||
labeler = BotLabeler()
|
||
|
||
|
||
# top_users - массив sqlite3.Row с двумя столбцами: user_id и value
|
||
async def format_rating(top_users: List[Any], api: API) -> str:
|
||
top_user_ids = [user['user_id'] for user in top_users]
|
||
users_info = await api.users.get(user_ids=top_user_ids)
|
||
|
||
result = ''
|
||
i = 1
|
||
for user in top_users:
|
||
for info in users_info:
|
||
if info.id == user['user_id']:
|
||
result += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['value'])
|
||
i = i + 1
|
||
break
|
||
|
||
return result
|
||
|
||
|
||
def calculate_total_messages(top_users: List[Any]) -> int:
|
||
total = 0
|
||
for user in top_users:
|
||
total += user['value']
|
||
return total
|
||
|
||
|
||
# noinspection SpellCheckingInspection
|
||
@labeler.chat_message(text="!помощь")
|
||
async def rules_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
response = bold('Команды для всех') + '\n'
|
||
response += '!я - показать свою визитку\n'
|
||
response += '!я* - изменить свою визитку\n'
|
||
response += '!кто* - показать визитку участника\n'
|
||
response += '!правила - вывести правила\n'
|
||
response += '!сегодня - статистика сообщений за сегодня\n'
|
||
response += '!месяц - статистика сообщений за месяц\n'
|
||
response += '!молчуны - список молчунов\n'
|
||
response += '!предупреждения - список участников с предупреждениями\n'
|
||
response += '!поздравление - запретить/разрешить поздравлять с днем рождения\n'
|
||
response += '!проверка* - проверить сообщения на нарушение правил\n'
|
||
response += '\n'
|
||
response += bold('Команды для администраторов') + '\n'
|
||
response += '!старт - начать работу в чате\n'
|
||
response += '!правила* - изменить правила\n'
|
||
response += '!приветствие* - изменить приветствие новичков\n'
|
||
response += '!возвращение* - изменить приветствие при возвращении\n'
|
||
response += '!деньрождения* - изменить уведомление о дне рождения\n'
|
||
response += '!личность* - изменить описание личности ИИ\n'
|
||
response += '!предупреждение* - выдать предупреждение участнику\n'
|
||
response += '!исключить* - навсегда исключить участника\n'
|
||
response += '\n'
|
||
response += italic('Команды с пометкой * нужно вызывать в ответном сообщении.')
|
||
|
||
await message.answer(response)
|
||
|
||
|
||
@labeler.chat_message(text="!я")
|
||
async def about_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
user_id = message.from_id
|
||
user = database.DB.create_user_if_not_exists(chat_id, user_id)
|
||
if message.reply_message is None:
|
||
info = await message.ctx_api.users.get(user_ids=[user_id])
|
||
if len(info) == 1:
|
||
if user['about'] is not None:
|
||
await message.answer(italic(f'{info[0].first_name} {info[0].last_name}') + '\n' + user['about'])
|
||
else:
|
||
await message.answer('Вы не установили визитку.')
|
||
else:
|
||
await message.answer('Не могу получить информацию об участнике.')
|
||
else:
|
||
if len(message.reply_message.text) > 0:
|
||
database.DB.user_update(chat_id, user_id, about=message.reply_message.text)
|
||
await message.answer('Визитка изменена.')
|
||
else:
|
||
await message.answer('Извините, но я понимаю только текст.')
|
||
|
||
|
||
@labeler.chat_message(text="!кто")
|
||
async def whois_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
if message.reply_message is None:
|
||
await message.answer(MESSAGE_NEED_REPLY)
|
||
return
|
||
|
||
target_user_id = message.reply_message.from_id
|
||
info = await message.ctx_api.users.get(user_ids=[target_user_id])
|
||
if len(info) == 1:
|
||
user = database.DB.get_user(chat_id, target_user_id)
|
||
if user is not None and user['about'] is not None:
|
||
await message.answer(italic(f'{info[0].first_name} {info[0].last_name}') + '\n' + user['about'])
|
||
else:
|
||
await message.answer(f'У {info[0].first_name} {info[0].last_name} нет визитки.')
|
||
else:
|
||
await message.answer('Не могу получить информацию об участнике.')
|
||
|
||
|
||
@labeler.chat_message(text="!сегодня")
|
||
async def stats_today_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
top_users = database.DB.get_top_messages_today(chat_id)
|
||
if len(top_users) == 0:
|
||
await message.answer('Сегодня еще никто не писал.')
|
||
return
|
||
|
||
response = bold('Статистика за сегодня') + '\n'
|
||
response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users))
|
||
response += await format_rating(top_users, message.ctx_api)
|
||
await message.answer(response)
|
||
|
||
|
||
@labeler.chat_message(text="!месяц")
|
||
async def stats_month_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
top_users = database.DB.get_top_messages_month(chat_id)
|
||
if len(top_users) == 0:
|
||
await message.answer('В этом месяце еще никто не писал.')
|
||
return
|
||
|
||
response = bold('Статистика за месяц') + '\n'
|
||
response += 'Всего сообщений - {}\n'.format(calculate_total_messages(top_users))
|
||
response += await format_rating(top_users, message.ctx_api)
|
||
await message.answer(response)
|
||
|
||
|
||
@labeler.chat_message(text="!молчуны")
|
||
async def silent_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
top_users = database.DB.get_top_silent(chat_id, 14)
|
||
if len(top_users) == 0:
|
||
await message.answer('Молчунов нет. Все молодцы!')
|
||
return
|
||
|
||
response = bold('Молчуны чата') + ' (не писали более 14 дней)\n'
|
||
response += await format_rating(top_users, message.ctx_api)
|
||
await message.answer(response)
|
||
|
||
|
||
@labeler.chat_message(text="!предупреждения")
|
||
async def warnings_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
top_users = database.DB.get_top_warnings(chat_id)
|
||
if len(top_users) == 0:
|
||
await message.answer('Пока все спокойно. Продолжайте в том же духе.')
|
||
return
|
||
|
||
response = bold('Участники с предупреждениями') + '\n'
|
||
response += await format_rating(top_users, message.ctx_api)
|
||
await message.answer(response)
|
||
|
||
|
||
@labeler.chat_message(text="!поздравление")
|
||
async def no_birthday_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
user_id = message.from_id
|
||
user = database.DB.create_user_if_not_exists(chat_id, user_id)
|
||
happy_birthday = 1 if user['happy_birthday'] == 0 else 0
|
||
database.DB.user_toggle_happy_birthday(chat_id, user_id, happy_birthday)
|
||
|
||
if happy_birthday == 1:
|
||
await message.answer('Хорошо, я буду поздравлять тебя с днем рождения, если его дата не скрыта.')
|
||
else:
|
||
await message.answer('Хорошо, я не буду поздравлять тебя с днем рождения.')
|
||
|
||
|
||
@labeler.chat_message(text="!проверка")
|
||
async def check_rules_violation_handler(message: Message):
|
||
chat_id = message.peer_id
|
||
chat = database.DB.create_chat_if_not_exists(chat_id)
|
||
if chat['active'] == 0:
|
||
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
|
||
return
|
||
|
||
chat_rules = chat['rules']
|
||
if chat_rules is None:
|
||
await message.answer(MESSAGE_DEFAULT_CHECK_RULES)
|
||
return
|
||
|
||
prompt = 'В чате действуют следующие правила:\n'
|
||
prompt += chat_rules + '\n\n'
|
||
prompt += 'Проверь, не нарушают ли правила следующие сообщения (если нарушают, то укажи пункты правил):'
|
||
|
||
chat_prompt = chat['ai_prompt']
|
||
|
||
ai_message = ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.from_id), text=prompt)
|
||
ai_fwd_messages: list[ai_agent.Message] = []
|
||
if message.reply_message is not None and len(message.reply_message.text) > 0:
|
||
ai_fwd_messages.append(
|
||
ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, message.reply_message.from_id),
|
||
text=message.reply_message.text))
|
||
else:
|
||
for fwd_message in message.fwd_messages:
|
||
if len(fwd_message.text) > 0:
|
||
ai_fwd_messages.append(
|
||
ai_agent.Message(user_name=await get_user_name_for_ai(message.ctx_api, fwd_message.from_id),
|
||
text=fwd_message.text))
|
||
|
||
if len(ai_fwd_messages) == 0:
|
||
await message.answer(MESSAGE_NEED_REPLY_OR_FORWARD)
|
||
return
|
||
|
||
await message.answer(
|
||
await utils.run_with_progress(
|
||
partial(ai_agent.agent.get_group_chat_reply, chat_id, chat_prompt, ai_message, ai_fwd_messages),
|
||
partial(message.ctx_api.messages.set_activity, peer_id=chat_id, type='typing'),
|
||
interval=4))
|