Files
Parser_Telegram_Message/main.py
2025-05-23 13:05:20 +08:00

449 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
# Конфигурационные данные
api_id = '1******1' # Замените на ваш API ID
api_hash = 'd******************************a' # Замените на ваш API Hash
phone_number = '+7999*******' # Замените на ваш номер телефона
channel_username_or_id = '-100**********' # Замените на username или ID группы
"""
import asyncio
import csv
import re
import os
import datetime
from telethon import TelegramClient, events
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.types import Channel, ChatForbidden, PeerChannel
from telethon.errors import ChannelPrivateError # Добавлен импорт
import logging
from tqdm import tqdm
import getpass
from dotenv import load_dotenv
load_dotenv()
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("telegram_parser.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация из .env файла
API_ID = os.getenv('API_ID')
API_HASH = os.getenv('API_HASH')
PHONE_NUMBER = os.getenv('PHONE_NUMBER')
# CHANNEL_IDENTIFIER теперь используется как значение по умолчанию при выборе диалога
CONFIG_CHANNEL_IDENTIFIER = os.getenv('CONFIG_CHANNEL_IDENTIFIER')
CSV_FILENAME = os.getenv('CSV_FILENAME', 'telegram_data.csv')
# Дополнительные настройки
LIMIT_MESSAGES_STR = os.getenv('LIMIT_MESSAGES')
LIMIT_MESSAGES = int(LIMIT_MESSAGES_STR) if LIMIT_MESSAGES_STR and LIMIT_MESSAGES_STR.strip() else None
DAYS_LIMIT_STR = os.getenv('DAYS_LIMIT') # None для всех сообщений, или число для ограничения
DAYS_LIMIT = int(DAYS_LIMIT_STR) if DAYS_LIMIT_STR and DAYS_LIMIT_STR.strip() else None # None для всех сообщений, или число дней назад для ограничения
async def parse_message(text: str) -> dict | None:
""" Парсит текстовое сообщение с использованием регулярных выражений.
Args:
text: Текст сообщения.
Returns:
Словарь с извлеченными данными, или None, если не удалось найти необходимые поля. """
raw_text_preview = text[:250] + ('...' if len(text) > 250 else '') # Немного увеличил превью
result = {'Raw Message': raw_text_preview}
# Определяем тип сообщения
is_order = "Order #" in text and "Purchaser information:" in text
is_request_details = "Request details:" in text
# --- Общая информация (обычно в конце) ---
additional_info_text = ""
# Ищем блок "Additional information:"
additional_info_match = re.search(r"Additional information:\s*\n(.*?)(?:-----|$)", text, re.DOTALL | re.IGNORECASE)
if additional_info_match:
additional_info_text = additional_info_match.group(1)
else:
# Если нет явного блока "Additional information:", но есть URL в конце,
# попробуем взять текст после последнего "-----\n" если он есть, или весь текст для поиска URL
parts = text.split("-----")
if len(parts) > 1:
potential_additional_info = parts[-1]
if "Transaction ID:" in potential_additional_info or "Block ID:" in potential_additional_info or "Form Name:" in potential_additional_info:
additional_info_text = potential_additional_info
if additional_info_text:
transaction_id_match = re.search(r"Transaction ID:([^\n]+)", additional_info_text, re.IGNORECASE)
if transaction_id_match: result['Transaction ID'] = transaction_id_match.group(1).strip()
block_id_match = re.search(r"Block ID:([^\n]+)", additional_info_text, re.IGNORECASE)
if block_id_match: result['Block ID'] = block_id_match.group(1).strip()
form_name_match = re.search(r"Form Name:([^\n]+)", additional_info_text, re.IGNORECASE)
if form_name_match: result['Form Name'] = form_name_match.group(1).strip()
# URL часто идет после Form Name или как отдельная строка в этом блоке
url_match_after_form_name = re.search(r"Form Name:[^\n]+\n\s*(`?(https?://\S+)`?)", additional_info_text, re.IGNORECASE)
if url_match_after_form_name:
result['URL'] = url_match_after_form_name.group(1).strip('`')
else:
# Ищем URL в любой строке блока Additional Information
url_fallback_match = re.search(r"^`?(https?://\S+)`?$", additional_info_text, re.MULTILINE | re.IGNORECASE)
if url_fallback_match:
result['URL'] = url_fallback_match.group(1).strip('`')
if is_order:
result['Type'] = 'Order'
order_id_match = re.search(r"Order #(\S+)", text, re.IGNORECASE)
if order_id_match: result['Order ID'] = order_id_match.group(1).strip()
payment_amount_match = re.search(r"Payment Amount:([^\n]+)", text, re.IGNORECASE)
if payment_amount_match: result['Payment Amount'] = payment_amount_match.group(1).strip()
# Информация о покупателе
purchaser_info_match = re.search(r"Purchaser information:\s*\n(.*?)(?:Additional information:|$)", text, re.DOTALL | re.IGNORECASE)
if purchaser_info_match:
purchaser_text = purchaser_info_match.group(1)
name_match = re.search(r"(?:Name|Имя|ФИО|фио)[\s:]*([^\n]+)", purchaser_text, re.IGNORECASE)
if name_match: result['Name'] = name_match.group(1).strip()
phone_match = re.search(r"(?:Phone|телефон|тел|номер)[\s:]*(\+?[0-9\s()\-]{10,})", purchaser_text, re.IGNORECASE)
if phone_match: result['Phone'] = phone_match.group(1).strip()
email_match = re.search(r"(?:Email|почта|e-mail)[\s:]*([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", purchaser_text, re.IGNORECASE)
if email_match: result['Email'] = email_match.group(1).strip()
# Дата рождения или просто дата
date_match = re.search(r"(?:Дата_Рождения|Date)[\s:]*(\d{1,2}[-./]\d{1,2}[-./]\d{2,4})", purchaser_text, re.IGNORECASE)
if date_match: result['Date'] = date_match.group(1).strip()
elif is_request_details: # Явно указано "Request details:"
result['Type'] = 'Request'
# Извлекаем текст из секции "Request details:"
request_details_section_text = text
match_rd_section = re.search(r"Request details:\s*\n(.*?)(?:Additional information:|$)", text, re.DOTALL | re.IGNORECASE)
if match_rd_section:
request_details_section_text = match_rd_section.group(1)
# Паттерны для "Request details"
request_patterns = {
'Name': r'(?:Name|имя|ФИО|фио|namestep)[\s:]*([^\n]+)',
'Phone': r'(?:Phone|телефон|тел|номер|phonenamber|phonestep)[\s:]*(\+?[0-9\s()\-]{10,})',
'Email': r'(?:Email|почта|e-mail)[\s:]*([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)',
'Date': r'(?:Date|дата)[\s:]*(\d{1,2}[-./]\d{1,2}[-./]\d{2,4})',
'Checkbox': r'(?:Checkbox|чекбокс|галочка)[\s:]*([^\n]+)',
'Selectbox': r'Selectbox[\s:]*([^\n]+)',
'Request Type': r'^type[\s:]*([^\n]+)', # Добавил ^ для точности, если 'type' в начале строки
'Persons': r'^Person[\s:]*([^\n]+)',
'Helper': r'^helper[\s:]*([^\n]+)',
'Connection': r'^connection[\s:]*([^\n]+)',
'Comment': r'(?:comment|комментарий)[\s:]*([^\n]+)',
'Address': r'(?:address|адрес)[\s:]*([^\n]+)',
'Product': r'(?:product|товар|продукт)[\s:]*([^\n]+)',
'Price': r'(?:price|цена|стоимость)[\s:]*([^\n]+)'
}
for key, pattern_regex in request_patterns.items():
match = re.search(pattern_regex, request_details_section_text, re.IGNORECASE | re.MULTILINE)
if match:
result[key] = match.group(1).strip()
else:
# Если нет явных признаков "Order" или "Request details",
# но есть поля, похожие на контактные данные в начале.
# Это может быть случай, когда "Request details:" отсутствует.
# Попробуем общие паттерны для имени и телефона.
# logger.debug(f"Сообщение без явных 'Order #' или 'Request details:'. Попытка общего парсинга: {raw_text_preview}")
# Паттерны для "Request details"
fallback_patterns = {
'Name': r'(?:Name|имя|ФИО|фио|namestep)[\s:]*([^\n]+)',
'Phone': r'(?:Phone|телефон|тел|номер|phonenamber|phonestep)[\s:]*(\+?[0-9\s()\-]{10,})',
'Email': r'(?:Email|почта|e-mail)[\s:]*([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)',
'Date': r'(?:Date|дата)[\s:]*(\d{1,2}[-./]\d{1,2}[-./]\d{2,4})',
}
found_in_fallback = False
for key, pattern_regex in fallback_patterns.items():
match = re.search(pattern_regex, text, re.IGNORECASE | re.MULTILINE)
if match:
# Проверяем, чтобы значение не было частью "Additional Information" если оно уже разобрано
if key not in result: # Чтобы не перезаписывать уже найденное из Additional Info
result[key] = match.group(1).strip()
found_in_fallback = True
if found_in_fallback:
result['Type'] = 'Request (Fallback)'
# Если URL не был найден в "Additional Information", попробуем найти его в основном тексте
if 'URL' not in result:
url_global_fallback_match = re.search(r"`?(https?://\S+)`?", text)
if url_global_fallback_match:
result['URL'] = url_global_fallback_match.group(1).strip('`')
# Проверяем, есть ли хотя бы имя или телефон, или это заказ с ID
if result.get('Name') or result.get('Phone') or (is_order and result.get('Order ID')):
if 'Checkbox' in result and isinstance(result['Checkbox'], str):
result['Checkbox'] = result['Checkbox'].lower()
return result
return None
async def choose_dialog(client, configured_identifier):
"""
Позволяет пользователю выбрать целевой чат/канал.
"""
logger.info("Выберите целевой чат/канал:")
print("\nКак выбрать целевой чат/канал?")
print(f"1. Использовать ID из конфигурации: {configured_identifier}")
print("2. Ввести ID, username (@имя) или полную ссылку на чат/канал вручную.")
print("3. Выбрать из списка недавних диалогов (до 20).")
choice = input("Ваш выбор (1-3): ")
identifier_to_use = None
if choice == '1':
identifier_to_use = configured_identifier
if not identifier_to_use:
logger.warning("ID в конфигурации не указан.")
# Можно предложить ввести вручную или выбрать из списка
choice = '2' # Переходим к ручному вводу
if choice == '2': # Также используется, если выбор '1' не удался
identifier_to_use = input("Введите ID, username (@имя) или ссылку: ").strip()
elif choice == '3':
try:
dialogs = await client.get_dialogs(limit=20)
if not dialogs:
logger.warning("Не найдено недавних диалогов.")
return None
print("\nПоследние 20 диалогов:")
print("-" * 70)
print(f"{'':<3} | {'Тип':<15} | {'Название':<30} | ID")
print("-" * 70)
for i, dialog in enumerate(dialogs):
entity_type = "Канал" if dialog.is_channel else "Группа" if dialog.is_group else "Пользователь"
name = dialog.name if hasattr(dialog, 'name') and dialog.name else "N/A"
name_short = name[:27] + "..." if len(name) > 30 else name
print(f"{i+1:<3} | {entity_type:<15} | {name_short:<30} | {dialog.id}")
print("-" * 70)
dialog_choice_str = input(f"Выберите номер диалога (1-{len(dialogs)}) или 0 для отмены: ")
dialog_choice_num = int(dialog_choice_str)
if 1 <= dialog_choice_num <= len(dialogs):
identifier_to_use = dialogs[dialog_choice_num-1].id # Используем ID напрямую
elif dialog_choice_num == 0:
logger.info("Выбор отменен.")
return None
else:
logger.warning("Неверный номер диалога.")
return None
except ValueError:
logger.warning("Неверный ввод для номера диалога.")
return None
except Exception as e:
logger.error(f"Ошибка при получении списка диалогов: {e}")
return None
if not identifier_to_use: # Если ни один из вариантов не дал идентификатор
logger.error("Идентификатор чата/канала не был указан или выбран.")
return None
try:
logger.info(f"Попытка получить информацию о сущности: {identifier_to_use}")
# Telethon get_entity очень гибкий: принимает int (ID), str (username, link, phone)
# Если это строка, которая выглядит как число (особенно отрицательное для channel_id), преобразуем в int
if isinstance(identifier_to_use, str):
if identifier_to_use.startswith('-') and identifier_to_use[1:].isdigit():
entity = await client.get_entity(int(identifier_to_use))
elif identifier_to_use.isdigit(): # Для обычных user ID или chat ID
entity = await client.get_entity(int(identifier_to_use))
else: # Для username или https://t.me/joinchat/... или https://t.me/channel_name
entity = await client.get_entity(identifier_to_use)
else: # Если это уже int (например, из dialog.id)
entity = await client.get_entity(identifier_to_use)
return entity
except ValueError as e:
logger.error(f"Не удалось найти сущность по '{identifier_to_use}'. Убедитесь, что ID/имя пользователя/ссылка верны и у вас есть доступ. Ошибка: {e}")
return None
except TypeError as e: # Например, если передан None в get_entity
logger.error(f"Некорректный тип идентификатора для get_entity: {identifier_to_use}. Ошибка: {e}")
return None
except Exception as e:
logger.error(f"Непредвиденная ошибка при получении сущности '{identifier_to_use}': {e}")
return None
async def main():
"""
Основная функция для работы с Telegram и парсинга данных.
"""
logger.info("Запуск парсера Telegram")
os.makedirs('sessions', exist_ok=True)
client = TelegramClient('sessions/telegram_session', API_ID, API_HASH)
try:
await client.connect()
logger.info("Подключение к Telegram API установлено")
if not await client.is_user_authorized():
logger.info("Требуется авторизация")
await client.send_code_request(PHONE_NUMBER)
code = input('Введите код из Telegram: ')
try:
await client.sign_in(PHONE_NUMBER, code)
logger.info("Авторизация с кодом успешна")
except Exception as e: # Изначально ловим общее исключение
if "SESSION_PASSWORD_NEEDED" in str(e) or "Two-steps verification is enabled" in str(e): # Более точная проверка
logger.info("Требуется двухфакторная аутентификация (пароль)")
password = getpass.getpass('Введите пароль двухфакторной аутентификации: ')
try:
await client.sign_in(password=password)
logger.info("Авторизация с паролем успешна")
except Exception as e_pw:
logger.error(f"Ошибка авторизации с паролем: {e_pw}")
return
else:
logger.error(f"Ошибка авторизации с кодом: {e}")
return
entity = await choose_dialog(client, CONFIG_CHANNEL_IDENTIFIER)
if not entity:
logger.error("Не удалось определить целевой чат/канал. Завершение работы.")
return
try:
# Проверка типа сущности и членства
if isinstance(entity, Channel):
logger.info(f"Тип: Канал/Супергруппа")
logger.info(f"Название: {entity.title}")
logger.info(f"ID: {entity.id}")
# logger.info(f"Доступ: {'публичный' if entity.megagroup else 'приватный'}") # megagroup не всегда показатель публичности
# Проверка членства в группе/канале
# Для каналов/супергрупп, `entity.left` может быть True, если вы не участник.
# Для приватных каналов, к которым вы не присоединились, get_entity может не вернуть полную информацию или выдать ошибку ранее.
# Попытка присоединиться, если необходимо и возможно
try:
# Попытка получить полную информацию, чтобы проверить 'left' атрибут
full_entity = await client.get_input_entity(entity) # или get_dialogs с entity
# Проверка на участие может быть сложной, так как 'left' не всегда есть или актуален
# Простой способ - попытаться прочитать сообщения. Если не получится - значит нет доступа.
logger.info("Проверка доступа к сообщениям...")
async for _ in client.iter_messages(entity, limit=1):
break
logger.info("Доступ к сообщениям есть.")
except (ChatForbidden, ChannelPrivateError) as e: # Telethon specific errors
logger.warning(f"Нет доступа к {entity.title} или он приватный и вы не участник: {e}")
logger.info(f"Попытка присоединиться к {entity.title}...")
try:
await client(JoinChannelRequest(entity))
logger.info(f"Успешно присоединились к {entity.title}.")
except Exception as e_join:
logger.error(f"Ошибка при попытке присоединиться: {e_join}")
return
except Exception as e_check:
logger.warning(f"Не удалось точно определить статус участия, но сущность получена. Ошибка: {e_check}")
elif isinstance(entity, ChatForbidden): # Это обычно означает, что get_entity не смогла получить доступ
logger.error(f"ОШИБКА: Нет доступа к сущности '{entity.title if hasattr(entity, 'title') else CONFIG_CHANNEL_IDENTIFIER}' (запрещено или не существует)")
return
else: # User, Chat (обычная группа)
logger.info(f"Тип: {type(entity).__name__}, Название: {getattr(entity, 'title', getattr(entity, 'username', entity.id))}, ID: {entity.id}")
date_filter = None
if DAYS_LIMIT:
date_filter = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=DAYS_LIMIT)
logger.info(f"Будут обработаны сообщения не старше {DAYS_LIMIT} дней (с {date_filter.strftime('%d.%m.%Y %H:%M:%S %Z')})")
# Настройка для прогресс-бара
messages_iterator = client.iter_messages(entity, limit=LIMIT_MESSAGES, reverse=True)
# Если LIMIT_MESSAGES is None, tqdm total будет None, что нормально.
# Если LIMIT_MESSAGES задан, используем его.
progress_bar_total = LIMIT_MESSAGES
if LIMIT_MESSAGES is None:
logger.info("Обработка всех сообщений. Общее количество для прогресс-бара будет неизвестно.")
else:
logger.info(f"Будет обработано до {LIMIT_MESSAGES} сообщений.")
parsed_data = []
total_messages_processed = 0
skipped_messages_by_date = 0
with tqdm(total=progress_bar_total, desc="Обработка сообщений", unit="сообщ.") as progress_bar:
async for message in messages_iterator:
progress_bar.update(1)
if date_filter and message.date < date_filter:
skipped_messages_by_date += 1
continue
if message.text:
parsed = await parse_message(message.text)
if parsed:
parsed['Message Date'] = message.date.strftime('%d.%m.%Y %H:%M:%S')
parsed_data.append(parsed)
total_messages_processed += 1
logger.info(f"\nСтатистика:")
logger.info(f"Всего обработано сообщений в итераторе: {total_messages_processed}")
if DAYS_LIMIT:
logger.info(f"Пропущено сообщений (старше {DAYS_LIMIT} дней): {skipped_messages_by_date}")
logger.info(f"Найдено подходящих записей для CSV: {len(parsed_data)}")
if parsed_data:
all_fields = set()
for entry in parsed_data:
all_fields.update(entry.keys())
fieldnames = sorted(list(all_fields))
# Перемещаем основные поля в начало
preferred_order = [
'Message Date', 'Type', 'Name', 'Phone', 'Email', 'Date',
'Form Name', 'Order ID', 'Payment Amount', 'Product', 'Price',
'Selectbox', 'Checkbox', 'Request Type', 'Persons', 'Helper', 'Connection',
'Transaction ID', 'Block ID', 'Address', 'Comment', 'URL', 'Raw Message'
]
final_fieldnames = [f for f in preferred_order if f in fieldnames] + \
[f for f in fieldnames if f not in preferred_order]
with open(CSV_FILENAME, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=final_fieldnames)
writer.writeheader()
writer.writerows(parsed_data)
logger.info(f"Файл сохранен: {os.path.abspath(CSV_FILENAME)}")
else:
logger.warning("Нет данных для сохранения. Проверьте сообщения в чате или настройки парсера.")
except Exception as e:
logger.error(f"Произошла ошибка при обработке сообщений: {e}", exc_info=True)
except Exception as e:
logger.error(f"Произошла ошибка при подключении или начальной настройке: {e}", exc_info=True)
finally:
if client.is_connected():
await client.disconnect()
logger.info("Работа парсера завершена")
if __name__ == '__main__':
# Для Windows может потребоваться это для asyncio с tqdm в некоторых случаях
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Работа программы прервана пользователем")
except Exception as e:
logger.error(f"Критическая ошибка в __main__: {e}", exc_info=True)