Python案例做沙箱?

wen 网络安全 48

本文目录导读:

Python案例做沙箱?

  1. 什么是Python沙箱?
  2. 基础沙箱实现方案
  3. 安全性检查清单
  4. 注意事项

我来详细解释一下Python沙箱的实现方案。

什么是Python沙箱?

沙箱是一个隔离的执行环境,用于安全地运行不可信代码,限制其对系统资源的访问。

基础沙箱实现方案

使用exec限制命名空间

def basic_sandbox(code, timeout=5):
    """
    基础沙箱 - 限制可访问的模块和函数
    """
    # 限制可用的内置函数
    allowed_builtins = {
        'print': print,
        'len': len,
        'range': range,
        'int': int,
        'str': str,
        'list': list,
        'dict': dict,
        'tuple': tuple,
        'True': True,
        'False': False,
        'None': None,
    }
    # 创建安全的命名空间
    safe_globals = {
        '__builtins__': allowed_builtins,
        '__name__': '__sandbox__',
    }
    safe_locals = {}
    try:
        exec(code, safe_globals, safe_locals)
        return safe_locals
    except Exception as e:
        return f"执行错误: {e}"
# 测试
user_code = """
result = [x**2 for x in range(10)]
print(result)
"""
basic_sandbox(user_code)

使用ast模块解析和验证

import ast
import sys
class SandboxVisitor(ast.NodeVisitor):
    """AST访问者,用于检查代码安全性"""
    def __init__(self):
        self.allowed_nodes = {
            ast.Expr, ast.Assign, ast.Name, ast.Constant,
            ast.BinOp, ast.UnaryOp, ast.Compare, ast.BoolOp,
            ast.List, ast.Tuple, ast.Dict, ast.Set,
            ast.Call, ast.Attribute, ast.Subscript,
            ast.If, ast.For, ast.While, ast.Break,
            ast.Continue, ast.Return, ast.FunctionDef,
            ast.Lambda, ast.FormattedValue, ast.JoinedStr,
        }
        self.dangerous_attributes = [
            '__import__', 'open', 'exec', 'eval', 'compile',
            '__subclasses__', '__globals__', '__builtins__',
            'os.', 'sys.', 'subprocess.', 'shutil.',
        ]
    def visit_Call(self, node):
        """检查函数调用"""
        if isinstance(node.func, ast.Name):
            func_name = node.func.id
            if func_name in ['eval', 'exec', 'compile', 'open', '__import__']:
                raise SecurityError(f"禁止调用: {func_name}")
        if isinstance(node.func, ast.Attribute):
            full_name = self._get_full_name(node.func)
            for dangerous in self.dangerous_attributes:
                if dangerous in full_name.lower():
                    raise SecurityError(f"禁止访问: {full_name}")
        self.generic_visit(node)
    def _get_full_name(self, node):
        """获取属性访问的完整名称"""
        if isinstance(node, ast.Name):
            return node.id
        elif isinstance(node, ast.Attribute):
            return f"{self._get_full_name(node.value)}.{node.attr}"
        return ""
class SecurityError(Exception):
    pass
def ast_sandbox(code):
    """
    使用AST进行代码安全检查
    """
    try:
        tree = ast.parse(code)
        visitor = SandboxVisitor()
        visitor.visit(tree)
        # 编译并执行
        compiled = compile(tree, '<sandbox>', 'exec')
        safe_builtins = {
            'print': print,
            'range': range,
            'len': len,
            'int': int,
            'float': float,
            'str': str,
            'bool': bool,
            'list': list,
            'dict': dict,
            'tuple': tuple,
        }
        exec(compiled, {'__builtins__': safe_builtins})
    except SecurityError as e:
        return f"安全检查失败: {e}"
    except Exception as e:
        return f"执行错误: {e}"
