2025年8月31日星期日

某地区角洲行动任务脚本

1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/W6Zbs4若失效,可用地址

1.购买服务器

阿里云:

服务器购买地址

https://t.aliyun.com/U/W6Zbs4

若失效,可用地址

https://www.aliyun.com/minisite/goods?source=5176.29345612&userCode=49hts92d

腾讯云:

https://curl.qcloud.com/wJpWmSfU

若失效,可用地址

https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console

华为云

https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905

2.部署教程

2024年最新青龙面板跑脚本教程(一)持续更新中

3.代码如下

import mssimport numpy as npimport win32guiimport win32processimport psutilimport win32confrom PIL import ImageDrawimport detect_moneyimport detect_locationimport threadingimport queueimport timeimport scheduleimport configparserimport pyautoguiimport pytesseractimport osimport sysimport datetimeimport keyboardfrom dataclasses import dataclassfrom mouse_keyboard_controller import MouseKeyboardController
controller = MouseKeyboardController()
# 获取脚本所在目录BASE_DIR = os.path.dirname(os.path.abspath(__file__))# 使用相对路径读取配置文件config_path = os.path.join(BASE_DIR, 'config.ini')
config = configparser.ConfigParser()# 显式指定 UTF-8 编码来读取文件with open(config_path, encoding='utf-8'as f:    config.read_file(f)
# --- 配置参数 ---game_name = config['window']['game_window_name']  # 游戏窗口名称min_width = int(config['window']['min_width'])  # 最小窗口宽度min_height = int(config['window']['min_height'])  # 最小窗口高度expected_price_1 = int(config['limit']['expected_price_1']) # 价格下限expected_price_2 = int(config['limit']['expected_price_2'])  # 价格上限x = int(config['click_location']['x'])  # 收藏物品X坐标y = int(config['click_location']['y'])  # 收藏物品Y坐标execution_time = config['schedule']['execution_time']  # 脚本执行时间execution_time_single = int(config['schedule']['execution_time_single'])  # 单次执行时长(秒)duration = int(config['schedule']['duration'])  # 总运行时长(秒)
# --- 控制标志 ---paused = False  # 控制脚本暂停/恢复should_exit = False  # 标记主程序的运行与结束thread_pause_click = False  # 控制连点线程的暂停thread_running = True  # 控制主程序运行与结束时子线程的运行与结束game_window_hwnd = None  # 游戏主窗口句柄
# --- 线程通信 ---color_check_result = False  # 线程安全变量,存储颜色检测结果color_check_lock = threading.Lock()  # 颜色检测结果的线程锁
# --- 统计数据 ---start_time_single = time.time()  # 计时器初始值consumption = initial_money = end_money = 0  # 消耗的哈夫币统计

class Tee:    """    同时将输出重定向到控制台和日志文件的类
    实现了标准输出的重定向,同时捕获未处理的异常并记录到日志文件    """
    def __init__(self, filename=None):        """        初始化Tee对象,设置日志文件路径并重定向标准输出
        参数:            filename: str - 日志文件名,如果为None则使用当前时间戳命名        """        log_dir = os.path.join(BASE_DIR, 'logs')  # 指定日志保存路径        os.makedirs(log_dir, exist_ok=True)  # 确保目录存在
        # 如果没有提供文件名,则以当前时间命名        if filename is None:            timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")            filename = f"log_{timestamp}.txt"
        self.file = open(os.path.join(log_dir, filename), "a", encoding="utf-8")  # 追加模式        self.stdout = sys.stdout        sys.stdout = self
        # 设置异常钩子,捕获未处理的异常        self.original_excepthook = sys.excepthook        sys.excepthook = self.exception_handler
    def write(self, message):        """        写入消息到标准输出和日志文件
        参数:            message: str - 要写入的消息        """        # 获取当前时间戳,精确到毫秒        timestamp = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] "        # 在每条日志前添加时间戳        if message.strip():  # 仅对非空行添加时间戳            message = f"{timestamp}{message}"  # 确保时间戳格式完整        self.stdout.write(message)  # 在 CMD 窗口打印        self.file.write(message)  # 同时写入文件        self.flush()  # 确保日志信息立即写入文件
    def flush(self):        """确保日志即时写入文件和标准输出"""        self.stdout.flush()        self.file.flush()
    def exception_handler(self, exc_type, exc_value, exc_traceback):        """        处理未捕获的异常,将异常信息记录到日志文件
        参数:            exc_type: 异常类型            exc_value: 异常值            exc_traceback: 异常的堆栈跟踪        """        # 将异常信息格式化为字符串        import traceback        exception_str = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))        # 写入日志        self.write("\n*** 捕获到未处理的异常 ***\n")        self.write(f"{exception_str}\n")        self.write("*** 异常信息结束 ***\n")        self.flush()        # 调用原始异常处理器        self.original_excepthook(exc_type, exc_value, exc_traceback)

