import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import threading import queue import paramiko import time import os import logging class NetworkConfiguratorApp: def __init__(self, root): self.root = root self.root.title("Network Device Configurator") self.root.geometry("1000x800") self.root.resizable(True, True) # Настройка логгирования logging.basicConfig( filename='network_configurator.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Очереди для межпоточного взаимодействия self.log_queue = queue.Queue() self.error_queue = queue.Queue() # Списки для хранения данных self.error_ips = [] # Создаем интерфейс self.create_widgets() # Запускаем обработчик очередей self.root.after(100, self.process_queues) def create_widgets(self): # Основной фрейм main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Панель с вкладками notebook = ttk.Notebook(main_frame) notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Вкладка 1: Учетные данные cred_frame = ttk.Frame(notebook, padding=10) notebook.add(cred_frame, text="Учетные данные") self.create_credentials_tab(cred_frame) # Вкладка 2: IP-адреса ip_frame = ttk.Frame(notebook, padding=10) notebook.add(ip_frame, text="IP-адреса") self.create_ip_tab(ip_frame) # Вкладка 3: Команды cmd_frame = ttk.Frame(notebook, padding=10) notebook.add(cmd_frame, text="Команды") self.create_commands_tab(cmd_frame) # Вкладка 4: Выполнение exec_frame = ttk.Frame(notebook, padding=10) notebook.add(exec_frame, text="Выполнение") self.create_execution_tab(exec_frame) # Вкладка 5: Ошибки error_frame = ttk.Frame(notebook, padding=10) notebook.add(error_frame, text="Ошибки") self.create_errors_tab(error_frame) # Панель управления control_frame = ttk.Frame(main_frame) control_frame.pack(fill=tk.X, padx=5, pady=5) self.start_btn = ttk.Button(control_frame, text="Старт", command=self.start_execution) self.start_btn.pack(side=tk.LEFT, padx=5) self.stop_btn = ttk.Button(control_frame, text="Остановить", command=self.stop_execution, state=tk.DISABLED) self.stop_btn.pack(side=tk.LEFT, padx=5) def create_credentials_tab(self, parent): frame = ttk.LabelFrame(parent, text="Учетные данные для подключения") frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Логин ttk.Label(frame, text="Логин:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) self.username = tk.StringVar() ttk.Entry(frame, textvariable=self.username, width=30).grid(row=0, column=1, padx=5, pady=5, sticky=tk.W) # Пароль ttk.Label(frame, text="Пароль:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) self.password = tk.StringVar() ttk.Entry(frame, textvariable=self.password, show="*", width=30).grid(row=1, column=1, padx=5, pady=5, sticky=tk.W) # Кнопка тестового подключения ttk.Button( frame, text="Тестовое подключение", command=self.test_connection ).grid(row=2, column=0, columnspan=2, padx=5, pady=10) def test_connection(self): """Тестовое подключение к устройству""" username = self.username.get().strip() password = self.password.get().strip() if not username or not password: messagebox.showerror("Ошибка", "Введите логин и пароль") return # Получаем первый IP из списка ips = self.ip_text.get("1.0", tk.END).strip().split("\n") ips = [ip.strip() for ip in ips if ip.strip()] if not ips: messagebox.showerror("Ошибка", "Введите хотя бы один IP-адрес") return test_ip = ips[0] try: self.log(f"\nПопытка тестового подключения к {test_ip}...") # Создание SSH-клиента client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( test_ip, username=username, password=password, timeout=10, banner_timeout=20 ) # Проверка соединения _, stdout, _ = client.exec_command("show version", timeout=10) output = stdout.read().decode('utf-8', 'ignore') self.log(f"Успешное подключение к {test_ip}") self.log(f"Версия ПО:\n{output[:500]}...") # Выводим первые 500 символов client.close() messagebox.showinfo("Успех", f"Успешное подключение к {test_ip}") except Exception as e: self.log(f"Ошибка тестового подключения: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось подключиться: {str(e)}") def create_ip_tab(self, parent): frame = ttk.LabelFrame(parent, text="Список IP-адресов (один на строку)") frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.ip_text = scrolledtext.ScrolledText(frame, wrap=tk.WORD, height=10) self.ip_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Пример IP-адресов self.ip_text.insert(tk.END, "192.168.1.1\n192.168.1.2\n192.168.1.3") def create_commands_tab(self, parent): frame = ttk.LabelFrame(parent, text="Команды для выполнения (одна на строку)") frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.cmd_text = scrolledtext.ScrolledText(frame, wrap=tk.WORD, height=15) self.cmd_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Пример команд example_commands = [ "configure terminal", "radius server RS", "address ipv4 10.151.3.66 auth-port 1812 acct-port 1813", "end", "wr" ] self.cmd_text.insert(tk.END, "\n".join(example_commands)) def create_execution_tab(self, parent): frame = ttk.LabelFrame(parent, text="Ход выполнения") frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Настройки выполнения settings_frame = ttk.Frame(frame) settings_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Label(settings_frame, text="Макс. страниц:").pack(side=tk.LEFT, padx=5) self.max_pages = tk.IntVar(value=50) ttk.Entry(settings_frame, textvariable=self.max_pages, width=5).pack(side=tk.LEFT, padx=5) ttk.Label(settings_frame, text="Таймаут (сек):").pack(side=tk.LEFT, padx=5) self.command_timeout = tk.IntVar(value=30) ttk.Entry(settings_frame, textvariable=self.command_timeout, width=5).pack(side=tk.LEFT, padx=5) # Логи выполнения self.log_text = scrolledtext.ScrolledText( frame, wrap=tk.WORD, height=20, state=tk.DISABLED ) self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def create_errors_tab(self, parent): frame = ttk.LabelFrame(parent, text="Ошибки подключения") frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Текст с ошибками self.error_text = scrolledtext.ScrolledText( frame, wrap=tk.WORD, height=10, state=tk.DISABLED ) self.error_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Кнопка сохранения ошибок btn_frame = ttk.Frame(frame) btn_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Button( btn_frame, text="Сохранить ошибки в файл", command=self.save_errors ).pack(side=tk.RIGHT, padx=5) def log(self, message): """Добавление сообщения в очередь логов""" self.log_queue.put(message) def log_error(self, ip, message): """Добавление ошибки в очередь ошибок""" self.error_queue.put((ip, message)) def process_queues(self): """Обработка сообщений из очередей""" # Обработка логов while not self.log_queue.empty(): message = self.log_queue.get_nowait() self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, message + "\n") self.log_text.config(state=tk.DISABLED) self.log_text.see(tk.END) # Обработка ошибок while not self.error_queue.empty(): ip, error = self.error_queue.get_nowait() self.error_ips.append((ip, error)) self.error_text.config(state=tk.NORMAL) self.error_text.insert(tk.END, f"{ip}: {error}\n") self.error_text.config(state=tk.DISABLED) self.error_text.see(tk.END) # Продолжаем обработку self.root.after(100, self.process_queues) def start_execution(self): """Запуск выполнения команд""" # Получаем учетные данные username = self.username.get().strip() password = self.password.get().strip() if not username or not password: messagebox.showerror("Ошибка", "Введите логин и пароль") return # Получаем IP-адреса ips = self.ip_text.get("1.0", tk.END).strip().split("\n") ips = [ip.strip() for ip in ips if ip.strip()] if not ips: messagebox.showerror("Ошибка", "Введите хотя бы один IP-адрес") return # Получаем команды commands = self.cmd_text.get("1.0", tk.END).strip().split("\n") commands = [cmd.strip() for cmd in commands if cmd.strip()] if not commands: messagebox.showerror("Ошибка", "Введите команды для выполнения") return # Очищаем логи и ошибки self.log_text.config(state=tk.NORMAL) self.log_text.delete("1.0", tk.END) self.log_text.config(state=tk.DISABLED) self.error_text.config(state=tk.NORMAL) self.error_text.delete("1.0", tk.END) self.error_text.config(state=tk.DISABLED) self.error_ips = [] # Меняем состояние кнопок self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.NORMAL) # Запускаем выполнение в отдельном потоке self.stop_event = threading.Event() threading.Thread( target=self.execute_commands, args=(ips, username, password, commands), daemon=True ).start() def stop_execution(self): """Остановка выполнения команд""" if hasattr(self, 'stop_event'): self.stop_event.set() self.log("Выполнение остановлено пользователем") self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) def execute_commands(self, ips, username, password, commands): """Выполнение команд на устройствах с обработкой --More--""" self.log(f"Начато выполнение для {len(ips)} устройств") # Различные варианты промпта "More" more_prompts = [ "--More--", " -- More -- ", "-More-", "Press any key to continue", "続けるには何かキーを押してください . . ." ] # Стандартные промпты для определения конца вывода end_prompts = [">", "#", "$", ":"] for ip in ips: if self.stop_event.is_set(): self.log("Выполнение прервано") break try: self.log(f"\nПодключение к {ip}...") # Создание SSH-клиента client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( ip, username=username, password=password, timeout=10, banner_timeout=20 ) # Открытие интерактивного сеанса shell = client.invoke_shell() time.sleep(1) # Даем время на установление сессии # Получаем начальный вывод (приглашение) initial_output = "" start_time = time.time() while time.time() - start_time < 5: # Таймаут 5 секунд if shell.recv_ready(): initial_output += shell.recv(4096).decode('utf-8', 'ignore') else: time.sleep(0.1) # Выполнение команд for command in commands: if self.stop_event.is_set(): break # Отправка команды shell.send(command + "\n") time.sleep(1) # Сбор вывода с обработкой --More-- output = "" max_pages = self.max_pages.get() page_count = 0 end_detected = False start_time = time.time() while not end_detected and time.time() - start_time < 30: # Таймаут 30 секунд на команду if self.stop_event.is_set(): break # Проверяем, есть ли данные для чтения if shell.recv_ready(): chunk = shell.recv(4096).decode('utf-8', 'ignore') output += chunk # Проверяем наличие промпта More if any(prompt in chunk for prompt in more_prompts): # Симулируем нажатие пробела shell.send(" ") time.sleep(0.5) page_count += 1 self.log(f"[{ip}] Обнаружен промпт More, отправка пробела ({page_count}/{max_pages})") # Прерываем если слишком много страниц if page_count > max_pages: self.log(f"[{ip}] Превышено максимальное количество страниц для команды: {command}") break else: # Проверяем, закончился ли вывод (по приглашению) if any(prompt in output for prompt in end_prompts): end_detected = True else: # Если данных нет, делаем небольшую паузу time.sleep(0.5) # Проверяем завершение по таймауту if time.time() - start_time > 5: # 5 секунд без данных break # Логируем команду и вывод self.log(f"[{ip}] >>> {command}") self.log(f"[{ip}] <<< {output}") # Проверка на ошибки error_checks = [ ("Password:", "Запрос пароля (возможно, неверный уровень доступа)"), ("Authentication failed", "Сбой аутентификации на устройстве"), ("Invalid input", f"Неверная команда: '{command}'"), ("Connection timed out", "Таймаут соединения"), ("Error", f"Ошибка выполнения команды: {output.strip()}"), ("Failed", f"Ошибка выполнения команды: {output.strip()}") ] for pattern, message in error_checks: if pattern in output: self.log_error(ip, message) break self.log(f"Выполнение команд на {ip} завершено") except paramiko.AuthenticationException: self.log_error(ip, "Ошибка аутентификации: неверные учетные данные") except paramiko.SSHException as e: self.log_error(ip, f"Ошибка SSH: {str(e)}") except Exception as e: self.log_error(ip, f"Ошибка подключения: {str(e)}") finally: try: client.close() except: pass self.log("\nВыполнение завершено") self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) def save_errors(self): """Сохранение ошибок в файл""" if not self.error_ips: messagebox.showinfo("Информация", "Нет ошибок для сохранения") return try: with open("network_errors.txt", "w", encoding="utf-8") as f: f.write("Список устройств с ошибками подключения:\n\n") for ip, error in self.error_ips: f.write(f"{ip}: {error}\n") self.log("Ошибки сохранены в файл: network_errors.txt") messagebox.showinfo("Успех", "Ошибки сохранены в файл network_errors.txt") except Exception as e: self.log(f"Ошибка при сохранении файла: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось сохранить файл: {str(e)}") if __name__ == "__main__": root = tk.Tk() app = NetworkConfiguratorApp(root) root.mainloop()