2025年8月29日星期五

某学习网站自动刷视频脚本

1.购买服务器阿里云:服务器购买地址https://t.aliyun.com/U/W6Zbs4若失效,可用地址

1.购买服务器

阿里云:

服务器购买地址

https://t.aliyun.com/U/W6Zbs4

若失效,可用地址

https://www.aliyun.com/minisite/goods?source=5176.29345612&userCode=49hts92d

腾讯云:

https://curl.qcloud.com/wJpWmSfU

若失效,可用地址

https://cloud.tencent.com/act/cps/redirect?redirect=2446&cps_key=ad201ee2ef3b771157f72ee5464b1fea&from=console

华为云

https://activity.huaweicloud.com/cps.html?fromacct=64b5cf7cc11b4840bb4ed2ea0b2f4468&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905

2.部署教程

2024年最新青龙面板跑脚本教程(一)持续更新中

3.代码如下

import loggingimport randomimport timeimport osimport datetimeimport jsonimport pickleimport base64import mathimport warningsfrom io import BytesIOimport sys
# 抑制警告信息warnings.filterwarnings("ignore")
# 抑制Selenium的一些警告os.environ['WDM_LOG_LEVEL'] = '0'os.environ['WDM_PRINT_FIRST_LINE'] = 'False'
from PIL import Image
from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.edge.options import Optionsfrom selenium.webdriver.edge.service import Servicefrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver.support.ui import WebDriverWait
# 全局配置ARTICLE_READ_TIME = 70  # 阅读文章时间(秒)VIDEO_WATCH_TIME = 180  # 观看视频时间(秒)WAIT_TIMEOUT = 30  # 等待元素超时时间(秒)EDGE_DRIVER_PATH = None  # 可以手动指定Edge驱动路径

