47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
import asyncio
|
|
from calendar import timegm
|
|
from typing import Awaitable, Callable, Coroutine, Optional
|
|
|
|
from pymorphy3 import MorphAnalyzer
|
|
from time import gmtime
|
|
|
|
|
|
_morph = MorphAnalyzer()
|
|
|
|
|
|
def make_word_agree_with_number(n: int, word: str) -> str:
|
|
w = _morph.parse(word)[0]
|
|
return w.make_agree_with_number(n).word
|
|
|
|
|
|
def posix_time():
|
|
return timegm(gmtime())
|
|
|
|
|
|
def full_name(first_name: str, last_name: Optional[str]) -> str:
|
|
if last_name is not None:
|
|
return f"{first_name} {last_name}"
|
|
return first_name
|
|
|
|
|
|
async def run_with_progress(main_func: Callable[[], Coroutine], progress_func: Callable[[], Awaitable], interval: int):
|
|
completion_event = asyncio.Event()
|
|
|
|
async def progress():
|
|
while not completion_event.is_set():
|
|
await progress_func()
|
|
wait_event_task = asyncio.create_task(completion_event.wait())
|
|
wait_timer_task = asyncio.create_task(asyncio.sleep(interval))
|
|
await asyncio.wait([wait_event_task, wait_timer_task],
|
|
return_when=asyncio.FIRST_COMPLETED)
|
|
if completion_event.is_set():
|
|
wait_timer_task.cancel()
|
|
|
|
progress_task = asyncio.create_task(progress())
|
|
main_task = asyncio.create_task(main_func())
|
|
|
|
result = await main_task
|
|
completion_event.set()
|
|
await progress_task
|
|
|
|
return result
|