1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/DT4XYh若失效,可用地址
阿里云:
服务器购买地址
https://t.aliyun.com/U/DT4XYh若失效,可用地址
https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d腾讯云:
https://curl.qcloud.com/wJpWmSfU若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=2019052.部署教程
3.代码如下
import argparseimport jsonimport osimport reimport sysimport timefrom datetime import datetimefrom typing import Any, Dict, Optional, Tupleimport requestsfrom apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.triggers.cron import CronTriggerfrom apscheduler.triggers.interval import IntervalTriggerfrom pytz import timezonefrom urllib.parse import urlparseCHECKIN_URL = 'https://msec.nsfocus.com/backend_api/checkin/checkin'POINTS_URL = 'https://msec.nsfocus.com/backend_api/point/common/get'def load_config(filepath: str) -> Dict[str, str]:with open(filepath, 'r', encoding='utf-8') as f:content = f.read().strip()try:obj = json.loads(content)return {'Authorization': obj.get('Authorization') or obj.get('authorization') or '','FEISHU_WEBHOOK': obj.get('FEISHU_WEBHOOK') or obj.get('feishu') or obj.get('feishu_token') or '','LARK_WEBHOOK': obj.get('LARK_WEBHOOK') or obj.get('lark') or obj.get('lark_token') or '',}except json.JSONDecodeError:cfg: Dict[str, str] = {'Authorization': '', 'FEISHU_WEBHOOK': '', 'LARK_WEBHOOK': ''}for line in content.splitlines():line = line.strip()if not line or line.startswith('#'):continueif '=' in line:k, v = line.split('=', 1)key = k.strip()val = v.strip()if key in cfg:cfg[key] = valreturn cfgdef build_headers(authorization: str) -> Dict[str, str]:headers: Dict[str, str] = {'Accept': '*/*','Content-Type': 'application/json','Origin': 'https://msec.nsfocus.com','Referer': 'https://msec.nsfocus.com/','User-Agent': 'Mozilla/5.0',}if authorization:headers['Authorization'] = authorizationreturn headersDEFAULT_QD = """POST /backend_api/checkin/checkin HTTP/1.1Host: msec.nsfocus.comContent-Type: application/json{}"""DEFAULT_CX = """POST /backend_api/point/common/get HTTP/1.1Host: msec.nsfocus.comContent-Type: application/json{}"""def parse_request_file(filepath: str) -> Tuple[str, str, Dict[str, str], Optional[str]]:with open(filepath, 'r', encoding='utf-8') as f:text = f.read().strip()if not text:raise ValueError(f"Empty request file: {filepath}")try:obj = json.loads(text)method = (obj.get('method') or 'GET').upper()url = obj['url']headers = obj.get('headers') or {}body = obj.get('body')if isinstance(body, (dict, list)):body = json.dumps(body, ensure_ascii=False)return method, url, headers, bodyexcept Exception:passif text.lower().startswith('curl '):method = 'GET'headers: Dict[str, str] = {}body: Optional[str] = Noneurl_match = re.search(r"(https?://[^\s'\"]+)", text)if url_match:url = url_match.group(1)else:raise ValueError('curl missing URL')m = re.search(r"-X\s+([A-Z]+)", text)if m:method = m.group(1).upper()for hk, hv in re.findall(r"-H\s+'([^:]+):\s*([^']*)'", text):headers[hk.strip()] = hv.strip()for hk, hv in re.findall(r' -H\s+"([^:]+):\s*([^"]*)"', text):headers[hk.strip()] = hv.strip()d = re.search(r"-d\s+'([^']*)'", text, re.S)if not d:d = re.search(r'-d\s+"([^"]*)"', text, re.S)if d:body = d.group(1)return method, url, headers, bodylines = text.splitlines()first = lines[0].strip()m = re.match(r"([A-Z]+)\s+(https?://\S+)", first)if m:method = m.group(1).upper()url = m.group(2)headers: Dict[str, str] = {}body_lines: list[str] = []in_body = Falsefor line in lines[1:]:if not in_body and not line.strip():in_body = Truecontinueif not in_body:if ':' in line:k, v = line.split(':', 1)headers[k.strip()] = v.strip()else:body_lines.append(line)body = '\n'.join(body_lines) if body_lines else Nonereturn method, url, headers, bodym2 = re.match(r"([A-Z]+)\s+(/\S+)(?:\s+HTTP/\d\.\d)?", first)if m2:method = m2.group(1).upper()path_only = m2.group(2)headers: Dict[str, str] = {}body_lines: list[str] = []in_body = Falsefor line in lines[1:]:if not in_body and not line.strip():in_body = Truecontinueif not in_body:if ':' in line:k, v = line.split(':', 1)headers[k.strip()] = v.strip()else:body_lines.append(line)host = headers.get('Host') or headers.get('host')scheme = 'https'if not host:raise ValueError('Raw HTTP request missing Host header')url = f"{scheme}://{host}{path_only}"body = '\n'.join(body_lines) if body_lines else Nonereturn method, url, headers, bodyif first.startswith('http://') or first.startswith('https://'):return 'GET', first, {}, Noneraise ValueError(f"Unrecognized request file format: {filepath}")def parse_request_text(text: str) -> Tuple[str, str, Dict[str, str], Optional[str]]:tmp = "/tmp/_req.txt"with open(tmp, 'w', encoding='utf-8') as f:f.write(text)return parse_request_file(tmp)def load_request(path: str, fallback_text: str) -> Tuple[str, str, Dict[str, str], Optional[str]]:if path and os.path.exists(path):return parse_request_file(path)return parse_request_text(fallback_text)def merge_headers(base: Dict[str, str], auth_headers: Dict[str, str]) -> Dict[str, str]:merged = {k.strip(): v.strip() for k, v in base.items()}if auth_headers.get('Authorization'):merged['Authorization'] = auth_headers['Authorization']for hk in list(merged.keys()):lk = hk.lower()if lk in ('host', 'content-length'):merged.pop(hk, None)return mergeddef send_webhook(text: str, lark_webhook: Optional[str], feishu_webhook: Optional[str]) -> None:payload = {"msg_type": "text", "content": {"text": text[:19000]}}headers = {"Content-Type": "application/json"}for name, url in [("lark", lark_webhook), ("feishu", feishu_webhook)]:if not url:continuetry:resp = requests.post(url, headers=headers, data=json.dumps(payload, ensure_ascii=False, separators=(",", ":")), timeout=10)if resp.status_code >= 300:print(f"Webhook {name} failed: HTTP {resp.status_code} body={resp.text[:200]}")except Exception as e:print(f"Webhook {name} exception: {e}")def perform_request(method: str, url: str, headers: Dict[str, str], json_body: Optional[Dict[str, Any]] = None) -> requests.Response:method = method.upper()resp = requests.request(method, url, headers=headers, json=json_body, timeout=30)return respdef try_json(resp: requests.Response) -> Optional[Dict[str, Any]]:try:return resp.json()except Exception:return Nonedef sign_in(req_headers: Dict[str, str]) -> Tuple[bool, str, Optional[Dict[str, Any]]]:resp = perform_request('POST', CHECKIN_URL, req_headers, json_body={})j = try_json(resp)success = Falsemsg = f"Sign-in HTTP {resp.status_code}"if j:status = j.get('status')message = j.get('message') or ''data = j.get('data')msg = f"Sign-in status={status} message={message}"success = (status == 200) or ('成功' in str(message))return success, msg, jdef confirm_signed(req_headers: Dict[str, str]) -> Tuple[bool, str]:delays = [1.5, 3.0, 5.0]attempt = 0last_msg = ''while True:resp = perform_request('POST', CHECKIN_URL, req_headers, json_body={})j = try_json(resp)if j:status = j.get('status')message = j.get('message') or ''data = j.get('data')if status == 400 and ('已经签到' in str(data) or '已经签到' in message):txt = str(data or message)return True, f"已签到:{txt}"if status == 429:last_msg = f"确认响应 status=429 message={message or '请求过于频繁'}"else:return False, f"确认响应 status={status} message={message}"else:if resp.status_code == 429:last_msg = f"Confirm HTTP 429"else:return False, f"Confirm HTTP {resp.status_code}"if attempt >= len(delays):return False, last_msg or '确认失败'time.sleep(delays[attempt])attempt += 1def query_points(req_headers: Dict[str, str]) -> Tuple[Optional[int], Optional[int], str]:resp = perform_request('POST', POINTS_URL, req_headers, json_body={})j = try_json(resp)if j and j.get('status') == 200 and isinstance(j.get('data'), dict):accrued = j['data'].get('accrued')total = j['data'].get('total')return accrued, total, 'OK'return None, None, f"Query failed: HTTP {resp.status_code} body={resp.text[:200]}"def now_str(tz_name: str) -> str:tz = timezone(tz_name)return datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S %Z')def main() -> None:parser = argparse.ArgumentParser(description='Daily sign-in and points checker with Lark/Feishu notifications')parser.add_argument('--config-file', default=os.path.join(os.getcwd(), 'config.json'), help='Path to config.json (default: $PWD/config.json)')parser.add_argument('--lark', default='', help='Lark webhook token or full URL (overrides config)')parser.add_argument('--feishu', default='', help='Feishu webhook token or full URL (overrides config)')parser.add_argument('--tz', default=os.environ.get('TZ', 'Asia/Shanghai'), help='Timezone, default Asia/Shanghai')args, unknown = parser.parse_known_args()for u in unknown:if u.startswith('lark=') and not args.lark:args.lark = u.split('=', 1)[1]if u.startswith('feishu=') and not args.feishu:args.feishu = u.split('=', 1)[1]def normalize_webhook(value: str, base: str) -> str:if not value:return ''s = (value or '').strip().strip('"').strip("'")if s.startswith('http://') or s.startswith('https://'):u = urlparse(s)if u.scheme in ('http', 'https') and u.netloc and '.' not in u.netloc and (u.path == '' or u.path == '/'):token = u.netlocreturn base + tokenreturn stoken = s.lstrip('/').lstrip('\\')if re.fullmatch(r'[A-Za-z0-9\-]{8,}', token):return base + tokenif '/open-apis/bot/v2/hook/' in token:idx = token.rfind('/open-apis/bot/v2/hook/')maybe = token[idx + len('/open-apis/bot/v2/hook/') :]if maybe:return base + maybereturn base + tokenLARK_BASE = 'https://open.larksuite.com/open-apis/bot/v2/hook/'FEISHU_BASE = 'https://open.feishu.cn/open-apis/bot/v2/hook/'if os.path.exists(args.config_file):cfg = load_config(args.config_file)else:cfg = {'Authorization': os.environ.get('Authorization') or os.environ.get('AUTHORIZATION') or '','LARK_WEBHOOK': os.environ.get('LARK_WEBHOOK') or '','FEISHU_WEBHOOK': os.environ.get('FEISHU_WEBHOOK') or '',}authorization = cfg.get('Authorization', '')if not args.lark:args.lark = cfg.get('LARK_WEBHOOK', '')if not args.feishu:args.feishu = cfg.get('FEISHU_WEBHOOK', '')if not args.lark:args.lark = os.environ.get('LARK_WEBHOOK', '')if not args.feishu:args.feishu = os.environ.get('FEISHU_WEBHOOK', '')args.lark = normalize_webhook(args.lark, LARK_BASE)args.feishu = normalize_webhook(args.feishu, FEISHU_BASE)if not authorization:msg = '未找到有效的 Authorization(请提供 config.json 或设置环境变量)'print(msg)send_webhook(f"[签到服务] 启动失败:{msg}", args.lark, args.feishu)sys.exit(1)req_headers = build_headers(authorization)def do_sign_in_flow(trigger: str) -> None:nonlocal req_headerstry:ok, first_msg, first_json = sign_in(req_headers)time.sleep(1.2)confirmed, confirm_msg = confirm_signed(req_headers)accrued, total, qmsg = query_points(req_headers)final_ok = bool(confirmed or ok)status_emoji = '✅' if final_ok else '❌'timestamp = now_str(args.tz)result_text = '签到成功' if final_ok else '签到失败'if accrued is not None:points_text = f"积分情况:当前积分{accrued}, 累计积分{total}"else:points_text = "积分情况:查询失败"note = "\n".join([f"状态:{status_emoji} {result_text}",points_text,f"签到时间:{timestamp}",])print(note)send_webhook(note, args.lark, args.feishu)except requests.RequestException as e:err = f"[{trigger}] 网络请求错误: {e} @ {now_str(args.tz)}"print(err)send_webhook(err, args.lark, args.feishu)except Exception as e:err = f"[{trigger}] 程序错误: {e} @ {now_str(args.tz)}"print(err)send_webhook(err, args.lark, args.feishu)def do_points_check() -> None:nonlocal req_headerstry:accrued, total, qmsg = query_points(req_headers)if accrued is None:warn = f"[积分检查] 失败,可能 Authorization 失效:{qmsg} @ {now_str(args.tz)}"print(warn)send_webhook(warn, args.lark, args.feishu)else:print(f"[积分检查] 当前 {accrued}, 累计 {total} @ {now_str(args.tz)}")except Exception as e:warn = f"[积分检查] 异常:{e} @ {now_str(args.tz)}"print(warn)send_webhook(warn, args.lark, args.feishu)do_sign_in_flow('启动签到')sched = BackgroundScheduler(timezone=timezone(args.tz))sched.add_job(lambda: do_sign_in_flow('定时签到'), CronTrigger(hour=8, minute=0))sched.add_job(do_points_check, IntervalTrigger(minutes=10))sched.start()print(f"Scheduler started. TZ={args.tz}")try:while True:time.sleep(60)except KeyboardInterrupt:passif __name__ == '__main__':main()
这是一个绿盟科技 MSEC每日签到与积分查询的自动化脚本,特点是:
读取配置中的 Authorization(登录鉴权)后,立刻尝试签到一次。
之后用 APScheduler 常驻运行:
每天 08:00 定时再签到一次;
每 10 分钟查询一次积分,若失败会用 webhook 提醒你可能失效。
支持把结果通过 飞书/企业版 Lark 机器人 webhook 推送通知(成功/失败、积分数、时间等)。
时区可配(默认
Asia/Shanghai)。
主要方法
配置与请求头
load_config(filepath)
从config.json或类.env文本读取配置,主要拿到:Authorization(必须)FEISHU_WEBHOOK/LARK_WEBHOOK(可选)build_headers(authorization)
生成请求头(含Authorization)用于后续 API 调用。parse_request_file()/parse_request_text()/load_request()
兼容多种格式(JSON、curl、原始HTTP报文、URL)解析出method、url、headers、body。merge_headers(base, auth_headers)
合并并清理头字段(移除 Host/Content-Length 等)。
注:这些解析工具和
DEFAULT_QD/DEFAULT_CX是通用组件,本脚本实际并未调用,可看作保留能力。
Webhook 通知
send_webhook(text, lark_webhook, feishu_webhook)
以统一文本格式向 Lark/飞书机器人发送消息;异常会在控制台提示。
HTTP 与 JSON 辅助
perform_request(method, url, headers, json_body)
用requests.request()发送 HTTP 请求(30s 超时)。try_json(resp)
安全解析响应 JSON,失败返回None。
业务核心:签到与积分
sign_in(req_headers)
调用签到接口POST /backend_api/checkin/checkin,返回是否成功与提示信息。confirm_signed(req_headers)
二次确认是否"已签到":
循环重试最多 3 次(1.5s/3s/5s 退避):如果 API 返回 400 且信息含"已经签到",视为确认成功;
429(限流)会继续退避重试;
其他情况按失败返回。
query_points(req_headers)
调POST /backend_api/point/common/get查询 当前积分 accrued 与 累计积分 total。
其他
now_str(tz_name)
以指定时区输出当前时间字符串。main()(程序入口)do_sign_in_flow(trigger):执行一次完整签到流程(签到→确认→查积分→推送)。do_points_check():只查积分并按结果提醒。内部闭包:
CronTrigger(hour=8, minute=0):每日 08:00 自动签到;IntervalTrigger(minutes=10):每 10 分钟查积分;解析命令行与
config.json/ 环境变量,规范化 webhook 地址;构建请求头;
首次立即签到并发通知;
启动 APScheduler:
常驻循环;
Ctrl+C可退出。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论