@dataclass(frozen=True)class PurchaseEvent:    kind: str              # 'six_digits' | 'no_items' | 'seven_sep'    data: int | None = None
class PurchaseStateMonitor:    """    并行监测三种状态,任一命中产生事件;随后进入失效态,    待检测到"三种状态均不命中"连续 N 次后再重武装。    """    def __init__(self, poll_interval: float = 0.03, rearm_clear_consecutive: int = 1):        self.poll_interval = poll_interval        self.rearm_clear_consecutive = rearm_clear_consecutive
        self._stop = threading.Event()        self._armed = True        self._armed_lock = threading.Lock()
        self._present = {'six'False'no'False'seven'False}        self._present_lock = threading.Lock()
        self._q: "queue.Queue[PurchaseEvent]" = queue.Queue(maxsize=1)        self._threads: list[threading.Thread] = []
    def start(self):        self._threads = [            threading.Thread(target=self._watch_six_digits, daemon=True),            threading.Thread(target=self._watch_no_items, daemon=True),            threading.Thread(target=self._watch_seven_sep, daemon=True),            threading.Thread(target=self._watch_rearm_all_clear, daemon=True),        ]        for t in self._threads:            t.start()
    def stop(self):        self._stop.set()        for t in self._threads:            t.join(timeout=1.0)
    def get_event(self, timeout: float | None = None) -> PurchaseEvent:        return self._q.get(timeout=timeout)

    def clear_pending(self) -> None:        """        清空待处理事件,避免消费到上一次循环的残留事件。        采用与投递相同的锁以避免竞态。        """        with self._armed_lock:            while True:                try:                    self._q.get_nowait()                except queue.Empty:                    break    def _emit_if_armed(self, evt: PurchaseEvent) -> bool:        """        若当前处于武装态,投递事件并转入失效态;返回 True 表示成功投递(可打印一次性日志)。        """        with self._armed_lock:            if not self._armed:                return False            # 清理可能残留的旧事件,确保只保留最新命中的            while not self._q.empty():                try:                    self._q.get_nowait()                except queue.Empty:                    break            self._q.put(evt)            self._armed = False            return True
    def _watch_six_digits(self):        while not self._stop.is_set():            val = detect_money.main()            hit = isinstance(val, intand 100000 <= val <= 999999            with self._present_lock:                self._present['six'] = hit            if hit:                self._emit_if_armed(PurchaseEvent('six_digits', val))            time.sleep(self.poll_interval)
    def _watch_no_items(self):        while not self._stop.is_set():            hit = is_color_similar(1630889, (757982), 10)            with self._present_lock:                self._present['no'] = hit            if hit:                self._emit_if_armed(PurchaseEvent('no_items'None))            time.sleep(self.poll_interval)
    def _watch_seven_sep(self):        while not self._stop.is_set():            hit = is_color_similar(313193, (179181183), 10)            with self._present_lock:                self._present['seven'] = hit            if hit:                self._emit_if_armed(PurchaseEvent('seven_sep'None))            time.sleep(self.poll_interval)
    def _watch_rearm_all_clear(self):        clear_cnt = 0        while not self._stop.is_set():            with self._armed_lock:                armed = self._armed            if armed:                clear_cnt = 0                time.sleep(self.poll_interval)                continue
            with self._present_lock:                any_hit = self._present['six'or self._present['no'or self._present['seven']
            if not any_hit:                clear_cnt += 1                if clear_cnt >= self.rearm_clear_consecutive:                    with self._armed_lock:                        self._armed = True                    clear_cnt = 0            else:                clear_cnt = 0            time.sleep(self.poll_interval)

def take_screenshot(price):    """    截取当前屏幕并保存,包含鼠标指针位置
    参数:        price: int - 当前识别到的价格,将添加到文件名中
    功能:        1. 创建包含时间戳和价格的文件名        2. 截取全屏        3. 在截图上标记当前鼠标位置        4. 保存截图到指定目录    """    # 获取当前时间戳,精确到毫秒    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f")[:-3]    # 将价格添加到文件名中    screenshot_path = os.path.join(BASE_DIR, 'screenshots'f"screenshot_{timestamp}_price_{price}.png")    os.makedirs(os.path.dirname(screenshot_path), exist_ok=True)  # 确保目录存在
    # 使用pyautogui截取全屏    screenshot = pyautogui.screenshot()
    # 获取鼠标位置,用于在截图上标记鼠标位置    cursor_pos = win32gui.GetCursorPos()
    # 获取当前使用的鼠标光标    cursor_info = win32gui.GetCursorInfo()    cursor = cursor_info[1]  # 获取光标句柄
    # 获取光标信息    if cursor:        try:            # 将光标绘制到截图上            draw = ImageDraw.Draw(screenshot)
            # 绘制一个红色圆点表示光标位置,便于后续分析问题            draw.ellipse((cursor_pos[0] - 2, cursor_pos[1] - 2,                          cursor_pos[0] + 2, cursor_pos[1] + 2), fill='red')
        except Exception as e:            print(f"无法添加鼠标指针: {str(e)}")
    # 保存图片    screenshot.save(screenshot_path)    print(f"全屏幕截屏(含鼠标指针)已保存到 {screenshot_path}")

def get_window_normal_size(hwnd):    """    获取窗口的正常尺寸,即使它当前是最小化的
    参数:        hwnd: int - 窗口句柄
    返回:        tuple: (width, height) - 窗口的宽度和高度
    功能:        1. 检查窗口是否最小化        2. 如果是最小化状态,获取其正常位置信息        3. 如果不是最小化状态,直接获取当前尺寸        4. 返回窗口的宽度和高度    """    minimized = win32gui.IsIconic(hwnd)
    if minimized:        # 获取窗口的正常位置信息(即使窗口是最小化的)        window_placement = win32gui.GetWindowPlacement(hwnd)        normal_rect = window_placement[4]  # normalPosition属性        width = normal_rect[2] - normal_rect[0]        height = normal_rect[3] - normal_rect[1]        return width, height    else:        # 窗口未最小化,直接获取当前尺寸        rect = win32gui.GetWindowRect(hwnd)        width = rect[2] - rect[0]        height = rect[3] - rect[1]        return width, height

def find_game_window():    """    查找游戏窗口,通过尺寸区分游戏本体和启动器
    返回:        int: 游戏窗口句柄,若未找到则返回0
    功能:        1. 枚举所有窗口,筛选出标题包含游戏名称的窗口        2. 获取这些窗口的尺寸和进程信息        3. 根据最小尺寸要求筛选出符合条件的窗口        4. 选择尺寸最大的窗口作为游戏主窗口    """    global game_window_hwnd
    windows = []
    def callback(hwnd, extra):        if win32gui.IsWindowVisible(hwnd):            title = win32gui.GetWindowText(hwnd)            if game_name.lower() in title.lower():                # 获取窗口正常尺寸(即使最小化)                width, height = get_window_normal_size(hwnd)                # 获取进程信息                try:                    pid = win32process.GetWindowThreadProcessId(hwnd)[1]                    process = psutil.Process(pid)                    exe_path = process.exe()                    proc_name = process.name()                except (psutil.NoSuchProcess, psutil.AccessDenied):                    exe_path = "未知"                    proc_name = "未知"
                windows.append({                    "hwnd": hwnd,                    "title": title,                    "width": width,                    "height": height,                    "size": width * height,                    "exe_path": exe_path,                    "process": proc_name                })
    win32gui.EnumWindows(callback, None)
    # 按窗口尺寸排序,筛选出符合最小尺寸条件的窗口    suitable_windows = [w for w in windows if w["width"] >= min_width and w["height"] >= min_height]
    if suitable_windows:        # 按尺寸降序排序,选择最大的窗口        suitable_windows.sort(key=lambda w: w["size"], reverse=True)        game_window_hwnd = suitable_windows[0]["hwnd"]        print(f"已找到游戏窗口: '{suitable_windows[0]['title']}'")        print(f"窗口大小: {suitable_windows[0]['width']}x{suitable_windows[0]['height']}")        print(f"进程: {suitable_windows[0]['process']} ({suitable_windows[0]['exe_path']})")        return game_window_hwnd    elif windows:        print(f"找到的窗口均小于最小尺寸要求({min_width}x{min_height}),可能为启动器窗口:")        for w in windows:            print(f"- '{w['title']}' ({w['width']}x{w['height']}) - {w['process']}")        return 0    else:        print(f"未找到包含'{game_name}'的窗口")        return 0

def set_window_topmost(hwnd):    """    设置窗口置顶,如果窗口最小化则先恢复
    参数:        hwnd: int - 窗口句柄
    返回:        bool: 设置成功返回True,失败返回False
    功能:        1. 检查窗口是否最小化,如果是则先恢复        2. 调用Win32 API设置窗口为置顶状态    """    try:        # 检查窗口是否最小化        if win32gui.IsIconic(hwnd):            # 先恢复窗口            print("窗口已最小化,先恢复窗口")            win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)            # 给窗口一点时间恢复            time.sleep(0.3)
        # 设置窗口置顶        win32gui.SetWindowPos(hwnd, win32con.HWND_TOPMOST, 0000,                              win32con.SWP_NOMOVE | win32con.SWP_NOSIZE)        print("已将窗口设置为置顶")        return True    except Exception as e:        print(f"设置窗口置顶失败: {e}")        return False

def unset_window_topmost(hwnd):    """    取消窗口置顶
    参数:        hwnd: int - 窗口句柄
    返回:        bool: 设置成功返回True,失败返回False
    功能:        调用Win32 API取消窗口的置顶状态,使其恢复普通窗口层级    """    try:        win32gui.SetWindowPos(hwnd, win32con.HWND_NOTOPMOST, 0000,                              win32con.SWP_NOMOVE | win32con.SWP_NOSIZE)        print("已取消窗口置顶")        return True    except Exception as e:        print(f"取消窗口置顶失败: {e}")        return False

def toggle_pause():    """    切换脚本暂停状态,同时控制窗口置顶状态
    功能:        1. 切换全局暂停标志        2. 根据暂停状态设置或取消窗口置顶        3. 输出暂停/恢复状态信息    """    global paused, game_window_hwnd    paused = not paused
    if paused:        print("脚本已暂停")        # 暂停时取消窗口置顶        # noinspection PyUnreachableCode        if game_window_hwnd:            unset_window_topmost(game_window_hwnd)    else:        print("脚本已恢复")        # 恢复时重新置顶窗口        # noinspection PyUnreachableCode        if game_window_hwnd:            set_window_topmost(game_window_hwnd)

# 监听快捷键 Ctrl+Pkeyboard.add_hotkey('ctrl+p', toggle_pause)

def view_money(location, region):    """    识别并返回当前账号拥有的哈夫币数量
    参数:        location: tuple - 哈夫币图标位置坐标(x, y)        region: tuple - 哈夫币数量区域(x, y, width, height)
    返回:        int 或 None: 识别到的哈夫币数量,识别失败返回None    """    # 移动鼠标到哈夫币图标位置,触发显示哈夫币数量的悬浮窗    pyautogui.moveTo(location)    time.sleep(0.5)  # 等待悬浮窗完全显示
    # 截取哈夫币数量区域的图像    screenshot = pyautogui.screenshot(region=region)
    # 使用OCR识别图像中的数字,限制识别字符集为数字和逗号    custom_config = r'--psm 6 -c tessedit_char_whitelist=0123456789,'    money = pytesseract.image_to_string(screenshot, config=custom_config)
    try:        # 输出识别结果并返回处理后的整数值        print(f"当前哈夫币数量为{money.strip()}")        # 移除换行符、空格和逗号,转换为整数        return int(money.strip().replace("\n""").replace(" """).replace(","""))    except ValueError:        # 如果转换失败(通常是因为OCR识别不准确),返回None        return None

def is_color_similar(a, b, target_color, threshold=30):    """    使用 mss 截取 1x1 区域获取像素颜色    读取失败返回 False。    """    try:        with mss.mss() as sct:            region = {"top": b, "left": a, "width"1"height"1}            img = np.array(sct.grab(region))  # BGRA            bgr = img[00, :3]            pixel_color = (int(bgr[2]), int(bgr[1]), int(bgr[0]))  # 转 RGB    except Exception:        return False
    dr = pixel_color[0] - target_color[0]    dg = pixel_color[1] - target_color[1]    db = pixel_color[2] - target_color[2]    return (dr * dr + dg * dg + db * db) ** 0.5 < threshold

def check_chi(region, content):    """    识别指定区域内的汉字是否与预期内容匹配
    参数:        region: tuple - 截图区域 (x, y, width, height)        content: str - 预期匹配的中文内容
    返回:        bool: 匹配成功返回True,否则返回False    """    # 截取指定区域    screenshot = pyautogui.screenshot(region=region)
    # 使用中文简体模型进行OCR识别    check_result = pytesseract.image_to_string(screenshot, config='--psm 6', lang='chi_sim')
    try:        # 比较OCR结果是否与预期内容完全匹配        if check_result.strip() == content:            return True        return False    except ValueError:        # 处理异常情况        return False

def refresh_operation():    """    刷新交易行状态,防止界面卡顿
    返回:        bool: 当且仅当本次确实执行了刷新流程时返回 True,否则 False。    """    global thread_pause_click, start_time_single
    # 检查是否达到刷新时间间隔(配置文件默认180秒)    if time.time() - start_time_single > execution_time_single:        # 暂停线程        thread_pause_click = True
        time.sleep(0.1)
        # flag用于标记是否已经从全面战场切换回烽火地带模式        flag = False
        print("刷新交易行状态")        # 处理各种可能的界面状态,循环直到成功回到交易行界面        while True:            time.sleep(0.5)            if check_chi((8144771921), '为'):                # 识别到"禁止使用市场..."界面提示,按ESC关闭                controller.key_press('esc')
            elif is_color_similar(1771362, (234235235)):                # 识别到交易行购买子弹的二级界面,按ESC返回一级界面                controller.key_press('esc')
            elif is_color_similar(180106, (191195195)) or is_color_similar(180106, (818485)):                # 识别到交易行一级界面,按ESC关闭                controller.key_press('esc')
            elif is_color_similar(14591043, (677072)):                # 识别到烽火地带开始游戏界面                if flag:                    # 如果之前已执行过切换模式操作,返回交易行                    pyautogui.moveTo(72080)  # 移动到交易行按钮位置下方                    if not is_color_similar(72077, (91197146)):                        pyautogui.move(0, -200.1)  # 上移选择菜单项                        time.sleep(0.2)                        pyautogui.click()                        time.sleep(0.1)                        pyautogui.move(0200.1)  # 重置鼠标位置                    time.sleep(0.5)
                    # 点击收藏一号位,避免界面位移问题                    for _ in range(3):                        controller.mouse_click(660240)                        time.sleep(0.2)                    controller.key_press('esc')                    time.sleep(0.5)
                    break  # 成功返回交易行,退出循环                else:                    # 否则先离开烽火地带                    controller.key_press('esc')
            elif is_color_similar(14151053, (828688)):                # 识别到全面战场开始游戏界面,按ESC离开                controller.key_press('esc')
            elif is_color_similar(104330, (233234234)) and is_color_similar(104550, (9910099)):                # 识别切换模式界面(此时在烽火地带)                # 通过检查左侧菜单栏的颜色状态来判断当前游戏模式                pyautogui.moveTo(250380)  # 移动到模式选择菜单                # 切换到全面战场模式                for _ in range(3):  # 通过多次点击确保成功选择                    pyautogui.move(0200.1)  # 下移选择菜单项                    time.sleep(0.2)                    pyautogui.click()                    time.sleep(0.1)                    pyautogui.move(0, -200.1)  # 重置鼠标位置                time.sleep(0.5)                controller.key_press('space'# 关闭活动广告
            elif is_color_similar(104330, (888889)) and is_color_similar(104550, (234235235)):                # 识别切换模式界面(此时在全面战场)                pyautogui.moveTo(250380)  # 移动到模式选择菜单                # 切换到烽火地带模式                for _ in range(3):  # 通过多次点击确保成功选择                    pyautogui.move(0, -200.1)  # 上移选择菜单项                    time.sleep(0.2)                    pyautogui.click()                    time.sleep(0.1)                    pyautogui.move(0200.1)  # 重置鼠标位置                time.sleep(0.5)                controller.key_press('space')  # 关闭活动广告                flag = True  # 标记已经执行了从全面战场到烽火地带的切换操作
        start_time_single = time.time()        # 恢复线程        thread_pause_click = False        return True
    return False

def continuous_click_worker():    """    连续鼠标点击线程函数
    功能:        以高频率持续点击指定位置        可通过全局变量暂停和恢复        用于快速刷新交易行物品列表    """    global thread_running, thread_pause_click
    while thread_running:        # 检查线程是否需要暂停        if thread_pause_click:            time.sleep(0.05)  # 暂停状态下降低CPU使用率            continue
        controller.mouse_click(x, y)  # 点击当前目标位置        time.sleep(0.2)

def run_for_duration(duration_time):    """    在指定时间内执行交易行监控与操作:    - 采用并发状态监测 + 消抖(武装/失效/重武装)    - 保留定期刷新交易行、暂停/恢复连点、界面状态检查、二次检查价格等逻辑    """    global paused, should_exit, thread_running, thread_pause_click, start_time_single, \        consumption, initial_money, end_money
    # 置顶窗口    hwnd = find_game_window()    if hwnd:        set_window_topmost(hwnd)    else:        print("警告: 定时执行开始时未找到游戏窗口,无法置顶")
    # 初始资金与定位    location, region = detect_location.main()    initial_money = view_money(location, region)
    start_time = start_time_single = time.time()
    # 点击收藏一号位,避免界面位移    for _ in range(3):        controller.mouse_click(660240)        time.sleep(0.2)    controller.key_press('esc')    time.sleep(0.5)
    # 启动线程    thread_running = True    thread_pause_click = False
    click_thread = threading.Thread(target=continuous_click_worker, daemon=True)    click_thread.start()
    # 启动并发状态监测(六位价/暂无/七位分隔符)    monitor = PurchaseStateMonitor(poll_interval=0.03, rearm_clear_consecutive=1)    monitor.start()
    try:        while time.time() - start_time < duration_time:            # 暂停控制            if paused:                thread_pause_click = True                while paused:                    time.sleep(0.1)                thread_pause_click = False                continue
            # 定期刷新交易行            refreshed = refresh_operation()            if refreshed:                monitor.clear_pending() # 清空待处理事件,避免消费到上一次循环的残留事件
            # 取事件(带短超时,便于循环做其它工作)            try:                evt = monitor.get_event(timeout=0.2)            except queue.Empty:                continue
            # 处理事件            if evt.kind == 'six_digits':                price = evt.data                if expected_price_1 <= price <= expected_price_2:                    print(f"识别到价格{price}")                    # 暂停连点,避免干扰购买操作                    thread_pause_click = True                    controller.mouse_moveTo(1746900)                    controller.mouse_move(010)                    controller.mouse_click()
                    # take_screenshot(price)                    time.sleep(0.5)                    thread_pause_click = False
                controller.key_press('esc')
            elif evt.kind in ('no_items''seven_sep'):                # 无货或七位分隔符,直接返回                controller.key_press('esc')
    finally:        # 停止监测与线程        monitor.stop()        thread_running = False        click_thread.join(timeout=1.0)
        # 统计最终消耗        time.sleep(1)        location, region = detect_location.main()        end_money = view_money(location, region)        if end_money is not None:            consumption_delta = initial_money - end_money if initial_money is not None else 0            consumption_total = consumption + consumption_delta            consumption_str = "{:,}".format(consumption_total)        else:            print("最终哈夫币数量无法识别")            consumption_str = "识别失败"
        print(f"时间到,总计消耗哈夫币:{consumption_str}")        should_exit = True

def main():    """    主函数,调度整个脚本的执行
    功能:        1. 设置定时任务        2. 等待定时任务执行        3. 处理退出信号    """    global game_window_hwnd, should_exit
    # 查找游戏窗口(在定时执行时置顶)    game_window_hwnd = find_game_window()
    # 输出脚本即将执行的时间和持续时长    print(f"{execution_time}开始执行,执行{duration}秒")
    # 设置定时任务,在指定时间执行run_for_duration函数    schedule.every().day.at(execution_time).do(run_for_duration, duration_time=duration)
    # 持续运行,直到收到退出信号    try:        while not should_exit:            # 检查并执行到期的定时任务            schedule.run_pending()            # 每秒检查一次,降低CPU占用            time.sleep(1)    finally:        # 脚本结束时,取消窗口置顶        if game_window_hwnd:            unset_window_topmost(game_window_hwnd)

if __name__ == "__main__":    # 创建Tee实例,重定向输出到日志文件    tee = Tee()    sys.stdout = tee
    # 记录当前时间作为日志标题    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")    print(f"\n====== 运行时间: {timestamp} ======\n")
    try:        # 运行主程序        main()    finally:        # 确保线程停止        thread_running = False        # 给线程一点时间退出        time.sleep(0.5)
        # 关闭日志文件并恢复标准输出        sys.excepthook = tee.original_excepthook        sys.stdout = tee.stdout        tee.file.close()
        print(f"日志已保存到 {tee.file.name}")

解析

这是一个三角洲游戏交易行自动化选购装备脚本,主要作用为:

  • 高频刷新交易行界面,自动识别装备价格;

  • 如果检测到低价装备(在设定区间内),立即自动点击购买;

  • 记录和统计消耗的虚拟货币(哈夫币);

  • 支持定时执行、暂停/恢复、日志记录和自动截图;

  • 提供异常处理和日志持久化,便于后期分析。

主要方法

1. Tee 类

  • 功能:把所有输出同时写到 控制台和日志文件

  • 还会捕获未处理的异常,自动写入日志文件。

2. PurchaseStateMonitor 类

  • 功能:并行监测三种交易状态,命中时触发事件:

    • six_digits: 识别到六位数字价格;

    • no_items: 无货提示;

    • seven_sep: 七位数价格分隔符。

  • 内部机制:

    • 多线程轮询,实时检测游戏界面;

    • 事件触发后进入"失效态",必须等界面恢复正常(防抖)才会再次触发。

3. take_screenshot(price)

  • 截图全屏并保存,文件名带上价格和时间戳;

  • 在截图中标记鼠标位置,便于复盘。

4. find_game_window() / set_window_topmost() / unset_window_topmost()

  • 查找并锁定游戏窗口,避免误操作启动器;

  • 设置或取消游戏窗口置顶;

  • 在运行时保证脚本始终操作正确的窗口。

5. toggle_pause()

  • 切换脚本暂停/恢复状态;

  • 绑定快捷键 Ctrl+P

  • 暂停时取消窗口置顶,恢复时重新置顶。

6. view_money(location, region)

  • OCR 识别玩家当前的哈夫币数量;

  • 用于记录初始/最终余额,统计消耗。

7. is_color_similar() / check_chi()

  • is_color_similar: 用像素点颜色检测界面状态;

  • check_chi: OCR 检测界面中文提示(如"禁止使用市场")。

8. refresh_operation()

  • 定期刷新交易行界面,防止卡顿;

  • 自动判断当前游戏界面并执行返回/切换操作;

  • 确保脚本始终停留在交易行主界面。

9. continuous_click_worker()

  • 子线程函数;

  • 在指定坐标高频点击,用于刷新交易行列表。

10. run_for_duration(duration_time)

  • 脚本核心执行流程

    • six_digits: 判断价格是否在设定区间,若符合则点击购买;

    • no_items / seven_sep: 无货或非目标价格,返回上一级界面;

    • 启动高频点击线程和状态监测;

    • 循环运行直到设定时长;

    • 根据事件类型做操作:

    • 定期刷新交易行;

    • 最终统计消耗金额。

11. main()

  • 主入口函数;

  • 读取配置(执行时间、持续时间、点击位置等);

  • 注册定时任务,到点自动执行 run_for_duration

  • 脚本退出时清理状态、取消窗口置顶。


注意

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。


历史脚本txt文件获取>>
服务器搭建,人工服务咨询>>

没有评论:

发表评论

太好了!比特币和以太坊都跌了!

本周总结干货: (1)离岸人民币创去年11月以来新高; (2)美债收益曲线陡峭化,预示长期经济前景乐观; (3)加密货币下跌回调。 01 离岸人民币创年内新高 截至8月30日, 美元指数97.84,与上周相比,微涨0.13%。 离岸人民币汇率较上周微涨0.69%,达到 7.1...