This commit is contained in:
2025-06-20 10:35:04 +09:00
parent 3c9b30be1e
commit f53fea652c
5 changed files with 989 additions and 5 deletions

27
package-lock.json generated
View File

@@ -6392,16 +6392,17 @@
}
},
"node_modules/prettier": {
"version": "2.8.7",
"resolved": "https://registry.npmjs.org/prettier/-/prettier-2.8.7.tgz",
"integrity": "sha512-yPngTo3aXUUmyuTjeTUT75txrf+aMh9FiD7q9ZE/i6r0bPb22g4FsE6Y338PQX1bmfy08i9QQCB7/rcUAVntfw==",
"version": "3.5.3",
"resolved": "https://registry.npmjs.org/prettier/-/prettier-3.5.3.tgz",
"integrity": "sha512-QQtaxnoDJeAkDvDKWCLiwIXkTgRhwYDEQCghU9Z6q03iyek/rxRh/2lC3HB7P8sWT2xC/y5JDctPLBIGzHKbhw==",
"license": "MIT",
"optional": true,
"peer": true,
"bin": {
"prettier": "bin-prettier.js"
"prettier": "bin/prettier.cjs"
},
"engines": {
"node": ">=10.13.0"
"node": ">=14"
},
"funding": {
"url": "https://github.com/prettier/prettier?sponsor=1"
@@ -8736,6 +8737,22 @@
"prettier": "2.8.7"
}
},
"node_modules/yaml-language-server/node_modules/prettier": {
"version": "2.8.7",
"resolved": "https://registry.npmjs.org/prettier/-/prettier-2.8.7.tgz",
"integrity": "sha512-yPngTo3aXUUmyuTjeTUT75txrf+aMh9FiD7q9ZE/i6r0bPb22g4FsE6Y338PQX1bmfy08i9QQCB7/rcUAVntfw==",
"license": "MIT",
"optional": true,
"bin": {
"prettier": "bin-prettier.js"
},
"engines": {
"node": ">=10.13.0"
},
"funding": {
"url": "https://github.com/prettier/prettier?sponsor=1"
}
},
"node_modules/yaml-language-server/node_modules/request-light": {
"version": "0.5.8",
"resolved": "https://registry.npmjs.org/request-light/-/request-light-0.5.8.tgz",

View File

@@ -0,0 +1,127 @@
---
title: 'MikroTik: отправка логов в Telegram без внешних серверов'
summary: 'Пошаговая инструкция по настройке автоматической отправки критических логов MikroTik в Telegram через встроенные скрипты RouterOS. Фильтрация событий, предотвращение дублей, советы по безопасности и расширению.'
date: '2025-11-18'
draft: false
tags:
- mikrotik
- telegram
- routeros
- мониторинг
- безопасность
- автоматизация
---
# 🛠️ Инструкция: Настройка отправки логов MikroTik в Telegram
Привет! сегодня научу тебя настраивать мгновенные оповещения о критических событиях с MikroTik прямо в Telegram. Всё делается через родные скрипты RouterOS — никаких внешних серверов!
## 🔍 Что мы сделаем
- Настроим фильтрацию логов по важным событиям
- Отправляем уведомления в Telegram через бота
- Гарантируем уникальность сообщений (без спама)
## ⚙️ Предварительные требования
1. Устройство MikroTik с RouterOS v6+
2. Доступ к интернету (разрешен `api.telegram.org`)
3. Telegram-бот (создай через [@BotFather](https://t.me/BotFather))
4. ID чата (узнай через [@userinfobot](https://t.me/userinfobot))
## 🚀 Пошаговая настройка
### Шаг 1: Создаем скрипт
Зайди в Winbox → `System``Scripts` → Добавь новый скрипт:
```routeros
# Название: telegram-log-sender
:local chat_id "ВАШ_CHAT_ID" # Замени на реальный ID
:local bot_token "ВАШ_ТОКЕН" # Токен бота от @BotFather
:local last_log_time [/log get [find] time]
:local prefixes ("dhcp", "interface", "error") # Твои ключевые префиксы
:local logs ""
:local last_unique_log ""
:foreach prefix in=$prefixes do={
:local prefix_logs [/log find where time>($last_log_time + 1h) and message~($prefix)]
:foreach log in=$prefix_logs do={
:local log_entry [/log get $log]
:local log_message ($log_entry->"message")
# Фильтр дубликатов
:if ($log_message != $last_unique_log) do={
:set logs ($logs . "⚠️ " . $log_message . "\n\n")
:set last_unique_log $log_message
}
}
}
# Отправка в Telegram
:if ($logs != "") do={
:local url ("https://api.telegram.org/bot" . $bot_token . "/sendMessage")
:local data ("chat_id=" . $chat_id . "&text=" . $logs)
/tool fetch url=$url mode=http http-method=post http-data=$data
}
# Обновляем метку времени
:set last_log_time [/log get [find] time]
```
### Шаг 2: Настраиваем планировщик
В Winbox → `System``Scheduler` → Добавь новое задание:
```routeros
Name: telegram-logs
Interval: 5m
On Event: /system script run telegram-log-sender
```
### Шаг 3: Проверяем работу
1. Сымитируй событие в логах:
```bash
/log info message="dhcp: Test lease added"
```
2. Жди до 5 минут
3. Проверь Telegram-чат:
![Пример уведомления](https://i.imgur.com/3JxWqcl.png)
## 🚨 Возможные проблемы и решения
| Симптом | Проверка | Исправление |
|---------|----------|-------------|
| Сообщения не приходят | `:put [:resolve "api.telegram.org"]` | Разреши DNS и доступ |
| "failure" в логах | `/tool fetch url="https://api.telegram.org"` | Проверь токен и chat_id |
| Дубли сообщений | Проверь `last_unique_log` логику | Добавь больше префиксов в фильтр |
| Обрезанные сообщения | `:put [:len $logs]` | Разбей сообщение на части по 4000 символов |
## 💡 Продвинутые настройки
### Добавляем форматирование
Измени блок отправки для Markdown:
```routeros
:local data ("chat_id=" . $chat_id . "&parse_mode=Markdown&text=*Router Alert!*%0A" . $logs)
```
### Фильтр по критичности
Добавь приоритеты в скрипт:
```routeros
:local levels ("error", "warning")
...
and message~($prefix) and topics~($level)
```
### Защита токена (только ROS v7+)
```routeros
:local bot_token [/certificate get telegram_token value-name=key]
# Где telegram_token - имя зашифрованного сертификата
```
## 🌟 Советы
1. **Тестовый режим**: сначала добавь префикс `test` и проверь работу
2. **Ограничение частоты**: при >50 событиях/мин добавь задержку
```routeros
:delay 3s
```
3. **Визуализация**: используй emoji в сообщениях (⚠️🔥🚨)
4. **Резервный канал**: продублируй на почту через `/tool e-mail`
Обновил скрипт или нашел ошибку? Дай знать в обратной связи!

View File

@@ -0,0 +1,308 @@
---
title: 'Автоматизация сбора системной информации и логов с серверов: Ansible + Graylog'
summary: 'Практическое руководство по созданию кроссплатформенной системы автоматического сбора метрик и логов с серверов Windows и Linux с помощью Ansible и отправкой данных в Graylog. Описаны архитектура, примеры скриптов, интеграция и расширяемость.'
date: '2025-11-18'
draft: false
tags:
- ansible
- graylog
- мониторинг
- windows
- linux
- автоматизация
- devops
---
## Универсальная система сбора системной информации с интеграцией в Graylog
### 🔍 Введение
В современной инфраструктуре критически важен **централизованный сбор системной информации** с разнородных серверов. Ручной сбор данных через SSH/WinRM неэффективен, а разрозненные логи затрудняют анализ. Решение — **автоматизированная кроссплатформенная система**, которая:
- Собирает ключевые метрики на Windows/Linux
- Стандартизирует формат логов
- Отправляет данные в Graylog для централизованного анализа
- Работает через Ansible без ручного вмешательства
---
### ⚙️ Технические требования
| Компонент | Windows | Linux |
|-----------------|-----------------------------|---------------------------|
| **Управление** | Ansible 2.9+ | Ansible 2.9+ |
| **Доступ** | WinRM, Python 3.x | SSH, Bash |
| **Целевые хосты**| WinRM включён, порт 5985 | Стандартные утилиты (ip, df) |
| **Сервер логирования** | Graylog с настроенным GELF-input | |
---
### 🧩 Архитектура решения
```mermaid
graph LR
A[Ansible Control Node] --> B(Windows Hosts)
A --> C(Linux Hosts)
B --> D[run_commands.py]
C --> E[collect_info.sh]
D --> F[Graylog]
E --> F
F[Graylog] --> G[Дашборды]
F --> H[Алерты]
```
---
### 🚀 Быстрый старт
**Шаг 1. Клонирование репозитория**
```bash
git clone https://git.fipi.pro/Plata_Upravleniya_RF/commander.git
cd commander
```
**Шаг 2. Настройка инвентаря**
`inventory/hosts.ini`:
```ini
[windows]
win-server1 ansible_host=192.168.1.10
[linux]
linux-server1 ansible_host=192.168.1.20
[windows:vars]
ansible_user=Admin
ansible_password=SecurePass123
ansible_connection=winrm
[linux:vars]
ansible_user=root
ansible_ssh_private_key_file=~/.ssh/id_rsa
```
**Шаг 3. Запуск системы**
```bash
ansible-playbook -i inventory/hosts.ini playbook.yml
```
---
### 🔧 Детали реализации
#### Для Windows
**Основной скрипт:** `roles/graylog_collector/files/run_commands.py`
```python
import subprocess, yaml
def execute_command(cmd):
"""Выполнение команды с перехватом вывода"""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return {
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
```
**Конфигурация команд:** `commands.yaml`
```yaml
commands:
- name: Network Configuration
command: ipconfig /all
- name: System Information
command: systeminfo
```
#### Для Linux
**Bash-скрипт:** `collect_info.sh`
```bash
#!/bin/bash
commands=(
"ip -br a"
"df -h --output=source,fstype,pcent"
"free -m"
"ss -tuln"
)
```
#### Интеграция с Graylog
**Общий модуль:** `graylog_sender.py`
```python
def send_gelf(log_entry):
"""Отправка лога в формате GELF"""
message = {
"version": "1.1",
"host": socket.gethostname(),
"short_message": "System Snapshot",
"_environment": "production",
"_log": log_entry
}
sock.sendto(json.dumps(message).encode(), (GRAYLOG_HOST, GRAYLOG_PORT))
```
---
### ⚙️ Ansible-интеграция
**Структура проекта:**
```
commander/
├── playbook.yml
└── roles/
└── graylog_collector/
├── tasks/
│ ├── windows.yml
│ └── linux.yml
├── files/
│ ├── run_commands.py
│ ├── collect_info.sh
│ └── graylog_sender.py
└── defaults/
└── main.yml
```
**Плейбук:** `playbook.yml`
```yaml
- name: Deploy Graylog Collector
hosts: all
roles:
- graylog_collector
```
---
### 🔍 Пример использования в Graylog
**Поиск по конкретному серверу:**
```sql
source:"win-server1" AND "Domain Admins"
```
**Анализ ошибок:**
```sql
_full_message:"ERROR" AND _environment:"production"
```
**Дашборд в Grafana:**
```sql
SELECT
host,
COUNT_IF(_log LIKE '%CRITICAL%') AS criticals
FROM graylog_messages
WHERE timestamp > NOW() - INTERVAL 1 DAY
GROUP BY host
```
---
### 🛠️ Кастомизация
**Добавление новых команд:**
1. Для Windows:
```yaml
# commands.yaml
- name: Check DNS Resolution
command: nslookup example.com
```
2. Для Linux:
```bash
# В collect_info.sh
commands+=("lsof -i :80")
```
**Настройка Graylog:**
```yaml
# defaults/main.yml
graylog_host: log-collector.company.com
graylog_port: 12201
windows_log_dir: C:\Monitoring\Logs
linux_log_dir: /var/log/monitoring
```
---
### ⚠️ Решение проблем
**Частые ошибки Windows:**
```powershell
# Разрешить WinRM:
Enable-PSRemoting -Force
New-NetFirewallRule -Name "WinRM-HTTP" -DisplayName "WinRM HTTP" -Protocol TCP -LocalPort 5985 -Action Allow
```
**Ошибки подключения к Linux:**
```bash
sudo systemctl restart sshd
sudo ssh-keygen -A
```
**Проверка Graylog:**
```bash
# Проверка открытости порта
nc -vz graylog.company.com 12201
# Тестовая отправка
echo '{"version":"1.1","host":"test","short_message":"Test"}' | nc -u -w1 graylog.company.com 12201
```
---
### 📈 Расширенные сценарии
**Планирование регулярного сбора:**
```yaml
- name: Schedule Daily Collection
win_scheduled_task:
name: "Daily System Snapshot"
actions:
- path: "python"
arguments: "C:\\Collector\\run_commands.py"
triggers:
- type: daily
start_time: "02:30"
```
**Безопасная обработка данных:**
```python
# Фильтрация чувствительных данных
if "password" in command.lower():
log_entry = "REDACTED: Sensitive command skipped"
```
**Сжатие логов:**
```python
import zlib
compressed = zlib.compress(log_entry.encode())
sock.sendto(compressed, (GRAYLOG_HOST, GRAYLOG_PORT))
```
---
### 📚 Заключение
Представленная система решает ключевую задачу DevOps - **автоматизированный сбор критической информации** с инфраструктуры. Благодаря интеграции с Ansible и Graylog, она обеспечивает:
- Единую точку сбора логов
- Возможность оперативного анализа инцидентов
- Автоматическое обнаружение аномалий
- Исторический аудит изменений
**Полный код доступен на Git:**
https://git.fipi.pro/Plata_Upravleniya_RF/commander.git
Для более простого исполнения есть `/old_script/` это раннее написанные версии скриптов с максимально простой структурой:
## Старая версия
```
old_scripts/
├── run_commands.py # основной скрипт win
├── script.sh # основной скрипт unix
├── commands.yaml # конфигурация команд
└── logs/ # автоматически создаётся, сюда пишутся логи
```
Файл `commands.yaml` Можно добавлять любые команды. Главное — указать поле `name` и `command`.
### 🔍 Пример запуска
#### ** 1. Python версия скрипта для Windows-хостов**
```bash
python run_commands.py
```
В консоли будет отображаться ход выполнения, а весь лог сохраняется в `logs/commands_*.log`.
---
#### **2. Bash-версия скрипта для Linux-хостов**
> Не забудь `chmod +x script.sh`
> Можно запускать из Ansible с `shell` или `script`.
---

View File

@@ -0,0 +1,298 @@
---
title: 'Wi-Fi сканер на Python: консоль и WebSocket-интерфейс'
summary: 'Примеры кода для сканирования Wi-Fi сетей с помощью Python и scapy: консольный сниффер, WebSocket-вывод в браузер и TUI-интерфейс. Практические советы по запуску и расширению.'
date: '2025-11-18'
draft: false
tags:
- wifi
- python
- scapy
- безопасность
- сниффер
- мониторинг
---
Код для Wi-Fi сниффера на Python, использующего `scapy` и интерфейс в режиме мониторинга.
### ✅ Скрипт `scanwf.py`
```python
#!/usr/bin/env python3
import sys
import os
import signal
import subprocess
from scapy.all import *
from scapy.layers.dot11 import Dot11, Dot11Elt
# Завершение по Ctrl+C
def signal_handler(sig, frame):
print("\n=================")
print("Завершение работы по Ctrl+C")
print("=================\n")
sys.exit(0)
# Инструкция
def usage():
print("\nИспользование:")
print("\tsudo python3 scanwf.py -i <интерфейс> [-p]\n")
sys.exit(1)
# Обработка пакетов
def sniffpackets(packet):
if packet.haslayer(Dot11):
mac_src = packet.addr2
mac_dst = packet.addr1
bssid = packet.addr3
ssid = ""
if packet.haslayer(Dot11Elt):
ssid = packet[Dot11Elt].info.decode(errors="ignore")
if packet.type == 0 and packet.subtype == 8:
print(f"[+] SSID: {ssid:30} | BSSID: {bssid} | SRC: {mac_src} | DST: {mac_dst}")
# Перевод интерфейса в режим мониторинга
def setup_monitor(iface):
print(f"[+] Перевод {iface} в режим мониторинга...")
subprocess.call(["ifconfig", iface, "down"])
subprocess.call(["iwconfig", iface, "mode", "monitor"])
subprocess.call(["ifconfig", iface, "up"])
return iface
# Проверка root
def check_root():
if os.geteuid() != 0:
print("[!] Запуск возможен только от root")
sys.exit(1)
# Показ интерфейсов
def show_interfaces():
print("[*] Доступные интерфейсы:\n")
subprocess.call(["ifconfig"])
# Парсинг аргументов
def parse_args():
if "-p" in sys.argv:
show_interfaces()
sys.exit(0)
if "-i" not in sys.argv:
usage()
idx = sys.argv.index("-i")
try:
iface = sys.argv[idx + 1]
except IndexError:
usage()
return iface
# Главная
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
check_root()
iface = parse_args()
setup_monitor(iface)
print(f"[+] Начинаем прослушку интерфейса {iface}...\nНажмите Ctrl+C для выхода.\n")
sniff(iface=iface, prn=sniffpackets, store=0)
```
---
### 🔧 Зависимости
Убедись, что установлен `scapy`:
```bash
pip install scapy
```
---
### 🔐 Пример запуска
```bash
sudo python3 scanwf.py -i wlan0
```
Показать интерфейсы:
```bash
python3 scanwf.py -p
```
---
## ✅ Вариант 1: WebSocket-интерфейс для отображения в браузере
Позволяет в реальном времени выводить данные в браузер через WebSocket.
### 📦 Зависимости:
```bash
pip install scapy flask flask-socketio eventlet
```
### 📁 Структура проекта:
```
wifi_sniffer_ws/
├── app.py # сервер Flask + WebSocket
├── sniffer.py # логика сниффера
└── templates/
└── index.html # браузерный интерфейс
```
### 🔧 `sniffer.py` — сниффер как поток
```python
from scapy.all import *
from scapy.layers.dot11 import Dot11, Dot11Elt, RadioTap
from datetime import datetime
def sniff_wifi(interface, callback):
def handle_packet(packet):
if packet.haslayer(Dot11) and packet.type == 0 and packet.subtype == 8:
ssid = packet[Dot11Elt].info.decode(errors="ignore") if packet.haslayer(Dot11Elt) else "<unknown>"
bssid = packet.addr3
signal = packet.dBm_AntSignal if hasattr(packet, 'dBm_AntSignal') else "?"
ch = "?"
el = packet.getlayer(Dot11Elt)
while el:
if el.ID == 3:
ch = ord(el.info)
break
el = el.payload.getlayer(Dot11Elt)
callback({
"time": datetime.now().strftime("%H:%M:%S"),
"ssid": ssid,
"bssid": bssid,
"channel": ch,
"signal": signal
})
sniff(iface=interface, prn=handle_packet, store=0)
```
### 🚀 `app.py` — Flask + WebSocket-сервер
```python
from flask import Flask, render_template
from flask_socketio import SocketIO
from threading import Thread
from sniffer import sniff_wifi
INTERFACE = "wlan0mon" # заменить на нужный
app = Flask(__name__)
socketio = SocketIO(app)
@app.route('/')
def index():
return render_template('index.html')
def emit_packet(packet):
socketio.emit('packet', packet)
def start_sniffer():
sniff_wifi(INTERFACE, emit_packet)
if __name__ == '__main__':
thread = Thread(target=start_sniffer)
thread.daemon = True
thread.start()
socketio.run(app, host='0.0.0.0', port=5000)
```
### 🌐 `templates/index.html`
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Wi-Fi Sniffer</title>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/4.4.1/socket.io.min.js"></script>
</head>
<body>
<h2>📡 Обнаруженные сети:</h2>
<table border="1">
<thead>
<tr><th>Time</th><th>SSID</th><th>BSSID</th><th>Channel</th><th>Signal</th></tr>
</thead>
<tbody id="wifiTable"></tbody>
</table>
<script>
const socket = io();
socket.on('packet', data => {
const row = document.createElement('tr');
row.innerHTML = `<td>${data.time}</td><td>${data.ssid}</td><td>${data.bssid}</td><td>${data.channel}</td><td>${data.signal}</td>`;
document.getElementById('wifiTable').appendChild(row);
});
</script>
</body>
</html>
```
---
## ✅ Вариант 2: Консольный TUI (Text User Interface) на `Textual`
Подходит для терминала, удобно для SSH или серверов без GUI.
### 📦 Установка
```bash
pip install scapy textual rich
```
### 🐍 `sniffer_tui.py` — пример TUI-интерфейса
```python
from scapy.all import sniff, Dot11, Dot11Elt
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, DataTable
from datetime import datetime
class WifiTUI(App):
def compose(self) -> ComposeResult:
yield Header()
self.table = DataTable()
self.table.add_columns("Time", "SSID", "BSSID", "Channel", "Signal")
yield self.table
yield Footer()
def on_mount(self) -> None:
self.seen = set()
sniff(iface="wlan0mon", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(Dot11) and packet.subtype == 8:
ssid = packet[Dot11Elt].info.decode(errors="ignore") if packet.haslayer(Dot11Elt) else "<unknown>"
bssid = packet.addr3
signal = packet.dBm_AntSignal if hasattr(packet, 'dBm_AntSignal') else "?"
channel = "?"
el = packet.getlayer(Dot11Elt)
while el:
if el.ID == 3:
channel = ord(el.info)
break
el = el.payload.getlayer(Dot11Elt)
key = (ssid, bssid)
if key not in self.seen:
self.seen.add(key)
self.table.add_row(
datetime.now().strftime("%H:%M:%S"),
ssid,
bssid,
str(channel),
str(signal)
)
if __name__ == "__main__":
WifiTUI().run()
```
---
## 🧩 Расширения
|Возможность|Поддержка|
|---|---|
|📡 WebSocket в браузере|✅|
|🖥 TUI в терминале|✅ `Textual`|
|🔒 Graylog интеграция|🔜 (через GELF или syslog)|
|📦 Ansible-интеграция|🔜 (в виде роли)|
---

View File

@@ -0,0 +1,234 @@
---
title: 'Сканирование пользовательских папок в корпоративной сети: автоматизация аудита файлов'
summary: 'Практическое руководство по созданию Python-скрипта для поиска файлов с определёнными расширениями на рабочих станциях сотрудников через административные шары Windows. Описание архитектуры, безопасности и запуска.'
date: '2025-11-18'
draft: false
tags:
- аудит
- безопасность
- python
- windows
- сканирование
- администрирование
---
# Документация для сканера расширений файлов
- Внутри корпаративной сети пользователи частенько хранят что то не нужно, давйте выясним чем сотруднок занимаеться на рабочем месте и каким файлам пользуеться, а после проанализируем что там есть у него на ПК.
## Обзор
Скрипт представляет собой инструмент для сканирования сетевых компьютеров с целью поиска файлов с определенными расширениями в пользовательских директориях.
## Основные возможности
* **Сетевое сканирование**: Подключение к удаленным компьютерам через административные шары (C\$)
* **Поиск файлов**: Обнаружение файлов с заданными расширениями (.exe, .url, .lnk)
* **Фильтрация**: Исключение системных пользователей и нежелательных файлов
* **Логирование**: Детальное ведение журнала операций
* **Экспорт результатов**: Сохранение данных в CSV формате
## Конфигурация
### Основные параметры
```python
EXTENSIONS = (".exe", ".url", ".lnk") # Искомые расширения файлов
EXCLUDE_USERS = ("All Users", "Default", "Default User", "Public", "desktop.ini", "Administrator")
SEARCH_DIRS = ("Desktop", "Downloads", "Documents") # Сканируемые директории
EXCLUDE_FILENAME_KEYWORDS = ("putty", "telegram", "vmware", "anydesk") # Исключаемые ключевые слова
```
### Файлы конфигурации
* `hosts.txt` - список хостов для сканирования (по одному на строку)
* `found_files.csv` - результаты сканирования
* `failed_hosts.csv` - журнал неудачных подключений
* `scanner.log` - детальный лог операций
## Архитектура
### Основные функции
#### `read_hosts(file_path: str) -> List[str]`
Читает список хостов из файла, игнорируя комментарии (строки, начинающиеся с `#`)
#### `is_excluded_file(file: str) -> bool`
Проверяет, должен ли файл быть исключен из результатов на основе:
* Имени файла (`desktop.ini`, `Thumbs.db`)
* Ключевых слов в имени файла
#### `wait_for_access(path: str, timeout: int = 3) -> bool`
Проверяет доступность сетевого пути с таймаутом
#### `scan_host(host: str) -> List[List[str]]`
Основная функция сканирования:
1. Подключается к `\\{host}\c$\Users`
2. Перебирает пользовательские директории
3. Сканирует указанные поддиректории
4. Фильтрует файлы по расширениям и исключениям
5. Возвращает список найденных файлов
#### `write_results(rows: List[List[str]], header: List[str], file_path: str)`
Записывает результаты в CSV файл с разделителем `;`
#### `log_failed_host(host: str, reason: str)`
Записывает информацию о неудачных подключениях
## Структура данных
### Формат результатов CSV
| Поле | Описание |
| --------- | ------------------------- |
| PC Name | Имя компьютера |
| User | Имя пользователя |
| Extension | Расширение файла |
| File Path | Полный путь к файлу |
| Scan Date | Дата и время сканирования |
### Формат журнала ошибок
| Поле | Описание |
| --------- | -------------- |
| Host | Имя хоста |
| Reason | Причина ошибки |
| Timestamp | Время ошибки |
## Безопасность и требования
### Системные требования
* **ОС**: Windows (для доступа к административным шарам)
* **Права**: Административные права на целевых машинах
* **Сеть**: Доступ к портам SMB (445/tcp)
* **Python**: 3.6+ с модулями `os`, `csv`, `pathlib`, `logging`, `datetime`, `typing`
### Рекомендации по безопасности
1. **Ограничение доступа**: Запускать только с учетных записей с минимально необходимыми правами
2. **Сетевая сегментация**: Использовать в изолированных сетевых сегментах
3. **Аудит**: Регулярно проверять логи на предмет подозрительной активности
4. **Шифрование логов**: При необходимости добавьте шифрование логов или безопасную транспортировку логов на удалённый сервер
## Развёртывание и запуск
1. Установите Python 3.8+:
- на MACOS:
```bash
brew install python # или иным способом в зависимости от ОС
```
- на Windows:
```bash
choco install python # или иным способом в зависимости от ОС
```
- на Linux:
```bash
sudo apt-get install python3 # или иным способом в зависимости от ОС
```
2. Подготовьте файл `hosts.txt`:
```text
PC-NAME-1
PC-NAME-2
```
3. Запустите скрипт:
```bash
python3 main.py
```
```python
import os
import csv
import pathlib
import logging
from datetime import datetime
from typing import List
# Конфигурация
EXTENSIONS = (".exe", ".url", ".lnk")
EXCLUDE_USERS = ("All Users", "Default", "Default User", "Public", "desktop.ini", "Administrator")
SEARCH_DIRS = ("Desktop", "Downloads", "Documents")
EXCLUDE_FILENAME_KEYWORDS = ("putty", "telegram", "vmware", "anydesk") # слова в имени файла, а не расширения
EXCLUDE_FILENAMES = ("desktop.ini", "Thumbs.db")
OUTPUT_FILE = "found_files.csv"
FAILED_FILE = "failed_hosts.csv"
HOSTS_FILE = "hosts.txt"
LOG_FILE = "scanner.log"
# Настройка логирования
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def read_hosts(file_path: str) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
def is_excluded_file(file: str) -> bool:
file_lower = file.lower()
return (
file in EXCLUDE_FILENAMES
or any(keyword in file_lower for keyword in EXCLUDE_FILENAME_KEYWORDS)
)
def wait_for_access(path: str, timeout: int = 3) -> bool:
try:
return os.path.exists(path)
except Exception:
return False
def scan_host(host: str) -> List[List[str]]:
results = []
base_users_path = f"\\\\{host}\\c$\\Users"
try:
if not wait_for_access(base_users_path):
raise Exception("Недоступен путь к Users")
logging.info(f"Подключение к: {host}")
users = os.listdir(base_users_path)
for user in users:
if user in EXCLUDE_USERS:
continue
user_base = os.path.join(base_users_path, user)
for sub_dir in SEARCH_DIRS:
target_dir = os.path.join(user_base, sub_dir)
if not os.path.exists(target_dir):
continue
for root, _, files in os.walk(target_dir):
for file in files:
if is_excluded_file(file):
continue
if not file.lower().endswith(EXTENSIONS):
continue
full_path = os.path.join(root, file)
ext = pathlib.Path(file).suffix
logging.info(f"[{host}] Обнаружен файл: {full_path}")
results.append([
host, user, ext, full_path, datetime.now().strftime("%Y-%m-%d %H:%M:%S")
])
except Exception as e:
log_failed_host(host, str(e))
logging.error(f"Ошибка при сканировании {host}: {e}")
return results
def write_results(rows: List[List[str]], header: List[str], file_path: str):
file_exists = os.path.exists(file_path)
with open(file_path, mode="a", newline='', encoding="utf-8") as f:
writer = csv.writer(f, delimiter=";")
if not file_exists:
writer.writerow(header)
writer.writerows(rows)
logging.info(f"Записано {len(rows)} строк в {file_path}")
def log_failed_host(host: str, reason: str):
with open(FAILED_FILE, mode="a", newline='', encoding="utf-8") as f:
writer = csv.writer(f, delimiter=";")
writer.writerow([host, reason, datetime.now().strftime("%Y-%m-%d %H:%M:%S")])
def main():
logging.info("=== Запуск сканирования ===")
hosts = read_hosts(HOSTS_FILE)
header = ["PC Name", "User", "Extension", "File Path", "Scan Date"]
for host in hosts:
print(f"Scanning: {host}")
results = scan_host(host)
if results:
write_results(results, header, OUTPUT_FILE)
else:
logging.warning(f"Нет результатов или ошибка доступа: {host}")
logging.info("=== Завершение сканирования ===")
if __name__ == "__main__":
main()
```
## Полезные ресурсы
1. [Официальная документация Python](https://docs.python.org/3/)
2. [Работа с путями в Python](https://realpython.com/python-pathlib/)
3. [Примеры работы с CSV](https://pythonexamples.org/python-csv/)
4. [Сетевые пути Windows](https://learn.microsoft.com/ru-ru/windows/win32/fileio/naming-a-file)