1.购买服务器
阿里云:
服务器购买地址
https://t.aliyun.com/U/nxyLKd若失效,可用地址
https://www.aliyun.com/activity/wuying/dj?source=5176.29345612&userCode=49hts92d腾讯云:
https://curl.qcloud.com/wJpWmSfU若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console2.部署教程
3.代码如下
# !/usr/bin/python3# -- coding: utf-8 --# cron "20 8 * * *" script-path=xxx.py,tag=匹配cron用# const $ = new Env('春茧未来荟小程序')import hashlibimport jsonimport osimport randomimport timefrom datetime import datetime, time as timesimport requestsfrom requests.packages.urllib3.exceptions import InsecureRequestWarning# 禁用安全请求警告requests.packages.urllib3.disable_warnings(InsecureRequestWarning)IS_DEV = os.path.isfile('DEV_ENV.py')if IS_DEV:import DEV_ENVif os.path.isfile('notify.py'):from notify import sendprint("加载通知服务成功!")else:print("加载通知服务失败!")send_msg = ''one_msg = ''def Log(cont=''):global send_msg, one_msgprint(cont)if cont:one_msg += f'{cont}\n'send_msg += f'{cont}\n'class RUN:def __init__(self, info, index):global one_msgone_msg = ''split_info = info.split('@')self.X_XSRF_TOKEN = split_info[0]self.cookies = split_info[1]len_split_info = len(split_info)last_info = split_info[len_split_info - 1]self.send_UID = Noneif len_split_info > 0 and "UID_" in last_info:print('检测到设置了UID')print(last_info)self.send_UID = last_infoself.index = index + 1# print(self.access_token)self.UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a1b) XWEB/9129"PROXY_URL = os.environ.get(f'{ENV_NAME}_PROXY_URL', False)if PROXY_URL:print(f'【已设置反代】:{PROXY_URL}✅')self.baseUrl = f'{PROXY_URL}szbay/api/services/app/'else:print(f'【未设置反代,使用官方域名】')self.baseUrl = 'https://program.springcocoon.com/szbay/api/services/app/'self.headers ={'Host': self.baseUrl.split('/')[2],'Accept':'application/json, text/javascript, */*; q=0.01','X-Requested-With':'XMLHttpRequest','Accept-Language':'zh-CN,zh-Hans;q=0.9','Accept-Encoding':'gzip, deflate, br','X-XSRF-TOKEN':self.X_XSRF_TOKEN,'Content-Type':'application/x-www-form-urlencoded','Origin':'https://program.springcocoon.com','site':'program.springcocoon.com','User-Agent':self.UA,'Referer':'https://program.springcocoon.com/szbay/AppInteract/SignIn/Index?isWeixinRegister=true','Connection':'keep-alive','Cookie':self.cookies}self.s = requests.session()self.s.verify = False# self.baseUrl = 'https://proxy.cherwin.workers.dev/szbay/api/services/app/'def make_request(self, url, method='post', headers={}, json_data={}, params=None, data=None):if headers == {}:headers = self.headerstry:if method.lower() == 'get':response = self.s.get(url, headers=headers, verify=False, params=params)elif method.lower() == 'post':response = self.s.post(url, headers=headers, json=json_data, data=data, params=params, verify=False)else:raise ValueError("不支持的请求方法❌: " + method)return response.json()except requests.exceptions.RequestException as e:print("请求异常❌:", e)except ValueError as e:print("值错误或不支持的请求方法❌:", e)except Exception as e:print("发生了未知错误❌:", e)def get_user_info(self, END=False):act_name = '获取用户信息'if END:Log(f'\n====== {act_name} ======')else:print(f'\n====== {act_name} ======')url = f"{self.baseUrl}CRCWeixinEmpMerge/QueryMergeDataAsync"data = {'isGetEmpMoney': 'true','isGetEmpPoint': 'true','isGetEmpGrowth': 'true','isGetCouponNum': 'false','isShowPerfectEmpInfo': 'false','isShowSignIn': 'false','isShowWeixinEmpSubscribe': 'false',}response = self.make_request(url, data=data)if response.get('result', False) != False:print(f'{act_name}成功!✅')result = response.get('result', {})empID = result.get('empID', '')empDisplayName = result.get('empDisplayName', '')phone = result.get('phone', '')empPoint = result.get('empPoint', '')mobile = phone[:3] + "*" * 4 + phone[7:]if END:Log(f"> 执行后积分:{empPoint}")else:Log(f"> 用户名:{empDisplayName}\n> 手机号:{mobile}\n> 当前积分:{empPoint}")return Trueelif not response:print(f"> 账号 {self.index}: ck过期 请重新抓取❌")return Falseelse:print(response)return Falsedef GetSignInRecordAsync(self):act_name = '获取签到状态'Log(f'\n====== {act_name} ======')url = f"{self.baseUrl}SignInRecord/GetSignInRecordAsync"data = {'id': '6c3a00f6-b9f0-44a3-b8a0-d5d709de627d','webApiUniqueID': '54f08e2f-c832-8b82-8077-4473aea84800'}response = self.make_request(url, data=data)if response.get('result', False) != False:print(f'{act_name}成功!✅')result = response.get('result', {})isSignInToday = result.get('isSignInToday', '')if isSignInToday:Log(f"> 今日已签到!✅")else:Log(f"> 今日未签到!❌")self.SignInAsync()return Trueelif not response:print(f"> 账号 {self.index}: ck过期 请重新抓取❌")return Falseelse:print(response)return Falsedef SignInAsync(self):act_name = '签到'Log(f'====== {act_name} ======')url = f"{self.baseUrl}SignInRecord/SignInAsync"data = {'id': '6c3a00f6-b9f0-44a3-b8a0-d5d709de627d','webApiUniqueID': 'f2cca2a7-c327-1d76-d375-ec92cdd296cd'}response = self.make_request(url, data=data)success = response.get('success', False)if success != False:result = response.get('result', {})point = result.get('listSignInRuleData', [])[0]['point']print(f'{act_name}成功!✅')Log(f'> 获得【{point}】万象星')return Trueelif success == False:error = response.get('error', {})msg = error.get('message', '')print(f'> {act_name}失败!❌')print(f'> 【{msg}】')return Falseelse:print(response)return Falsedef main(self):Log(f"\n开始执行第{self.index}个账号--------------->>>>>")if self.get_user_info():self.GetSignInRecordAsync()# self.SignInAsync()self.sendMsg()return Trueelse:self.sendMsg()return Falsedef sendMsg(self):if self.send_UID:push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME)print(push_res)def random_delay(min_delay=1, max_delay=5):"""在min_delay和max_delay之间产生一个随机的延时时间,然后暂停执行。参数:min_delay (int/float): 最小延时时间(秒)max_delay (int/float): 最大延时时间(秒)"""delay = random.uniform(min_delay, max_delay)print(f">本次随机延迟:【{delay:.2f}】 秒.....")time.sleep(delay)def down_file(filename, file_url):print(f'开始下载:{filename},下载地址:{file_url}')try:response = requests.get(file_url, verify=False, timeout=10)response.raise_for_status()with open(filename + '.tmp', 'wb') as f:f.write(response.content)print(f'【{filename}】下载完成!')# 检查临时文件是否存在temp_filename = filename + '.tmp'if os.path.exists(temp_filename):# 删除原有文件if os.path.exists(filename):os.remove(filename)# 重命名临时文件os.rename(temp_filename, filename)print(f'【{filename}】重命名成功!')return Trueelse:print(f'【{filename}】临时文件不存在!')return Falseexcept Exception as e:print(f'【{filename}】下载失败:{str(e)}')return Falsedef import_Tools():global CHERWIN_TOOLS, ENV, APP_INFO, TIPS, TIPS_HTML, cjwlhCodeimport CHERWIN_TOOLSENV, APP_INFO, TIPS, TIPS_HTML, cjwlhCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)if __name__ == '__main__':APP_NAME = '春茧未来荟小程序'ENV_NAME = 'CJWLH'CK_URL = 'program.springcocoon.com请求头'CK_NAME = 'X-XSRF-TOKEN@Cookie'CK_EX = '[email protected]_SessionId=l2cxxxxxxxxxxx'print(f'''✨✨✨ {APP_NAME}脚本✨✨✨✨ 功能:积分签到✨ 抓包步骤:打开{APP_NAME}授权登陆打开抓包工具找{CK_URL}{CK_NAME}参数示例:{CK_EX}✨ ✨✨wxpusher一对一推送功能,✨需要定义变量export WXPUSHER=wxpusher的app_token,不设置则不启用wxpusher一对一推送✨需要在{ENV_NAME}变量最后添加@wxpusher的UID✨ 设置青龙变量:export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启✨ ✨ 注意:抓完CK没事儿别打开小程序,重新打开小程序请重新抓包✨ 推荐cron:20 8 * * *''')local_script_name = os.path.basename(__file__)local_version = '1.0.1'if IS_DEV:import_Tools()else:if os.path.isfile('CHERWIN_TOOLS.py'):import_Tools()else:if down_file('cjwlh.py', 'cjwlh.py'):print('脚本检测正常')import_Tools()else:print('脚本检测失败')exit()print(TIPS)token = ''token = ENV if ENV else tokenif not token:print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")exit()tokens = CHERWIN_TOOLS.ENV_SPLIT(token)# print(tokens)if len(tokens) > 0:print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")access_token = []for index, infos in enumerate(tokens):run_result = RUN(infos, index).main()if not run_result: continueif send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)
解析
该脚本的主要作用是 自动化签到,通过模拟HTTP请求,定时为用户执行 春茧未来荟小程序中的签到任务,并根据签到情况获取积分奖励。脚本支持多账号管理,并且可以通过微信推送通知用户签到的状态。
关键功能
自动签到:检查是否已签到,如果未签到,执行签到操作并获取积分奖励。
支持多账号:通过环境变量传入多个账号的
cookie和X-XSRF-TOKEN,自动为多个账号执行签到任务。推送通知:支持通过
wxpusher向指定用户发送通知,告知签到状态及获得的积分。随机延迟:在请求间添加随机延迟,模拟人工操作,减少被检测为机器人操作的风险。
日志记录:记录签到过程中的各项信息,便于调试和用户查看。
脚本的运行流程:
从环境变量中读取配置,获取多个账号的信息。
对每个账号,执行以下操作:
获取用户信息,包括积分、手机号等。
检查当天是否已签到,若未签到则执行签到。
在每次签到后,将签到结果(包括获得的积分)通过
wxpusher推送通知给用户。支持定时执行,例如通过
cron定时任务每天自动执行。
主要方法
get_user_info():获取用户的基本信息,包括当前积分、手机号码等。GetSignInRecordAsync():获取并检查用户当天的签到状态。SignInAsync():执行签到操作并记录签到结果。sendMsg():通过wxpusher推送签到结果通知用户。random_delay():在请求之间添加随机延迟,模拟人工操作,避免频繁请求带来的封禁风险。
适用场景:
自动签到:适用于需要定期签到的小程序,通过脚本实现自动化签到,避免人工操作。
多账号支持:适合用户有多个账号时批量处理多个账号的签到任务。
实时通知:通过微信推送通知用户签到结果,方便及时了解签到状态。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论