class XueXiQiangGuoAssistant:    """学习强国助手类"""
    def __init__(self):        self.driver = None        self.logger = self._setup_logger()
    def _setup_logger(self):        """设置日志记录器"""        logger = logging.getLogger('XueXiQiangGuoAssistant')        logger.setLevel(logging.INFO)
        if not logger.handlers:            handler = logging.StreamHandler()            formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')            handler.setFormatter(formatter)            logger.addHandler(handler)
        return logger
    def _get_edge_driver_path(self):        """获取Edge驱动路径,支持离线模式"""        try:            # 尝试自动下载(在线模式)            try:                from webdriver_manager.microsoft import EdgeChromiumDriverManager                return EdgeChromiumDriverManager(                    url="https://msedgedriver.microsoft.com/",                    latest_release_url="https://msedgedriver.microsoft.com/LATEST_RELEASE"                ).install()            except ImportError:                self.logger.warning("webdriver_manager 未安装,尝试使用系统路径中的驱动")            except Exception as e:                self.logger.warning(f"自动下载驱动失败: {e}")
            # 离线模式:检查常见路径            possible_paths = [                EDGE_DRIVER_PATH,                "msedgedriver",  # 当前目录                "/usr/local/bin/msedgedriver",  # Linux                "/usr/bin/msedgedriver",  # Linux                "C:\\Program Files\\EdgeDriver\\msedgedriver.exe",  # Windows                "C:\\Windows\\System32\\msedgedriver.exe",  # Windows            ]
            for path in possible_paths:                if path and os.path.exists(path):                    self.logger.info(f"使用Edge驱动: {path}")                    return path
            # 如果都没找到,提示用户手动指定            self.logger.error("未找到Edge驱动,请手动安装并指定路径")            manual_path = input("请输入Edge驱动的完整路径(或按Enter退出): ").strip()            if manual_path and os.path.exists(manual_path):                return manual_path            else:                raise Exception("未找到有效的Edge驱动路径")
        except Exception as e:            self.logger.error(f"获取Edge驱动路径失败: {e}")            return None
    def initialize_driver(self):        """初始化WebDriver,支持离线模式"""        try:            # 设置Edge选项            edge_options = Options()
            # GPU和WebGL相关选项            edge_options.add_argument("--disable-gpu")            edge_options.add_argument("--disable-gpu-sandbox")            edge_options.add_argument("--disable-software-rasterizer")            edge_options.add_argument("--disable-webgl")            edge_options.add_argument("--disable-webgl2")            edge_options.add_argument("--disable-3d-apis")            edge_options.add_argument("--disable-accelerated-2d-canvas")            edge_options.add_argument("--disable-accelerated-video-decode")
            # 安全和性能选项            edge_options.add_argument("--no-sandbox")            edge_options.add_argument("--disable-dev-shm-usage")            edge_options.add_argument("--disable-extensions")            edge_options.add_argument("--disable-plugins")            edge_options.add_argument("--disable-images")            edge_options.add_argument("--disable-javascript-harmony-shipping")
            # 窗口和显示选项            edge_options.add_argument("--window-size=1920,1080")            edge_options.add_argument("--disable-web-security")            edge_options.add_argument("--allow-running-insecure-content")
            # 日志级别设置(减少错误信息输出)            edge_options.add_argument("--log-level=3")            edge_options.add_argument("--silent")            edge_options.add_experimental_option('excludeSwitches', ['enable-logging'])            edge_options.add_experimental_option('useAutomationExtension'False)
            # 可选:取消注释以下行以启用无头模式            edge_options.add_argument("--headless")
            # 获取驱动路径            driver_path = self._get_edge_driver_path()            if not driver_path:                return False
            # 初始化WebDriver            self.logger.info("正在初始化浏览器...")            service = Service(executable_path=driver_path)            self.driver = webdriver.Edge(service=service, options=edge_options)
            # 设置页面加载超时            self.driver.set_page_load_timeout(60)            self.driver.implicitly_wait(10)
            self.logger.info("浏览器初始化成功")            return True
        except Exception as e:            self.logger.error(f"初始化WebDriver时发生错误: {e}")            return False
    def check_network_connection(self):        """检查网络连接状态"""        try:            import socket            socket.create_connection(("www.baidu.com"80), timeout=5)            self.logger.info("网络连接正常")            return True        except OSError:            self.logger.warning("网络连接异常,请检查网络设置")            return False
    def extract_login_qrcode(self, output_path=None):        """        提取学习强国登录页面的二维码图片并保存到文件        """        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return None
        try:            if output_path is None:                output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "login_qrcode.png")
            # 确保页面已加载到登录页            if "login.html" not in self.driver.current_url:                self.logger.info("正在跳转到登录页面...")                self.driver.get("https://pc.xuexi.cn/points/login.html")                time.sleep(3)
            # 切换到登录iframe            wait = WebDriverWait(self.driver, WAIT_TIMEOUT)            wait.until(EC.frame_to_be_available_and_switch_to_it((By.ID, "ddlogin-iframe")))            self.logger.info("已切换到登录iframe")
            # 查找二维码图片元素            try:                # 尝试多种可能的选择器                qr_selectors = [                    '//*[@id="app"]/div/div[1]/div/div[1]/div[1]/img',                    '//img[contains(@src, "base64")]',                    '//div[contains(@class, "qrcode")]//img'                ]
                qr_element = None                for selector in qr_selectors:                    try:                        qr_element = wait.until(EC.presence_of_element_located((By.XPATH, selector)))                        if qr_element:                            break                    except:                        continue
                if not qr_element:                    self.logger.error("未找到二维码元素")                    return None
                # 获取图片元素的src属性                src = qr_element.get_attribute('src')
                if src and 'base64,' in src:                    base64_data = src.split('base64,')[1]                    img_data = base64.b64decode(base64_data)                    img = Image.open(BytesIO(img_data))                    img.save(output_path)
                    if os.path.exists(output_path):                        self.logger.info(f"二维码已保存到: {output_path}")                        return output_path                else:                    self.logger.warning("图片元素不包含base64编码的数据")
            except Exception as e:                self.logger.error(f"提取二维码时出错: {e}")
            finally:                self.driver.switch_to.default_content()
        except Exception as e:            self.logger.error(f"提取二维码时发生错误: {e}")            try:                self.driver.switch_to.default_content()            except:                pass
        return None
    def check_login_status(self):        """通过cookie检测登录状态"""        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return False
        try:            # 确保在xuexi.cn域名下            current_url = self.driver.current_url            if "xuexi.cn" not in current_url:                # 导航到学习强国主页来检查cookie                self.driver.get("https://www.xuexi.cn")                time.sleep(2)
            # 获取xuexi.cn域名下的所有cookie            cookies = self.driver.get_cookies()
            # 检查是否存在token相关的cookie            token_found = False            for cookie in cookies:                cookie_name = cookie.get('name''').lower()                cookie_value = cookie.get('value''')
                # 检查常见的token cookie名称                if any(token_key in cookie_name for token_key in ['token''access_token''auth''session''login']):                    if cookie_value and len(cookie_value) > 10:  # token通常比较长                        self.logger.info(f"发现有效token: {cookie_name}")                        token_found = True                        break
            if token_found:                # 进一步验证:尝试访问需要登录的页面                try:                    self.driver.get("https://pc.xuexi.cn/points/my-points.html")                    time.sleep(3)
                    # 检查是否被重定向到登录页面                    current_url = self.driver.current_url                    if "login.html" in current_url:                        self.logger.info("虽然有token但被重定向到登录页,token可能已过期")                        return False
                    self.logger.info("通过cookie验证登录成功")                    return True
                except Exception as e:                    self.logger.warning(f"验证登录状态时发生错误: {e}")                    return False            else:                self.logger.info("未找到有效的登录token")                return False
        except Exception as e:            self.logger.error(f"检查登录状态时出错: {e}")            return False
    def wait_for_login(self):        """等待用户登录成功"""        try:            self.logger.info("请使用学习强国APP扫描二维码登录...")            self.logger.info("等待登录成功...")
            timeout = 300  # 5分钟超时            wait_end_time = time.time() + timeout            check_count = 0
            while time.time() < wait_end_time:                # 使用更严格的登录状态检查                if self.check_login_status():                    self.logger.info("登录验证成功!")                    return True
                # 每10秒检查一次                time.sleep(10)                check_count += 1
                # 每30秒提醒一次                if check_count % 3 == 0:                    self.logger.info("仍在等待登录...如果已登录成功,请输入 'y' 确认")                    user_input = input("已登录成功?(y/n): ").lower().strip()                    if user_input == 'y':                        # 用户确认后也要验证登录状态                        if self.check_login_status():                            self.logger.info("用户确认并验证登录成功!")                            return True                        else:                            self.logger.warning("用户确认登录,但验证失败,请重新登录")
            # 超时处理            self.logger.warning("登录等待超时")            user_input = input("是否已成功登录?(y/n): ").lower().strip()            if user_input == 'y':                # 最后验证一次                if self.check_login_status():                    return True                else:                    self.logger.error("登录验证失败,请重新登录")                    return False            return False
        except Exception as e:            self.logger.error(f"检测登录状态时出错: {e}")            user_input = input("登录状态检测出错,是否已成功登录?(y/n): ").lower().strip()            if user_input == 'y':                return self.check_login_status()            return False
    def launch_xuexi_website(self):        """启动学习强国网站"""        try:            # 检查网络连接            if not self.check_network_connection():                self.logger.warning("继续尝试,但网络可能不稳定...")
            # 初始化浏览器            if not self.initialize_driver():                return
            # 打开学习强国登录页面            self.logger.info("正在打开学习强国...")            self.driver.get("https://www.xuexi.cn")            time.sleep(3)
            # 检查是否已经登录(使用更严格的检查)            if self.check_login_status():                self.logger.info("检测到已登录状态,直接进入学习页面")                self.show_menu()                return
            # 未登录,跳转到登录页面            self.logger.info("未检测到登录状态,跳转到登录页面")            self.driver.get("https://pc.xuexi.cn/points/login.html")            time.sleep(3)
            # 提取并显示二维码            qr_path = self.extract_login_qrcode()            if qr_path:                try:                    img = Image.open(qr_path)                    img.show()                    self.logger.info("二维码已显示,请使用学习强国APP扫描")                except:                    self.logger.info(f"无法自动显示图片,请手动查看: {qr_path}")
            # 等待登录            if self.wait_for_login():                # 清理二维码文件                if qr_path and os.path.exists(qr_path):                    os.remove(qr_path)                self.show_menu()            else:                self.logger.error("登录失败或超时")
        except Exception as e:            self.logger.error(f"启动学习强国时发生错误: {e}")        finally:            self.quit_driver()
    def read_articles(self, num_articles=6, start_index=0):        """        阅读文章获取积分
        参数:            num_articles: 要阅读的文章数量            start_index: 从文章列表的第几篇文章开始阅读        """        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return False
        try:            # 跳转到新闻页面            self.logger.info("正在跳转到新闻页面...")            self.driver.get("https://www.xuexi.cn")
            time.sleep(2)
            # 等待文章列表加载            article_links = WebDriverWait(self.driver, 30).until(                EC.presence_of_all_elements_located((By.XPATH, "//div[@class='text-link-item-title']"))            )
            # 阅读指定数量的文章            read_count = min(len(article_links), num_articles)            self.logger.info(f"找到{len(article_links)}篇文章,计划阅读{read_count}篇,从第{start_index + 1}篇开始")
            for i in range(read_count):                # 重新获取文章列表,避免StaleElementReferenceException                article_links = WebDriverWait(self.driver, 30).until(                    EC.presence_of_all_elements_located((By.XPATH, "//div[@class='text-link-item-title']"))                )
                # 计算实际的文章索引,使用模运算确保不会超出范围                actual_index = (i + start_index) % len(article_links)                self.logger.info(f"正在阅读第 {actual_index + 1}/{len(article_links)} 篇文章")
                # 点击对应索引的文章                article_links[actual_index].click()
                # 切换到新窗口                self.driver.switch_to.window(self.driver.window_handles[-1])
                # 模拟阅读行为,随机滚动页面                read_time = 70 + random.randint(-1010)  # 阅读文章时间(秒)                self.logger.info(f"阅读时间:{read_time}秒")
                end_time = time.time() + read_time                while time.time() < end_time:                    # 随机滚动页面                    scroll_height = random.randint(100500)                    self.driver.execute_script(f"window.scrollBy(0, {scroll_height});")                    time.sleep(random.uniform(25))
                # 关闭当前文章窗口,回到文章列表                self.driver.close()                self.driver.switch_to.window(self.driver.window_handles[0])
                time.sleep(1)
            self.logger.info("文章阅读完成!")            return True        except Exception as e:            self.logger.error(f"阅读文章时发生错误: {e}")            return False
    def watch_videos(self, num_videos=6, start_index=0):        """        观看视频获取积分
        参数:            num_videos: 要观看的视频数量            start_index: 从视频列表的第几个视频开始观看        """        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return False
        try:            # 跳转到视频页面            self.logger.info("正在跳转到视频页面...")            self.driver.get("https://www.xuexi.cn/4426aa87b0b64ac671c96379a3a8bd26/db086044562a57b441c24f2af1c8e101.html")
            time.sleep(3)
            # 等待视频列表加载 - 调整选择器以匹配视频列表项            self.logger.info("等待视频列表加载...")
            # 尝试多种选择器            selector_options = [                {"type""xpath""value""//div[contains(@class, 'thePic')][@data-link-target]"},                {"type""xpath""value""//div[contains(@class, 'textWrapper')][@data-link-target]"},                {"type""xpath""value""//div[contains(@class, 'grid-cell')]//div[contains(@class, 'innerPic')]"},                {"type""css""value"".grid-gr .grid-cell"}            ]
            # 尝试每个选择器            current_selector = None            for selector in selector_options:                try:                    if selector["type"] == "xpath":                        video_links = WebDriverWait(self.driver, 30).until(                            EC.presence_of_all_elements_located((By.XPATH, selector["value"]))                        )                    else:                        video_links = WebDriverWait(self.driver, 30).until(                            EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector["value"]))                        )
                    if video_links and len(video_links) > 0:                        self.logger.info(f"找到 {len(video_links)} 个视频,使用选择器: {selector['value']}")                        current_selector = selector  # 保存成功的选择器                        break                except Exception as e:                    self.logger.debug(f"选择器 {selector['value']} 未找到元素")
            if not current_selector:                self.logger.error("无法找到视频列表,任务无法完成")                return False
            # 观看指定数量的视频            watch_count = min(len(video_links), num_videos)            self.logger.info(f"计划观看{watch_count}个视频,从第{start_index + 1}个开始")
            for i in range(watch_count):                # 重新获取视频列表,使用成功的选择器                try:                    if current_selector["type"] == "xpath":                        video_links = WebDriverWait(self.driver, 30).until(                            EC.presence_of_all_elements_located((By.XPATH, current_selector["value"]))                        )                    else:                        video_links = WebDriverWait(self.driver, 30).until(                            EC.presence_of_all_elements_located((By.CSS_SELECTOR, current_selector["value"]))                        )
                    # 计算实际的视频索引,使用模运算确保不会超出范围                    actual_index = (i + start_index) % len(video_links)                    self.logger.info(f"正在观看第 {actual_index + 1}/{len(video_links)} 个视频")
                    # 确保元素可点击                    try:                        if current_selector["type"] == "xpath":                            WebDriverWait(self.driver, 10).until(                                EC.element_to_be_clickable((By.XPATH, current_selector["value"]))                            )                        else:                            WebDriverWait(self.driver, 10).until(                                EC.element_to_be_clickable((By.CSS_SELECTOR, current_selector["value"]))                            )                    except Exception as e:                        self.logger.debug(f"等待元素可点击时出错: {e}")
                    # 使用JavaScript点击元素                    try:                        self.driver.execute_script("arguments[0].click();", video_links[actual_index])                    except Exception as e:                        self.logger.warning(f"点击视频时出错,尝试替代方法: {e}")                        try:                            video_links[actual_index].click()                        except:                            self.logger.error("替代点击方法也失败,跳过此视频")                            continue
                    # 切换到新窗口                    try:                        if len(self.driver.window_handles) > 1:                            self.driver.switch_to.window(self.driver.window_handles[-1])                        else:                            self.logger.info("没有新窗口打开,继续处理当前页面")                    except Exception as e:                        self.logger.error(f"切换窗口时出错: {e}")                        continue
                    # 等待视频加载并播放                    try:                        # 尝试多个可能的视频选择器                        video_selectors = ["//video""//div[contains(@class,'outter')]//video""//div[@id='ji-player']"]                        video_player = None
                        for selector in video_selectors:                            try:                                video_player = WebDriverWait(self.driver, 30).until(                                    EC.presence_of_element_located((By.XPATH, selector))                                )                                if video_player:                                    break                            except:                                continue
                        if video_player:                            # 设置视频静音                            self.driver.execute_script("arguments[0].muted = true;", video_player)                            self.logger.info("已将视频设为静音模式")
                            # 确保视频开始播放                            self.driver.execute_script("arguments[0].play();", video_player)
                            # 等待视频加载并获取时长                            video_duration = 0                            wait_duration_time = time.time() + 10                            while time.time() < wait_duration_time:                                try:                                    video_duration = self.driver.execute_script("return arguments[0].duration", video_player)                                    if video_duration and video_duration > 0 and not math.isnan(video_duration):                                        break                                except:                                    pass                                time.sleep(1)
                            # 根据视频时长决定观看时间                            if video_duration and video_duration > 0:                                minutes = int(video_duration // 60)                                seconds = int(video_duration % 60)                                self.logger.info(f"检测到视频时长: {minutes}{seconds}秒 ({video_duration:.1f}秒)")
                                watch_time = int(video_duration) + random.randint(510)
                                if watch_time > 300:  # 如果超过5分钟                                    watch_time = 300    # 直接设置为5分钟                            else:                                self.logger.info("无法获取视频时长,使用默认观看时间")                                watch_time = 180 + random.randint(-1515)
                            # 检查视频是否真的在播放                            is_playing = self.driver.execute_script(                                "return arguments[0].paused === false && arguments[0].currentTime > 0",                                video_player                            )
                            if not is_playing:                                self.logger.info("尝试手动开始播放视频")                                play_buttons = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'play')]")                                if play_buttons:                                    play_buttons[0].click()                    except Exception as e:                        self.logger.error(f"播放视频时出错: {e}")                        watch_time = 180 + random.randint(-1515)
                    self.logger.info(f"观看时间:{watch_time}秒")
                    # 观看视频,并定期检查播放状态                    end_time = time.time() + watch_time                    while time.time() < end_time:                        remaining_time = end_time - time.time()
                        # 只在距离结束还有超过30秒时进行滚动                        if remaining_time > 30:                            scroll_height = random.randint(100400)                            self.driver.execute_script(f"window.scrollBy(0, {scroll_height});")
                            time.sleep(random.uniform(25))                            if random.random() > 0.5:  # 50%的概率滚回一些距离                                back_scroll = random.randint(50, scroll_height)                                self.driver.execute_script(f"window.scrollBy(0, -{back_scroll});")
                        # 每隔15-30秒检查一次视频是否仍在播放                        check_interval = random.uniform(1530)                        check_interval = min(check_interval, remaining_time)                        time.sleep(check_interval)
                        try:                            if video_player:                                is_paused = self.driver.execute_script("return arguments[0].paused", video_player)                                if is_paused:                                    self.logger.info("视频已暂停,尝试继续播放")                                    self.driver.execute_script("arguments[0].play();", video_player)                        except:                            pass
                    # 观看结束,确保视频在可见区域                    try:                        self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", video_player)                    except Exception as e:                        self.logger.debug(f"滚动到视频位置失败: {e}")
                    # 关闭当前视频窗口,回到视频列表                    try:                        self.driver.close()                        self.driver.switch_to.window(self.driver.window_handles[0])                    except Exception as e:                        self.logger.error(f"关闭视频窗口时出错: {e}")                        if len(self.driver.window_handles) > 0:                            self.driver.switch_to.window(self.driver.window_handles[0])                    time.sleep(1)                except Exception as e:                    self.logger.error(f"重新获取视频列表时出错: {e}")                    continue
            self.logger.info("视频观看完成!")            return True        except Exception as e:            self.logger.error(f"观看视频时发生错误: {e}")            if len(self.driver.window_handles) > 0:                self.driver.switch_to.window(self.driver.window_handles[0])            return False
    def check_score(self, verbose=False):        """        查看当前学习积分,并返回文章和视频的积分状态        verbose: 是否显示详细信息        """        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return {                'article': {'current'0'target'12},                'video': {'current'0'target'12}            }
        try:            # 跳转到积分页面            self.logger.info("正在检查积分状态...")            self.driver.get("https://pc.xuexi.cn/points/my-points.html")
            time.sleep(3)
            # 等待积分数据加载            WebDriverWait(self.driver, 30).until(                EC.presence_of_element_located((By.CLASS_NAME, "my-points-content"))            )
            # 提取各项积分详情            article_points = {'current'0'target'12}            video_points = {'current'0'target'12}
            try:                score_cards = self.driver.find_elements(By.CLASS_NAME, "my-points-card")
                if verbose:                    self.logger.info(f"积分详情: 找到 {len(score_cards)} 个积分卡片")
                # 只有在详细模式下才打印所有卡片                if verbose:                    self.logger.info("所有积分卡片标题:")                    for i, card in enumerate(score_cards):                        try:                            title = card.find_element(By.CLASS_NAME, "my-points-card-title").text                            progress = card.find_element(By.CLASS_NAME, "my-points-card-text").text                            self.logger.info(f"{i + 1}{title}{progress}")                        except:                            pass
                # 解析积分详情                for card in score_cards:                    try:                        title = card.find_element(By.CLASS_NAME, "my-points-card-title").text                        progress = card.find_element(By.CLASS_NAME, "my-points-card-text").text
                        # 提取文章和视频的积分情况                        if "选读文章" in title or "阅读文章" in title or "我要选读文章" in title:                            try:                                current, target = progress.split("/")                                # 移除非数字字符再转换                                current_clean = ''.join(filter(str.isdigit, current))                                target_clean = ''.join(filter(str.isdigit, target))
                                article_points['current'] = int(current_clean)                                article_points['target'] = int(target_clean)                            except Exception as e:                                if verbose:                                    self.logger.warning(f"解析文章积分失败: {progress}, 错误: {e}")                        elif ("视听学习" in title or "视频" in title) and (                                "时长" in title or "分钟" in title or "我要" in title):                            try:                                current, target = progress.split("/")                                # 移除非数字字符再转换                                current_clean = ''.join(filter(str.isdigit, current))                                target_clean = ''.join(filter(str.isdigit, target))
                                video_points['current'] = int(current_clean)                                video_points['target'] = int(target_clean)                            except Exception as e:                                if verbose:                                    self.logger.warning(f"解析视频积分失败: {progress}, 错误: {e}")                    except Exception as e:                        if verbose:                            self.logger.warning(f"获取积分卡片详情失败: {e}")
                # 简洁的积分汇总                self.logger.info(f"积分进度: 文章 {article_points['current']}/{article_points['target']} | " +                      f"视频 {video_points['current']}/{video_points['target']}")            except Exception as e:                if verbose:                    self.logger.error(f"获取积分详情失败: {e}")
            return {                'article': article_points,                'video': video_points            }        except Exception as e:            if verbose:                self.logger.error(f"查看积分时发生错误: {e}")            return {                'article': {'current'0'target'12},                'video': {'current'0'target'12}            }
    def show_menu(self):        """显示功能菜单并处理用户选择"""        while True:            print("\n=== 学习强国助手菜单 ===")            print("1. 阅读文章(获取积分)")            print("2. 观看视频(获取积分)")            print("3. 查看我的积分")            print("4. 阅读文章+观看视频(全自动)")            print("0. 退出程序")
            choice = input("\n请选择功能 (0-4): ").strip()
            if choice == '1':                num = input("请输入要阅读的文章数量 (默认12篇): ").strip()                num = int(num) if num.isdigit() else 12                self.read_articles(num)            elif choice == '2':                num = input("请输入要观看的视频数量 (默认12个): ").strip()                num = int(num) if num.isdigit() else 12                self.watch_videos(num)            elif choice == '3':                self.check_score(verbose=True)            elif choice == '4':                self.run_automatic_learning()            elif choice == '0':                self.logger.info("正在退出程序...")                break            else:                print("无效选择,请重新输入")
    def run_automatic_learning(self):        """全自动学习"""        if not self.driver:            self.logger.error("浏览器未初始化,请先调用 initialize_driver()")            return False
        try:            self.logger.info("===== 开始全自动学习 =====")
            # 初始化检查积分状态            score_status = self.check_score(verbose=False)
            # 持续检查直到所有任务完成            while True:                # 计算所需的阅读文章和观看视频数量                article_target = score_status['article']['target']                article_current = score_status['article']['current']                article_remaining = max(0, article_target - article_current)
                video_target = score_status['video']['target']                video_current = score_status['video']['current']                video_remaining = max(0, video_target - video_current)
                # 显示完成百分比                article_percent = min(100int(article_current / article_target * 100))                video_percent = min(100int(video_current / video_target * 100))                self.logger.info(f"当前进度: 文章 {article_percent}% | 视频 {video_percent}%")
                # 如果两种任务都已完成,退出循环                if article_remaining <= 0 and video_remaining <= 0:                    self.logger.info(" 所有学习任务已完成!")                    break
                # 自动完成文章阅读任务,从已阅读的文章数量开始                if article_remaining > 0:                    batch_articles = min(6, article_remaining)                    # 传递已获得的文章积分作为起始索引                    self.read_articles(batch_articles, article_current)
                # 简短地检查积分状态                score_status = self.check_score(verbose=False)
                # 如果已完成所有任务,提前退出                if score_status['article']['current'] >= article_target and score_status['video']['current'] >= video_target:                    self.logger.info(" 所有学习任务已完成!")                    break
                # 自动完成视频观看任务,从已观看的视频数量开始                if video_remaining > 0:                    batch_videos = min(6, video_remaining)                    # 传递已获得的视频积分作为起始索引                    self.watch_videos(batch_videos, video_current)
                # 再次检查积分状态                score_status = self.check_score(verbose=False)
            return True        except Exception as e:            self.logger.error(f"全自动学习过程中发生错误: {e}")            return False
    def quit_driver(self):        """关闭浏览器"""        if self.driver:            try:                self.driver.quit()                self.logger.info("浏览器已关闭")            except:                pass

