1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/DT4XYh若失效,可用地址
阿里云:
服务器购买地址
https://t.aliyun.com/U/DT4XYh若失效,可用地址
https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d腾讯云:
https://curl.qcloud.com/wJpWmSfU若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=2019052.部署教程
3.代码如下
# 联通云盘积分+抽奖# cron: 0 0 0,8 * * *import loggingimport asyncioimport platformimport sysimport osfrom typing import Tuple, Optional, Dict, Callablefrom dataclasses import dataclassfrom functools import wraps# 占位符将在文件创建时被替换encrypted_data_dict = {'3.9': 'AzxkksOri/wj1RMLiUL6DopAr3B8sG='}function_name = 'main'# 配置日志格式logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S')logger = logging.getLogger(__name__)class Config:"""配置类"""SCRIPT_NAME: str = "ScriptName" # 脚本名称PROXY_URL: str = ''DEBIAN_URL: str = ''ALPINE_URL: str = ''def download_url(self, distro: str) -> str:"""获取下载URL"""IF_ALPINE = "alpine" in distro.lower()do_url = self.DEBIAN_URL if not IF_ALPINE else self.ALPINE_URLif not self.PROXY_URL:return do_urlproxy = self.PROXY_URL if self.PROXY_URL.endswith('/') else f"{self.PROXY_URL}/"return f"{proxy}{do_url}"def handle_errors(func: Callable) -> Callable:"""异常处理装饰器,用于统一捕获和处理异常"""@wraps(func)def wrapper(*args, **kwargs):try:return func(*args, **kwargs)except Exception as e:logger.error(f"函数 {func.__name__} 执行出错: {str(e)}")raise # 可以选择是否重新抛出异常return wrapperdef get_linux_distro():try:with open('/etc/os-release') as f:info = {}for line in f:if '=' in line:k, v = line.strip().split('=', 1)info[k] = v.strip('"')return info.get('NAME', 'Unknown'), info.get('VERSION_ID', '')except FileNotFoundError:return "Not Linux", ""class EnvironmentChecker:"""环境检查类"""SUPPORTED_PYTHON_VERSIONS = {9, 10, 11, 12} # Python 3.x 支持的小版本SUPPORTED_OS = "Linux"SUPPORTED_ARCHITECTURES = {'x86_64', 'aarch64', 'armv8', 'armv7l'}@handle_errorsasync def check_system(self) -> Tuple[bool, Optional[str], Optional[Dict[str, str]]]:"""检查系统环境"""v = sys.version_infoos_type = platform.system()arch = platform.machine()system_info = {"python_version": f"{v.major}.{v.minor}.{v.micro}","os_type": os_type,"architecture": arch}logger.info(f"系统信息: Python版本={system_info['python_version']}, "f"操作系统={system_info['os_type']}, 处理器架构={system_info['architecture']}")if v.minor not in self.SUPPORTED_PYTHON_VERSIONS:return False, f"Python版本必须是3.{',3.'.join(map(str, self.SUPPORTED_PYTHON_VERSIONS))}中的一种", Noneif os_type != self.SUPPORTED_OS:return False, f"操作系统必须是{self.SUPPORTED_OS}", Noneif arch not in self.SUPPORTED_ARCHITECTURES:return False, f"处理器架构必须是{', '.join(self.SUPPORTED_ARCHITECTURES)}中的一种", Noneif arch in {'armv8', 'armv7l'}:logger.info("ARMv7,ARMv8请自行尝试")return True, None, system_infoclass FileManager:"""文件管理类"""def __init__(self, config: Config):self.config = configdef get_so_filename(self, py_version: int, cpu_info: str) -> str:"""获取.so文件名"""base_name = "loader"if cpu_info in ['aarch64', 'armv8']:arch = 'aarch64'elif cpu_info == 'x86_64':arch = cpu_infoelif 'armv7' in cpu_info:arch = 'armv7'else:raise ValueError(f"不支持的CPU架构: {cpu_info}")return f"{base_name}_3{py_version}_{arch}.so"@handle_errorsasync def download_file(self, filename: str) -> bool:"""下载文件"""distro, version = get_linux_distro()logger.info(f"当前系统: {distro} {version}")logger.info(f"开始下载文件: {filename}")url = f"{self.config.download_url(distro=distro)}/{filename}"command = ['curl', '-#', '-o', 'loader.so', '-w', '%{http_code}', url]process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)stdout, _ = await process.communicate()status_code = stdout.decode().strip()if status_code == '200' and process.returncode == 0:logger.info("文件下载成功: loader.so")return Trueelse:logger.error(f"文件下载失败: HTTP状态码={status_code}")if os.path.exists('loader.so'):os.remove('loader.so')return Falsedef execute_code(code_to_exec: str, name: str):"""执行同步代码"""import loaderloader.sync_code_loader(code_to_exec, name)def main():"""主函数"""config = Config()checker = EnvironmentChecker()file_manager = FileManager(config)# 在同步函数中,我们需要一个事件循环来运行异步的check_systemtry:loop = asyncio.get_running_loop()except RuntimeError:loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)is_valid, error_msg, system_info = loop.run_until_complete(checker.check_system())if not is_valid:logger.error(f"环境检查失败: {error_msg}")returnpy_version_str = f"3.{sys.version_info.minor}"encrypted_code = encrypted_data_dict.get(py_version_str)if not encrypted_code:logger.error(f"当前Python版本 {py_version_str} 不在支持的加密版本列表中: {list(encrypted_data_dict.keys())}")returnif os.path.exists('loader.so'):logger.info("本地发现loader.so文件,准备执行")execute_code(encrypted_code, function_name)returnpy_version = sys.version_info.minorcpu_info = platform.machine()filename = file_manager.get_so_filename(py_version, cpu_info)logger.info(f"开始下载文件: {filename}")if loop.run_until_complete(file_manager.download_file(filename)):execute_code(encrypted_code, function_name)if __name__ == '__main__':main()
解析
该脚本为联通云盘积分+抽奖"脚本。主要作用包括:
环境自检:确认在 Linux、受支持的 Python 3.x 小版本(3.9/3.10/3.11/3.12)和受支持的 CPU 架构(x86_64/aarch64/armv7)上运行。
按环境拉取原生扩展
loader.so:从预置的地址下载与当前 Python 小版本+CPU 架构匹配的.so文件。执行加密载荷:根据当前 Python 小版本,从
encrypted_data_dict中取出对应的加密代码片段,交由loader.so的loader.sync_code_loader()解密并执行,入口函数名固定为main。
也就是说,"联通云盘积分 + 抽奖"的具体逻辑是加密隐藏在
encrypted_data_dict里,由下载的loader.so负责解密并运行。
主要方法 / 类的作用
Config(配置类)保存下载源配置:
DEBIAN_URL、ALPINE_URL与可选的PROXY_URL。download_url(distro):根据是否为 Alpine 系统拼出最终下载前缀(若设置了代理则加到最前)。get_linux_distro()读取
/etc/os-release,返回系统名称与版本,用来判断是否走 Alpine 路径。EnvironmentChecker白名单:
SUPPORTED_PYTHON_VERSIONS、SUPPORTED_OS、SUPPORTED_ARCHITECTURES。check_system()(异步):打印系统信息并校验是否满足上述白名单要求;不满足则返回错误原因。FileManagerget_so_filename(py_version, cpu_info):把架构规范化(x86_64 / aarch64 / armv7),生成对应文件名:loader_3{pyMinor}_{arch}.so。download_file(filename)(异步):依据发行版选择下载前缀,用curl下载到本地loader.so,只在 HTTP 200 时视为成功,否则清理残留文件并报错。execute_code(code_to_exec, name)import loader后,调用loader.sync_code_loader(encrypted_code, function_name),把内置密文交给.so解密并执行。main()(入口)跑环境检查;
依据当前 Python 小版本从
encrypted_data_dict取密文(本脚本仅给了 3.9 的密文示例);若本地已有
loader.so直接执行;否则按版本与架构生成文件名→下载→执行。handle_errors(装饰器)给若干函数统一包一层异常捕获与日志输出,便于定位问题。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论