# 测试
test_codes = [
    "print('Hello, World!')",
    "x = [i**2 for i in range(10)]",
    "import os  # 应该被阻止",
    "__import__('os').system('ls')",  # 应该被阻止
]
for code in test_codes:
    print(f"执行: {code}")
    print(f"结果: {ast_sandbox(code)}")
    print("-" * 40)

限制资源使用

import resource
import signal
import threading
class ResourceLimitSandbox:
    """资源受限的沙箱"""
    def __init__(self, cpu_time=1, memory_mb=100):
        self.cpu_time = cpu_time
        self.memory_bytes = memory_mb * 1024 * 1024
    def run(self, code, globals_dict=None):
        """在资源限制下运行代码"""
        if globals_dict is None:
            globals_dict = {
                '__builtins__': self._get_safe_builtins()
            }
        result = []
        error = None
        def execute():
            nonlocal error
            try:
                # 设置资源限制
                resource.setrlimit(resource.RLIMIT_CPU, 
                                 (self.cpu_time, self.cpu_time))
                resource.setrlimit(resource.RLIMIT_AS, 
                                 (self.memory_bytes, self.memory_bytes))
                exec(code, globals_dict)
                result.append(True)
            except Exception as e:
                error = str(e)
                result.append(False)
        thread = threading.Thread(target=execute)
        thread.daemon = True
        thread.start()
        thread.join(timeout=self.cpu_time)
        if thread.is_alive():
            return "超时: 代码执行时间过长"
        if not result[0]:
            return f"执行错误: {error}"
        return "执行成功"
    def _get_safe_builtins(self):
        """获取安全的内置函数"""
        return {
            'print': print,
            'len': len,
            'range': range,
            'int': int,
            'str': str,
            'list': list,
            'dict': dict,
            'tuple': tuple,
            'True': True,
            'False': False,
            'None': None,
        }
# 测试
sandbox = ResourceLimitSandbox(cpu_time=1, memory_mb=100)
# 测试正常代码
print(sandbox.run("print('Hello')"))
# 测试无限循环
print(sandbox.run("while True: pass"))
# 测试内存溢出
print(sandbox.run("data = [0] * (1024**3)"))

使用子进程隔离

import subprocess
import json
import tempfile
import os
def subprocess_sandbox(code, timeout=5):
    """
    使用子进程实现完全隔离
    """
    # 创建临时文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', 
                                    delete=False) as f:
        f.write(f"""
import sys
import json
def safe_exec():
    try:
        {code}
    except Exception as e:
        print(f"Error: {{e}}", file=sys.stderr)
        sys.exit(1)
safe_exec()
""")
        temp_file = f.name
    try:
        # 在子进程中执行
        result = subprocess.run(
            [sys.executable, temp_file],
            capture_output=True,
            text=True,
            timeout=timeout,
            # 限制进程权限(Unix)
            preexec_fn=lambda: os.setuid(65534) if os.name == 'posix' else None
        )
        if result.returncode == 0:
            return result.stdout
        else:
            return f"错误: {result.stderr}"
    except subprocess.TimeoutExpired:
        return "超时: 代码执行时间过长"
    except Exception as e:
        return f"执行错误: {e}"
    finally:
        # 清理临时文件
        try:
            os.unlink(temp_file)
        except:
            pass
# 测试
print(subprocess_sandbox("print('Hello from subprocess!')"))

使用Docker容器隔离(高级方案)

