""" 抢购核心服务 — 优化版 优化点: 1. 提前预热:开售前就打开页面并保持连接 2. 提前刷新:开售前 500ms 发起 reload,用 commit 级别不等渲染 3. 并发多 tab:同时开 2 个 tab 竞争抢购,谁先成功算谁的 4. 极速点击:不等 sleep,用 waitForSelector 替代固定等待 5. 重试间隔极短:50ms 级别快速重试 """ import asyncio from playwright.async_api import async_playwright from utils.stealth import stealth_async from utils.timer import PrecisionTimer from server.services.auth_service import get_browser_context, has_auth from server.database import get_db from datetime import datetime # 并发 tab 数量 CONCURRENT_TABS = 2 # 最大重试次数(每个 tab) MAX_RETRIES = 5 # 购买按钮文案 BUY_TEXTS = ["立即抢购", "立即购买", "马上抢", "立即秒杀"] async def run_snatch(task_id): """执行单个抢购任务""" db = get_db() task = db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)).fetchone() if not task: return account_id = task['account_id'] if not has_auth(account_id): _update_task(db, task_id, 'failed', '账号未登录') return target_url = task['target_url'] if not target_url or not target_url.strip(): _update_task(db, task_id, 'failed', '商品链接为空,请检查购物车同步是否获取到了 itemID') return _update_task(db, task_id, 'running', '正在准备...') timer = PrecisionTimer() timer.sync_time() try: async with async_playwright() as p: browser, context = await get_browser_context( p, account_id, headless=True) # ── 1. 预热:打开商品页面 ── _update_task(db, task_id, 'running', '预热:打开商品页面...') page = await context.new_page() await stealth_async(page) await page.goto(target_url, wait_until='networkidle', timeout=20000) # 检查页面状态 if 'login' in page.url.lower(): _update_task(db, task_id, 'failed', '登录态已过期') await browser.close() return body_text = await page.locator('body').text_content() if '商品不存在' in (body_text or '') or '已下架' in (body_text or ''): _update_task(db, task_id, 'failed', '商品不存在或已下架') await browser.close() return # ── 2. 等待抢购时间 ── snatch_time = task['snatch_time'] if snatch_time: _update_task(db, task_id, 'running', f'等待开售: {snatch_time}') # 提前 500ms 触发,因为 reload 本身需要时间 await timer.wait_until_early(snatch_time, early_ms=500) # ── 3. 并发抢购 ── _update_task(db, task_id, 'running', '开始抢购...') # 创建多个 tab 并发竞争 pages = [page] for _ in range(CONCURRENT_TABS - 1): try: p2 = await context.new_page() await stealth_async(p2) await p2.goto(target_url, wait_until='commit', timeout=10000) pages.append(p2) except Exception: pass # 所有 tab 并发执行抢购 tasks_coro = [_do_purchase_fast(pg, i) for i, pg in enumerate(pages)] results = await asyncio.gather(*tasks_coro, return_exceptions=True) # 取第一个成功的结果 result = None for r in results: if isinstance(r, str) and ('已提交' in r or '已发送' in r): result = r break if not result: # 没有成功的,取最后一个非异常结果 for r in results: if isinstance(r, str): result = r if not result: result = f"抢购失败: {results}" if '已提交' in result or '已发送' in result: _update_task(db, task_id, 'completed', result) db.execute( 'INSERT INTO orders (task_id, account_id, status, detail)' ' VALUES (?, ?, ?, ?)', (task_id, account_id, 'submitted', result)) else: _update_task(db, task_id, 'failed', result) db.execute( 'INSERT INTO orders (task_id, account_id, status, detail)' ' VALUES (?, ?, ?, ?)', (task_id, account_id, 'failed', result)) db.commit() await asyncio.sleep(3) await browser.close() except Exception as e: _update_task(db, task_id, 'failed', str(e)) finally: db.close() async def _do_purchase_fast(page, tab_index=0): """ 极速购买流程(单个 tab): 1. reload 用 commit 级别,不等完整渲染 2. 用 locator.wait_for 替代固定 sleep 3. 重试间隔极短 """ for attempt in range(MAX_RETRIES): try: # ── 刷新页面 ── # 用 commit 级别:收到第一个字节就继续,不等 DOM 完整加载 await page.reload(wait_until='commit', timeout=8000) # 短暂等待让关键 DOM 出现(比 networkidle 快很多) await asyncio.sleep(0.3) # ── 点击购买按钮 ── buy_btn = None for text in BUY_TEXTS: loc = page.get_by_text(text, exact=False) try: await loc.first.wait_for(state="visible", timeout=1500) buy_btn = loc.first break except Exception: continue if not buy_btn: # 按钮没出现,可能页面还没加载完或还没开售 if attempt < MAX_RETRIES - 1: await asyncio.sleep(0.05) # 50ms 后重试 continue return f"tab{tab_index}: 未找到购买按钮" await buy_btn.click(timeout=2000) # ── 处理 SKU 弹窗 ── try: confirm_btn = page.get_by_text("确定", exact=True) await confirm_btn.first.wait_for(state="visible", timeout=1500) # 选第一个可用 SKU sku_sel = ('.sku-item:not(.disabled), ' '.sku_item:not(.disabled), ' '[class*="sku"] [class*="item"]' ':not([class*="disabled"])') sku_items = page.locator(sku_sel) if await sku_items.count() > 0: await sku_items.first.click() await asyncio.sleep(0.1) await confirm_btn.first.click(timeout=2000) except Exception: # 没有 SKU 弹窗,直接继续 pass # ── 提交订单 ── submit_btn = page.get_by_text("提交订单") await submit_btn.wait_for(state="visible", timeout=6000) await submit_btn.click() return f"tab{tab_index}: 抢购请求已提交" except Exception as e: if attempt < MAX_RETRIES - 1: await asyncio.sleep(0.05) continue return f"tab{tab_index}: 抢购失败: {e}" return f"tab{tab_index}: 重试次数用尽" def _update_task(db, task_id, status, result): db.execute( "UPDATE tasks SET status = ?, result = ?, updated_at = ? WHERE id = ?", (status, result, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), task_id)) db.commit()