1.购买服务器
阿里云:
服务器购买地址
https://t.aliyun.com/U/nxyLKd若失效,可用地址
https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d腾讯云:
https://curl.qcloud.com/wJpWmSfU若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=2019052.部署教程
3.代码如下
# !/usr/bin/env python3# -*- coding: utf-8 -*-"""new Env('夸克自动追更');0 8,18,20 * * * quark.py"""import osimport reimport sysimport jsonimport timeimport randomimport requestsimport importlibfrom datetime import datetime# 兼容青龙try:from treelib import Treeexcept:print("正在尝试自动安装依赖...")os.system("pip3 install treelib &> /dev/null")from treelib import TreeCONFIG_DATA = {}NOTIFYS = []GH_PROXY = ''MAGIC_REGEX = {"$TV": {"pattern": r".*?(?<!\d)([Ss]\d{1,2})?([Ee]?[Pp]?[Xx]?\d{1,3})(?!\d).*?\.(mp4|mkv)","replace": r"\1\2.\3",},}# 发送通知消息def send_ql_notify(title, body):try:# 导入通知模块import notify# 如未配置 push_config 则使用青龙环境通知设置if CONFIG_DATA.get("push_config"):notify.push_config = CONFIG_DATA["push_config"].copy()notify.push_config["CONSOLE"] = notify.push_config.get("CONSOLE", True)notify.send(title, body)except Exception as e:if e:print("发送通知消息失败!")# 添加消息def add_notify(text):global NOTIFYSNOTIFYS.append(text)print("📢", text)return textclass Config:# 下载配置def download_file(url, save_path):response = requests.get(url)if response.status_code == 200:with open(save_path, "wb") as file:file.write(response.content)return Trueelse:return False# 读取CKdef get_cookies(cookie_val):if isinstance(cookie_val, list):return cookie_valelif cookie_val:if "\n" in cookie_val:return cookie_val.split("\n")else:return [cookie_val]else:return Falsedef load_plugins(plugins_config={}, plugins_dir="plugins"):PLUGIN_FLAGS = os.environ.get("PLUGIN_FLAGS", "").split(",")plugins_available = {}task_plugins_config = {}all_modules = [f.replace(".py", "") for f in os.listdir(plugins_dir) if f.endswith(".py")]# 调整模块优先级priority_path = os.path.join(plugins_dir, "_priority.json")try:with open(priority_path, encoding="utf-8") as f:priority_modules = json.load(f)if priority_modules:all_modules = [module for module in priority_modules if module in all_modules] + [module for module in all_modules if module not in priority_modules]except (FileNotFoundError, json.JSONDecodeError):priority_modules = []for module_name in all_modules:if f"-{module_name}" in PLUGIN_FLAGS:continuetry:module = importlib.import_module(f"{plugins_dir}.{module_name}")ServerClass = getattr(module, module_name.capitalize())# 检查配置中是否存在该模块的配置if module_name in plugins_config:plugin = ServerClass(**plugins_config[module_name])plugins_available[module_name] = pluginelse:plugin = ServerClass()plugins_config[module_name] = plugin.default_config# 检查插件是否支持单独任务配置if hasattr(plugin, "default_task_config"):task_plugins_config[module_name] = plugin.default_task_configexcept (ImportError, AttributeError) as e:print(f"载入模块 {module_name} 失败: {e}")print()return plugins_available, plugins_config, task_plugins_configdef breaking_change_update(config_data):if config_data.get("emby"):print("🔼 Update config v0.3.6.1 to 0.3.7")config_data.setdefault("media_servers", {})["emby"] = {"url": config_data["emby"]["url"],"token": config_data["emby"]["apikey"],}del config_data["emby"]for task in config_data.get("tasklist", {}):task["media_id"] = task.get("emby_id", "")if task.get("emby_id"):del task["emby_id"]if config_data.get("media_servers"):print("🔼 Update config v0.3.8 to 0.3.9")config_data["plugins"] = config_data.get("media_servers")del config_data["media_servers"]for task in config_data.get("tasklist", {}):task["addition"] = {"emby": {"media_id": task.get("media_id", ""),}}if task.get("media_id"):del task["media_id"]class Quark:BASE_URL = "https://drive-pc.quark.cn"BASE_URL_APP = "https://drive-m.quark.cn"USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) quark-cloud-drive/3.14.2 Chrome/112.0.5615.165 Electron/24.1.3.8 Safari/537.36 Channel/pckk_other_ch"def __init__(self, cookie, index=None):self.cookie = cookie.strip()self.index = index + 1self.is_active = Falseself.nickname = ""self.mparam = self._match_mparam_form_cookie(cookie)self.savepath_fid = {"/": "0"}def _match_mparam_form_cookie(self, cookie):mparam = {}kps_match = re.search(r"(?<!\w)kps=([a-zA-Z0-9%+/=]+)[;&]?", cookie)sign_match = re.search(r"(?<!\w)sign=([a-zA-Z0-9%+/=]+)[;&]?", cookie)vcode_match = re.search(r"(?<!\w)vcode=([a-zA-Z0-9%+/=]+)[;&]?", cookie)if kps_match and sign_match and vcode_match:mparam = {"kps": kps_match.group(1).replace("%25", "%"),"sign": sign_match.group(1).replace("%25", "%"),"vcode": vcode_match.group(1).replace("%25", "%"),}return mparamdef _send_request(self, method, url, **kwargs):headers = {"cookie": self.cookie,"content-type": "application/json","user-agent": self.USER_AGENT,}if "headers" in kwargs:headers = kwargs["headers"]del kwargs["headers"]if self.mparam and "share" in url and self.BASE_URL in url:url = url.replace(self.BASE_URL, self.BASE_URL_APP)kwargs["params"].update({"device_model": "M2011K2C","entry": "default_clouddrive","_t_group": "0%3A_s_vp%3A1","dmn": "Mi%2B11","fr": "android","pf": "3300","bi": "35937","ve": "7.4.5.680","ss": "411x875","mi": "M2011K2C","nt": "5","nw": "0","kt": "4","pr": "ucpro","sv": "release","dt": "phone","data_from": "ucapi","kps": self.mparam.get("kps"),"sign": self.mparam.get("sign"),"vcode": self.mparam.get("vcode"),"app": "clouddrive","kkkk": "1",})del headers["cookie"]try:response = requests.request(method, url, headers=headers, **kwargs)# print(f"{response.text}")# response.raise_for_status() # 检查请求是否成功,但返回非200也会抛出异常return responseexcept Exception as e:print(f"_send_request error:\n{e}")fake_response = requests.Response()fake_response.status_code = 500fake_response._content = b'{"status": 500, "message": "request error"}'return fake_responsedef init(self):account_info = self.get_account_info()if account_info:self.is_active = Trueself.nickname = account_info["nickname"]return account_infoelse:return Falsedef get_account_info(self):url = "https://pan.quark.cn/account/info"querystring = {"fr": "pc", "platform": "pc"}response = self._send_request("GET", url, params=querystring).json()if response.get("data"):return response["data"]else:return Falsedef get_growth_info(self):url = f"{self.BASE_URL_APP}/1/clouddrive/capacity/growth/info"querystring = {"pr": "ucpro","fr": "android","kps": self.mparam.get("kps"),"sign": self.mparam.get("sign"),"vcode": self.mparam.get("vcode"),}headers = {"content-type": "application/json",}response = self._send_request("GET", url, headers=headers, params=querystring).json()if response.get("data"):return response["data"]else:return Falsedef get_growth_sign(self):url = f"{self.BASE_URL_APP}/1/clouddrive/capacity/growth/sign"querystring = {"pr": "ucpro","fr": "android","kps": self.mparam.get("kps"),"sign": self.mparam.get("sign"),"vcode": self.mparam.get("vcode"),}payload = {"sign_cyclic": True,}headers = {"content-type": "application/json",}response = self._send_request("POST", url, json=payload, headers=headers, params=querystring).json()if response.get("data"):return True, response["data"]["sign_daily_reward"]else:return False, response["message"]# 可验证资源是否失效def get_stoken(self, pwd_id, passcode=""):url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/token"querystring = {"pr": "ucpro", "fr": "pc"}payload = {"pwd_id": pwd_id, "passcode": passcode}response = self._send_request("POST", url, json=payload, params=querystring).json()if response.get("status") == 200:return True, response["data"]["stoken"]else:return False, response["message"]def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0):list_merge = []page = 1while True:url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/detail"querystring = {"pr": "ucpro","fr": "pc","pwd_id": pwd_id,"stoken": stoken,"pdir_fid": pdir_fid,"force": "0","_page": page,"_size": "50","_fetch_banner": "0","_fetch_share": _fetch_share,"_fetch_total": "1","_sort": "file_type:asc,updated_at:desc",}response = self._send_request("GET", url, params=querystring).json()if response["data"]["list"]:list_merge += response["data"]["list"]page += 1else:breakif len(list_merge) >= response["metadata"]["_total"]:breakresponse["data"]["list"] = list_mergereturn response["data"]def get_fids(self, file_paths):fids = []while True:url = f"{self.BASE_URL}/1/clouddrive/file/info/path_list"querystring = {"pr": "ucpro", "fr": "pc"}payload = {"file_path": file_paths[:50], "namespace": "0"}response = self._send_request("POST", url, json=payload, params=querystring).json()if response["code"] == 0:fids += response["data"]file_paths = file_paths[50:]else:print(f"获取目录ID:失败, {response['message']}")breakif len(file_paths) == 0:breakreturn fidsdef ls_dir(self, pdir_fid, **kwargs):file_list = []page = 1while True:url = f"{self.BASE_URL}/1/clouddrive/file/sort"querystring = {"pr": "ucpro","fr": "pc","uc_param_str": "","pdir_fid": pdir_fid,"_page": page,"_size": "50","_fetch_total": "1","_fetch_sub_dirs": "0","_sort": "file_type:asc,updated_at:desc","_fetch_full_path": kwargs.get("fetch_full_path", 0),}response = self._send_request("GET", url, params=querystring).json()if response["data"]["list"]:file_list += response["data"]["list"]page += 1else:breakif len(file_list) >= response["metadata"]["_total"]:breakreturn file_listdef save_file(self, fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken):url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/save"querystring = {"pr": "ucpro","fr": "pc","uc_param_str": "","app": "clouddrive","__dt": int(random.uniform(1, 5) * 60 * 1000),"__t": datetime.now().timestamp(),}payload = {"fid_list": fid_list,"fid_token_list": fid_token_list,"to_pdir_fid": to_pdir_fid,"pwd_id": pwd_id,"stoken": stoken,"pdir_fid": "0","scene": "link",}response = self._send_request("POST", url, json=payload, params=querystring).json()return responsedef query_task(self, task_id):retry_index = 0while True:url = f"{self.BASE_URL}/1/clouddrive/task"querystring = {"pr": "ucpro","fr": "pc","uc_param_str": "","task_id": task_id,"retry_index": retry_index,"__dt": int(random.uniform(1, 5) * 60 * 1000),"__t": datetime.now().timestamp(),}response = self._send_request("GET", url, params=querystring).json()if response["data"]["status"] != 0:if retry_index > 0:print()breakelse:if retry_index == 0:print(f"正在等待[{response['data']['task_title']}]执行结果",end="",flush=True,)else:print(".", end="", flush=True)retry_index += 1time.sleep(0.500)return responsedef download(self, fids):url = f"{self.BASE_URL}/1/clouddrive/file/download"querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}payload = {"fids": fids}response = self._send_request("POST", url, json=payload, params=querystring)set_cookie = response.cookies.get_dict()cookie_str = "; ".join([f"{key}={value}" for key, value in set_cookie.items()])return response.json(), cookie_strdef mkdir(self, dir_path):url = f"{self.BASE_URL}/1/clouddrive/file"querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}payload = {"pdir_fid": "0","file_name": "","dir_path": dir_path,"dir_init_lock": False,}response = self._send_request("POST", url, json=payload, params=querystring).json()return responsedef rename(self, fid, file_name):url = f"{self.BASE_URL}/1/clouddrive/file/rename"querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}payload = {"fid": fid, "file_name": file_name}response = self._send_request("POST", url, json=payload, params=querystring).json()return responsedef delete(self, filelist):url = f"{self.BASE_URL}/1/clouddrive/file/delete"querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}payload = {"action_type": 2, "filelist": filelist, "exclude_fids": []}response = self._send_request("POST", url, json=payload, params=querystring).json()return responsedef recycle_list(self, page=1, size=30):url = f"{self.BASE_URL}/1/clouddrive/file/recycle/list"querystring = {"_page": page,"_size": size,"pr": "ucpro","fr": "pc","uc_param_str": "",}response = self._send_request("GET", url, params=querystring).json()return response["data"]["list"]def recycle_remove(self, record_list):url = f"{self.BASE_URL}/1/clouddrive/file/recycle/remove"querystring = {"uc_param_str": "", "fr": "pc", "pr": "ucpro"}payload = {"select_mode": 2,"record_list": record_list,}response = self._send_request("POST", url, json=payload, params=querystring).json()return response# ↑ 请求函数# ↓ 操作函数# 魔法正则匹配def magic_regex_func(self, pattern, replace, taskname=None):magic_regex = CONFIG_DATA.get("magic_regex") or MAGIC_REGEX or {}keyword = patternif keyword in magic_regex:pattern = magic_regex[keyword]["pattern"]if replace == "":replace = magic_regex[keyword]["replace"]if taskname:replace = replace.replace("$TASKNAME", taskname)return pattern, replacedef get_id_from_url(self, url):url = url.replace("https://pan.quark.cn/s/", "")pattern = r"(\w+)(\?pwd=(\w+))?(#/list/share.*/(\w+))?"match = re.search(pattern, url)if match:pwd_id = match.group(1)passcode = match.group(3) if match.group(3) else ""pdir_fid = match.group(5) if match.group(5) else 0return pwd_id, passcode, pdir_fidelse:return Nonedef update_savepath_fid(self, tasklist):dir_paths = [re.sub(r"/{2,}", "/", f"/{item['savepath']}")for item in tasklistif not item.get("enddate")or (datetime.now().date()<= datetime.strptime(item["enddate"], "%Y-%m-%d").date())]if not dir_paths:return Falsedir_paths_exist_arr = self.get_fids(dir_paths)dir_paths_exist = [item["file_path"] for item in dir_paths_exist_arr]# 比较创建不存在的dir_paths_unexist = list(set(dir_paths) - set(dir_paths_exist) - set(["/"]))for dir_path in dir_paths_unexist:mkdir_return = self.mkdir(dir_path)if mkdir_return["code"] == 0:new_dir = mkdir_return["data"]dir_paths_exist_arr.append({"file_path": dir_path, "fid": new_dir["fid"]})print(f"创建文件夹:{dir_path}")else:print(f"创建文件夹:{dir_path} 失败, {mkdir_return['message']}")# 储存目标目录的fidfor dir_path in dir_paths_exist_arr:self.savepath_fid[dir_path["file_path"]] = dir_path["fid"]# print(dir_paths_exist_arr)def do_save_check(self, shareurl, savepath):try:pwd_id, passcode, pdir_fid = self.get_id_from_url(shareurl)is_sharing, stoken = self.get_stoken(pwd_id, passcode)share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)["list"]fid_list = [item["fid"] for item in share_file_list]fid_token_list = [item["share_fid_token"] for item in share_file_list]file_name_list = [item["file_name"] for item in share_file_list]if not fid_list:returnget_fids = self.get_fids([savepath])to_pdir_fid = (get_fids[0]["fid"] if get_fids else self.mkdir(savepath)["data"]["fid"])save_file = self.save_file(fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken)if save_file["code"] == 41017:returnelif save_file["code"] == 0:dir_file_list = self.ls_dir(to_pdir_fid)del_list = [item["fid"]for item in dir_file_listif (item["file_name"] in file_name_list)and ((datetime.now().timestamp() - item["created_at"]) < 60)]if del_list:self.delete(del_list)recycle_list = self.recycle_list()record_id_list = [item["record_id"]for item in recycle_listif item["fid"] in del_list]self.recycle_remove(record_id_list)return save_fileelse:return Falseexcept Exception as e:if os.environ.get("DEBUG") == True:print(f"转存测试失败: {str(e)}")def do_save_task(self, task):# 判断资源失效记录if task.get("shareurl_ban"):print(f"《{task['taskname']}》:{task['shareurl_ban']}")return# 链接转换所需参数pwd_id, passcode, pdir_fid = self.get_id_from_url(task["shareurl"])# print("match: ", pwd_id, pdir_fid)# 获取stoken,同时可验证资源是否失效is_sharing, stoken = self.get_stoken(pwd_id, passcode)if not is_sharing:add_notify(f"❌《{task['taskname']}》:{stoken}\n")task["shareurl_ban"] = stokenreturn# print("stoken: ", stoken)updated_tree = self.dir_check_and_save(task, pwd_id, stoken, pdir_fid)if updated_tree.size(1) > 0:add_notify(f"✅《{task['taskname']}》添加追更:\n{updated_tree}")return updated_treeelse:print(f"任务结束:没有新的转存任务")return Falsedef dir_check_and_save(self, task, pwd_id, stoken, pdir_fid="", subdir_path=""):tree = Tree()# 获取分享文件列表share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)["list"]# print("share_file_list: ", share_file_list)if not share_file_list:if subdir_path == "":task["shareurl_ban"] = "分享为空,文件已被分享者删除"add_notify(f"❌《{task['taskname']}》:{task['shareurl_ban']}\n")return treeelif (len(share_file_list) == 1and share_file_list[0]["dir"]and subdir_path == ""): # 仅有一个文件夹print("🧠 该分享是一个文件夹,读取文件夹内列表")share_file_list = self.get_detail(pwd_id, stoken, share_file_list[0]["fid"])["list"]# 获取目标目录文件列表savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")if not self.savepath_fid.get(savepath):if get_fids := self.get_fids([savepath]):self.savepath_fid[savepath] = get_fids[0]["fid"]else:print(f"❌ 目录 {savepath} fid获取失败,跳过转存")return treeto_pdir_fid = self.savepath_fid[savepath]dir_file_list = self.ls_dir(to_pdir_fid)# print("dir_file_list: ", dir_file_list)tree.create_node(savepath,pdir_fid,data={"is_dir": True,},)# 需保存的文件清单need_save_list = []# 添加符合的for share_file in share_file_list:if share_file["dir"] and task.get("update_subdir", False):pattern, replace = task["update_subdir"], ""else:pattern, replace = self.magic_regex_func(task["pattern"], task["replace"], task["taskname"])# 正则文件名匹配if re.search(pattern, share_file["file_name"]):# 替换后的文件名save_name = (re.sub(pattern, replace, share_file["file_name"])if replace != ""else share_file["file_name"])# 忽略后缀if task.get("ignore_extension") and not share_file["dir"]:compare_func = lambda a, b1, b2: (os.path.splitext(a)[0] == os.path.splitext(b1)[0]or os.path.splitext(a)[0] == os.path.splitext(b2)[0])else:compare_func = lambda a, b1, b2: (a == b1 or a == b2)# 判断目标目录文件是否存在file_exists = any(compare_func(dir_file["file_name"], share_file["file_name"], save_name)for dir_file in dir_file_list)if not file_exists:share_file["save_name"] = save_nameneed_save_list.append(share_file)elif share_file["dir"]:# 存在并是一个文件夹if task.get("update_subdir", False):if re.search(task["update_subdir"], share_file["file_name"]):print(f"检查子文件夹:{savepath}/{share_file['file_name']}")subdir_tree = self.dir_check_and_save(task,pwd_id,stoken,share_file["fid"],f"{subdir_path}/{share_file['file_name']}",)if subdir_tree.size(1) > 0:# 合并子目录树tree.create_node("📁" + share_file["file_name"],share_file["fid"],parent=pdir_fid,data={"is_dir": share_file["dir"],},)tree.merge(share_file["fid"], subdir_tree, deep=False)# 指定文件开始订阅/到达指定文件(含)结束历遍if share_file["fid"] == task.get("startfid", ""):breakfid_list = [item["fid"] for item in need_save_list]fid_token_list = [item["share_fid_token"] for item in need_save_list]if fid_list:save_file_return = self.save_file(fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken)err_msg = Noneif save_file_return["code"] == 0:task_id = save_file_return["data"]["task_id"]query_task_return = self.query_task(task_id)if query_task_return["code"] == 0:# 建立目录树for index, item in enumerate(need_save_list):icon = ("📁"if item["dir"] == Trueelse "🎞️" if item["obj_category"] == "video" else "")tree.create_node(f"{icon}{item['save_name']}",item["fid"],parent=pdir_fid,data={"fid": f"{query_task_return['data']['save_as']['save_as_top_fids'][index]}","path": f"{savepath}/{item['save_name']}","is_dir": item["dir"],},)else:err_msg = query_task_return["message"]else:err_msg = save_file_return["message"]if err_msg:add_notify(f"❌《{task['taskname']}》转存失败:{err_msg}\n")return treedef do_rename_task(self, task, subdir_path=""):pattern, replace = self.magic_regex_func(task["pattern"], task["replace"], task["taskname"])if not pattern or not replace:return 0savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")if not self.savepath_fid.get(savepath):self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]dir_file_list = self.ls_dir(self.savepath_fid[savepath])dir_file_name_list = [item["file_name"] for item in dir_file_list]is_rename_count = 0for dir_file in dir_file_list:if dir_file["dir"]:is_rename_count += self.do_rename_task(task, f"{subdir_path}/{dir_file['file_name']}")if re.search(pattern, dir_file["file_name"]):save_name = (re.sub(pattern, replace, dir_file["file_name"])if replace != ""else dir_file["file_name"])if save_name != dir_file["file_name"] and (save_name not in dir_file_name_list):rename_return = self.rename(dir_file["fid"], save_name)if rename_return["code"] == 0:print(f"重命名:{dir_file['file_name']} → {save_name}")is_rename_count += 1else:print(f"重命名:{dir_file['file_name']} → {save_name} 失败,{rename_return['message']}")return is_rename_count > 0def verify_account(account):# 验证账号print(f"▶️ 验证第{account.index}个账号")if "__uid" not in account.cookie:print(f"💡 不存在cookie必要参数,判断为仅签到")return Falseelse:account_info = account.init()if not account_info:add_notify(f"👤 第{account.index}个账号登录失败,cookie无效❌")return Falseelse:print(f"👤 账号昵称: {account_info['nickname']}✅")return Truedef format_bytes(size_bytes: int) -> str:units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")i = 0while size_bytes >= 1024 and i < len(units) - 1:size_bytes /= 1024i += 1return f"{size_bytes:.2f} {units[i]}"def do_sign(account):if not account.mparam:print("⏭️ 移动端参数未设置,跳过签到")print()return# 每日领空间growth_info = account.get_growth_info()if growth_info:growth_message = f"💾 {'88VIP' if growth_info['88VIP'] else '普通用户'} 总空间:{format_bytes(growth_info['total_capacity'])},签到累计获得:{format_bytes(growth_info['cap_composition'].get('sign_reward', 0))}"if growth_info["cap_sign"]["sign_daily"]:sign_message = f"📅 签到记录: 今日已签到+{int(growth_info['cap_sign']['sign_daily_reward']/1024/1024)}MB,连签进度({growth_info['cap_sign']['sign_progress']}/{growth_info['cap_sign']['sign_target']})✅"message = f"{sign_message}\n{growth_message}"print(message)else:sign, sign_return = account.get_growth_sign()if sign:sign_message = f"📅 执行签到: 今日签到+{int(sign_return/1024/1024)}MB,连签进度({growth_info['cap_sign']['sign_progress']+1}/{growth_info['cap_sign']['sign_target']})✅"message = f"{sign_message}\n{growth_message}"if (str(CONFIG_DATA.get("push_config", {}).get("QUARK_SIGN_NOTIFY")).lower()== "false"or os.environ.get("QUARK_SIGN_NOTIFY") == "false"):print(message)else:message = message.replace("今日", f"[{account.nickname}]今日")add_notify(message)else:print(f"📅 签到异常: {sign_return}")print()def do_save(account, tasklist=[]):print(f"🧩 载入插件")plugins, CONFIG_DATA["plugins"], task_plugins_config = Config.load_plugins(CONFIG_DATA.get("plugins", {}))print(f"转存账号: {account.nickname}")# 获取全部保存目录fidaccount.update_savepath_fid(tasklist)def check_date(task):return (not task.get("enddate")or (datetime.now().date()<= datetime.strptime(task["enddate"], "%Y-%m-%d").date())) and (not task.get("runweek")# 星期一为0,星期日为6or (datetime.today().weekday() + 1 in task.get("runweek")))# 执行任务for index, task in enumerate(tasklist):# 判断任务期限if check_date(task):print()print(f"#{index+1}------------------")print(f"任务名称: {task['taskname']}")print(f"分享链接: {task['shareurl']}")print(f"保存路径: {task['savepath']}")print(f"正则匹配: {task['pattern']}")print(f"正则替换: {task['replace']}")if task.get("enddate"):print(f"任务截止: {task['enddate']}")if task.get("ignore_extension"):print(f"忽略后缀: {task['ignore_extension']}")if task.get("update_subdir"):print(f"更子目录: {task['update_subdir']}")print()is_new_tree = account.do_save_task(task)is_rename = account.do_rename_task(task)# 补充任务的插件配置def merge_dicts(a, b):result = a.copy()for key, value in b.items():if (key in resultand isinstance(result[key], dict)and isinstance(value, dict)):result[key] = merge_dicts(result[key], value)elif key not in result:result[key] = valuereturn resulttask["addition"] = merge_dicts(task.get("addition", {}), task_plugins_config)# 调用插件if is_new_tree or is_rename:print(f"🧩 调用插件")for plugin_name, plugin in plugins.items():if plugin.is_active and (is_new_tree or is_rename):task = (plugin.run(task, account=account, tree=is_new_tree) or task)print()def main():global CONFIG_DATAstart_time = datetime.now()print(f"===============程序开始===============")print(f"⏰ 执行时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")print()# 读取启动参数config_path = sys.argv[1] if len(sys.argv) > 1 else "quark_config.json"task_index = int(sys.argv[2]) if len(sys.argv) > 2 and sys.argv[2].isdigit() else ""# 检查本地文件是否存在,如果不存在就下载if not os.path.exists(config_path):if os.environ.get("QUARK_COOKIE"):print(f"⚙️ 读取到 QUARK_COOKIE 环境变量,仅签到领空间。如需执行转存,请删除该环境变量后配置 {config_path} 文件")cookie_val = os.environ.get("QUARK_COOKIE")cookie_form_file = Falseelse:print(f"⚙️ 配置文件 {config_path} 不存在❌,正远程从下载配置模版")config_url = f"{GH_PROXY}quark_config.json"if Config.download_file(config_url, config_path):print("⚙️ 配置模版下载成功✅,请到程序目录中手动配置")returnelse:print(f"⚙️ 正从 {config_path} 文件中读取配置")with open(config_path, "r", encoding="utf-8") as file:CONFIG_DATA = json.load(file)Config.breaking_change_update(CONFIG_DATA)cookie_val = CONFIG_DATA.get("cookie")if not CONFIG_DATA.get("magic_regex"):CONFIG_DATA["magic_regex"] = MAGIC_REGEXcookie_form_file = True# 获取cookiecookies = Config.get_cookies(cookie_val)if not cookies:print("❌ cookie 未配置")returnaccounts = [Quark(cookie, index) for index, cookie in enumerate(cookies)]# 签到print(f"===============签到任务===============")if type(task_index) is int:verify_account(accounts[0])else:for account in accounts:verify_account(account)do_sign(account)print()# 转存if accounts[0].is_active and cookie_form_file:print(f"===============转存任务===============")# 任务列表tasklist = CONFIG_DATA.get("tasklist", [])if type(task_index) is int:do_save(accounts[0], [tasklist[task_index]])else:do_save(accounts[0], tasklist)print()# 通知if NOTIFYS:notify_body = "\n".join(NOTIFYS)print(f"===============推送通知===============")send_ql_notify("【夸克自动追更】", notify_body)print()if cookie_form_file:# 更新配置with open(config_path, "w", encoding="utf-8") as file:json.dump(CONFIG_DATA, file, ensure_ascii=False, sort_keys=False, indent=2)print(f"===============程序结束===============")duration = datetime.now() - start_timeprint(f"😃 运行时长: {round(duration.total_seconds(), 2)}s")print()if __name__ == "__main__":main()
解析
这个脚本是用于自动管理和更新夸克云盘(Quark Cloud Drive)上的文件和目录的Python脚本。其主要功能包括自动下载、更新、重命名、删除文件和文件夹,以及处理和发送通知,可以在特定的时间间隔内运行,根据配置文件进行操作。
主要功能
1. Quark 类:
__init__:初始化类,设置cookie和其他基本信息。get_account_info、get_growth_info等:获取账户信息和成长信息。save_file、download:保存和下载文件。delete、rename:删除和重命名文件或文件夹。recycle_list、recycle_remove:处理回收站中的文件。do_save_task:执行保存任务,包括验证资源是否失效和更新文件。do_rename_task:重命名任务,根据特定的正则表达式规则修改文件名。
2. Config 类:
download_file:下载文件。get_cookies:处理和获取cookie。load_plugins:加载并初始化插件。breaking_change_update:处理配置文件中的重大更新。
3. 全局方法:
main:程序的主入口点,处理基本的配置读取、任务执行流程。verify_account:验证账户有效性。do_sign:执行签到任务,可能包括获取额外的存储空间。send_ql_notify:发送通知。
配置和任务调度
脚本使用quark_config.json文件来存储用户配置,如cookie、任务列表和其他设置。脚本通过配置中指定的时间来定时执行(如8点、18点、20点)时间可以通过cron配置进行调整。
历史文章:夸克网盘任务脚本
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论