vk_chat_bot/bot.py

383 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import calendar
import datetime
import time
from vkbottle.bot import Bot, Message
from vkbottle_types.objects import MessagesGetConversationMembers
from vkbottle.tools.formatting import bold
import config
from config import config_load
import database
import utils
config_load()
bot = Bot(config.Config['api_token'])
def posix_time():
return calendar.timegm(time.gmtime())
def vk_user_is_admin(user_id: int, chat_members: MessagesGetConversationMembers):
for member in chat_members.items:
if member.member_id != user_id:
continue
return member.is_admin
return False
def create_chat_if_not_exists(chat_id: str):
chat = database.DB.get_chat(chat_id)
if chat is None:
database.DB.add_chat(chat_id)
chat = database.DB.get_chat(chat_id)
return chat
def create_user_if_not_exists(chat_id: str, user_id: str):
user = database.DB.get_user(chat_id, user_id)
if user is None:
database.DB.add_user(chat_id, user_id)
user = database.DB.get_user(chat_id, user_id)
return user
MESSAGE_CHAT_NOT_ACTIVE = 'Извините, но я пока не работаю в этом чате.'
MESSAGE_PERMISSION_DENIED = 'Извините, но о таком меня может попросить только администратор чата.'
MESSAGE_NEED_REPLY = 'Извините, но эту команду нужно вызывать в ответном сообщении.'
MESSAGE_DEFAULT_RULES = 'Правила не установлены. Просто ведите себя хорошо.'
MESSAGE_DEFAULT_GREETING_JOIN = 'Добро пожаловать, {name}!'
MESSAGE_DEFAULT_GREETING_REJOIN = 'С возвращением, {name}!'
@bot.on.chat_message(text="!старт")
async def start_handler(message: Message):
chat_id = message.peer_id
create_chat_if_not_exists(chat_id)
chat_members = await bot.api.messages.get_conversation_members(peer_id=chat_id, extended=False)
if not vk_user_is_admin(message.from_id, chat_members):
await message.answer(MESSAGE_PERMISSION_DENIED)
return
database.DB.chat_update(chat_id, active=1)
for member in chat_members.items:
# Пропустить ботов
if member.member_id < 0:
continue
create_user_if_not_exists(chat_id, member.member_id)
await message.answer('Готова к работе!')
@bot.on.chat_message(text="!правила")
async def rules_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
if message.reply_message is None:
if chat['rules'] is not None:
await message.answer(bold('Правила чата') + '\n' + chat['rules'])
else:
await message.answer(MESSAGE_DEFAULT_RULES)
else:
chat_members = await bot.api.messages.get_conversation_members(peer_id=chat_id, extended=False)
if not vk_user_is_admin(message.from_id, chat_members):
await message.answer(MESSAGE_PERMISSION_DENIED)
return
database.DB.chat_update(chat_id, rules=message.reply_message.text)
await message.answer('Правила чата изменены.')
@bot.on.chat_message(text="!приветствие")
async def set_greeting_join_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
chat_members = await bot.api.messages.get_conversation_members(peer_id=chat_id, extended=False)
if not vk_user_is_admin(message.from_id, chat_members):
await message.answer(MESSAGE_PERMISSION_DENIED)
return
if message.reply_message is None:
await message.answer(MESSAGE_NEED_REPLY)
return
database.DB.chat_update(chat_id, greeting_join=message.reply_message.text)
await message.answer('Приветствие изменено.')
@bot.on.chat_message(text="!возвращение")
async def set_greeting_rejoin_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
chat_members = await bot.api.messages.get_conversation_members(peer_id=chat_id, extended=False)
if not vk_user_is_admin(message.from_id, chat_members):
await message.answer(MESSAGE_PERMISSION_DENIED)
return
if message.reply_message is None:
await message.answer(MESSAGE_NEED_REPLY)
return
database.DB.chat_update(chat_id, greeting_rejoin=message.reply_message.text)
await message.answer('Приветствие при возвращении изменено.')
@bot.on.chat_message(text="!предупреждение")
async def warning_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
chat_members = await bot.api.messages.get_conversation_members(peer_id=chat_id, extended=False)
if not vk_user_is_admin(message.from_id, chat_members):
await message.answer(MESSAGE_PERMISSION_DENIED)
return
if message.reply_message is None:
await message.answer(MESSAGE_NEED_REPLY)
return
user_id = message.reply_message.from_id
create_user_if_not_exists(chat_id, user_id)
database.DB.user_increment_warnings(chat_id, user_id)
user = database.DB.get_user(chat_id, user_id)
user_info = await bot.api.users.get(user_ids=[user_id], name_case='gen')
if len(user_info) == 1:
await message.answer('У {} {} {} {}.'.format(
user_info[0].first_name,
user_info[0].last_name,
user['warnings'],
utils.make_word_agree_with_number(user['warnings'], 'предупреждение'))
)
@bot.on.chat_message(text="!предупреждения")
async def warnings_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_warnings(chat_id)
if len(top_users) == 0:
await message.answer('Пока все спокойно. Продолжайте в том же духе.')
return
top_user_ids = [user['user_id'] for user in top_users]
users_info = await bot.api.users.get(user_ids=top_user_ids)
response = bold('Участники с предупреждениями') + '\n'
i = 1
for user in top_users:
for info in users_info:
if info.id == user['user_id']:
response += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['warnings'])
i = i + 1
break
await message.answer(response)
@bot.on.chat_message(text="!сегодня")
async def stats_today_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_messages_today(chat_id)
if len(top_users) == 0:
await message.answer('Сегодня еще никто не писал. Вы можете стать первым!')
return
top_user_ids = [user['user_id'] for user in top_users]
users_info = await bot.api.users.get(user_ids=top_user_ids)
response = bold('Статистика за сегодня') + '\n'
i = 1
for user in top_users:
for info in users_info:
if info.id == user['user_id']:
response += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['messages_today'])
i = i + 1
break
await message.answer(response)
@bot.on.chat_message(text="!месяц")
async def stats_month_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
top_users = database.DB.get_top_messages_month(chat_id)
if len(top_users) == 0:
await message.answer('В этом месяце еще никто не писал. Вы можете стать первым!')
return
top_user_ids = [user['user_id'] for user in top_users]
users_info = await bot.api.users.get(user_ids=top_user_ids)
response = bold('Статистика за месяц') + '\n'
i = 1
for user in top_users:
for info in users_info:
if info.id == user['user_id']:
response += '{}. {} {} - {}\n'.format(i, info.first_name, info.last_name, user['messages_month'])
i = i + 1
break
await message.answer(response)
@bot.on.chat_message(text="!молчуны")
async def silent_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
await message.answer(MESSAGE_CHAT_NOT_ACTIVE)
return
now = posix_time()
threshold = now - 14 * 24 * 3600
top_users = database.DB.get_top_silent(message.peer_id, threshold)
if len(top_users) == 0:
await message.answer('Молчунов нет. Все молодцы!')
return
top_user_ids = [user['user_id'] for user in top_users]
users_info = await bot.api.users.get(user_ids=top_user_ids)
response = bold('Молчуны чата') + ' (не писали больше 2 недель)\n'
i = 1
for user in top_users:
for info in users_info:
if info.id == user['user_id']:
if user['last_message'] == 0:
response += '{}. {} {} - никогда\n'.format(i, info.first_name, info.last_name)
else:
days_silent = round((now - user['last_message']) / 3600 / 24)
response += '{}. {} {} - {} дней\n'.format(i, info.first_name, info.last_name, days_silent)
i = i + 1
break
await message.answer(response)
@bot.on.chat_message(action=['chat_invite_user', 'chat_invite_user_by_link'])
async def user_join_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
return
if message.action.type == 'chat_invite_user_by_link':
user_id = message.from_id
first_join = True
else:
user_id = message.action.member_id
first_join = (user_id != message.from_id)
user_info = (await bot.api.users.get(user_ids=[user_id]))[0]
if user_id < 0:
return
if first_join:
response = chat['greeting_join'] or MESSAGE_DEFAULT_GREETING_JOIN
else:
response = chat['greeting_rejoin'] or MESSAGE_DEFAULT_GREETING_REJOIN
response = response.format(name=f'@id{user_id} ({user_info.first_name})')
await message.answer(response)
@bot.on.chat_message()
async def any_message_handler(message: Message):
chat_id = message.peer_id
chat = create_chat_if_not_exists(chat_id)
if chat['active'] == 0:
return
# Игнорировать ботов
if message.from_id < 0:
return
# Не учитывать служебные сообщения
if message.action is not None:
return
user_id = message.from_id
create_user_if_not_exists(chat_id, user_id)
database.DB.user_set_last_message(chat_id, user_id, posix_time())
database.DB.user_increment_messages(chat_id, user_id)
async def wait_until(target_time: datetime.datetime):
now = datetime.datetime.now(target_time.tzinfo)
if now >= target_time:
return
delay_seconds = (target_time - now).total_seconds()
await asyncio.sleep(delay_seconds)
async def counters_reset_task():
tz = datetime.timezone(datetime.timedelta(hours=3), name="MSK")
target_time = datetime.time(6, 0, 0, tzinfo=tz)
now = datetime.datetime.now(tz)
if now.hour > target_time.hour or now.hour == target_time.hour and now.minute > target_time.minute:
target_date = now.date() + datetime.timedelta(days=1)
else:
target_date = now.date()
target_datetime = datetime.datetime.combine(target_date, target_time)
while True:
await wait_until(target_datetime)
print('Resetting daily counters...')
database.DB.reset_messages_today()
if target_datetime.day == 1:
print('Resetting monthly counters...')
database.DB.reset_messages_month()
target_datetime = target_datetime + datetime.timedelta(days=1)
async def startup_task():
print("Bot started.")
bot.loop_wrapper.on_startup.append(startup_task())
bot.loop_wrapper.add_task(counters_reset_task())
bot.run_forever()