def check_dependencies():    """检查必要的依赖"""    try:        from PIL import Image        from selenium import webdriver        return True    except ImportError as e:        print(f"缺少必要依赖: {e}")        print("请安装所需包: pip install selenium pillow")        return False

def main():    """主函数"""    print("=" * 50)    print("学习强国自动化助手")    print("=" * 50)
    # 检查依赖    if not check_dependencies():        return
    # 运行助手    assistant = XueXiQiangGuoAssistant()    assistant.launch_xuexi_website()

if __name__ == "__main__":    main()

解析

这份脚本做的事一眼看懂:用 Selenium(Edge,无头)自动登录学习强国,读取/观看若干文章与视频拿分,并可查看积分进度;还支持把登录二维码从 iframe 里取出、base64 解码后保存/展示,供手机扫码登录。

主要功能

  • 启动 Edge(可自动下载或走本地驱动),大量禁用项提升稳定/隐蔽性(GPU、WebGL、扩展、图片等),默认 无头模式

  • 打开学习强国:

    • 若未登录:跳到登录页 → 切换到 ddlogin-iframe → 抓取二维码 <img> → 保存并展示图片 → 轮询/人工确认 → 用访问「我的积分页」验证是否登录。

    • 若已登录:直接展示功能菜单。

  • 功能菜单:阅读文章、看视频、查积分、全自动(先查进度,再补足文章/视频目标)。

  • 阅读文章/看视频:打开列表页,按索引点击进入详情页/视频页,模拟滚动、尝试播放、按时长或默认时长停留,再关闭标签回到列表。

  • 查询积分:抓取各积分卡片标题和进度文案,解析"当前/目标"。

