本文目录导读:

我来为您介绍几个Python扫描相关的实用案例。
端口扫描器
import socket
import threading
from queue import Queue
import time
class PortScanner:
def __init__(self, target, ports_range=(1, 1000), threads=50):
self.target = target
self.start_port = ports_range[0]
self.end_port = ports_range[1]
self.threads = threads
self.queue = Queue()
self.open_ports = []
def scan_port(self, port):
"""扫描单个端口"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((self.target, port))
if result == 0:
service = self.get_service_name(port)
self.open_ports.append((port, service))
print(f"端口 {port} 开放 - {service}")
sock.close()
except:
pass
def get_service_name(self, port):
"""获取常见服务名称"""
common_ports = {
21: 'FTP', 22: 'SSH', 23: 'Telnet',
25: 'SMTP', 53: 'DNS', 80: 'HTTP',
110: 'POP3', 143: 'IMAP', 443: 'HTTPS',
3306: 'MySQL', 3389: 'RDP', 5432: 'PostgreSQL',
6379: 'Redis', 8080: 'HTTP-Proxy'
}
return common_ports.get(port, 'Unknown')
def worker(self):
"""工作线程"""
while not self.queue.empty():
port = self.queue.get()
self.scan_port(port)
self.queue.task_done()
def scan(self):
"""执行扫描"""
print(f"开始扫描 {self.target}...")
print(f"扫描范围: 端口 {self.start_port}-{self.end_port}")
# 创建任务队列
for port in range(self.start_port, self.end_port + 1):
self.queue.put(port)
# 启动线程池
start_time = time.time()
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
# 等待所有线程完成
self.queue.join()
for t in threads:
t.join()
elapsed_time = time.time() - start_time
print(f"\n扫描完成!耗时: {elapsed_time:.2f}秒")
print(f"发现 {len(self.open_ports)} 个开放端口")
return self.open_ports
# 使用示例
if __name__ == "__main__":
scanner = PortScanner("localhost", (1, 100), threads=20)
results = scanner.scan()
print(f"开放端口列表: {results}")
网络扫描器
import subprocess
import ipaddress
import concurrent.futures
import platform
class NetworkScanner:
def __init__(self, network):
self.network = network
self.active_hosts = []
def ping_host(self, ip):
"""Ping测试主机是否在线"""
param = '-n' if platform.system().lower() == 'windows' else '-c'
try:
result = subprocess.run(
['ping', param, '1', str(ip)],
capture_output=True,
timeout=2
)
if result.returncode == 0:
print(f"主机 {ip} 在线")
return str(ip)
except:
pass
return None
def scan(self):
"""扫描网络中的活跃主机"""
print(f"扫描网络: {self.network}")
# 创建IP地址范围
network = ipaddress.ip_network(self.network, strict=False)
# 使用线程池并行扫描
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = []
for ip in network.hosts():
futures.append(executor.submit(self.ping_host, ip))
# 收集结果
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
self.active_hosts.append(result)
self.active_hosts.sort(key=lambda x: ipaddress.ip_address(x))
print(f"\n发现 {len(self.active_hosts)} 个活跃主机")
return self.active_hosts
# 使用示例
scanner = NetworkScanner("192.168.1.0/24")
results = scanner.scan()
文件扫描器
import os
import hashlib
from pathlib import Path
class FileScanner:
def __init__(self, path):
self.path = Path(path)
self.scanned_files = []
self.file_hashes = {}
self.duplicates = []
def get_file_hash(self, filepath, algorithm='md5'):
"""计算文件哈希值"""
hash_obj = hashlib.new(algorithm)
try:
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except:
return None
def scan(self, pattern='*.*', include_hidden=False):
"""扫描文件"""
print(f"扫描目录: {self.path}")
for item in self.path.rglob(pattern):
if item.is_file():
# 跳过隐藏文件(可选)
if not include_hidden and item.name.startswith('.'):
continue
file_info = {
'path': str(item),
'name': item.name,
'size': item.stat().st_size,
'modified': item.stat().st_mtime,
'extension': item.suffix
}
self.scanned_files.append(file_info)
print(f"找到 {len(self.scanned_files)} 个文件")
return self.scanned_files
def find_duplicates(self):
"""查找重复文件"""
print("正在查找重复文件...")
# 按文件大小分组
size_groups = {}
for file in self.scanned_files:
size = file['size']
if size not in size_groups:
size_groups[size] = []
size_groups[size].append(file['path'])
# 检查相同大小的文件(只检查大小大于0的文件)
for size, files in size_groups.items():
if len(files) > 1 and size > 0:
# 计算哈希值
for filepath in files:
file_hash = self.get_file_hash(filepath)
if file_hash:
if file_hash not in self.file_hashes:
self.file_hashes[file_hash] = []
self.file_hashes[file_hash].append(filepath)
# 找出重复文件
for file_hash, paths in self.file_hashes.items():
if len(paths) > 1:
self.duplicates.append({
'hash': file_hash,
'paths': paths,
'count': len(paths)
})
print(f"发现 {len(self.duplicates)} 组重复文件")
return self.duplicates
# 使用示例
scanner = FileScanner("/path/to/directory")
files = scanner.scan(pattern="*.txt")
duplicates = scanner.find_duplicates()
for dup in duplicates:
print(f"\n重复文件组 (哈希: {dup['hash'][:8]}...):")
for path in dup['paths']:
print(f" - {path}")
目录扫描器(Web应用)
import requests
from urllib.parse import urljoin
import concurrent.futures
class WebDirectoryScanner:
def __init__(self, base_url, wordlist, extensions=['.php', '.asp', '.html', '.txt']):
self.base_url = base_url.rstrip('/')
self.wordlist = wordlist
self.extensions = extensions
self.found_items = []
def check_url(self, path):
"""检查URL是否存在"""
url = urljoin(self.base_url, path)
try:
response = requests.get(url, timeout=5, allow_redirects=True)
if response.status_code in [200, 301, 302, 403]:
info = {
'url': url,
'status': response.status_code,
'size': len(response.content)
}
self.found_items.append(info)
print(f"[{response.status_code}] {url} ({info['size']} bytes)")
return info
except:
pass
return None
def scan(self, max_workers=10):
"""执行扫描"""
print(f"开始扫描: {self.base_url}")
print(f"使用 {max_workers} 个线程")
# 构建路径列表
paths = []
for word in self.wordlist:
# 直接路径
paths.append(f"/{word}")
# 带扩展名的路径
for ext in self.extensions:
paths.append(f"/{word}{ext}")
# 常见组合
paths.append(f"/{word}/")
# 并行扫描
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.check_url, path) for path in paths]
concurrent.futures.wait(futures)
print(f"\n扫描完成!找到 {len(self.found_items)} 个有效路径")
return self.found_items
# 使用示例
wordlist = ['admin', 'login', 'config', 'backup', 'test', 'api', 'upload']
scanner = WebDirectoryScanner("http://example.com", wordlist)
results = scanner.scan()
综合使用示例
def main():
print("="*50)
print("Python 扫描工具集")
print("="*50)
while True:
print("\n选择扫描类型:")
print("1. 端口扫描")
print("2. 网络扫描")
print("3. 文件扫描")
print("4. Web目录扫描")
print("0. 退出")
choice = input("请输入选择 (0-4): ")
if choice == '0':
break
elif choice == '1':
target = input("请输入目标地址: ")
scanner = PortScanner(target, (1, 1000), threads=50)
results = scanner.scan()
elif choice == '2':
network = input("请输入网络地址(CIDR格式,如192.168.1.0/24): ")
scanner = NetworkScanner(network)
results = scanner.scan()
elif choice == '3':
path = input("请输入要扫描的目录路径: ")
scanner = FileScanner(path)
files = scanner.scan()
find_dup = input("是否查找重复文件? (y/n): ")
if find_dup.lower() == 'y':
duplicates = scanner.find_duplicates()
elif choice == '4':
url = input("请输入目标URL: ")
wordlist_file = input("请输入字典文件路径(留空使用默认): ")
if wordlist_file:
with open(wordlist_file, 'r') as f:
wordlist = [line.strip() for line in f if line.strip()]
else:
wordlist = ['admin', 'login', 'config', 'backup', 'test']
scanner = WebDirectoryScanner(url, wordlist)
results = scanner.scan()
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()
注意事项
- 合法使用:扫描他人的系统或网络需要获得授权
- 性能影响:大量扫描可能对目标系统造成影响
- 防火墙限制:防火墙可能阻止扫描请求
- 网络速度:扫描速度受网络条件影响
这些扫描工具可以用于:
- 网络安全评估
- 系统维护和监控
- 文件管理
- 网络故障排查
请确保您在合法和授权的环境中使用这些工具。