1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/W9mv4W若失效,可用地址
阿里云:
服务器购买地址
https://t.aliyun.com/U/W9mv4W
若失效,可用地址
https://www.aliyun.com/minisite/goods?source=5176.29345612&userCode=49hts92d
腾讯云:
https://curl.qcloud.com/wJpWmSfU
若失效,可用地址
https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console
华为云
https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905
2.部署教程
3.代码如下
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OceanBase 社区自动签到脚本
适用于青龙面板定时任务
"""
import random
import time
import json
import os
import requests
import pytz
import base64
from datetime import datetime
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
def bj_time():
"""获取北京时间"""
return datetime.now(pytz.timezone('Asia/Shanghai'))
def fmt_now():
"""格式化当前时间"""
return bj_time().strftime("%Y-%m-%d %H:%M:%S")
class OceanBaseClient:
def __init__(self, user, pwd, pushplus_token=None):
self.user = user
self.pwd = pwd
self.pushplus_token = pushplus_token
self.user_name = user # 用于消息推送中显示用户名
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:139.0) Gecko/20100101 Firefox/139.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Content-Type": "application/json"
})
self.public_key = None
def log(self, message, level='INFO'):
"""日志输出"""
timestamp = fmt_now()
print(f"[{timestamp}] [{level}] {message}")
def send_notification(self, title, content):
"""PushPlus消息推送"""
if not self.pushplus_token:
self.log(" 未配置PushPlus Token,跳过消息推送")
return
attempts = 3
pushplus_url = "http://www.pushplus.plus/send"
# 在标题和内容中加入用户名称
title_with_user = "[{}] {}".format(self.user_name, title)
content_with_user = " 账号: {}\n\n{}".format(self.user_name, content)
for attempt in range(attempts):
try:
response = requests.post(
pushplus_url,
data=json.dumps({
"token": self.pushplus_token,
"title": title_with_user,
"content": content_with_user
}).encode('utf-8'),
headers={'Content-Type': 'application/json'},
timeout=10
)
response.raise_for_status()
self.log(" PushPlus响应: {}".format(response.text))
break
except requests.exceptions.RequestException as e:
self.log(" PushPlus推送失败: {}".format(e), 'ERROR')
if attempt < attempts - 1:
sleep_time = random.randint(30, 60)
self.log("将在 {} 秒后重试...".format(sleep_time))
time.sleep(sleep_time)
def get_public_key(self):
"""获取RSA公钥"""
try:
self.log("获取公钥...")
public_key_url = "https://oceanbase.com/webapi/aciamweb/config/publicKey"
headers = {
'Host': 'obiamweb.oceanbase.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Referer': 'https://www.oceanbase.com/',
'Content-Type': 'application/json',
'Origin': 'https://www.oceanbase.com',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Priority': 'u=4'
}
response = self.session.get(public_key_url, headers=headers)
self.log(f"公钥接口响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
# 检查响应格式,公钥在data字段中
if result.get('data'):
self.public_key = result['data']
self.log("获取公钥成功")
return self.public_key
elif result.get('result') and result['result'].get('data'):
self.public_key = result['result']['data']
self.log("获取公钥成功")
return self.public_key
else:
self.log(f"公钥响应格式异常: {result}", 'ERROR')
return None
else:
self.log(f"获取公钥失败,状态码: {response.status_code}", 'ERROR')
return None
except Exception as e:
self.log(f"获取公钥异常: {str(e)}", 'ERROR')
return None
def encrypt_password(self, password, public_key):
"""使用RSA公钥加密密码"""
try:
self.log("开始加密密码...")
# 限制密码长度为230字符(参考前端逻辑)
if len(password) > 230:
password = password[:230]
# 解析公钥
if public_key.startswith('-----BEGIN PUBLIC KEY-----'):
# 如果已经是完整的PEM格式
key = RSA.import_key(public_key)
else:
# 如果只是公钥内容,需要添加PEM头尾
pem_key = f"-----BEGIN PUBLIC KEY-----\n{public_key}\n-----END PUBLIC KEY-----"
key = RSA.import_key(pem_key)
# 使用PKCS1_v1_5进行加密
cipher = PKCS1_v1_5.new(key)
# 重试机制,确保加密结果长度为344(参考前端逻辑)
for i in range(10):
encrypted = cipher.encrypt(password.encode('utf-8'))
encrypted_b64 = base64.b64encode(encrypted).decode('utf-8')
self.log(f"第{i+1}次加密,结果长度: {len(encrypted_b64)}")
# 前端期望加密结果长度为344
if len(encrypted_b64) == 344:
self.log("密码加密成功")
return encrypted_b64
# 如果10次都没有得到344长度的结果,返回最后一次的结果
self.log(f"密码加密完成,最终长度: {len(encrypted_b64)}")
return encrypted_b64
except Exception as e:
self.log(f"密码加密失败: {str(e)}", 'ERROR')
return None
def login(self):
"""登录OceanBase论坛"""
try:
self.log("开始登录...")
# 第一步:访问登录页面获取初始cookie
self.session.get("https://www.oceanbase.com/ob/login/password")
# 第二步:获取RSA公钥
public_key = self.get_public_key()
if not public_key:
self.log("获取公钥失败,无法继续登录", 'ERROR')
return False
# 第三步:使用公钥加密密码
encrypted_password = self.encrypt_password(self.pwd, public_key)
if not encrypted_password:
self.log("密码加密失败,无法继续登录", 'ERROR')
return False
# 第四步:执行登录
login_url = "https://obiamweb.oceanbase.com/webapi/aciamweb/login/publicLogin"
headers = {
'Host': 'obiamweb.oceanbase.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Referer': 'https://www.oceanbase.com/',
'Content-Type': 'application/json',
'Authorization': '',
'Security-Code': '',
'X-Aciamweb-Tenant': '',
'X-Aciamweb-Tenant-Id': '',
'X-From-Aciamweb': 'true',
'Origin': 'https://www.oceanbase.com',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Priority': 'u=4',
'TE': 'trailers'
}
# 使用RSA加密后的密码
login_data = {
"passAccountName": self.user,
"password": encrypted_password, # 使用RSA加密后的密码
"registerFrom": 0,
"aliyunMpToken": None,
"mpToken": None,
"mpChannel": None
}
response = self.session.post(login_url, json=login_data, headers=headers)
self.log(f"登录响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
# 检查登录是否成功,从抓包看成功时data字段会有内容
if result.get('data') and isinstance(result['data'], dict):
# 第五步:获取token信息
token_url = "https://webapi.oceanbase.com/api/links/token"
token_headers = {
'Host': 'webapi.oceanbase.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Referer': 'https://www.oceanbase.com/',
'Content-Type': 'application/json',
'Authorization': '',
'Security-Code': '',
'X-Aciamweb-Tenant': '',
'X-Aciamweb-Tenant-Id': '',
'X-From-Aciamweb': 'true',
'Origin': 'https://www.oceanbase.com',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Priority': 'u=4',
'TE': 'trailers'
}
token_response = self.session.post(token_url, json={}, headers=token_headers)
self.log(f"Token响应状态码: {token_response.status_code}")
if token_response.status_code == 200:
token_result = token_response.json()
if token_result.get('success'):
self.log("登录成功")
return True
self.log("登录成功但获取token失败")
return True
else:
self.log(f"登录失败: {result}", 'ERROR')
return False
else:
self.log(f"登录请求失败,状态码: {response.status_code}", 'ERROR')
return False
except Exception as e:
self.log(f"登录异常: {str(e)}", 'ERROR')
return False
def checkin(self):
"""执行签到操作"""
try:
self.log("开始签到...")
# 第一步:登录
try:
login_success = self.login()
if not login_success:
return {
"message": "签到失败",
"details": "登录失败"
}
except Exception as e:
self.log(f"签到时登录异常: {str(e)}", 'ERROR')
return {
"message": "签到失败",
"details": "登录异常"
}
time.sleep(2)
# 第二步:执行签到
checkin_url = "https://openwebapi.oceanbase.com/api/integral/signUp/insertOrUpdateSignUp"
checkin_headers = {
'Host': 'openwebapi.oceanbase.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Referer': 'https://open.oceanbase.com/user/coin',
'Content-Type': 'application/json; charset=utf-8',
'Origin': 'https://open.oceanbase.com',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Priority': 'u=0'
}
checkin_response = self.session.post(checkin_url, json={}, headers=checkin_headers)
self.log(f"签到接口响应状态码: {checkin_response.status_code}")
# 第三步:查询签到状态
query_url = "https://openwebapi.oceanbase.com/api/integral/signUp/queryUserSignUpDays"
query_headers = {
'Host': 'openwebapi.oceanbase.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Referer': 'https://open.oceanbase.com/user/coin',
'Content-Type': 'application/json; charset=utf-8',
'Origin': 'https://open.oceanbase.com',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Priority': 'u=0'
}
query_response = self.session.post(query_url, json={}, headers=query_headers)
self.log(f"最终查询接口响应状态码: {query_response.status_code}")
if checkin_response.status_code == 200:
checkin_result = checkin_response.json()
if checkin_result.get('code') == 200:
if query_response.status_code == 200:
final_result = query_response.json()
if final_result.get('code') == 200 and final_result.get('data'):
data = final_result['data']
total_days = data.get('currentTotalDays', 0)
sign_flag = data.get('signUpFlag', 0)
if sign_flag == 1:
return {
"message": "签到成功",
"details": f"OceanBase 签到成功,累计签到 {total_days} 天"
}
else:
return {
"message": "签到失败",
"details": "OceanBase 签到失败,签到状态异常"
}
return {
"message": "签到成功",
"details": "OceanBase 签到成功"
}
elif checkin_result.get('code') == 500 and "已签到" in str(checkin_result.get('message', '')):
if query_response.status_code == 200:
final_result = query_response.json()
if final_result.get('code') == 200 and final_result.get('data'):
data = final_result['data']
total_days = data.get('currentTotalDays', 0)
sign_flag = data.get('signUpFlag', 0)
if sign_flag == 1:
return {
"message": "今日已签到",
"details": f"累计签到 {total_days} 天"
}
else:
return {
"message": "签到失败",
"details": "OceanBase 签到失败,签到状态异常"
}
return {
"message": "签到成功",
"details": "今日已签到"
}
else:
error_msg = checkin_result.get('message', '签到失败')
return {
"message": "签到失败",
"details": f"OceanBase 签到失败: {error_msg}"
}
else:
return {
"message": "签到失败",
"details": f"OceanBase 签到请求失败,状态码: {checkin_response.status_code}"
}
except Exception as e:
self.log(f"签到失败: {str(e)}", 'ERROR')
return {
"message": "签到失败",
"details": f"OceanBase 签到异常: {str(e)}"
}
def run_checkin(self):
"""执行签到任务"""
self.log("=== 开始 OceanBase 社区签到任务 ===")
try:
result = self.checkin()
today = bj_time().strftime("%Y-%m-%d")
title = f"OceanBase 社区签到结果 - {today}"
if isinstance(result, dict):
message = result.get("message", "未知状态")
details = result.get("details", "")
if "成功" in message or "已签到" in message:
content = f" {message}\n\n 详情:{details}"
self.log("签到成功")
else:
content = f" {message}\n\n 详情:{details}"
self.log(f"签到失败: {details}", 'ERROR')
else:
content = f" OceanBase 签到成功:{result}"
self.log("签到成功")
except Exception as e:
today = bj_time().strftime("%Y-%m-%d")
title = f"OceanBase 社区签到结果 - {today}"
content = f" OceanBase 签到失败:{str(e)}"
self.log(f"签到失败: {str(e)}", 'ERROR')
self.log("=== 任务完成,准备推送结果 ===")
self.send_notification(title, content)
self.log("OceanBase 签到任务完成")
return content
def random_delay():
delay_minutes = random.randint(0, 60)
delay_seconds = delay_minutes * 60
if delay_minutes > 0:
from datetime import timedelta
current_time = bj_time()
estimated_start = current_time + timedelta(minutes=delay_minutes)
print(f" 随机延迟 {delay_minutes} 分钟后开始执行任务...")
print(f" 预计开始时间: {estimated_start.strftime('%H:%M:%S')}")
time.sleep(delay_seconds)
print(f" 延迟结束,开始执行 OceanBase 签到任务")
else:
print(f" 无需延迟,立即开始执行 OceanBase 签到任务")
def main():
"""主函数"""
try:
random_delay()
ob_users = os.environ.get("OCEANBASE_USER", "").split("#")
ob_pwds = os.environ.get("OCEANBASE_PWD", "").split("#")
pushplus_token = os.environ.get("PUSH_PLUS_TOKEN")
if not ob_users or not ob_users[0]:
print(" 错误:未配置 OCEANBASE_USER 环境变量")
return
if not ob_pwds or not ob_pwds[0]:
print(" 错误:未配置 OCEANBASE_PWD 环境变量")
return
# 处理多账号情况
for ob_user, ob_pwd in zip(ob_users, ob_pwds):
if not ob_user or not ob_pwd:
continue
print(f"\n{'='*50}")
print(f"开始处理账号: {ob_user}")
print(f"{'='*50}")
client = OceanBaseClient(ob_user, ob_pwd, pushplus_token)
result = client.run_checkin()
print(f"\n账号 {ob_user} 处理完成")
print(f"结果: {result}")
except Exception as e:
print(f" 程序执行异常: {str(e)}")
if 'pushplus_token' in locals() and pushplus_token:
try:
error_title = "OceanBase 签到任务异常"
error_content = f" 程序执行异常: {str(e)}"
requests.post(
"http://www.pushplus.plus/send",
data=json.dumps({
"token": pushplus_token,
"title": error_title,
"content": error_content
}).encode('utf-8'),
headers={'Content-Type': 'application/json'},
timeout=10
)
except:
pass
if __name__ == "__main__":
main()
解析
该脚本为OceanBase 社区自动签到脚本,脚本主要作用:
自动登录 OceanBase 社区(获取登录所需 RSA 公钥、加密密码并提交登录)。
自动执行每日签到(调用积分签到接口,并查询签到天数状态)。
结果通知(可选)通过 PushPlus 把签到结果推送到微信服务号。
多账号支持(
OCEANBASE_USER
/OCEANBASE_PWD
用#
分隔),适配 青龙面板 定时运行。运行前可设置随机延迟,模拟自然行为。
工具函数
bj_time()
:返回东八区(北京时间)的datetime
。fmt_now()
:返回格式化的当前北京时间字符串。
类:OceanBaseClient
__init__(user, pwd, pushplus_token=None)
:初始化会话、通用请求头与推送配置。log(message, level='INFO')
:简单带时间的日志输出。send_notification(title, content)
:使用 PushPlus 将签到结果推送(如未配置 Token 则跳过)。get_public_key()
:请求publicKey
接口,获取登录所需 RSA 公钥。encrypt_password(password, public_key)
:用获取的公钥按 PKCS1_v1_5 加密密码(带长度校验与重试)。login()
:预热获取初始 Cookie;
拉取公钥并加密密码;
调用登录接口;
成功后再调用
links/token
获取/确认登录链路可用。checkin()
:确保已登录;
调用
insertOrUpdateSignUp
执行签到;再调用
queryUserSignUpDays
查询累积天数与今日状态;统一返回"签到成功 / 今日已签到 / 失败"的结构化结果。
run_checkin()
:一次完整任务编排:执行checkin()
→ 组织标题/内容 →send_notification()
→ 返回最终文本。
运行辅助
random_delay()
:0–60 分钟随机等待后再执行(可营造更自然的执行时间分布)。main()
:读取环境变量OCEANBASE_USER
、OCEANBASE_PWD
、PUSH_PLUS_TOKEN
,按多账号循环创建OceanBaseClient
并调用run_checkin()
,输出每个账号的执行结果。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
没有评论:
发表评论