1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/Bg6shY若失效,可用地址
阿里云:
服务器购买地址
https://t.aliyun.com/U/Bg6shY若失效,可用地址
https://www.aliyun.com/daily-act/ecs/activity_selection?source=5176.29345612&userCode=49hts92d腾讯云:
https://curl.qcloud.com/wJpWmSfU若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=2019052.部署教程
3.代码如下
import jsonimport osimport sysfrom datetime import datetimefrom pathlib import Pathfrom api import HuaRunTongAPIcurrent_dir = Path(__file__).parentproject_root = current_dir.parent.parent.parentsys.path.insert(0, str(project_root))from notification import send_notification, NotificationSounddef load_config():"""加载统一配置文件"""current_dir = os.path.dirname(os.path.abspath(__file__))config_path = os.path.join(current_dir, '..', '..', '..', 'config', 'token.json')with open(config_path, 'r', encoding='utf-8') as f:return json.load(f)def process_account(account_config):"""处理单个账号的签到"""account_name = account_config.get('account_name', '未知账号')# 初始化结果result_info = {'account_name': account_name,'success': False,'error': None,'message': None,'response': None}print("=" * 50)print(f"账号: {account_name}")print("=" * 50)# 初始化APIapi = HuaRunTongAPI(token=account_config.get("token"),answer_result=account_config.get("answerResult", 1),channel_id=account_config.get("channelId", "APP"),merchant_code=account_config.get("merchantCode", "1641000001532"),store_code=account_config.get("storeCode", "qiandaosonjifen"),sys_id=account_config.get("sysId", "T0000001"),transaction_uuid=account_config.get("transactionUuid", ""),invite_code=account_config.get("inviteCode", ""),user_agent=account_config.get("user_agent"))# 发送请求print("\n发送签到请求...")result = api.sign_in()# 解析结果if result.get('success'):result_info['success'] = Trueresult_info['message'] = result.get('message', '签到成功')result_info['response'] = resultprint(" 签到成功")else:result_info['error'] = result.get('error', '签到失败')result_info['response'] = resultprint(f" 签到失败: {result_info['error']}")print("响应:")print(json.dumps(result, indent=2, ensure_ascii=False))print("\n" + "=" * 50)return result_infodef send_notification_summary(all_results, start_time, end_time):"""发送任务执行结果的推送通知Args:all_results: 所有账号的执行结果列表start_time: 任务开始时间end_time: 任务结束时间"""try:duration = (end_time - start_time).total_seconds()# 统计结果total_count = len(all_results)success_count = sum(1 for r in all_results if r.get('success'))failed_count = total_count - success_count# 构建通知标题if failed_count == 0:title = "华润通签到成功 "sound = NotificationSound.BIRDSONGelif success_count == 0:title = "华润通签到失败 "sound = NotificationSound.ALARMelse:title = "华润通签到部分成功 "sound = NotificationSound.BELL# 构建通知内容content_parts = [" 执行统计:",f" 成功: {success_count} 个账号",f" 失败: {failed_count} 个账号",f" 总计: {total_count} 个账号",""," 详情:"]for result in all_results:account_name = result.get('account_name', '未知账号')if result.get('success'):message = result.get('message', '签到成功')content_parts.append(f" [{account_name}] {message}")else:error = result.get('error', '未知错误')if len(error) > 30:error = error[:30] + "..."content_parts.append(f" [{account_name}] {error}")content_parts.append("")content_parts.append(f" 执行耗时: {int(duration)}秒")content_parts.append(f" 完成时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")content = "\n".join(content_parts)# 发送通知send_notification(title=title,content=content,sound=sound,group="华润通")print(" 推送通知发送成功")except Exception as e:print(f" 推送通知失败: {str(e)}")def main():"""主函数"""# 记录开始时间start_time = datetime.now()print(f"\n{'='*60}")print(f"## 华润通签到任务开始")print(f"## 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")print(f"{'='*60}\n")# 加载配置config = load_config()accounts = config.get('huaruntong', {}).get('huaruntong_wx', {}).get('accounts', [])if not accounts:print(" 配置文件中没有找到账号信息")return# 收集所有账号的结果all_results = []# 遍历所有账号for account in accounts:if not account.get('token'):print(f" 跳过账号 {account.get('account_name', '未知')}: token 为空")print("=" * 50)all_results.append({'account_name': account.get('account_name', '未知'),'success': False,'error': 'token为空'})continueresult = process_account(account)all_results.append(result)print("\n")# 记录结束时间end_time = datetime.now()duration = (end_time - start_time).total_seconds()print(f"\n{'='*60}")print(f"## 华润通签到任务完成")print(f"## 结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")print(f"## 执行耗时: {int(duration)} 秒")print(f"{'='*60}\n")# 发送推送通知send_notification_summary(all_results, start_time, end_time)if __name__ == "__main__":main()
解析
该脚本是华润通签到脚本,主要功能是:
从统一配置文件
config/token.json中读取 多个华润通账号配置;使用封装好的
HuaRunTongAPI,为每个账号 自动发起签到请求;记录每个账号签到是否成功、返回信息等执行结果;
最后通过
notification模块 发送签到结果汇总推送通知(成功/失败数量、各账号详情、耗时等)。
适合挂在定时任务中,每天自动跑一遍华润通签到。
主要方法
1. load_config()
从目录下
config/token.json读取统一配置文件。返回包含
huaruntong.huaruntong_wx.accounts等节点的配置信息,用于后续多账号遍历。
2. process_account(account_config)
处理单个账号的签到流程:
success:是否签到成功message:成功提示error:失败原因response:完整原始响应根据
account_config初始化HuaRunTongAPI(带 token、渠道、门店、系统ID等参数)。调用
api.sign_in()发送签到请求。解析返回结果,填充:
同时在控制台打印当前账号的签到过程与响应内容。
返回该账号的签到结果字典。
3. send_notification_summary(all_results, start_time, end_time)
对所有账号的结果做 统计与汇总通知:
统计成功/失败账号数量及总数;
按情况选择不同通知标题与提示音(全部成功 / 全部失败 / 部分成功);
为每个账号生成一行简要结果(成功则展示 message,失败则展示截断后的 error);
附带任务总耗时、完成时间;
调用
send_notification()发送到通知渠道(分组名为"华润通")。
4. main()
整个脚本的 入口函数,负责调度整体流程:
记录并打印开始时间;
调用
load_config()读取配置,获取huaruntong.huaruntong_wx.accounts下的账号列表;逐个账号调用
process_account()执行签到(token 为空的账号会被跳过并记录失败);记录结束时间与耗时,并打印任务完成信息;
调用
send_notification_summary()发送签到结果汇总推送。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论