跑之前你要准备

  • 依赖:seleniumPillow(已自检),若在线自动下载驱动需 webdriver-manager(脚本里做了兜底)。

  • EdgeDriver:联网可自动下;离线环境请把路径填到 EDGE_DRIVER_PATH

  • 无头环境想看二维码:脚本已把二维码保存到本地 login_qrcode.png,但 img.show() 在无头服务器通常打不开图形界面,直接用文件即可。

无头 + "禁用图片/视频"的矛盾

    • 你加了 --disable-images--disable-accelerated-video-decode 等,这可能导致文章缩略图/视频节点渲染异常,进而找不到元素或视频无法播放。

    • 建议:阅读文章可保留 --disable-images看视频前动态移除该策略或使用单独的 WebDriver 实例用于视频(不禁图/不禁视频硬解)。

二维码 iframe / 选择器易变

    • 登录页 iframe id ddlogin-iframe 和二维码 XPATH 可能变动。

    • 建议:增加多个稳健选择器(你已做多套 XPATH,很好),再补一个 CSS 选择器 备选,并在失败时截图 page_source 到文件方便排查。

无头扫码后的可视化反馈

    • 目前靠轮询和"访问积分页是否跳登录"判断;如果登录慢会反复告警。

    • 后期可优化:加一个 二维码过期重刷(登录页通常支持刷新),或定时重新抓取二维码并覆盖保存。

