Source code for duck.utils.codesandbox

"""
Secure Python Code Sandbox:

This script defines a class `SafeCodeSandbox` that provides a secure execution environment
for running arbitrary Python code. It ensures isolation by:
- Running code in a subprocess.
- Blocking dangerous imports and built-ins.
- Restricting system resources like CPU and memory.
- Providing execution time and memory limits to prevent abuse.

Note:
- This is only available for Linux environment.
"""
import subprocess
import os
import resource
import traceback
import tempfile
import json
import sys
import re


[docs] class SafeCodeSandbox: """ Enhanced Secure Python Code Sandbox: This class runs Python code in a highly secure, isolated environment. It blocks unsafe imports, restricts resource usage (CPU, memory), and executes the code in a subprocess with limited system call access to prevent abuse. """
[docs] def execute(self, code: str, variable_name: str): """ Executes the provided Python code in a secure sandbox environment and retrieves the value of a specific variable. Args: code (str): The Python code to be executed in the sandbox. variable_name (str): The name of the variable whose value we want to retrieve. Returns: dict: A dictionary with either: - "success": the value of the specified variable. - "error": error message including full traceback if an exception occurs. """ try: sanitized_code = self.sanitize_code(code) # Run the code in the subprocess with chroot isolation with tempfile.TemporaryDirectory() as temp_dir: result = self.run_in_subprocess(sanitized_code, variable_name, temp_dir) return result except ValueError as ve: return {"error": str(ve)} except Exception as e: return {"error": traceback.format_exc()}
[docs] def sanitize_code(self, code: str) -> str: blocked_imports = [ "os", "re", "builtins", "sys", "subprocess", "shutil", "socket", "ctypes", "pathlib", "threading", "multiprocessing", "warnings", "dotenv", "django", ] blocked_builtins = [ "exec", "eval", "compile", "open", "__builtins__", "__import__", ] for banned in blocked_imports: if re.search(rf'\b(import|from)\s+{banned}\b', code): raise ValueError(f"Blocked import: {banned}") for banned in blocked_builtins: if re.search(rf'\b{banned}\b', code): raise ValueError(f"Blocked usage: {banned}") return code
[docs] def run_in_subprocess(self, py_code: str, variable_name: str, temp_dir: str) -> dict: try: # `chroot` will ensure the subprocess is isolated to the temp_dir command = [ sys.executable, '-c', f""" import json import resource import sys import os import traceback # Enforce resource limits resource.setrlimit(resource.RLIMIT_CPU, (1, 1)) # Limit CPU time resource.setrlimit(resource.RLIMIT_AS, (128 * 1024 * 1024, 128 * 1024 * 1024)) # Limit RAM (128MB) # Override os functions to prevent changing directories def safe_chdir(path): raise PermissionError("Changing directories is not allowed") def safe_getcwd(): return "{temp_dir}" # Create read-only open function to block write access def safe_open(path, mode='r', *args, **kwargs): if 'w' in mode or 'a' in mode or '+' in mode or 'x' in mode: raise PermissionError("Write access is forbidden") if not os.path.isfile(path): raise FileNotFoundError("Only regular files are accessible") if path.startswith(('/etc', '/proc', '/sys', '/dev', '/bin', '/lib', '/boot')): raise PermissionError("Access to system directories is not allowed") return open(path, mode, *args, **kwargs) # Safe built-ins safe_builtins = {{ "range": range, "len": len, "int": int, "float": float, "str": str, "print": print, "bool": bool, "dict": dict, "list": list, "set": set, "tuple": tuple, "abs": abs, "min": min, "max": max, "sum": sum, "open": safe_open, "Exception": Exception, "chdir": safe_chdir, "getcwd": safe_getcwd, "__import__": __import__, }} def enforce_chroot(): # Enforce chroot os.system("chroot {temp_dir}") os.chdir('/') enforce_chroot() sandbox_globals = {{}} py_code = '''{py_code}''' try: exec(py_code, {{"__builtins__": safe_builtins}}, sandbox_globals) result = sandbox_globals.get("{variable_name}", "Variable '{variable_name}' not found") print(json.dumps({{"success": result}})) except MemoryError: print(json.dumps({{"error": "MemoryError: Exceeded memory limit"}})) except Exception as e: print(json.dumps({{"error": "".join(traceback.format_exception(type(e), e, e.__traceback__))}})) """ ] # Create the chroot environment for isolation process = subprocess.run( command, cwd=temp_dir, capture_output=True, text=True, timeout=2, ) output = process.stdout.strip() if output: return json.loads(output) else: return {"error": f"No output. stderr: {process.stderr.strip()}"} except subprocess.TimeoutExpired: return {"error": "Execution time limit exceeded"} except json.JSONDecodeError: return {"error": "Invalid JSON output from subprocess"} except Exception: return {"error": traceback.format_exc()}