1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/DT4XYh若失效,可用地址
阿里云:
服务器购买地址
https://t.aliyun.com/U/DT4XYh
若失效,可用地址
https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d
腾讯云:
https://curl.qcloud.com/wJpWmSfU
若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console
华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905
2.部署教程
3.代码如下
# 联通云盘积分+抽奖
# cron: 0 0 0,8 * * *
import logging
import asyncio
import platform
import sys
import os
from typing import Tuple, Optional, Dict, Callable
from dataclasses import dataclass
from functools import wraps
# 占位符将在文件创建时被替换
encrypted_data_dict = {'3.9': 'AzxkksOri/wj1RMLiUL6DopAr3B8sG='}
function_name = 'main'
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class Config:
"""配置类"""
SCRIPT_NAME: str = "ScriptName" # 脚本名称
PROXY_URL: str = ''
DEBIAN_URL: str = ''
ALPINE_URL: str = ''
def download_url(self, distro: str) -> str:
"""获取下载URL"""
IF_ALPINE = "alpine" in distro.lower()
do_url = self.DEBIAN_URL if not IF_ALPINE else self.ALPINE_URL
if not self.PROXY_URL:
return do_url
proxy = self.PROXY_URL if self.PROXY_URL.endswith('/') else f"{self.PROXY_URL}/"
return f"{proxy}{do_url}"
def handle_errors(func: Callable) -> Callable:
"""
异常处理装饰器,用于统一捕获和处理异常
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"函数 {func.__name__} 执行出错: {str(e)}")
raise # 可以选择是否重新抛出异常
return wrapper
def get_linux_distro():
try:
with open('/etc/os-release') as f:
info = {}
for line in f:
if '=' in line:
k, v = line.strip().split('=', 1)
info[k] = v.strip('"')
return info.get('NAME', 'Unknown'), info.get('VERSION_ID', '')
except FileNotFoundError:
return "Not Linux", ""
class EnvironmentChecker:
"""环境检查类"""
SUPPORTED_PYTHON_VERSIONS = {9, 10, 11, 12} # Python 3.x 支持的小版本
SUPPORTED_OS = "Linux"
SUPPORTED_ARCHITECTURES = {'x86_64', 'aarch64', 'armv8', 'armv7l'}
@handle_errors
async def check_system(self) -> Tuple[bool, Optional[str], Optional[Dict[str, str]]]:
"""检查系统环境"""
v = sys.version_info
os_type = platform.system()
arch = platform.machine()
system_info = {
"python_version": f"{v.major}.{v.minor}.{v.micro}",
"os_type": os_type,
"architecture": arch
}
logger.info(f"系统信息: Python版本={system_info['python_version']}, "
f"操作系统={system_info['os_type']}, 处理器架构={system_info['architecture']}")
if v.minor not in self.SUPPORTED_PYTHON_VERSIONS:
return False, f"Python版本必须是3.{',3.'.join(map(str, self.SUPPORTED_PYTHON_VERSIONS))}中的一种", None
if os_type != self.SUPPORTED_OS:
return False, f"操作系统必须是{self.SUPPORTED_OS}", None
if arch not in self.SUPPORTED_ARCHITECTURES:
return False, f"处理器架构必须是{', '.join(self.SUPPORTED_ARCHITECTURES)}中的一种", None
if arch in {'armv8', 'armv7l'}:
logger.info("ARMv7,ARMv8请自行尝试")
return True, None, system_info
class FileManager:
"""文件管理类"""
def __init__(self, config: Config):
self.config = config
def get_so_filename(self, py_version: int, cpu_info: str) -> str:
"""获取.so文件名"""
base_name = "loader"
if cpu_info in ['aarch64', 'armv8']:
arch = 'aarch64'
elif cpu_info == 'x86_64':
arch = cpu_info
elif 'armv7' in cpu_info:
arch = 'armv7'
else:
raise ValueError(f"不支持的CPU架构: {cpu_info}")
return f"{base_name}_3{py_version}_{arch}.so"
@handle_errors
async def download_file(self, filename: str) -> bool:
"""下载文件"""
distro, version = get_linux_distro()
logger.info(f"当前系统: {distro} {version}")
logger.info(f"开始下载文件: {filename}")
url = f"{self.config.download_url(distro=distro)}/{filename}"
command = ['curl', '-#', '-o', 'loader.so', '-w', '%{http_code}', url]
process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, _ = await process.communicate()
status_code = stdout.decode().strip()
if status_code == '200' and process.returncode == 0:
logger.info("文件下载成功: loader.so")
return True
else:
logger.error(f"文件下载失败: HTTP状态码={status_code}")
if os.path.exists('loader.so'):
os.remove('loader.so')
return False
def execute_code(code_to_exec: str, name: str):
"""执行同步代码"""
import loader
loader.sync_code_loader(code_to_exec, name)
def main():
"""主函数"""
config = Config()
checker = EnvironmentChecker()
file_manager = FileManager(config)
# 在同步函数中,我们需要一个事件循环来运行异步的check_system
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
is_valid, error_msg, system_info = loop.run_until_complete(checker.check_system())
if not is_valid:
logger.error(f"环境检查失败: {error_msg}")
return
py_version_str = f"3.{sys.version_info.minor}"
encrypted_code = encrypted_data_dict.get(py_version_str)
if not encrypted_code:
logger.error(f"当前Python版本 {py_version_str} 不在支持的加密版本列表中: {list(encrypted_data_dict.keys())}")
return
if os.path.exists('loader.so'):
logger.info("本地发现loader.so文件,准备执行")
execute_code(encrypted_code, function_name)
return
py_version = sys.version_info.minor
cpu_info = platform.machine()
filename = file_manager.get_so_filename(py_version, cpu_info)
logger.info(f"开始下载文件: {filename}")
if loop.run_until_complete(file_manager.download_file(filename)):
execute_code(encrypted_code, function_name)
if __name__ == '__main__':
main()
解析
该脚本为联通云盘积分+抽奖"脚本。主要作用包括:
环境自检:确认在 Linux、受支持的 Python 3.x 小版本(3.9/3.10/3.11/3.12)和受支持的 CPU 架构(x86_64/aarch64/armv7)上运行。
按环境拉取原生扩展
loader.so
:从预置的地址下载与当前 Python 小版本+CPU 架构匹配的.so
文件。执行加密载荷:根据当前 Python 小版本,从
encrypted_data_dict
中取出对应的加密代码片段,交由loader.so
的loader.sync_code_loader()
解密并执行,入口函数名固定为main
。
也就是说,"联通云盘积分 + 抽奖"的具体逻辑是加密隐藏在
encrypted_data_dict
里,由下载的loader.so
负责解密并运行。
主要方法 / 类的作用
Config
(配置类)保存下载源配置:
DEBIAN_URL
、ALPINE_URL
与可选的PROXY_URL
。download_url(distro)
:根据是否为 Alpine 系统拼出最终下载前缀(若设置了代理则加到最前)。get_linux_distro()
读取
/etc/os-release
,返回系统名称与版本,用来判断是否走 Alpine 路径。EnvironmentChecker
白名单:
SUPPORTED_PYTHON_VERSIONS
、SUPPORTED_OS
、SUPPORTED_ARCHITECTURES
。check_system()
(异步):打印系统信息并校验是否满足上述白名单要求;不满足则返回错误原因。FileManager
get_so_filename(py_version, cpu_info)
:把架构规范化(x86_64 / aarch64 / armv7),生成对应文件名:loader_3{pyMinor}_{arch}.so
。download_file(filename)
(异步):依据发行版选择下载前缀,用curl
下载到本地loader.so
,只在 HTTP 200 时视为成功,否则清理残留文件并报错。execute_code(code_to_exec, name)
import loader
后,调用loader.sync_code_loader(encrypted_code, function_name)
,把内置密文交给.so
解密并执行。main()
(入口)跑环境检查;
依据当前 Python 小版本从
encrypted_data_dict
取密文(本脚本仅给了 3.9 的密文示例);若本地已有
loader.so
直接执行;否则按版本与架构生成文件名→下载→执行。handle_errors
(装饰器)给若干函数统一包一层异常捕获与日志输出,便于定位问题。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论