import docker
import uuid
class DockerSandbox:
    """使用Docker容器实现完全隔离"""
    def __init__(self, image='python:3.9-slim'):
        self.client = docker.from_env()
        self.image = image
    def run_code(self, code, timeout=30):
        """在Docker容器中运行代码"""
        container_name = f"sandbox_{uuid.uuid4().hex[:8]}"
        # 创建代码文件
        code_file = f"/tmp/{container_name}.py"
        with open(code_file, 'w') as f:
            f.write(code)
        try:
            container = self.client.containers.run(
                self.image,
                f"python /code/{container_name}.py",
                volumes={
                    code_file: {
                        'bind': f'/code/{container_name}.py',
                        'mode': 'ro'
                    }
                },
                name=container_name,
                detach=True,
                # 限制资源
                mem_limit='100m',
                cpu_period=100000,
                cpu_quota=50000,  # 0.5 CPU
                network_disabled=True,  # 禁用网络
                read_only=True,  # 只读文件系统
                remove=True,  # 自动删除容器
            )
            # 等待结果
            result = container.wait(timeout=timeout)
            logs = container.logs(stdout=True, stderr=True)
            if result['StatusCode'] == 0:
                return logs.decode('utf-8')
            else:
                return f"错误: {logs.decode('utf-8')}"
        except docker.errors.APIError as e:
            return f"Docker错误: {e}"
        except Exception as e:
            return f"执行错误: {e}"
        finally:
            # 清理
            try:
                os.remove(code_file)
            except:
                pass
# 使用示例(需要安装docker和python-docker库)
# sandbox = DockerSandbox()
# print(sandbox.run_code("print('Hello from Docker!')"))

安全性检查清单

class SecurityChecklist:
    """安全检查清单"""
    @staticmethod
    def check_imports(code):
        """检查是否导入危险模块"""
        dangerous_modules = [
            'os', 'sys', 'subprocess', 'shutil',
            'ctypes', 'socket', 'requests',
            'pickle', 'marshal', 'shelve',
        ]
        for module in dangerous_modules:
            if f"import {module}" in code or f"from {module}" in code:
                return False, f"禁止导入模块: {module}"
        return True, "通过"
    @staticmethod
    def check_dangerous_functions(code):
        """检查危险函数调用"""
        dangerous_funcs = [
            'eval(', 'exec(', 'compile(',
            'open(', '__import__(',
            'getattr(', 'setattr(', 'delattr(',
        ]
        for func in dangerous_funcs:
            if func in code:
                return False, f"禁止调用函数: {func}"
        return True, "通过"
    @staticmethod
    def check_file_operations(code):
        """检查文件操作"""
        file_operations = [
            'file(', 'open(',
            'write(', 'read(',
            'os.remove', 'os.unlink',
            'shutil.',
        ]
        for op in file_operations:
            if op in code:
                return False, f"禁止文件操作: {op}"
        return True, "通过"
# 综合沙箱
def comprehensive_sandbox(code, timeout=5, memory_limit=100):
    """综合安全检查沙箱"""
    # 1. 静态代码检查
    checks = [
        SecurityChecklist.check_imports,
        SecurityChecklist.check_dangerous_functions,
        SecurityChecklist.check_file_operations,
    ]
    for check in checks:
        passed, message = check(code)
        if not passed:
            return f"安全拦截: {message}"
    # 2. AST验证
    try:
        tree = ast.parse(code)
        visitor = SandboxVisitor()
        visitor.visit(tree)
    except SyntaxError as e:
        return f"语法错误: {e}"
    except SecurityError as e:
        return f"安全错误: {e}"
    # 3. 资源限制执行
    sandbox = ResourceLimitSandbox(cpu_time=timeout, memory_mb=memory_limit)
    return sandbox.run(code)
# 使用示例
user_code = """
def calculate():
    result = sum(range(100))
    return result
print(f"计算结果: {calculate()}")
"""
print(comprehensive_sandbox(user_code))

注意事项

  1. 没有绝对安全:Python的沙箱很难做到100%安全
  2. 使用现成方案:考虑使用PyPy、pybox等成熟方案
  3. 生产环境:强烈建议使用容器化方案(Docker)
  4. 性能考虑:沙箱会带来额外性能开销
  5. 定制化需求:根据实际需求选择合适的隔离级别

选择哪种沙箱方案取决于你的具体需求:安全级别、性能要求、部署环境等,对于生产环境,推荐使用Docker等容器化方案。

抱歉,评论功能暂时关闭!