Support-BOT
This commit is contained in:
36
README.md
36
README.md
@@ -183,3 +183,39 @@ PS: Для запуска необходим python 3.9 или выше
|
|||||||
|
|
||||||
```/deleteadmin``` - Удаляет права администратора у пользователя в **чате поддержки**. После удаления прав администратора нужно вручную удалить пользователя из группы телеграм.
|
```/deleteadmin``` - Удаляет права администратора у пользователя в **чате поддержки**. После удаления прав администратора нужно вручную удалить пользователя из группы телеграм.
|
||||||
|
|
||||||
|
## Автозапуск бота
|
||||||
|
1.Создание службы:Создайте файл службы для вашего бота. Например, my_bot.service
|
||||||
|
```
|
||||||
|
sudo nano /etc/systemd/system/my_bot.service
|
||||||
|
```
|
||||||
|
2.Редактирование службы:Внесите следующие настройки в файл службы:
|
||||||
|
```
|
||||||
|
[Unit]
|
||||||
|
Description=My Python Telegram Bot
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
User=your_username
|
||||||
|
Group=your_groupname
|
||||||
|
WorkingDirectory=/path/to/your/bot
|
||||||
|
ExecStart=/path/to/your/python /path/to/your/bot/main.py
|
||||||
|
Restart=always
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
```
|
||||||
|
Замените your_username, your_groupname, /path/to/your/bot, и /path/to/your/python на соответствующие значения для вашей среды. Убедитесь, что ExecStart указывает на правильный путь к вашему скрипту Python бота.
|
||||||
|
|
||||||
|
3.Перезагрузка systemd:После того как вы сохранили изменения, перезагрузите systemd для применения новой службы:
|
||||||
|
```
|
||||||
|
sudo systemctl daemon-reload
|
||||||
|
```
|
||||||
|
4.Управление службой:Теперь вы можете управлять вашим ботом как службой. Например, чтобы запустить его и настроить автозапуск при загрузке системы, выполните следующие команды:
|
||||||
|
```
|
||||||
|
sudo systemctl start my_bot
|
||||||
|
sudo systemctl enable my_bot
|
||||||
|
```
|
||||||
|
Чтобы проверить статус вашей службы, выполните:
|
||||||
|
```
|
||||||
|
sudo systemctl status my_bot
|
||||||
|
```
|
||||||
0
__init__.py
Normal file
0
__init__.py
Normal file
10
filter_media.py
Normal file
10
filter_media.py
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
from aiogram.filters import BaseFilter
|
||||||
|
from aiogram.types import Message, ContentType
|
||||||
|
|
||||||
|
|
||||||
|
class SupportedMediaFilter(BaseFilter):
|
||||||
|
async def __call__(self, message: Message) -> bool:
|
||||||
|
return message.content_type in (
|
||||||
|
ContentType.ANIMATION, ContentType.AUDIO, ContentType.DOCUMENT,
|
||||||
|
ContentType.PHOTO, ContentType.VIDEO, ContentType.VOICE
|
||||||
|
)
|
||||||
48
get_reports.py
Normal file
48
get_reports.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from app.crud.message import crud_message
|
||||||
|
|
||||||
|
|
||||||
|
async def get_report_from_db(session: AsyncSession,
|
||||||
|
from_date=None,
|
||||||
|
to_date=None):
|
||||||
|
"""Получение отчета за интервал времени из базы данных.
|
||||||
|
:param session: Асинхронная сессия к БД
|
||||||
|
:type session: AsyncSession
|
||||||
|
:param from_date: Дата начала
|
||||||
|
:type from_date: str
|
||||||
|
:param to_date: Дата окончания
|
||||||
|
:type to_date: str
|
||||||
|
:return: Возвращает словарь с данными для отчета
|
||||||
|
:rtype: dict
|
||||||
|
"""
|
||||||
|
if not to_date:
|
||||||
|
to_date = datetime.now()
|
||||||
|
if not from_date:
|
||||||
|
from_date = to_date + relativedelta(months=-1)
|
||||||
|
messages = await crud_message.get_by_date_interval(
|
||||||
|
from_date, to_date, session
|
||||||
|
)
|
||||||
|
users_senders = []
|
||||||
|
answers_amount = 0
|
||||||
|
questions_amount = 0
|
||||||
|
|
||||||
|
for mes in messages:
|
||||||
|
if (mes.telegram_user_id not in users_senders
|
||||||
|
and not mes.answer_to_user_id):
|
||||||
|
users_senders.append(mes.telegram_user_id)
|
||||||
|
if mes.answer_to_user_id:
|
||||||
|
answers_amount += 1
|
||||||
|
else:
|
||||||
|
questions_amount += 1
|
||||||
|
users_amount = len(users_senders)
|
||||||
|
report = {
|
||||||
|
'from_date': from_date.strftime('%d.%m.%Y'),
|
||||||
|
'to_date': to_date.strftime('%d.%m.%Y'),
|
||||||
|
'users_amount': users_amount,
|
||||||
|
'answers_amount': answers_amount,
|
||||||
|
'questions_amount': questions_amount,
|
||||||
|
}
|
||||||
|
return report
|
||||||
195
handlers_commands.py
Normal file
195
handlers_commands.py
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
from aiogram import Bot, F, Router
|
||||||
|
from aiogram.exceptions import TelegramBadRequest
|
||||||
|
from aiogram.filters import Command, CommandObject
|
||||||
|
from aiogram.types import Message
|
||||||
|
|
||||||
|
from app.bot.get_reports import get_report_from_db
|
||||||
|
from app.bot.utils import get_user_name, check_input_date_correct, \
|
||||||
|
stringdate_to_date, check_user_is_banned, \
|
||||||
|
get_telegram_user_from_resend_message, parse_ban_command
|
||||||
|
from app.core.config import settings
|
||||||
|
from app.core.db import get_async_session
|
||||||
|
from app.crud.user import crud_user
|
||||||
|
from app.crud.message import crud_message
|
||||||
|
|
||||||
|
router = Router()
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands=["start"]))
|
||||||
|
async def command_start(message: Message):
|
||||||
|
await message.answer(settings.START_MESSAGE)
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(message,
|
||||||
|
session)
|
||||||
|
if check_user_is_banned(db_user):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands="info"),
|
||||||
|
F.chat.id == int(settings.GROUP_ID),
|
||||||
|
F.reply_to_message)
|
||||||
|
async def get_user_info(message: Message, bot: Bot):
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
telegram_user = await get_telegram_user_from_resend_message(message, bot)
|
||||||
|
if not telegram_user:
|
||||||
|
return
|
||||||
|
messages_count = await crud_message.get_count_user_messages(
|
||||||
|
telegram_user.id, session
|
||||||
|
)
|
||||||
|
ans_count = await crud_message.get_count_answers_to_user(
|
||||||
|
telegram_user.id, session
|
||||||
|
)
|
||||||
|
username = f"@{telegram_user.username}" if telegram_user.username else "отсутствует"
|
||||||
|
await message.reply(text=f'Имя: {get_user_name(telegram_user)}\n'
|
||||||
|
f'Id: {telegram_user.id}\n'
|
||||||
|
f'username: {username}\n'
|
||||||
|
f'Сообщений от пользователя: {messages_count}\n'
|
||||||
|
f'Ответов пользователю: {ans_count}\n')
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='report'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def get_report(message: Message,
|
||||||
|
bot: Bot,
|
||||||
|
command: CommandObject):
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
if command.args:
|
||||||
|
if not check_input_date_correct(command.args):
|
||||||
|
answer_text = 'Неверный формат даты'
|
||||||
|
await bot.send_message(
|
||||||
|
chat_id=int(settings.GROUP_ID), text=answer_text
|
||||||
|
)
|
||||||
|
return
|
||||||
|
from_date, to_date = stringdate_to_date(command.args)
|
||||||
|
report = await get_report_from_db(session, from_date, to_date)
|
||||||
|
report['period'] = 'выбранный период'
|
||||||
|
else:
|
||||||
|
report = await get_report_from_db(session)
|
||||||
|
report['period'] = 'последний месяц'
|
||||||
|
|
||||||
|
answer_text = (f"Отчет за {report['period']}, c {report['from_date']} до "
|
||||||
|
f"{report['to_date']}.\n"
|
||||||
|
f"Всего было получено {report['questions_amount']} "
|
||||||
|
f"сообщений от {report['users_amount']} клиентов.\n"
|
||||||
|
f"Количество ответов от администраторов: {report['answers_amount']}")
|
||||||
|
await bot.send_message(chat_id=int(settings.GROUP_ID), text=answer_text)
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='ban'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def handler_ban_user(message: Message,
|
||||||
|
bot: Bot,
|
||||||
|
command: CommandObject):
|
||||||
|
if not command.args and not message.reply_to_message:
|
||||||
|
return await message.reply(
|
||||||
|
text='Команда некорректна. Укажите ID или ответьте на сообщение')
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
if command.args:
|
||||||
|
telegram_user_id = parse_ban_command(command)
|
||||||
|
if not telegram_user_id:
|
||||||
|
return await message.reply(
|
||||||
|
text='Невозможно извлечь id пользователя. '
|
||||||
|
'Нужно ввести id в формате "12345", либо ответить на '
|
||||||
|
'сообщение пользователя, которого хотите забанить')
|
||||||
|
try:
|
||||||
|
telegram_user = await bot.get_chat(telegram_user_id)
|
||||||
|
except TelegramBadRequest:
|
||||||
|
return await message.reply(
|
||||||
|
text='Пользователя с таким id не существует ')
|
||||||
|
|
||||||
|
else:
|
||||||
|
telegram_user = await get_telegram_user_from_resend_message(message, bot)
|
||||||
|
if not telegram_user:
|
||||||
|
return
|
||||||
|
db_user = await crud_user.get_user_by_telegram_id(telegram_user.id,
|
||||||
|
session)
|
||||||
|
await crud_user.ban_user(db_user, session)
|
||||||
|
await message.reply(text=f'Пользователь {db_user.first_name} '
|
||||||
|
f'{db_user.last_name} забанен.'
|
||||||
|
f'Чтобы разбанить, отправьте /unban\n'
|
||||||
|
f'Тикет: #id{db_user.telegram_id}')
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='unban'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def handler_unban_user(message: Message,
|
||||||
|
bot: Bot,
|
||||||
|
command: CommandObject):
|
||||||
|
if not command.args and not message.reply_to_message:
|
||||||
|
return await message.reply(
|
||||||
|
text='Команда некорректна. Укажите ID или ответьте на сообщение')
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
if command.args:
|
||||||
|
telegram_user_id = parse_ban_command(command)
|
||||||
|
if not telegram_user_id:
|
||||||
|
return await message.reply(
|
||||||
|
text='Невозможно извлечь id пользователя. '
|
||||||
|
'Нужно ввести id в формате "12345", либо ответить на '
|
||||||
|
'сообщение пользователя, которого хотите забанить')
|
||||||
|
db_user = await crud_user.get_user_by_telegram_id(telegram_user_id,
|
||||||
|
session)
|
||||||
|
else:
|
||||||
|
telegram_user = await get_telegram_user_from_resend_message(message, bot)
|
||||||
|
if not telegram_user:
|
||||||
|
return
|
||||||
|
db_user = await crud_user.get_user_by_telegram_id(telegram_user.id,
|
||||||
|
session)
|
||||||
|
await crud_user.unban_user(db_user, session)
|
||||||
|
await message.reply(text=f'Пользователь с id '
|
||||||
|
f'{db_user.first_name} {db_user.last_name} разбанен\n'
|
||||||
|
f'Тикет: #id{db_user.telegram_id}')
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='banlist'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def handler_unban_user(message: Message,
|
||||||
|
bot: Bot):
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
banned_users = await crud_user.get_banned_users(session)
|
||||||
|
text = 'Список забанненых пользователей:\n'
|
||||||
|
for user in banned_users:
|
||||||
|
text += f'{user.telegram_id} - {user.first_name} {user.last_name}\n'
|
||||||
|
await message.reply(text=text)
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='registeradmin'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def handle_register_admin(message: Message,
|
||||||
|
bot: Bot):
|
||||||
|
if not message.reply_to_message:
|
||||||
|
return message.reply(text="Введите команду как ответ на сообщение "
|
||||||
|
"пользователя")
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(
|
||||||
|
message.reply_to_message,
|
||||||
|
session
|
||||||
|
)
|
||||||
|
db_user = await crud_user.register_admin(db_user, session)
|
||||||
|
text = (f'Пользователь {db_user.first_name} {db_user.last_name} теперь '
|
||||||
|
f'администратор')
|
||||||
|
await message.reply(text=text)
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command(commands='deleteadmin'),
|
||||||
|
F.chat.id == int(settings.GROUP_ID))
|
||||||
|
async def handle_remove_admin(message: Message,
|
||||||
|
bot: Bot):
|
||||||
|
if not message.reply_to_message:
|
||||||
|
return message.reply(text="Введите команду как ответ на сообщение "
|
||||||
|
"пользователя")
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(
|
||||||
|
message.reply_to_message,
|
||||||
|
session
|
||||||
|
)
|
||||||
|
db_user = await crud_user.remove_admin(db_user, session)
|
||||||
|
text = f'Администратор {db_user.first_name} {db_user.last_name} удален'
|
||||||
|
await message.reply(text=text)
|
||||||
101
handlers_messages.py
Normal file
101
handlers_messages.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
from aiogram import Bot, F, Router
|
||||||
|
from aiogram.exceptions import TelegramForbiddenError
|
||||||
|
from aiogram.types import Message
|
||||||
|
|
||||||
|
from app.bot.utils import extract_user_id, check_user_is_banned
|
||||||
|
from app.core.config import settings
|
||||||
|
from app.core.db import get_async_session
|
||||||
|
from app.crud.message import crud_message
|
||||||
|
from app.crud.user import crud_user
|
||||||
|
from filter_media import SupportedMediaFilter
|
||||||
|
|
||||||
|
router = Router()
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(F.chat.type == 'private', F.text)
|
||||||
|
async def send_message_to_group(message: Message, bot: Bot):
|
||||||
|
if message.text and len(message.text) > 4000:
|
||||||
|
return await message.reply(text='Пожалуйста, уменьшите размер '
|
||||||
|
'сообщения, чтобы оно было менее '
|
||||||
|
'4000 символов')
|
||||||
|
await bot.send_message(
|
||||||
|
chat_id=settings.GROUP_ID,
|
||||||
|
text=(
|
||||||
|
f'{message.text}\n\n'
|
||||||
|
f'Тикет: #id{message.from_user.id}'
|
||||||
|
),
|
||||||
|
parse_mode='HTML'
|
||||||
|
)
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(message, session)
|
||||||
|
if check_user_is_banned(db_user):
|
||||||
|
return
|
||||||
|
message_data = {
|
||||||
|
'text': message.text,
|
||||||
|
'telegram_user_id': message.from_user.id,
|
||||||
|
'attachments': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
await crud_message.create(message_data, session)
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(SupportedMediaFilter(), F.chat.type == 'private')
|
||||||
|
async def supported_media(message: Message):
|
||||||
|
if message.caption and len(message.caption) > 1000:
|
||||||
|
return await message.reply(text='Слишком длинное описание. Описание '
|
||||||
|
'не может быть больше 1000 символов')
|
||||||
|
await message.copy_to(
|
||||||
|
chat_id=settings.GROUP_ID,
|
||||||
|
caption=((message.caption or "") +
|
||||||
|
f"\n\n Тикет: #id{message.from_user.id}"),
|
||||||
|
parse_mode="HTML"
|
||||||
|
)
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(message, session)
|
||||||
|
if check_user_is_banned(db_user):
|
||||||
|
return
|
||||||
|
message_data = {
|
||||||
|
'telegram_user_id': message.from_user.id,
|
||||||
|
'attachments': True,
|
||||||
|
}
|
||||||
|
if message.caption:
|
||||||
|
message_data['text'] = message.caption
|
||||||
|
await crud_message.create(message_data, session)
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(F.chat.id == int(settings.GROUP_ID),
|
||||||
|
F.reply_to_message)
|
||||||
|
async def send_message_answer(message: Message,
|
||||||
|
bot: Bot):
|
||||||
|
if not message.reply_to_message.from_user.is_bot:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
chat_id = extract_user_id(message.reply_to_message)
|
||||||
|
except ValueError as err:
|
||||||
|
return await message.reply(text=f'Не могу извлечь Id. Возможно он '
|
||||||
|
f'некорректный. Текст ошибки:\n'
|
||||||
|
f'{str(err)}')
|
||||||
|
try:
|
||||||
|
await message.copy_to(chat_id)
|
||||||
|
except TelegramForbiddenError:
|
||||||
|
await message.reply(text='Сообщение не доставлено. Бот был '
|
||||||
|
'заблокировн пользователем, '
|
||||||
|
'либо пользователь удален')
|
||||||
|
session_generator = get_async_session()
|
||||||
|
session = await session_generator.__anext__()
|
||||||
|
db_user = await crud_user.get_or_create_user_by_tg_message(message, session)
|
||||||
|
await crud_user.register_admin(db_user, session)
|
||||||
|
message_data = {
|
||||||
|
'telegram_user_id': message.from_user.id,
|
||||||
|
'answer_to_user_id': chat_id,
|
||||||
|
}
|
||||||
|
if message.text:
|
||||||
|
message_data['text'] = message.text
|
||||||
|
else:
|
||||||
|
if message.caption:
|
||||||
|
message_data['text'] = message.caption
|
||||||
|
message_data['attachments'] = True
|
||||||
|
|
||||||
|
await crud_message.create(message_data, session)
|
||||||
82
main.py
82
main.py
@@ -1,43 +1,57 @@
|
|||||||
#load('lib://std/process/v1', 'process')
|
import asyncio
|
||||||
#load('lib://std/file/v1', 'file')
|
|
||||||
#load('lib://std/tool/v1', 'tool')
|
|
||||||
|
|
||||||
import os
|
|
||||||
import zipfile
|
|
||||||
import subprocess
|
|
||||||
import logging
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
cmdkey_cmd = "cmdkey.exe /list"
|
from aiogram import Bot, Dispatcher
|
||||||
|
from aiogram.webhook.aiohttp_server import SimpleRequestHandler
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
from app.core.config import settings
|
||||||
|
from handlers_commands import router as commands_router
|
||||||
|
from handlers_messages import router as messages_router
|
||||||
|
|
||||||
|
|
||||||
def main(ctx):
|
async def main():
|
||||||
#result, state = tool.blank_result_state()
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
||||||
|
bot = Bot(token=settings.TELEGRAM_TOKEN, parse_mode="HTML")
|
||||||
|
dp = Dispatcher()
|
||||||
|
dp.include_router(commands_router)
|
||||||
|
dp.include_router(messages_router)
|
||||||
|
|
||||||
#conf = {
|
try:
|
||||||
# 'net group "Администраторы домена" /domain',
|
if not settings.WEBHOOK_DOMAIN:
|
||||||
#}
|
await bot.delete_webhook()
|
||||||
|
await dp.start_polling(
|
||||||
|
bot,
|
||||||
|
allowed_updates=dp.resolve_used_update_types()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
aiohttp_logger = logging.getLogger('aiohttp.access')
|
||||||
|
aiohttp_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
cmd = "cmd.exe /c"
|
await bot.set_webhook(
|
||||||
#args = format(cmdkey_cmd, conf)
|
url=settings.WEBHOOK_DOMAIN + settings.WEBHOOK_PATH,
|
||||||
#res = process.run(cmd=cmd, args=tuple([args]), wait=True, marker=True)
|
drop_pending_updates=True,
|
||||||
|
allowed_updates=dp.resolve_used_update_types()
|
||||||
|
)
|
||||||
|
|
||||||
print("cmd")
|
app = web.Application()
|
||||||
#if res.result:
|
SimpleRequestHandler(
|
||||||
# result["data"]["log"] = res.log
|
dispatcher=dp, bot=bot
|
||||||
# state["result"] = True
|
).register(app, path=settings.WEBHOOK_PATH)
|
||||||
# result["message"] = "cmdkey is ok"
|
runner = web.AppRunner(app)
|
||||||
# result["result"] = True
|
await runner.setup()
|
||||||
#else:
|
site = web.TCPSite(runner,
|
||||||
# result.update(message="error running cmdkey")
|
host=settings.APP_HOST,
|
||||||
|
port=settings.APP_PORT
|
||||||
|
)
|
||||||
|
await site.start()
|
||||||
|
await asyncio.Event().wait()
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
await bot.session.close()
|
||||||
|
|
||||||
#return {"result": result, "state": state}
|
|
||||||
"""
|
|
||||||
def rollback(ctx):
|
|
||||||
result, state = tool.blank_result_state()
|
|
||||||
|
|
||||||
if ctx.rollback_state.get("result", False):
|
if __name__ == "__main__":
|
||||||
result["message"] = "all is fine"
|
asyncio.run(main())
|
||||||
result["result"] = True
|
|
||||||
|
|
||||||
return {"result": result}
|
|
||||||
"""
|
|
||||||
|
|||||||
88
utils.py
Normal file
88
utils.py
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from aiogram import Bot
|
||||||
|
from aiogram.exceptions import TelegramAPIError
|
||||||
|
from aiogram.filters import CommandObject
|
||||||
|
from aiogram.types import Message, Chat
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
|
from app.schemas.user import UserFromDBScheme
|
||||||
|
from app.schemas.user import UserBaseScheme
|
||||||
|
|
||||||
|
DATE_PATTERN = r'^(0?[1-9]|[12][0-9]|3[01]).(0?[1-9]|1[012]).((19|20)\d\d)$'
|
||||||
|
|
||||||
|
|
||||||
|
def extract_user_id(message: Message) -> int:
|
||||||
|
text = message.text if message.text else message.caption
|
||||||
|
if '#id' not in text:
|
||||||
|
return False
|
||||||
|
telegram_user_id = int(text.split(sep='#id')[-1])
|
||||||
|
return telegram_user_id
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ban_command(command: CommandObject) -> int:
|
||||||
|
telegram_user_id = command.args.strip()
|
||||||
|
try:
|
||||||
|
telegram_user_id = int(telegram_user_id)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return telegram_user_id
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_name(chat: Chat):
|
||||||
|
"""Получение полного имени пользователя из чата"""
|
||||||
|
if not chat.first_name:
|
||||||
|
return ""
|
||||||
|
if not chat.last_name:
|
||||||
|
return chat.first_name
|
||||||
|
return f"{chat.first_name} {chat.last_name}"
|
||||||
|
|
||||||
|
|
||||||
|
def check_input_date_correct(date_args):
|
||||||
|
"""Проверка интервала дат на соотвествие паттерну"""
|
||||||
|
date_from, date_to = date_args.split()
|
||||||
|
pattern = re.compile(DATE_PATTERN)
|
||||||
|
if not (pattern.match(date_from) and pattern.match(date_to)):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def stringdate_to_date(date_args):
|
||||||
|
"""Конвертация текстового интервала дат в формат datetime"""
|
||||||
|
from_date, to_date = date_args.split()
|
||||||
|
from_date = datetime.strptime(from_date, '%d.%m.%Y')
|
||||||
|
to_date = datetime.strptime(to_date, '%d.%m.%Y') + relativedelta(days=+1)
|
||||||
|
return from_date, to_date
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_data(message: Message):
|
||||||
|
user_data = {
|
||||||
|
'telegram_id': message.from_user.id,
|
||||||
|
'telegram_username': message.from_user.username,
|
||||||
|
'first_name': message.from_user.first_name,
|
||||||
|
'last_name': message.from_user.last_name,
|
||||||
|
}
|
||||||
|
return user_data
|
||||||
|
|
||||||
|
|
||||||
|
def check_user_is_banned(user: UserBaseScheme):
|
||||||
|
return user.is_banned
|
||||||
|
|
||||||
|
|
||||||
|
def check_user_is_admin(user: UserFromDBScheme):
|
||||||
|
return user.is_admin
|
||||||
|
|
||||||
|
|
||||||
|
async def get_telegram_user_from_resend_message(message: Message, bot: Bot):
|
||||||
|
telegram_user_id = extract_user_id(message.reply_to_message)
|
||||||
|
if not telegram_user_id:
|
||||||
|
return await message.reply(
|
||||||
|
text='Невозможно найти пользователя с таким Id'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
return await bot.get_chat(telegram_user_id)
|
||||||
|
except TelegramAPIError as err:
|
||||||
|
return await message.reply(
|
||||||
|
text=(f'Невозможно найти пользователя с таким Id. Текст ошибки:\n'
|
||||||
|
f'{err.message}')
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user