2025年9月25日星期四

MSEC任务脚本

1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/DT4XYh若失效,可用地址

1.购买服务器

阿里云:

服务器购买地址

https://t.aliyun.com/U/DT4XYh

若失效,可用地址

https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d

腾讯云:

https://curl.qcloud.com/wJpWmSfU

若失效,可用地址

https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console

华为云

https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905

2.部署教程

2024年最新青龙面板跑脚本教程(一)持续更新中

3.代码如下

import argparseimport jsonimport osimport reimport sysimport timefrom datetime import datetimefrom typing import AnyDictOptionalTuple
import requestsfrom apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.triggers.cron import CronTriggerfrom apscheduler.triggers.interval import IntervalTriggerfrom pytz import timezonefrom urllib.parse import urlparse
CHECKIN_URL = 'https://msec.nsfocus.com/backend_api/checkin/checkin'POINTS_URL = 'https://msec.nsfocus.com/backend_api/point/common/get'

def load_config(filepath: str) -> Dict[strstr]:    with open(filepath, 'r', encoding='utf-8'as f:        content = f.read().strip()    try:        obj = json.loads(content)        return {            'Authorization': obj.get('Authorization'or obj.get('authorization'or '',            'FEISHU_WEBHOOK': obj.get('FEISHU_WEBHOOK'or obj.get('feishu'or obj.get('feishu_token'or '',            'LARK_WEBHOOK': obj.get('LARK_WEBHOOK'or obj.get('lark'or obj.get('lark_token'or '',        }    except json.JSONDecodeError:        cfg: Dict[strstr] = {'Authorization''''FEISHU_WEBHOOK''''LARK_WEBHOOK'''}        for line in content.splitlines():            line = line.strip()            if not line or line.startswith('#'):                continue            if '=' in line:                k, v = line.split('='1)                key = k.strip()                val = v.strip()                if key in cfg:                    cfg[key] = val        return cfg

def build_headers(authorization: str) -> Dict[strstr]:    headers: Dict[strstr] = {        'Accept''*/*',        'Content-Type''application/json',        'Origin''https://msec.nsfocus.com',        'Referer''https://msec.nsfocus.com/',        'User-Agent''Mozilla/5.0',    }    if authorization:        headers['Authorization'] = authorization    return headers
DEFAULT_QD = """POST /backend_api/checkin/checkin HTTP/1.1Host: msec.nsfocus.comContent-Type: application/json
{}"""
DEFAULT_CX = """POST /backend_api/point/common/get HTTP/1.1Host: msec.nsfocus.comContent-Type: application/json
{}"""
def parse_request_file(filepath: str) -> Tuple[strstrDict[strstr], Optional[str]]:    with open(filepath, 'r', encoding='utf-8'as f:        text = f.read().strip()    if not text:        raise ValueError(f"Empty request file: {filepath}")
    try:        obj = json.loads(text)        method = (obj.get('method'or 'GET').upper()        url = obj['url']        headers = obj.get('headers'or {}        body = obj.get('body')        if isinstance(body, (dictlist)):            body = json.dumps(body, ensure_ascii=False)        return method, url, headers, body    except Exception:        pass
    if text.lower().startswith('curl '):        method = 'GET'        headers: Dict[strstr] = {}        body: Optional[str] = None        url_match = re.search(r"(https?://[^\s'\"]+)", text)        if url_match:            url = url_match.group(1)        else:            raise ValueError('curl missing URL')        m = re.search(r"-X\s+([A-Z]+)", text)        if m:            method = m.group(1).upper()        for hk, hv in re.findall(r"-H\s+'([^:]+):\s*([^']*)'", text):            headers[hk.strip()] = hv.strip()        for hk, hv in re.findall(r' -H\s+"([^:]+):\s*([^"]*)"', text):            headers[hk.strip()] = hv.strip()        d = re.search(r"-d\s+'([^']*)'", text, re.S)        if not d:            d = re.search(r'-d\s+"([^"]*)"', text, re.S)        if d:            body = d.group(1)        return method, url, headers, body
    lines = text.splitlines()    first = lines[0].strip()    m = re.match(r"([A-Z]+)\s+(https?://\S+)", first)    if m:        method = m.group(1).upper()        url = m.group(2)        headers: Dict[strstr] = {}        body_lines: list[str] = []        in_body = False        for line in lines[1:]:            if not in_body and not line.strip():                in_body = True                continue            if not in_body:                if ':' in line:                    k, v = line.split(':'1)                    headers[k.strip()] = v.strip()            else:                body_lines.append(line)        body = '\n'.join(body_lines) if body_lines else None        return method, url, headers, body
    m2 = re.match(r"([A-Z]+)\s+(/\S+)(?:\s+HTTP/\d\.\d)?", first)    if m2:        method = m2.group(1).upper()        path_only = m2.group(2)        headers: Dict[strstr] = {}        body_lines: list[str] = []        in_body = False        for line in lines[1:]:            if not in_body and not line.strip():                in_body = True                continue            if not in_body:                if ':' in line:                    k, v = line.split(':'1)                    headers[k.strip()] = v.strip()            else:                body_lines.append(line)        host = headers.get('Host'or headers.get('host')        scheme = 'https'        if not host:            raise ValueError('Raw HTTP request missing Host header')        url = f"{scheme}://{host}{path_only}"        body = '\n'.join(body_lines) if body_lines else None        return method, url, headers, body
    if first.startswith('http://'or first.startswith('https://'):        return 'GET', first, {}, None
    raise ValueError(f"Unrecognized request file format: {filepath}")

def parse_request_text(text: str) -> Tuple[strstrDict[strstr], Optional[str]]:    tmp = "/tmp/_req.txt"    with open(tmp, 'w', encoding='utf-8'as f:        f.write(text)    return parse_request_file(tmp)

def load_request(path: str, fallback_text: str) -> Tuple[strstrDict[strstr], Optional[str]]:    if path and os.path.exists(path):        return parse_request_file(path)    return parse_request_text(fallback_text)

def merge_headers(base: Dict[strstr], auth_headers: Dict[strstr]) -> Dict[strstr]:    merged = {k.strip(): v.strip() for k, v in base.items()}    if auth_headers.get('Authorization'):        merged['Authorization'] = auth_headers['Authorization']    for hk in list(merged.keys()):        lk = hk.lower()        if lk in ('host''content-length'):            merged.pop(hk, None)    return merged

def send_webhook(text: str, lark_webhook: Optional[str], feishu_webhook: Optional[str]) -> None:    payload = {"msg_type""text""content": {"text": text[:19000]}}    headers = {"Content-Type""application/json"}    for name, url in [("lark", lark_webhook), ("feishu", feishu_webhook)]:        if not url:            continue        try:            resp = requests.post(url, headers=headers, data=json.dumps(payload, ensure_ascii=False, separators=(","":")), timeout=10)            if resp.status_code >= 300:                print(f"Webhook {name} failed: HTTP {resp.status_code} body={resp.text[:200]}")        except Exception as e:            print(f"Webhook {name} exception: {e}")

def perform_request(method: str, url: str, headers: Dict[strstr], json_body: Optional[Dict[strAny]] = None) -> requests.Response:    method = method.upper()    resp = requests.request(method, url, headers=headers, json=json_body, timeout=30)    return resp

def try_json(resp: requests.Response) -> Optional[Dict[strAny]]:    try:        return resp.json()    except Exception:        return None

def sign_in(req_headers: Dict[strstr]) -> Tuple[boolstrOptional[Dict[strAny]]]:    resp = perform_request('POST', CHECKIN_URL, req_headers, json_body={})    j = try_json(resp)    success = False    msg = f"Sign-in HTTP {resp.status_code}"    if j:        status = j.get('status')        message = j.get('message'or ''        data = j.get('data')        msg = f"Sign-in status={status} message={message}"        success = (status == 200or ('成功' in str(message))    return success, msg, j

def confirm_signed(req_headers: Dict[strstr]) -> Tuple[boolstr]:    delays = [1.53.05.0]    attempt = 0    last_msg = ''    while True:        resp = perform_request('POST', CHECKIN_URL, req_headers, json_body={})        j = try_json(resp)        if j:            status = j.get('status')            message = j.get('message'or ''            data = j.get('data')            if status == 400 and ('已经签到' in str(data) or '已经签到' in message):                txt = str(data or message)                return Truef"已签到:{txt}"            if status == 429:                last_msg = f"确认响应 status=429 message={message or '请求过于频繁'}"            else:                return Falsef"确认响应 status={status} message={message}"        else:            if resp.status_code == 429:                last_msg = f"Confirm HTTP 429"            else:                return Falsef"Confirm HTTP {resp.status_code}"        if attempt >= len(delays):            return False, last_msg or '确认失败'        time.sleep(delays[attempt])        attempt += 1

def query_points(req_headers: Dict[strstr]) -> Tuple[Optional[int], Optional[int], str]:    resp = perform_request('POST', POINTS_URL, req_headers, json_body={})    j = try_json(resp)    if j and j.get('status') == 200 and isinstance(j.get('data'), dict):        accrued = j['data'].get('accrued')        total = j['data'].get('total')        return accrued, total, 'OK'    return NoneNonef"Query failed: HTTP {resp.status_code} body={resp.text[:200]}"

def now_str(tz_name: str) -> str:    tz = timezone(tz_name)    return datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S %Z')

def main() -> None:    parser = argparse.ArgumentParser(description='Daily sign-in and points checker with Lark/Feishu notifications')    parser.add_argument('--config-file', default=os.path.join(os.getcwd(), 'config.json'), help='Path to config.json (default: $PWD/config.json)')    parser.add_argument('--lark', default=''help='Lark webhook token or full URL (overrides config)')    parser.add_argument('--feishu', default=''help='Feishu webhook token or full URL (overrides config)')    parser.add_argument('--tz', default=os.environ.get('TZ''Asia/Shanghai'), help='Timezone, default Asia/Shanghai')    args, unknown = parser.parse_known_args()
    for u in unknown:        if u.startswith('lark='and not args.lark:            args.lark = u.split('='1)[1]        if u.startswith('feishu='and not args.feishu:            args.feishu = u.split('='1)[1]
    def normalize_webhook(value: str, base: str) -> str:        if not value:            return ''        s = (value or '').strip().strip('"').strip("'")        if s.startswith('http://'or s.startswith('https://'):            u = urlparse(s)            if u.scheme in ('http''https'and u.netloc and '.' not in u.netloc and (u.path == '' or u.path == '/'):                token = u.netloc                return base + token            return s        token = s.lstrip('/').lstrip('\\')        if re.fullmatch(r'[A-Za-z0-9\-]{8,}', token):            return base + token        if '/open-apis/bot/v2/hook/' in token:            idx = token.rfind('/open-apis/bot/v2/hook/')            maybe = token[idx + len('/open-apis/bot/v2/hook/') :]            if maybe:                return base + maybe        return base + token
    LARK_BASE = 'https://open.larksuite.com/open-apis/bot/v2/hook/'    FEISHU_BASE = 'https://open.feishu.cn/open-apis/bot/v2/hook/'
    if os.path.exists(args.config_file):        cfg = load_config(args.config_file)    else:        cfg = {            'Authorization': os.environ.get('Authorization'or os.environ.get('AUTHORIZATION'or '',            'LARK_WEBHOOK': os.environ.get('LARK_WEBHOOK'or '',            'FEISHU_WEBHOOK': os.environ.get('FEISHU_WEBHOOK'or '',        }    authorization = cfg.get('Authorization''')    if not args.lark:        args.lark = cfg.get('LARK_WEBHOOK''')    if not args.feishu:        args.feishu = cfg.get('FEISHU_WEBHOOK''')    if not args.lark:        args.lark = os.environ.get('LARK_WEBHOOK''')    if not args.feishu:        args.feishu = os.environ.get('FEISHU_WEBHOOK''')    args.lark = normalize_webhook(args.lark, LARK_BASE)    args.feishu = normalize_webhook(args.feishu, FEISHU_BASE)    if not authorization:        msg = '未找到有效的 Authorization(请提供 config.json 或设置环境变量)'        print(msg)        send_webhook(f"[签到服务] 启动失败:{msg}", args.lark, args.feishu)        sys.exit(1)
    req_headers = build_headers(authorization)
    def do_sign_in_flow(trigger: str) -> None:        nonlocal req_headers        try:            ok, first_msg, first_json = sign_in(req_headers)            time.sleep(1.2)            confirmed, confirm_msg = confirm_signed(req_headers)            accrued, total, qmsg = query_points(req_headers)
            final_ok = bool(confirmed or ok)            status_emoji = '✅' if final_ok else '❌'            timestamp = now_str(args.tz)
            result_text = '签到成功' if final_ok else '签到失败'            if accrued is not None:                points_text = f"积分情况:当前积分{accrued}, 累计积分{total}"            else:                points_text = "积分情况:查询失败"            note = "\n".join([                f"状态:{status_emoji} {result_text}",                points_text,                f"签到时间:{timestamp}",            ])
            print(note)            send_webhook(note, args.lark, args.feishu)        except requests.RequestException as e:            err = f"[{trigger}] 网络请求错误: {e} @ {now_str(args.tz)}"            print(err)            send_webhook(err, args.lark, args.feishu)        except Exception as e:            err = f"[{trigger}] 程序错误: {e} @ {now_str(args.tz)}"            print(err)            send_webhook(err, args.lark, args.feishu)
    def do_points_check() -> None:        nonlocal req_headers        try:            accrued, total, qmsg = query_points(req_headers)            if accrued is None:                warn = f"[积分检查] 失败,可能 Authorization 失效:{qmsg} @ {now_str(args.tz)}"                print(warn)                send_webhook(warn, args.lark, args.feishu)            else:                print(f"[积分检查] 当前 {accrued}, 累计 {total} @ {now_str(args.tz)}")        except Exception as e:            warn = f"[积分检查] 异常:{e} @ {now_str(args.tz)}"            print(warn)            send_webhook(warn, args.lark, args.feishu)
    do_sign_in_flow('启动签到')
    sched = BackgroundScheduler(timezone=timezone(args.tz))    sched.add_job(lambda: do_sign_in_flow('定时签到'), CronTrigger(hour=8, minute=0))    sched.add_job(do_points_check, IntervalTrigger(minutes=10))    sched.start()
    print(f"Scheduler started. TZ={args.tz}")    try:        while True:            time.sleep(60)    except KeyboardInterrupt:        pass

if __name__ == '__main__':    main()

解析

这是一个绿盟科技 MSEC每日签到与积分查询的自动化脚本,特点是:

  • 读取配置中的 Authorization(登录鉴权)后,立刻尝试签到一次。

  • 之后用 APScheduler 常驻运行:

    • 每天 08:00 定时再签到一次;

    • 每 10 分钟查询一次积分,若失败会用 webhook 提醒你可能失效。

  • 支持把结果通过 飞书/企业版 Lark 机器人 webhook 推送通知(成功/失败、积分数、时间等)。

  • 时区可配(默认 Asia/Shanghai)。

主要方法

配置与请求头

  • load_config(filepath)
    从 config.json 或类 .env 文本读取配置,主要拿到:

    • Authorization(必须)

    • FEISHU_WEBHOOK / LARK_WEBHOOK(可选)

  • build_headers(authorization)
    生成请求头(含 Authorization)用于后续 API 调用。

  • parse_request_file() / parse_request_text() / load_request()
    兼容多种格式(JSON、curl、原始HTTP报文、URL)解析出 method、url、headers、body

  • merge_headers(base, auth_headers)
    合并并清理头字段(移除 Host/Content-Length 等)。

注:这些解析工具和 DEFAULT_QD/DEFAULT_CX 是通用组件,本脚本实际并未调用,可看作保留能力。

Webhook 通知

  • send_webhook(text, lark_webhook, feishu_webhook)
    以统一文本格式向 Lark/飞书机器人发送消息;异常会在控制台提示。

HTTP 与 JSON 辅助

  • perform_request(method, url, headers, json_body)
    用 requests.request() 发送 HTTP 请求(30s 超时)。

  • try_json(resp)
    安全解析响应 JSON,失败返回 None

业务核心:签到与积分

  • sign_in(req_headers)
    调用签到接口 POST /backend_api/checkin/checkin,返回是否成功与提示信息。

  • confirm_signed(req_headers)
    二次确认是否"已签到":
    循环重试最多 3 次(1.5s/3s/5s 退避):

    • 如果 API 返回 400 且信息含"已经签到",视为确认成功;

    • 429(限流)会继续退避重试;

    • 其他情况按失败返回。

  • query_points(req_headers)
    调 POST /backend_api/point/common/get 查询 当前积分 accrued 与 累计积分 total

其他

  • now_str(tz_name)
    以指定时区输出当前时间字符串。

  • main()(程序入口)

    • do_sign_in_flow(trigger):执行一次完整签到流程(签到→确认→查积分→推送)。

    • do_points_check():只查积分并按结果提醒。

    • 内部闭包:

    • CronTrigger(hour=8, minute=0):每日 08:00 自动签到;

    • IntervalTrigger(minutes=10):每 10 分钟查积分;

    1. 解析命令行与 config.json / 环境变量,规范化 webhook 地址;

    2. 构建请求头;

    3. 首次立即签到并发通知;

    4. 启动 APScheduler

    5. 常驻循环;Ctrl+C 可退出


注意

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。


历史脚本txt文件获取>>
服务器搭建,人工服务咨询>>

没有评论:

发表评论

2025年Shopify独立站建站保姆级教程

Shopify是一个SaaS平台,即独立站。 Shopify是一个SaaS平台,即独立站。 一、注册Shopify账户(免费试用3天,前 3 个月 每月仅需1 $) 打开Shopify中文官网,点击主页右上角"开始免费试用",进入注册页面,可使用电子邮件、...