窗口与句柄管理

    • 多次打开/关闭标签页后,偶尔会句柄错乱。

    • 点击前记录 main = self.driver.current_window_handle,打开后用 for h in self.driver.window_handles: if h != main: switch;关闭后 switch_to.window(main),并在异常时做兜底。

视频播放检测

    • 目前用 JS 检查 paused/currentTime,不错。但很多视频页是 H5 包装器或自定义播放器。

    • 再加一层兜底——每隔 20–30s 触发一次 arguments[0].play() 和 "scrollIntoView";若 video 元素获取不到,尝试点击可见的播放按钮(你已做)并兼容 aria-label="播放"/含"播放"字样的元素。

积分解析健壮性

    • 标题/文案经常改名("选读文章/我要选读文章/时长/分钟"等)。

    • 关键词列表配置化;解析 "当前/目标" 时优先用 正则提取数字,不要依赖斜杠 / 分隔。

驱动获取的离线兜底

    • 你已做多路径检测+手输路径,非常好。

    • 在日志里打印 Edge 版本 与 驱动版本 不一致时的提示,减少"版本不匹配"坑。

异常截图 & 页面源保存

    • 定位问题很关键:在 except 分支里保存 screenshot.png 和 page.html 到工作目录,日志里打印保存路径。

登录态持久化(可选)

    • 指定 用户数据目录edge_options.add_argument(f'--user-data-dir={path}')),保持登录 cookie;

    • 或导出/导入 cookie(注意同源策略与有效期)。



注意

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。


历史脚本txt文件获取>>
服务器搭建,人工服务咨询>>


没有评论:

发表评论

内地创业者注册香港公司之优势解析

随着我国改革开放和市场经济的深入发展,越来越多的内地创业者将目光投向了香港这片充满机遇的土地。 随着我国改革开放和市场经济的深入发展,越来越多的内地创业者将目光投向了香港这片充满机遇的土地。香港作为国际金融中心,其独特的地理位置、开放的市场环境以及优惠的税收政策,使得注册香港...