如何用Python案例实现端口扫描?

wen python案例 4

本文目录导读:

如何用Python案例实现端口扫描?

  1. 基础TCP连接扫描
  2. 多线程端口扫描器
  3. 高级端口扫描器(带服务识别)
  4. 使用示例
  5. 注意事项

我来为您介绍几种Python实现端口扫描的方法,从简单到高级。

基础TCP连接扫描

import socket
import threading
from datetime import datetime
def scan_port(host, port):
    """扫描单个端口"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)  # 设置超时时间
        result = sock.connect_ex((host, port))
        sock.close()
        if result == 0:
            return True
        return False
    except Exception as e:
        return False
def simple_scan(host, port_range):
    """简单端口扫描"""
    print(f"开始扫描 {host}")
    print(f"扫描范围: {port_range[0]} - {port_range[1]}")
    print("-" * 50)
    start_time = datetime.now()
    open_ports = []
    for port in range(port_range[0], port_range[1] + 1):
        if scan_port(host, port):
            print(f"端口 {port} 开放")
            open_ports.append(port)
    end_time = datetime.now()
    print("-" * 50)
    print(f"扫描完成,耗时: {end_time - start_time}")
    print(f"发现 {len(open_ports)} 个开放端口: {open_ports}")
# 使用示例
if __name__ == "__main__":
    # 扫描本地主机的前10个端口
    simple_scan("127.0.0.1", (1, 10))

多线程端口扫描器

import socket
import threading
from queue import Queue
import sys
class PortScanner:
    def __init__(self, host, start_port, end_port, threads=100):
        self.host = host
        self.start_port = start_port
        self.end_port = end_port
        self.threads = threads
        self.open_ports = []
        self.queue = Queue()
    def scan_port(self, port):
        """线程工作函数"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(0.5)
            result = sock.connect_ex((self.host, port))
            sock.close()
            if result == 0:
                self.open_ports.append(port)
                self.get_port_service(port)
        except:
            pass
    def get_port_service(self, port):
        """获取端口服务名称"""
        try:
            service = socket.getservbyport(port)
        except:
            service = "未知服务"
        print(f"端口 {port} 开放 - {service}")
    def worker(self):
        """工作线程"""
        while not self.queue.empty():
            port = self.queue.get()
            self.scan_port(port)
            self.queue.task_done()
    def run(self):
        """执行扫描"""
        print(f"开始扫描 {self.host}")
        print(f"扫描端口范围: {self.start_port} - {self.end_port}")
        print(f"线程数: {self.threads}")
        print("-" * 50)
        # 创建端口队列
        for port in range(self.start_port, self.end_port + 1):
            self.queue.put(port)
        # 启动线程
        for _ in range(self.threads):
            t = threading.Thread(target=self.worker)
            t.daemon = True
            t.start()
        # 等待所有线程完成
        self.queue.join()
        print("-" * 50)
        print(f"扫描完成! 发现 {len(self.open_ports)} 个开放端口")
# 使用示例
if __name__ == "__main__":
    scanner = PortScanner("127.0.0.1", 1, 1000, threads=200)
    scanner.run()

高级端口扫描器(带服务识别)

