vk_chat_bot/bot.py

210 lines
7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import calendar
import datetime
import time
from vkbottle.bot import Bot, Message
from vkbottle_types.objects import MessagesGetConversationMembers
import config
from config import db_load, db_save
db_load()
db_save()
bot = Bot(config.DB['api_token'])
def posix_time():
return calendar.timegm(time.gmtime())
async def get_user_info(user_id):
info = await bot.api.users.get(user_ids=[user_id])
if len(info) > 0:
return info[0]
return None
def user_is_admin(user_id: int, chat_members: MessagesGetConversationMembers):
for member in chat_members.items:
if member.member_id != user_id:
continue
return member.is_admin
return False
@bot.on.chat_message(text="!старт")
async def stats_handler(message: Message):
chat_members = await bot.api.messages.get_conversation_members(peer_id=message.peer_id, extended=False)
if not user_is_admin(message.from_id, chat_members):
await message.answer('О таком меня может попросить только администратор беседы.')
return
chat_id = str(message.peer_id)
if chat_id not in config.DB['chats']:
config.DB['chats'][chat_id] = {'users': {}}
for member in chat_members.items:
if member.member_id < 0:
continue
member_id = str(member.member_id)
if member_id not in config.DB['chats'][chat_id]['users']:
config.DB['chats'][chat_id]['users'][member_id] = {
'last_message': 0,
'messages_today': 0,
'messages_month': 0
}
db_save()
await message.answer('Готова к работе!')
@bot.on.chat_message(text="!сегодня")
async def stats_today_handler(message: Message):
chat_id = str(message.peer_id)
chat_users = []
for user_id in config.DB['chats'][chat_id]['users'].keys():
flat_user = {**{'id': user_id}, **config.DB['chats'][chat_id]['users'][user_id]}
chat_users.append(flat_user)
top = sorted(chat_users, key=lambda item: item['messages_today'], reverse=True)
response = '* Статистика за сегодня:\n'
i = 0
while i < len(top):
if top[i]['messages_today'] == 0:
break
info = await get_user_info(top[i]['id'])
response += '{}. {} {} - {}\n'.format(i + 1, info.first_name, info.last_name, top[i]['messages_today'])
i = i + 1
await message.answer(response)
@bot.on.chat_message(text="!месяц")
async def stats_month_handler(message: Message):
chat_id = str(message.peer_id)
chat_users = []
for user_id in config.DB['chats'][chat_id]['users'].keys():
flat_user = {**{'id': user_id}, **config.DB['chats'][chat_id]['users'][user_id]}
chat_users.append(flat_user)
top = sorted(chat_users, key=lambda item: item['messages_month'], reverse=True)
response = '\n* Статистика за месяц:\n'
i = 0
while i < len(top):
if top[i]['messages_month'] == 0:
break
info = await get_user_info(top[i]['id'])
response += '{}. {} {} - {}\n'.format(i + 1, info.first_name, info.last_name, top[i]['messages_month'])
i = i + 1
await message.answer(response)
@bot.on.chat_message(text="!молчуны")
async def silent_handler(message: Message):
chat_id = str(message.peer_id)
chat_users = []
for user_id in config.DB['chats'][chat_id]['users'].keys():
flat_user = {**{'id': user_id},
**{'last_message': config.DB['chats'][chat_id]['users'][user_id]['last_message']}}
chat_users.append(flat_user)
top = sorted(chat_users, key=lambda item: item['last_message'])
now = posix_time()
response = '* Молчуны чата (не писали больше 2 недель):\n'
for i in range(0, len(top)):
if top[i]['last_message'] == 0:
info = await get_user_info(top[i]['id'])
if info is None:
continue
response += '{}. {} {} - никогда\n'.format(i + 1, info.first_name, info.last_name)
else:
days_silent = round((now - top[i]['last_message']) / 3600 / 24)
if days_silent < 14:
break
info = await get_user_info(top[i]['id'])
response += '{}. {} {} - {} дней\n'.format(i + 1, info.first_name, info.last_name, days_silent)
await message.answer(response)
@bot.on.chat_message()
async def any_message_handler(message: Message):
if message.from_id < 0:
return
chat_id = str(message.peer_id)
user_id = str(message.from_id)
if chat_id not in config.DB['chats']:
config.DB['chats'][chat_id] = {'users': {}}
if user_id not in config.DB['chats'][chat_id]['users']:
config.DB['chats'][chat_id]['users'][user_id] = {}
config.DB['chats'][chat_id]['users'][user_id]['last_message'] = posix_time()
config.DB['chats'][chat_id]['users'][user_id]['messages_today'] = (
config.DB['chats'][chat_id]['users'][user_id].get('messages_today', 0) + 1)
config.DB['chats'][chat_id]['users'][user_id]['messages_month'] = (
config.DB['chats'][chat_id]['users'][user_id].get('messages_month', 0) + 1)
db_save()
# @bot.on.raw_event(GroupEventType.GROUP_JOIN, GroupTypes.GroupJoin)
# async def group_join_handler(event: GroupTypes.GroupJoin):
# chat_id = str(event.group_id)
# user_id = str(event.object.user_id)
# if user_id == bot.api.users.get()
async def wait_until(target_time: datetime.datetime):
now = datetime.datetime.now(target_time.tzinfo)
if now >= target_time:
return
delay_seconds = (target_time - now).total_seconds()
await asyncio.sleep(delay_seconds)
async def counters_reset_task():
tz = datetime.timezone(datetime.timedelta(hours=3), name="MSK")
target_time = datetime.time(6, 0, 0, tzinfo=tz)
now = datetime.datetime.now(tz)
if now.hour > target_time.hour or now.hour == target_time.hour and now.minute > target_time.minute:
target_date = now.date() + datetime.timedelta(days=1)
else:
target_date = now.date()
target_datetime = datetime.datetime.combine(target_date, target_time)
while True:
await wait_until(target_datetime)
print('Resetting daily counters...')
for chat_id in config.DB['chats']:
for user_id in config.DB['chats'][chat_id]['users']:
config.DB['chats'][chat_id]['users'][user_id]['messages_today'] = 0
if target_datetime.day == 1:
print('Resetting monthly counters...')
for chat_id in config.DB['chats']:
for user_id in config.DB['chats'][chat_id]['users']:
config.DB['chats'][chat_id]['users'][user_id]['messages_month'] = 0
db_save()
target_datetime = target_datetime + datetime.timedelta(days=1)
async def startup_task():
print("Bot started.")
bot.loop_wrapper.on_startup.append(startup_task())
bot.loop_wrapper.add_task(counters_reset_task())
bot.run_forever()