diff --git a/.gitignore b/.gitignore index 6461d53..c8151b4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ **/.pytest_cache/ **/__pycache__/ */.pytest_cache/ -*/__pycache__/ \ No newline at end of file +*/__pycache__/ +*.log \ No newline at end of file diff --git a/blueprints/__init__.py b/blueprints/__init__.py index 0ee473a..37ddbb0 100644 --- a/blueprints/__init__.py +++ b/blueprints/__init__.py @@ -1,37 +1,81 @@ from flask import Blueprint, jsonify, request from ..utils.CheckKeys import get_verified_msg +from ..utils.Container import Container from ..services.container_service import ( create_container, remove_container, add_collaborator, remove_collaborator, - update_role + update_role, + start_container, + stop_container, + restart_container ) +import threading +from .. import extensions +from ..constant import ROLE +import docker # debug -success = 1 +# success = 1 api_bp = Blueprint("api", __name__, url_prefix="/api") +# 为的是将contaienr_status检查的特殊情况局限在创作过程 +creation_status = {} +# 通用的操作状态追踪(start/stop/restart) +action_status = {} + +def _set_action_status(container_name: str, action: str, status: str, error_reason: str | None = None): + action_status[container_name] = {"action": action, "status": status, "error_reason": error_reason} + +def _get_action_status(container_name: str): + return action_status.get(container_name) + +def _clear_action_status(container_name: str): + action_status.pop(container_name, None) + + +# Helper wrappers to mirror create behavior and keep special-case handling centralized +def mark_creation_status(container_name: str, status: str, error_reason: str | None = None): + creation_status[container_name] = {"status": status, "error_reason": error_reason} + +def clear_creation_status(container_name: str): + creation_status.pop(container_name, None) + +def mark_start_status(container_name: str, status: str, error_reason: str | None = None): + _set_action_status(container_name, 'start', status, error_reason) + +def mark_stop_status(container_name: str, status: str, error_reason: str | None = None): + _set_action_status(container_name, 'stop', status, error_reason) + +def mark_restart_status(container_name: str, status: str, error_reason: str | None = None): + _set_action_status(container_name, 'restart', status, error_reason) + + ''' 通信数据格式: 发送格式: { "message":{ + "owner_name":"xxxx", "config": { - "gpu_list":[0,1,2,...], + "gpu_list":[0,1,2,...], #字段为空就是CPU机器 "cpu_number":20, "memory":16,#GB - "user_name":'example', + "swap_memory":32,#GB + "name":'example', "port":0, "image":"ubuntu24.04" } + "public_key":"xxxx" # 可选,提供用户公钥以便容器内配置免密登录 }, "signature":"xxxxxx" } 返回格式: { + success: [0|1], "container_id": container_id, "container_name": container_name } @@ -41,29 +85,201 @@ def Create_container(): print("Create_container Called") recived_data = request.get_json(silent=True) if not recived_data: - return jsonify({"error":"invalid json"}), 400 - + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 + + # 使用 get_verified_msg 函数解密并验证 verified_msg = get_verified_msg(recived_data) - + if not verified_msg: - return jsonify({"error": "invalid_signature or decryption failed"}), 401 - + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + # 提取消息配置 + owner_name = verified_msg.get("owner_name") config = verified_msg.get("config") - - # container_id, container_name = create_container(**config) - container_id = "test_id" - container_name = "test_name" + public_key = verified_msg.get("public_key", None) + + try: + cfg = Container.Config_info(**config) + except Exception as e: + return jsonify({"error": f"invalid config: {e}", "error_reason": "invalid_config"}), 400 + # ensure docker client so we can pre-check container name collisions + if extensions.docker_client is None: + try: + extensions.init_docker() + except Exception as e: + return jsonify({"error": f"docker init failed: {e}", "error_reason": "docker_init_failed"}), 500 + + # 额外预检:检查是否已存在同名容器,避免创建后才发现冲突 + try: + existing = None + try: + existing = extensions.docker_client.containers.get(cfg.name) + except docker.errors.NotFound: + existing = None + if existing is not None: + return jsonify({"success": 0, "error": f"container {cfg.name} already exists", "error_reason": "container_exists", "container_name": cfg.name}), 409 + except Exception as e: + # if we can't contact docker, return an error + return jsonify({"success": 0, "error": f"docker check failed: {e}", "error_reason": "docker_check_failed"}), 500 + # spawn background thread to perform actual creation and return early + def _bg_create(o_name, cfg_obj): + try: + # mark as creating + creation_status[cfg_obj.name] = {"status": "creating"} + create_container(o_name, cfg_obj, public_key=public_key) + # creation succeeded -> remove tracking entry + creation_status.pop(cfg_obj.name, None) # 使得创建后的容器状态查询可以直接从docker获取最新状态,而不是被卡在"creating"里 + except Exception as e: + # record failure so /container_status can surface it to the controller + print("create_container error:", e) + creation_status[cfg_obj.name] = {"status": "failed", "error_reason": str(e)} + + + try: + t = threading.Thread(target=_bg_create, args=(owner_name, cfg)) + t.daemon = True + t.start() + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "background_thread_failed"}), 500 + print("SUCCESS") - + return jsonify({ - "container_id": container_id, - "container_name": container_name, - "decrypted_message": verified_msg + "success": 1, + "container_status": "creating", + "container_name": cfg.name }), 200 +# 由于部分内容需要api这个地方直接调用。并未将这个方法单独放到services里 +@api_bp.post("/container_status") +def Container_status(): + ''' + 通信数据格式: + 发送格式: + { + "message":{ + "config": + { + "container_name":"xxxx" + } + }, + "signature":"xxxxxx" + } + ''' + recived_data = request.get_json(silent=True) + if not recived_data: + return jsonify({"success": 0, "error":"invalid json"}), 400 + + verified_msg = get_verified_msg(recived_data) + if not verified_msg: + return jsonify({"success": 0, "error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + config = verified_msg.get("config") or {} + container_name = config.get("container_name") or config.get("name") + if not container_name: + return jsonify({"success": 0, "error": "missing container_name", "error_reason": "missing_container_name"}), 400 + + # ensure docker client + if extensions.docker_client is None: + try: + extensions.init_docker() + except Exception as e: + return jsonify({"success": 0, "error": f"docker init failed: {e}"}), 500 + + try: + # docker SDK allows get by name + # if there is an async failure/ongoing action recorded for this container name, return that first + status_info = creation_status.get(container_name) + if status_info is not None: + if status_info.get("status") == "failed": + return jsonify({"success": 0, "container_status": "failed", "error": "creation failed", "error_reason": status_info.get("error_reason")}), 200 + elif status_info.get("status") == "creating": + return jsonify({"success": 1, "container_status": "creating", "container_name": container_name}), 200 + + # check start/stop/restart async actions + ainfo = _get_action_status(container_name) + if ainfo is not None: + st = ainfo.get('status') + act = ainfo.get('action') + if st == 'failed': + return jsonify({"success": 0, "container_status": "failed", "error": f"{act} failed", "error_reason": ainfo.get('error_reason')}), 200 + else: + # return the in-progress or terminal status reported by the action tracker + return jsonify({"success": 1, "container_status": st, "container_name": container_name}), 200 + + container = extensions.docker_client.containers.get(container_name) + state = None + try: + state = container.attrs.get('State', {}).get('Status') + except Exception: + state = getattr(container, 'status', None) + + if state is None: + status_out = "unknown" + elif state.lower() == 'running': + # additional readiness checks: ensure sshd is listening and authorized_keys exists + def _exec_check(cmd: str) -> bool: + try: + r = container.exec_run(["/bin/sh", "-c", cmd], user="root") + return getattr(r, 'exit_code', r[0]) == 0 + except Exception: + return False + + # try multiple ways to detect ssh listening (ss/netstat/ps/pgrep) + ssh_listening = False + for c in ["ss -ltn | grep :22", "netstat -ltn | grep :22", "pgrep -f sshd", "ps aux | grep [s]shd"]: + if _exec_check(c): + ssh_listening = True + break + + # check authorized_keys exists and is non-empty + #auth_ok = _exec_check("test -s /root/.ssh/authorized_keys") + + if ssh_listening: + status_out = "online" + else: + # container running but service not yet ready + status_out = "starting" + elif state.lower() in ('created', 'restarting', 'starting'): + status_out = "starting" + elif state.lower() in ('exited', 'dead'): + status_out = "offline" + else: + status_out = str(state).lower() + + print(f"Container '{container_name}' status: {status_out}") + + return jsonify({"success": 1, "container_status": status_out, "container_name": container_name}), 200 + except docker.errors.NotFound: + return jsonify({"success": 0, "error": "container not found", "error_reason": "not_found", "container_name": container_name}), 404 + except Exception as e: + return jsonify({"success": 0, "error": str(e), "error_reason": "internal_error"}), 500 + + +# Minimal machine status endpoint for controller health checks +@api_bp.post("/machine_status") +def Machine_status(): + recived_data = request.get_json(silent=True) + if not recived_data: + return jsonify({"success": 0, "error": "invalid json"}), 400 + + verified_msg = get_verified_msg(recived_data) + if not verified_msg: + return jsonify({"success": 0, "error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + + # Best-effort: ensure docker client initialized; if it fails, still respond but mark non-ideal + try: + if extensions.docker_client is None: + extensions.init_docker() + except Exception as e: + # return success but indicate docker init failed + return jsonify({"success": 0, "error": f"docker init failed: {e}", "error_reason": "docker_init_failed"}), 500 + + # If everything looks OK, report online. Keep response minimal to be fast. + return jsonify({"success": 1, "machine_status": "online"}), 200 + ''' 通信数据格式: 发送格式: @@ -71,7 +287,7 @@ def Create_container(): "message":{ "config": { - "container_id":"xxxx" + "container_name":"xxxx" } }, "signature":"xxxxxx" @@ -86,25 +302,50 @@ def Create_container(): def Remove_container(): recived_data = request.get_json(silent=True) if not recived_data: - return jsonify({"error":"invalid json"}), 400 + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 # 使用 get_verified_msg 函数解密并验证 verified_msg = get_verified_msg(recived_data) if not verified_msg: - return jsonify({"error": "invalid_signature or decryption failed"}), 401 - - # 提取消息类型和配置 - config = verified_msg.get("config") - - - - # success = remove_container(**config) - - return jsonify({ - "success": success, - "decrypted_message": verified_msg - }), 200 + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + + # 提取消息类型和配置(防御性处理:可能没有 config) + config = verified_msg.get("config") or {} + container_name = config.get("container_name") or config.get("name") + if not container_name: + return jsonify({"error": "missing container_name", "error_reason": "missing_container_name"}), 400 + + try: + # 防止失败后删不掉 + status_info = creation_status.get(container_name) + if status_info is not None and status_info.get("status") == "failed": + # clear the recorded failed state and return success + creation_status.pop(container_name, None) + return jsonify({ + "success": 1 + }), 200 + + success = remove_container(container_name) + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e)}), 500 + if success == 0: + return jsonify({ + "success": 1 + }), 200 + elif success == 1: + return jsonify({ + "success": 0, + "error": "container not found", + "error_reason": "not_found" + }), 404 + else: + return jsonify({ + "success": 0, + "error": "failed to remove container", + "error_reason": "remove_failed" + }), 500 ''' 通信数据格式: @@ -113,7 +354,7 @@ def Remove_container(): "message":{ "config": { - "container_id":"xxxx", + "container_name":"xxxx", "user_name":"xxxx", "role":['admin'|'collaborator'] } @@ -129,26 +370,162 @@ def Remove_container(): def Add_collaborator(): recived_data = request.get_json(silent=True) if not recived_data: - return jsonify({"error":"invalid json"}), 400 + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 # 使用 get_verified_msg 函数解密并验证 verified_msg = get_verified_msg(recived_data) if not verified_msg: - return jsonify({"error": "invalid_signature or decryption failed"}), 401 + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 # 提取消息类型和配置 config = verified_msg.get("config") - - - - # success = add_collaborator(**config) + if not config: + return jsonify({"success": 0, "error": "missing config", "error_reason": "missing_config"}), 400 + container_name = config.get("container_name") + if not container_name: + return jsonify({"success": 0, "error": "missing container_name", "error_reason": "missing_container_name"}), 400 + user_name = config.get("user_name") + if not user_name: + return jsonify({"success": 0, "error": "missing user_name", "error_reason": "missing_user_name"}), 400 + role_str = config.get("role").lower() + if role_str not in ('admin', 'collaborator'): + return jsonify({"success": 0, "error": "invalid role, must be 'admin' or 'collaborator'", "error_reason": "invalid_role"}), 400 + + # map string role to ROLE enum + if role_str == 'admin': + role_val = ROLE.ADMIN + else: + role_val = ROLE.COLLABORATOR + + try: + success = add_collaborator(container_name, user_name, role_val) + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "internal_error"}), 500 return jsonify({ "success": success, "decrypted_message": verified_msg }), 200 +''' +通信数据格式: +发送格式: +{ + "message":{ + "config": + { + "container_name":"xxxx" + } + }, + signature":"xxxxxx" +} +返回格式: +{ + "success": [0|1], +} +''' +@api_bp.post("/start_container") +def Start_container_api(): + recived_data = request.get_json(silent=True) + if not recived_data: + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 + + verified_msg = get_verified_msg(recived_data) + if not verified_msg: + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + + + config = verified_msg.get("config") or {} + container_name = config.get("container_name") + if not container_name: + return jsonify({"error": "missing container_name", "error_reason": "missing_container_name"}), 400 + + # 早返回 表征请求已接受,实际的启动操作在后台线程执行,避免阻塞API响应 + def _bg_start(name: str): + try: + mark_start_status(name, 'starting') + ok = start_container(name) + if ok: + mark_start_status(name, 'online') + # clear tracking after short grace period so subsequent /container_status queries read from docker + try: + threading.Timer(5.0, lambda: _clear_action_status(name)).start() + except Exception: + pass + else: + mark_start_status(name, 'failed', 'start_failed') + except Exception as e: + print('bg start error:', e) + mark_start_status(name, 'failed', str(e)) + + try: + t = threading.Thread(target=_bg_start, args=(container_name,)) + t.daemon = True + t.start() + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "background_thread_failed"}), 500 + + return jsonify({"success": 1, "container_status": "starting", "container_name": container_name}), 200 +''' +通信数据格式: +发送格式: +{ + "message":{ + "config": + { + "container_name":"xxxx", + # 虽然设计了timeout参数,但在此不将其控制器下放给用户 + } + }, + "signature":"xxxxxx" +} +''' +@api_bp.post("/stop_container") +def Stop_container_api(): + recived_data = request.get_json(silent=True) + if not recived_data: + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 + + + verified_msg = get_verified_msg(recived_data) + if not verified_msg: + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + + + config = verified_msg.get("config") or {} + container_name = config.get("container_name") + if not container_name: + return jsonify({"error": "missing container_name", "error_reason": "missing_container_name"}), 400 + + # spawn background worker to stop container and return early + def _bg_stop(name: str): + try: + mark_stop_status(name, 'stoping') + ok = stop_container(name) + if ok: + mark_stop_status(name, 'offline') + try: + threading.Timer(5.0, lambda: _clear_action_status(name)).start() + except Exception: + pass + else: + mark_stop_status(name, 'failed', 'stop_failed') + except Exception as e: + print('bg stop error:', e) + mark_stop_status(name, 'failed', str(e)) + + try: + t = threading.Thread(target=_bg_stop, args=(container_name,)) + t.daemon = True + t.start() + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "background_thread_failed"}), 500 + + return jsonify({"success": 1, "container_status": "stoping", "container_name": container_name}), 200 ''' 通信数据格式: @@ -157,7 +534,71 @@ def Add_collaborator(): "message":{ "config": { - "container_id":"xxxx", + "container_name":"xxxx", + # 虽然设计了timeout参数,但在此不将其控制器下放给用户 + } + }, + "signature":"xxxxxx" +} +返回格式: +{ + "success": [0|1], +} +''' +@api_bp.post("/restart_container") +def Restart_container_api(): + recived_data = request.get_json(silent=True) + if not recived_data: + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 + + + verified_msg = get_verified_msg(recived_data) + if not verified_msg: + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 + + + config = verified_msg.get("config") or {} + container_name = config.get("container_name") + if not container_name: + return jsonify({"error": "missing container_name", "error_reason": "missing_container_name"}), 400 + + # spawn background worker to restart container and return early + def _bg_restart(name: str): + try: + # On restart we initially treat it as stopping + mark_restart_status(name, 'stoping') + ok = restart_container(name) + if ok: + mark_restart_status(name, 'online') + try: + threading.Timer(5.0, lambda: _clear_action_status(name)).start() + except Exception: + pass + else: + mark_restart_status(name, 'failed', 'restart_failed') + except Exception as e: + print('bg restart error:', e) + mark_restart_status(name, 'failed', str(e)) + + try: + t = threading.Thread(target=_bg_restart, args=(container_name,)) + t.daemon = True + t.start() + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "background_thread_failed"}), 500 + + return jsonify({"success": 1, "container_status": "stoping", "container_name": container_name}), 200 + + +''' +通信数据格式: +发送格式: +{ + "message":{ + "config": + { + "container_name":"xxxx", "user_name":"xxxx", } }, @@ -172,22 +613,37 @@ def Add_collaborator(): def Remove_collaborator(): recived_data = request.get_json(silent=True) if not recived_data: - return jsonify({"error":"invalid json"}), 400 + return jsonify({"error":"invalid json", "error_reason": "invalid_json"}), 400 # 使用 get_verified_msg 函数解密并验证 verified_msg = get_verified_msg(recived_data) - + if not verified_msg: - return jsonify({"error": "invalid_signature or decryption failed"}), 401 + return jsonify({"error": "invalid_signature or decryption failed", "error_reason": "invalid_signature"}), 401 # 提取消息类型和配置 - config = verified_msg.get("config") - - - # success = remove_collaborator(**config) + try: + config = verified_msg.get("config") + if not config: + return jsonify({"success": 0, "error": "missing config", "error_reason": "missing_config"}), 400 + + container_name = config.get("container_name") + except Exception: + return jsonify({"success": 0, "error": "invalid config format", "error_reason": "invalid_config_format"}), 400 + if not container_name: + return jsonify({"success": 0, "error": "missing container_name", "error_reason": "missing_container_name"}), 400 + user_name = config.get("user_name") + if not user_name: + return jsonify({"success": 0, "error": "missing user_name", "error_reason": "missing_user_name"}), 400 + + try: + success = remove_collaborator(container_name, user_name) + except Exception as e: + print(e) + return jsonify({"success": 0, "error": str(e), "error_reason": "internal_error"}), 500 return jsonify({ - "success": success, + "success": 1, "decrypted_message": verified_msg }), 200 @@ -199,7 +655,7 @@ def Remove_collaborator(): "message":{ "config": { - "container_id":"xxxx", + "container_name":"xxxx", "user_name":"xxxx", "updated_role":"xxxx" } @@ -225,9 +681,31 @@ def Update_role(): # 提取消息类型和配置 config = verified_msg.get("config") - - - # success = update_role(**config) + if not config: + return jsonify({"success": 0, "error": "missing config", "error_reason": "missing_config"}), 400 + container_name = config.get("container_name") + if not container_name: + return jsonify({"success": 0, "error": "missing container_name", "error_reason": "missing_container_name"}), 400 + user_name = config.get("user_name") + if not user_name: + return jsonify({"success": 0, "error": "missing user_name", "error_reason": "missing_user_name"}), 400 + updated_role_str = config.get("updated_role").lower() + if updated_role_str not in ('admin', 'collaborator', 'root'): + return jsonify({"success": 0, "error": "invalid updated_role, must be 'admin', 'collaborator' or 'root'", "error_reason": "invalid_updated_role"}), 400 + + # map string role to ROLE enum + if updated_role_str == 'admin': + updated_role_val = ROLE.ADMIN + elif updated_role_str == 'collaborator': + updated_role_val = ROLE.COLLABORATOR + else: + updated_role_val = ROLE.ROOT + + try: + success = update_role(container_name, user_name, updated_role_val) + except Exception as e: + print(e) + return jsonify({"error": str(e), "error_reason": "internal_error"}), 500 return jsonify({ "success": success, diff --git a/constant.py b/constant.py index 702d374..e80ef3a 100644 --- a/constant.py +++ b/constant.py @@ -14,9 +14,13 @@ class MachineTypes(Enum): class ContainerStatus(Enum): ONLINE = "online" OFFLINE = "offline" - MAINTENANCE = "maintenance" + CREATING = "creating" + STARTING = "starting" + STOPPING = "stopping" + FAILED = "failed" class ROLE(Enum): ADMIN="admin" - COLLABORATOR="collaborator" \ No newline at end of file + COLLABORATOR="collaborator" + ROOT = "root" \ No newline at end of file diff --git a/create_container_params.md b/create_container_params.md new file mode 100644 index 0000000..f589323 --- /dev/null +++ b/create_container_params.md @@ -0,0 +1,89 @@ +{ + "create_container": + { + "required_parameters": + { + "config": "Container.Config_info", + "cpu_number": "int", + "memory": "number (GB)", + "gpu_list": "list[int] | None", + "image": "str", + "port": "int", + "user_name": "str" + } + "returned_parameters": + { + "container_id": "str", + "container_name": "str" + } + } +} + +########### + +{ + "remove_container": + { + "required_parameters": + { + "container_id": "str" + }, + "returned_parameters": + { + "result_code": "int (RemoveContinaerReturn.SUCCESS | RemoveContinaerReturn.NOTFOUND | RemoveContinaerReturn.FAILED)" + } + } +} + +########### + +{ + "add_collaborator": + { + "required_parameters": + { + "container_id": "int", + "user_name": "str", + "role": "ROLE" + }, + "returned_parameters": + { + "success": "bool" + } + } +} + +########### + +{ + "remove_collaborator": + { + "required_parameters": + { + "container_id": "str", + "user_name": "str" + }, + "returned_parameters": + { + "success": "bool" + } + } +} + +########### + +{ + "update_role": + { + "required_parameters": + { + "container_id": "str", + "user_name": "str", + "updated_role": "str" + }, + "returned_parameters": + { + "success": "bool" + } + } +} diff --git a/docker_operates/container_service.py b/docker_operates/container_service.py index eac6151..630402f 100644 --- a/docker_operates/container_service.py +++ b/docker_operates/container_service.py @@ -1,137 +1,137 @@ -#TODO:完成实现 +# # DUPLICATE CODE SCHEDULED FOR DELETION, IGNORE THIS FILE -from ..constant import * -from typing import TypedDict -from config import KeyConfig -from ..utils.CheckKeys import load_keys -from ..utils.Container import Container -import requests -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey -from ..extensions import docker_client -import docker -from typing import NamedTuple +# from ..constant import * +# from typing import TypedDict +# from config import KeyConfig +# from ..utils.CheckKeys import load_keys +# from ..utils.Container import Container +# import requests +# from cryptography.hazmat.primitives.asymmetric import padding +# from cryptography.hazmat.primitives import hashes +# from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey +# from ..extensions import docker_client +# import docker +# from typing import NamedTuple -#Return API Definition -#################################################### -class CreateContainerReturn(NamedTuple): - container_id:str - container_name:str +# #Return API Definition +# #################################################### +# class CreateContainerReturn(NamedTuple): +# container_id:str +# container_name:str -class RemoveContinaerReturn: - SUCCESS=0 - NOTFOUND=1 - FAILED=2 -#################################################### +# class RemoveContinaerReturn: +# SUCCESS=0 +# NOTFOUND=1 +# FAILED=2 +# #################################################### -#Function Implementation -#################################################### +# #Function Implementation +# #################################################### -# 将user_name作为admin,创建port新容器 -def create_container(config:Container.Config_info)->CreateContainerReturn: - cpu_quota = config.cpu_number * 100000 - mem_limit = f"{config.memory}g" - device_requests = None - if config.gpu_list: - device_requests = [ - docker.types.DeviceRequest( - count=len(config.gpu_list), - device_ids=[str(x) for x in config.gpu_list], - capabilities=[["gpu"]] - ) - ] +# # 将user_name作为admin,创建port新容器 +# def create_container(config:Container.Config_info)->CreateContainerReturn: +# cpu_quota = config.cpu_number * 100000 +# mem_limit = f"{config.memory}g" +# device_requests = None +# if config.gpu_list: +# device_requests = [ +# docker.types.DeviceRequest( +# count=len(config.gpu_list), +# device_ids=[str(x) for x in config.gpu_list], +# capabilities=[["gpu"]] +# ) +# ] - container = docker_client.containers.run( - config.image, - "tail -f /dev/null", # 保证容器一直运行 - detach=True, - tty=True, - ports={"22/tcp": config.port}, # ssh端口映射 - mem_limit=mem_limit, - cpu_quota=cpu_quota, - device_requests=device_requests - ) - name = f"{config.user_name}_{container.short_id}" - container.rename(name) - container.exec_run("apt-get update && apt-get install -y openssh-server", user="root") - container.exec_run("service ssh start", user="root") - # 设置 root 密码为 root123 - container.exec_run("echo 'root:root123' | chpasswd", user="root") - # 修改 sshd_config,允许 root 密码登录 - container.exec_run("sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config", user="root") - container.exec_run("sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication yes/' /etc/ssh/sshd_config", user="root") - - # 重启 ssh 服务 - container.exec_run("service ssh restart", user="root") - return CreateContainerReturn(container.id,container.name) - -#删除容器并删除其所有者记录 -def remove_container(container_id: str) -> int: - try: - container = docker_client.containers.get(container_id) - container.remove(force=True) # force=True 避免容器在运行时报错 - return RemoveContinaerReturn.SUCCESS - except docker.errors.NotFound: - print(f"Container {container_id} not found.") - return RemoveContinaerReturn.NOTFOUND - except Exception as e: - print(f"Failed to remove container {container_id}: {e}") - return RemoveContinaerReturn.FAILED - -#将container_id对应的容器新增user_id作为collaborator,其权限为role -def add_collaborator(container_id:int,user_name:str,role:ROLE)->bool: - try: - container=docker_client.containers.get(container_id) - cmd = f"useradd -m -s /bin/bash {user_name} && echo '{user_name}:{user_name}' | chpasswd" - if role == ROLE.ADMIN: - cmd += f" && usermod -aG sudo {user_name}" - result = container.exec_run(cmd, user="root") - return result.exit_code == 0 - except Exception as e: - print(f"failed to add collaborator:{e}") - return False - - -#从container_id中移除user_id对应的用户访问权 -def remove_collaborator(container_id: str, user_name: str) -> bool: - try: - container = docker_client.containers.get(container_id) - - # 删除用户,并且一并删除家目录 (-r) - cmd = f"userdel -r {user_name}" - - result = container.exec_run(cmd, user="root") - return result.exit_code == 0 - - except Exception as e: - print(f"Failed to remove collaborator: {e}") - return False - -def update_role(container_id: str, user_name: str, updated_role: str) -> bool: - try: - container = docker_client.containers.get(container_id) - - if updated_role == ROLE.ADMIN: - cmd = f"usermod -aG sudo {user_name}" - elif updated_role == ROLE.COLLABORATOR: - cmd = f"deluser {user_name} sudo" - else: - raise ValueError(f"Unknown role: {updated_role}") - - result = container.exec_run(cmd, user="root") - return result.exit_code == 0 - - except Exception as e: - print(f"Failed to update role: {e}") - return False - - -#################################################### +# container = docker_client.containers.run( +# config.image, +# "tail -f /dev/null", # 保证容器一直运行 +# detach=True, +# tty=True, +# ports={"22/tcp": config.port}, # ssh端口映射 +# mem_limit=mem_limit, +# cpu_quota=cpu_quota, +# device_requests=device_requests +# ) +# name = f"{config.user_name}_{container.short_id}" +# container.rename(name) +# container.exec_run("apt-get update && apt-get install -y openssh-server", user="root") +# container.exec_run("service ssh start", user="root") +# # 设置 root 密码为 root123 +# container.exec_run("echo 'root:root123' | chpasswd", user="root") +# # 修改 sshd_config,允许 root 密码登录 +# container.exec_run("sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config", user="root") +# container.exec_run("sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication yes/' /etc/ssh/sshd_config", user="root") + +# # 重启 ssh 服务 +# container.exec_run("service ssh restart", user="root") +# return CreateContainerReturn(container.id,container.name) + +# #删除容器并删除其所有者记录 +# def remove_container(container_id: str) -> int: +# try: +# container = docker_client.containers.get(container_id) +# container.remove(force=True) # force=True 避免容器在运行时报错 +# return RemoveContinaerReturn.SUCCESS +# except docker.errors.NotFound: +# print(f"Container {container_id} not found.") +# return RemoveContinaerReturn.NOTFOUND +# except Exception as e: +# print(f"Failed to remove container {container_id}: {e}") +# return RemoveContinaerReturn.FAILED + +# #将container_id对应的容器新增user_id作为collaborator,其权限为role +# def add_collaborator(container_id:int,user_name:str,role:ROLE)->bool: +# try: +# container=docker_client.containers.get(container_id) +# cmd = f"useradd -m -s /bin/bash {user_name} && echo '{user_name}:{user_name}' | chpasswd" +# if role == ROLE.ADMIN: +# cmd += f" && usermod -aG sudo {user_name}" +# result = container.exec_run(cmd, user="root") +# return result.exit_code == 0 +# except Exception as e: +# print(f"failed to add collaborator:{e}") +# return False + + +# #从container_id中移除user_id对应的用户访问权 +# def remove_collaborator(container_id: str, user_name: str) -> bool: +# try: +# container = docker_client.containers.get(container_id) + +# # 删除用户,并且一并删除家目录 (-r) +# cmd = f"userdel -r {user_name}" + +# result = container.exec_run(cmd, user="root") +# return result.exit_code == 0 + +# except Exception as e: +# print(f"Failed to remove collaborator: {e}") +# return False + +# def update_role(container_id: str, user_name: str, updated_role: str) -> bool: +# try: +# container = docker_client.containers.get(container_id) + +# if updated_role == ROLE.ADMIN: +# cmd = f"usermod -aG sudo {user_name}" +# elif updated_role == ROLE.COLLABORATOR: +# cmd = f"deluser {user_name} sudo" +# else: +# raise ValueError(f"Unknown role: {updated_role}") + +# result = container.exec_run(cmd, user="root") +# return result.exit_code == 0 + +# except Exception as e: +# print(f"Failed to update role: {e}") +# return False + + +# #################################################### diff --git a/requirements.txt b/requirements.txt index 809039d..9545da6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,7 @@ Flask-Migrate>=4.0.0 Flask-Caching>=2.0.0 Flask-Login>=0.6.3 marshmallow>=3.19.0 -docker \ No newline at end of file +docker +cryptography +pydantic +pymysql \ No newline at end of file diff --git a/run.py b/run.py index c02e4be..5b57872 100644 --- a/run.py +++ b/run.py @@ -8,5 +8,5 @@ if __name__ == '__main__': app = create_app('development') - app.run(host = '0.0.0.0', port=5001, debug=True) + app.run(host = '0.0.0.0', port=5789, debug=True) diff --git a/services/container_service.py b/services/container_service.py index c67a480..1a8867e 100644 --- a/services/container_service.py +++ b/services/container_service.py @@ -1,10 +1,10 @@ -#TODO:完成实现 +# 与他们相关的参数有必要被严格验证和过滤,或者改用更安全的方式(如直接传递参数列表而不是 shell 命令字符串) -from FuxiYu_NodeKernel.constant import * -from FuxiYu_NodeKernel.config import KeyConfig -from FuxiYu_NodeKernel.utils.Container import Container -from FuxiYu_NodeKernel import extensions -from FuxiYu_NodeKernel.utils.CheckKeys import load_keys +from ..constant import * +from ..config import KeyConfig +from ..utils.Container import Container +from .. import extensions +from ..utils.CheckKeys import load_keys # from ..constant import * from typing import TypedDict # from ..config import KeyConfig @@ -14,9 +14,12 @@ from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey +import base64 # from ..extensions import docker_client import docker +import os from typing import NamedTuple +from ..utils import sanitizer as _sanitizer @@ -40,90 +43,188 @@ class RemoveContinaerReturn: #Function Implementation #################################################### -# 将user_name作为admin,创建port新容器 -def create_container(config:Container.Config_info)->CreateContainerReturn: +# 将owner_name作为root,创建port新容器 +def create_container(owner_name: str, config:Container.Config_info, public_key: str | None = None)->CreateContainerReturn: if extensions.docker_client is None: extensions.init_docker() - cpu_quota = config.cpu_number * 100000 + print(f"Creating container for owner={owner_name} with config={config} and public_key={public_key}") + # validate owner_name early because it's used as host path component + try: + _sanitizer.validate_username(owner_name) + except Exception as e: + raise RuntimeError(f"unsafe owner_name: {e}") + # 补CPU LIST 从 0 开始编号,如果 cpu_number=4 就是 [0,1,2,3] + cpu_count = int(getattr(config, 'cpu_number', 0) or 0) + cpu_list = list(range(cpu_count)) if cpu_count > 0 else [] + cpuset_cpus = ",".join(str(x) for x in cpu_list) if cpu_list else None mem_limit = f"{config.memory}g" + + # 构建 memswap_limit 参数,如果 swap_memory 大于0,则 memswap_limit = memory + swap_memory;如果 swap_memory 不大于0,则不设置 memswap_limit(默认为和 memory 一样,禁止使用 swap) + swap_amt = int(getattr(config, 'swap_memory', 0) or 0) + memswap_limit = f"{config.memory + swap_amt}g" if swap_amt and swap_amt >= 0 else None + + # GPU LIST为空则是CPU机器,不接受GPU请求。device_requests只用于GPU资源分配 + gpu_list = getattr(config, 'gpu_list', None) device_requests = None - if config.gpu_list: + if gpu_list is not None and isinstance(gpu_list, (list, tuple)) and len(gpu_list) > 0: + # When specific GPU ids are provided, do NOT set 'count' because + # Docker rejects DeviceRequest with both Count and DeviceIDs set. device_requests = [ docker.types.DeviceRequest( - count=len(config.gpu_list), - device_ids=[str(x) for x in config.gpu_list], + device_ids=[str(x) for x in gpu_list], capabilities=[["gpu"]] ) ] - + + print(f"DEBUG: cpu_list={cpu_list}, gpu_list={gpu_list}, mem_limit={mem_limit}, memswap_limit={memswap_limit}, device_requests={device_requests}") + name = f"{config.name}" # 名字自定义 + # prepare host directory to mount as container /root + host_root_mount = os.path.join("/home", owner_name, "containers", name) + try: + os.makedirs(host_root_mount, exist_ok=True) + except Exception as e: + raise RuntimeError(f"failed to ensure host mount path {host_root_mount}: {e}") + # avoid creating a random-name container: check if a container with the desired name already exists + try: + existing = extensions.docker_client.containers.get(name) + print(f"Container with name {name} already exists: id={existing.id} status={existing.status}") + raise RuntimeError(f"container {name} already exists on this host") + except docker.errors.NotFound: + # good, proceed to create with explicit name + pass + container = extensions.docker_client.containers.run( config.image, "tail -f /dev/null", # 保证容器一直运行 detach=True, tty=True, + name=name, ports={"22/tcp": config.port}, # ssh端口映射 mem_limit=mem_limit, - cpu_quota=cpu_quota, - device_requests=device_requests + memswap_limit=memswap_limit, + cpuset_cpus=cpuset_cpus, + device_requests=device_requests, + volumes={host_root_mount: {'bind': '/root', 'mode': 'rw'}} ) - name = f"{config.user_name}_{container.short_id}" - container.rename(name) + print(f"Container created with ID={container.id} and name={name}") container.reload() - # container.exec_run("apt-get update && apt-get install -y openssh-server", user="root") - # container.exec_run("service ssh start", user="root") - # # 设置 root 密码为 root123 - # container.exec_run("echo 'root:root123' | chpasswd", user="root") - # # 修改 sshd_config,允许 root 密码登录 - # container.exec_run("sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config", user="root") - # container.exec_run("sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication yes/' /etc/ssh/sshd_config", user="root") - - # # 重启 ssh 服务 + print(f"Container status after creation: {container.status}") + # container.exec_run("service ssh restart", user="root") - def _run(container, cmd: str): - r = container.exec_run(["/bin/sh", "-c", cmd], user="root") - if r.exit_code != 0: - raise RuntimeError(f"cmd failed: {cmd}\nexit={r.exit_code}\noutput={r.output!r}") + def _run(container, cmd: str, timeout_sec: int = 120): + # 这里用一个 shell wrapper 来实现命令超时,避免某些命令(如 apt-get)在容器内卡死导致 exec_run 永远不返回的问题 + wrapped = ( + "( " + cmd + " ) & pid=$!; (sleep " + str(timeout_sec) + "; kill -9 $pid 2>/dev/null) & wait $pid" + ) + print(f"Running command in container {container.id}: {cmd} (wrapped timeout={timeout_sec}s)") + r = container.exec_run(["/bin/sh", "-c", wrapped], user="root") + out = None + try: + out = r.output.decode('utf-8', errors='ignore') + except Exception: + out = str(r.output) + # determine exit code in a backward-compatible way + if hasattr(r, 'exit_code'): + exit_code = r.exit_code + else: + try: + # r may be a tuple like (exit_code, output) + exit_code = int(r[0]) + except Exception: + exit_code = 0 + print(f"Executed command: {cmd}\nExit code: {exit_code}\nOutput: {out}") + if exit_code != 0: + raise RuntimeError(f"cmd failed: {cmd}\nexit={exit_code}\noutput={out}") return r - - _run(container, "apt-get update") - _run(container, "DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server") - _run(container, "mkdir -p /run/sshd") - _run(container, "ssh-keygen -A") - - _run(container, "echo 'root:root123' | chpasswd") + # 下面的命令执行可能会比较慢,所以设置了较长的超时时间(120秒), + # 以避免某些环境下 apt-get 卡死导致的问题。apt-get 有时会因为签名/证书 + # 问题失败(例如镜像环境或时间不同步),因此在失败时尝试一次回退策略, + # 但不要因为安装失败就删除已创建的容器——只记录并继续。 + try: + _run(container, "apt-get update") + _run(container, "DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server") + _run(container, "mkdir -p /run/sshd") + _run(container, "ssh-keygen -A") + except Exception as e: + print(f"apt-get update/install failed: {e}\nAttempting fallback sequence (clean + relaxed update + allow-unauthenticated install)") + try: + _run(container, "apt-get clean") + _run(container, "rm -rf /var/lib/apt/lists/*") + _run(container, "apt-get update -o Acquire::AllowInsecureRepositories=true -o Acquire::Check-Valid-Until=false") + _run(container, "DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-unauthenticated openssh-server") + _run(container, "mkdir -p /run/sshd") + _run(container, "ssh-keygen -A") + except Exception as e2: + print(f"Fallback apt-get sequence also failed: {e2}. Continuing without openssh-server; container created but SSH may be unavailable.") + # 初始密码为 owner_name + "123",用户可以登录后再改密码(也可以直接提供公钥登录) + _run(container, f"echo 'root:{owner_name}123' | chpasswd") + try: + _sanitizer.validate_username(owner_name) + except Exception as e: + raise RuntimeError(f"unsafe owner_name: {e}") _run(container, "sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config") _run(container, "sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication yes/' /etc/ssh/sshd_config") # 不用 service(容器里不一定有 init),直接启动 sshd(会后台守护) - _run(container, "/usr/sbin/sshd") + try: + _run(container, "/usr/sbin/sshd") + except Exception as e: + print(f"Failed to start sshd inside container: {e}. SSH may be unavailable.") + # 使得公钥可选 (如果提供了公钥则安装,否则只用密码登录) + if public_key: + try: + # Use base64 to avoid shell-quoting issues when writing the key + # basic safety check on provided public key text before encoding + _sanitizer.validate_shell_arg(public_key) + b64 = base64.b64encode(public_key.encode('utf-8')).decode('ascii') + cmd = ( + "mkdir -p /root/.ssh && chmod 700 /root/.ssh && " + f"echo '{b64}' | base64 -d > /root/.ssh/authorized_keys && " + "chmod 600 /root/.ssh/authorized_keys && chown -R root:root /root/.ssh" + ) + _run(container, cmd) + except Exception as e: + print(f"Failed to install public_key into container: {e}") return CreateContainerReturn(container.id,container.name) #删除容器并删除其所有者记录 -def remove_container(container_id: str) -> int: +def remove_container(container_name: str) -> int: try: - container = extensions.docker_client.containers.get(container_id) + if extensions.docker_client is None: + try: + extensions.init_docker() + except Exception as e: + print(f"Failed to init docker client: {e}") + raise RuntimeError(f"docker init failed: {e}") + + container = extensions.docker_client.containers.get(container_name) container.remove(force=True) # force=True 避免容器在运行时报错 return RemoveContinaerReturn.SUCCESS except docker.errors.NotFound: - print(f"Container {container_id} not found.") + print(f"Container {container_name} not found.") return RemoveContinaerReturn.NOTFOUND except Exception as e: - print(f"Failed to remove container {container_id}: {e}") + print(f"Failed to remove container {container_name}: {e}") return RemoveContinaerReturn.FAILED #将container_id对应的容器新增user_id作为collaborator,其权限为role -def add_collaborator(container_id:int,user_name:str,role:ROLE)->bool: +def add_collaborator(container_name: str, user_name: str, role: ROLE) -> bool: try: if extensions.docker_client is None: extensions.init_docker() - container=extensions.docker_client.containers.get(container_id) - cmd = f"useradd -m -s /bin/bash {user_name} && echo '{user_name}:{user_name}' | chpasswd" + container=extensions.docker_client.containers.get(container_name) + print(f"Adding collaborator {user_name} with role {role} to container {container_name}") + # validate inputs to reduce injection risk + _sanitizer.validate_username(container_name) + _sanitizer.validate_username(user_name) + cmd = f"useradd -m -s /bin/bash {user_name} && echo '{user_name}:{user_name}123' | chpasswd" if role == ROLE.ADMIN: - cmd += f" && usermod -aG sudo {user_name}" + cmd += f" && (usermod -aG sudo {user_name} || usermod -aG wheel {user_name})" result = container.exec_run(["/bin/sh", "-c", cmd], user="root") + print(f"Executed command to add collaborator: {cmd}\nExit code: {result.exit_code}\nOutput: {result.output.decode('utf-8', errors='ignore')}") return result.exit_code == 0 except Exception as e: print(f"failed to add collaborator:{e}") @@ -131,36 +232,137 @@ def add_collaborator(container_id:int,user_name:str,role:ROLE)->bool: #从container_id中移除user_id对应的用户访问权 -def remove_collaborator(container_id: str, user_name: str) -> bool: +def remove_collaborator(container_name: str, user_name: str) -> bool: try: - container = extensions.docker_client.containers.get(container_id) + if extensions.docker_client is None: + extensions.init_docker() + container = extensions.docker_client.containers.get(container_name) - # 删除用户,并且一并删除家目录 (-r) - cmd = f"userdel -r {user_name}" + # 删除用户,并且一并删除home目录 (-r) + _sanitizer.validate_username(container_name) + _sanitizer.validate_username(user_name) + cmd = f"userdel -r {user_name} || deluser {user_name}" - result = container.exec_run(cmd, user="root") + result = container.exec_run(["/bin/sh", "-c", cmd], user="root") + print(f"Executed command to remove collaborator: {cmd}\nExit code: {result.exit_code}\nOutput: {result.output.decode('utf-8', errors='ignore')}") return result.exit_code == 0 except Exception as e: print(f"Failed to remove collaborator: {e}") return False -def update_role(container_id: str, user_name: str, updated_role: str) -> bool: + +def update_role(container_name: str, user_name: str, updated_role: ROLE) -> bool: try: - container = extensions.docker_client.containers.get(container_id) + if extensions.docker_client is None: + extensions.init_docker() + container = extensions.docker_client.containers.get(container_name) if updated_role == ROLE.ADMIN: - cmd = f"usermod -aG sudo {user_name}" - elif updated_role == ROLE.COLLABORATOR: - cmd = f"deluser {user_name} sudo" + #先验证用户存在(如果不存在就创建),再添加到sudo组 + _sanitizer.validate_username(container_name) + _sanitizer.validate_username(user_name) + cmd = f"id -u {user_name} || useradd -m -s /bin/bash {user_name} && echo '{user_name}:{user_name}123' | chpasswd" + cmd += f" && (usermod -aG sudo {user_name} || usermod -aG wheel {user_name})" + elif updated_role == ROLE.COLLABORATOR: # 直接从sudo组里删除用户(如果存在的话),但不删除用户账号 + _sanitizer.validate_username(container_name) + _sanitizer.validate_username(user_name) + cmd = f"deluser {user_name} sudo || deluser {user_name} wheel" + elif updated_role == ROLE.ROOT: + # 直接让root的密码为user_name123 + _sanitizer.validate_username(container_name) + _sanitizer.validate_username(user_name) + cmd = f"echo 'root:{user_name}123' | chpasswd" + # 不论是collaborator还是admin都要把原来的权限去掉,避免出现权限叠加的情况(虽然现在设计上collaborator和admin是互斥的,但以防万一) + # 先删sudo/wheel + cmd += f" && deluser {user_name} sudo || deluser {user_name} wheel" + # 再删掉用户(如果存在的话),避免出现同名用户导致的权限问题 + cmd += f" && userdel -r {user_name} || deluser {user_name}" + else: raise ValueError(f"Unknown role: {updated_role}") - result = container.exec_run(cmd, user="root") + result = container.exec_run(["/bin/sh", "-c", cmd], user="root") + print(f"Executed command to update role: {cmd}\nExit code: {result.exit_code}\nOutput: {result.output.decode('utf-8', errors='ignore')}") return result.exit_code == 0 except Exception as e: print(f"Failed to update role: {e}") + raise e + + +def start_container(container_name: str) -> bool: + """Start a stopped container by name. Returns True on success, False otherwise.""" + try: + if extensions.docker_client is None: + extensions.init_docker() + _sanitizer.validate_username(container_name) + container = extensions.docker_client.containers.get(container_name) + # 已开启的容器再次调用 start() 会报错,所以先检查状态避免这个问题 + try: + container.reload() + except Exception: + pass + status = getattr(container, 'status', None) + if status == 'running' or status == 'online': + print(f"Container {container_name} already running (status={status}).") + return True + container.start() + container.reload() + print(f"Started container {container_name}, new status={getattr(container, 'status', None)}") + return True + except docker.errors.NotFound: + print(f"Container {container_name} not found when trying to start.") + return False + except Exception as e: + print(f"Failed to start container {container_name}: {e}") + return False + +# 这里虽然写了timeout参数,但是暂时直接让取默认的10 +def stop_container(container_name: str, timeout: int = 10) -> bool: + """Stop a running container by name. Returns True on success, False otherwise.""" + try: + if extensions.docker_client is None: + extensions.init_docker() + _sanitizer.validate_username(container_name) + container = extensions.docker_client.containers.get(container_name) + try: + container.reload() + except Exception: + pass + status = getattr(container, 'status', None) + if status != 'running' and status != 'online': + print(f"Container {container_name} is not running (status={status}); nothing to stop.") + return True + container.stop(timeout=timeout) + container.reload() + print(f"Stopped container {container_name}, new status={getattr(container, 'status', None)}") + return True + except docker.errors.NotFound: + print(f"Container {container_name} not found when trying to stop.") + return False + except Exception as e: + print(f"Failed to stop container {container_name}: {e}") + return False + +# 这里虽然写了timeout参数,但是暂时直接让取默认的10 +def restart_container(container_name: str, timeout: int = 10) -> bool: + """Restart a container by name. Returns True on success, False otherwise.""" + try: + if extensions.docker_client is None: + extensions.init_docker() + _sanitizer.validate_username(container_name) + container = extensions.docker_client.containers.get(container_name) + container.restart(timeout=timeout) + container.reload() + container.exec_run("service ssh restart", user="root") + print(f"Restarted container {container_name}, new status={getattr(container, 'status', None)}") + return True + except docker.errors.NotFound: + print(f"Container {container_name} not found when trying to restart.") + return False + except Exception as e: + print(f"Failed to restart container {container_name}: {e}") return False diff --git a/utils/CheckKeys.py b/utils/CheckKeys.py index 6e98815..478443b 100644 --- a/utils/CheckKeys.py +++ b/utils/CheckKeys.py @@ -2,10 +2,13 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey from cryptography.hazmat.primitives.asymmetric import rsa from ..config import KeyConfig +# cryptography imports for hybrid encryption from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers.aead import AESGCM import json import base64 +import os # 加载公钥和私钥,返回公钥和私钥对象 def load_keys(private_key_path:str,pub_key_path:str,pub_key_control_path)->tuple[RSAPrivateKey,RSAPublicKey,RSAPublicKey]: with open(private_key_path, "rb") as f: @@ -47,18 +50,28 @@ def write_keys(path:str,key): ) ) -#加密信息 +#加密信息 这里因为可能会有较大数据,所以采用混合加密,消息体用AES-GCM对称加密,AES密钥用RSA非对称加密 def encryption(message:str)->bytes: - _,_,PUBLIC_KEY_B=load_keys(KeyConfig.PRIVATE_KEY_PATH,KeyConfig.PUBLIC_KEY_PATH,KeyConfig.PUBLIC_KEY_PATH) + # Hybrid encryption: AES-GCM + RSA-OAEP for AES key + _,_,PUBLIC_KEY_B = load_keys(KeyConfig.PRIVATE_KEY_PATH, KeyConfig.PUBLIC_KEY_PATH, KeyConfig.PUBLIC_KEY_PATH) if isinstance(message, str): message = message.encode('utf-8') - ciphertext = PUBLIC_KEY_B.encrypt( - message, + aes_key = AESGCM.generate_key(bit_length=128) + aesgcm = AESGCM(aes_key) + nonce = os.urandom(12) + ciphertext = aesgcm.encrypt(nonce, message, None) + enc_key = PUBLIC_KEY_B.encrypt( + aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None) + algorithm=hashes.SHA256(), + label=None) ) - return ciphertext + payload = { + "enc_key": base64.b64encode(enc_key).decode('utf-8'), + "nonce": base64.b64encode(nonce).decode('utf-8'), + "ciphertext": base64.b64encode(ciphertext).decode('utf-8') + } + return json.dumps(payload).encode('utf-8') #签名信息 def signature(message:str)->bytes: @@ -74,13 +87,35 @@ def signature(message:str)->bytes: #解密信息 def decryption(ciphertext:bytes)->bytes: PRIVATE_KEY_A,_,_=load_keys(KeyConfig.PRIVATE_KEY_PATH,KeyConfig.PUBLIC_KEY_PATH,KeyConfig.PUBLIC_KEY_PATH) - plaintext = PRIVATE_KEY_A.decrypt( - ciphertext, - padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None) - ) - return plaintext + # Attempt hybrid decryption first + try: + raw = ciphertext.decode('utf-8') + payload = json.loads(raw) + enc_key = base64.b64decode(payload.get('enc_key')) + nonce = base64.b64decode(payload.get('nonce')) + ct = base64.b64decode(payload.get('ciphertext')) + aes_key = PRIVATE_KEY_A.decrypt( + enc_key, + padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None) + ) + aesgcm = AESGCM(aes_key) + plaintext = aesgcm.decrypt(nonce, ct, None) + return plaintext + except Exception: + # fallback to legacy RSA decrypt + try: + plaintext = PRIVATE_KEY_A.decrypt( + ciphertext, + padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None) + ) + return plaintext + except Exception as e: + print("[Decryption error fallback failed]", e) + return b"" #验证签名 def verify_signature(message:bytes, signature:bytes)->bool: diff --git a/utils/Container.py b/utils/Container.py index 6daafa3..acfda10 100644 --- a/utils/Container.py +++ b/utils/Container.py @@ -5,15 +5,17 @@ class Config_info(BaseModel): gpu_list:list cpu_number:int memory:int - user_name:str + swap_memory:int + name:str port:int image:str #gpu_list:显卡编号,cpu_number:需要用到的cpu核数,memory:申请的内存大小(GB) - def __init__(self,gpu_list:list,cpu_number:int,memory:int,user_name:str,image:str,port:int=0): + def __init__(self,gpu_list:list,cpu_number:int,memory:int,name:str,image:str,port:int=0,swap_memory:int=0): self.GPU_LIST=gpu_list self.CPU_NUMBER=cpu_number self.MEMORY=memory - self.USER_NAME=user_name + self.SWAP_MEMORY=swap_memory + self.NAME=name self.__PORT=port self.image=image return @@ -30,7 +32,8 @@ def get_config(self)->Config_info: "gpu_list":self.GPU_LIST, "cpu_number":self.CPU_NUMBER, "memory":self.MEMORY, - "user_name":self.USER_NAME, + "swap_memory":self.SWAP_MEMORY, + "name":self.NAME, "port":self.__PORT, "image":self.image } diff --git a/utils/sanitizer.py b/utils/sanitizer.py new file mode 100644 index 0000000..10ba5ae --- /dev/null +++ b/utils/sanitizer.py @@ -0,0 +1,60 @@ +import re + +# Reject shell metacharacters and line breaks +_META_RE = re.compile(r"[;&|`$<>\\\n]") +# Reject obvious dangerous command keywords +_DANGEROUS_WORDS = re.compile(r"\b(rm|shutdown|reboot|init|mkfs|dd|curl|wget|nc|ncat|perl|python|bash|sh)\b", re.IGNORECASE) + + +def validate_shell_arg(value: str) -> bool: + """Raise ValueError if the value looks like it could be used in shell injection. + + This is a conservative heuristic: it rejects values containing shell metacharacters + or obvious dangerous command words. It does NOT guarantee safety but helps + catch common cases. + """ + if value is None: + return True + if not isinstance(value, str): + raise ValueError("invalid argument type") + if _META_RE.search(value): + raise ValueError("argument contains shell metacharacters") + if _DANGEROUS_WORDS.search(value): + raise ValueError("argument contains dangerous keyword") + return True + + +def is_valid_name(value: str) -> bool: + """Return True if value is a strict name token (letters/digits/underscore/hyphen).""" + if not isinstance(value, str): + return False + return bool(re.fullmatch(r"[A-Za-z0-9_\-]+", value)) + + +def is_valid_image_name(value: str) -> bool: + """Return True if value looks like a container image name (allow dots, slashes, colon tags).""" + if not isinstance(value, str): + return False + return bool(re.fullmatch(r"[A-Za-z0-9]+(?:[A-Za-z0-9._\-\/]*)?(?::[A-Za-z0-9._\-]+)?", value)) + + +def validate_username(username: str) -> bool: + """Validate username/container-name-like tokens: allow letters, digits, underscore, hyphen.""" + if username is None: + return True + if not isinstance(username, str): + raise ValueError("invalid username type") + if not is_valid_name(username): + raise ValueError("invalid characters in username") + return True + + +def validate_image_name(image_name: str) -> bool: + """Validate container image names; raises ValueError if invalid.""" + if image_name is None: + return True + if not isinstance(image_name, str): + raise ValueError("invalid image name type") + if not is_valid_image_name(image_name): + raise ValueError("invalid image name") + return True