import socket
import argparse
import concurrent.futures
import ipaddress
from colorama import init, Fore, Style
init(autoreset=True)
class AdvancedPortScanner:
    def __init__(self):
        self.services = {
            20: "FTP-DATA", 21: "FTP", 22: "SSH", 23: "Telnet",
            25: "SMTP", 53: "DNS", 80: "HTTP", 110: "POP3",
            143: "IMAP", 443: "HTTPS", 3306: "MySQL", 3389: "RDP",
            5432: "PostgreSQL", 6379: "Redis", 8080: "HTTP-Proxy",
            8443: "HTTPS-Alt"
        }
        self.common_ports = list(self.services.keys())
    def scan_port(self, host, port, timeout=1):
        """扫描端口并尝试识别服务"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            if result == 0:
                # 获取服务信息
                service = self.get_service_banner(sock, port)
                sock.close()
                return port, True, service
            sock.close()
            return port, False, None
        except Exception as e:
            return port, False, str(e)
    def get_service_banner(self, sock, port):
        """尝试获取服务banner"""
        service_name = self.services.get(port, "Unknown")
        # 尝试发送HTTP请求获取信息
        if port in [80, 8080, 443, 8443]:
            try:
                sock.send(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n")
                banner = sock.recv(1024)
                return f"{service_name} ({banner[:50].decode(errors='ignore').strip()})"
            except:
                pass
        # 尝试获取SSH版本信息
        if port == 22:
            try:
                banner = sock.recv(1024)
                return f"{service_name} ({banner[:50].decode(errors='ignore').strip()})"
            except:
                pass
        return service_name
    def scan_ports_concurrent(self, host, ports, max_workers=100):
        """并发扫描端口"""
        open_ports = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 创建扫描任务
            future_to_port = {
                executor.submit(self.scan_port, host, port): port 
                for port in ports
            }
            # 处理结果
            for future in concurrent.futures.as_completed(future_to_port):
                port = future_to_port[future]
                try:
                    port_num, is_open, service = future.result()
                    if is_open:
                        open_ports.append((port_num, service))
                        print(f"{Fore.GREEN}[开放] 端口 {port_num}: {service}")
                    else:
                        print(f"{Fore.RED}[关闭] 端口 {port_num}", end="\r")
                except Exception as e:
                    print(f"{Fore.YELLOW}[错误] 端口 {port}: {e}")
        return sorted(open_ports)
    def scan_target(self, target, port_range=None):
        """扫描目标"""
        # 解析目标
        try:
            # 处理IP地址或域名
            host = socket.gethostbyname(target)
            print(f"{Fore.CYAN}目标解析: {target} -> {host}")
        except:
            print(f"{Fore.RED}无法解析目标: {target}")
            return
        # 确定扫描端口
        if port_range:
            ports = list(range(port_range[0], port_range[1] + 1))
        else:
            ports = self.common_ports
        print(f"{Fore.CYAN}开始扫描 {host}")
        print(f"{Fore.CYAN}扫描端口数: {len(ports)}")
        print("=" * 60)
        open_ports = self.scan_ports_concurrent(host, ports)
        print("=" * 60)
        print(f"{Fore.GREEN}扫描完成!")
        print(f"{Fore.YELLOW}发现 {len(open_ports)} 个开放端口:")
        for port, service in open_ports:
            print(f"  {Fore.WHITE}{port}/tcp  {Fore.BLUE}{service}")
def main():
    parser = argparse.ArgumentParser(description="高级端口扫描器")
    parser.add_argument("target", help="扫描目标 (IP或域名)")
    parser.add_argument("-p", "--ports", help="端口范围 (如: 1-1000)")
    parser.add_argument("-t", "--threads", type=int, default=100, help="线程数 (默认: 100)")
    args = parser.parse_args()
    # 解析端口范围
    port_range = None
    if args.ports:
        try:
            start, end = map(int, args.ports.split("-"))
            port_range = (start, end)
        except:
            print("端口格式错误, 使用常见端口")
    scanner = AdvancedPortScanner()
    scanner.scan_target(args.target, port_range)
if __name__ == "__main__":
    main()

使用示例

基础使用:

# 扫描常见端口
python port_scanner.py 127.0.0.1
# 扫描指定端口范围
python port_scanner.py 192.168.1.1 -p 1-1000
# 并行扫描(增加速度)
python port_scanner.py example.com -p 1-65535 -t 500

Python代码直接调用:

# 使用基础扫描器
from port_scanner import simple_scan
simple_scan("192.168.1.1", (1, 100))
# 使用高级扫描器
from port_scanner_advanced import AdvancedPortScanner
scanner = AdvancedPortScanner()
scanner.scan_target("scanme.nmap.org")

注意事项

  1. 合法性:只扫描自己拥有或授权的系统
  2. 限制速度:过快扫描可能被防火墙检测或触发安全警报
  3. 异常处理:网络连接可能不稳定,需要适当的超时和重试机制
  4. 性能优化:使用线程池或异步IO提高扫描效率

这些示例覆盖了从基础到高级的端口扫描实现,您可以根据实际需求选择合适的方案。

抱歉,评论功能暂时关闭!