From 2427237116342a6a7eb6603818f60268f1fdf813 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Wed, 18 Mar 2026 13:58:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BB=BB=E5=8A=A1=EF=BC=8C?= =?UTF-8?q?=E6=89=8B=E5=8A=A8=E8=A7=A6=E5=8F=91=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/task_scheduler/app/main.py | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/backend/task_scheduler/app/main.py b/backend/task_scheduler/app/main.py index cb2e269..d7d2c74 100644 --- a/backend/task_scheduler/app/main.py +++ b/backend/task_scheduler/app/main.py @@ -44,6 +44,9 @@ _registered_tasks: dict = {} # task_id -> cron_expression SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) CLEANUP_BATCH_SIZE = 1000 +# Redis 订阅线程是否运行 +_redis_listener_running = False + WEIBO_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " @@ -384,10 +387,49 @@ async def _async_cleanup(): return {"deleted": total_deleted, "cutoff": cutoff.isoformat()} +# =============== Redis 订阅(实时同步任务变更) =============== + +def _start_redis_listener(): + """启动 Redis pub/sub 监听线程,收到 task_updates 消息后立即同步。""" + global _redis_listener_running + if not shared_settings.USE_REDIS or not shared_settings.REDIS_URL: + logger.info("⚠️ Redis 未启用,任务变更仅靠 5 分钟轮询同步") + return + + import threading + import redis + + def _listen(): + global _redis_listener_running + _redis_listener_running = True + logger.info("📡 Redis 订阅线程启动,监听 task_updates 频道") + while _redis_listener_running: + try: + r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) + pubsub = r.pubsub() + pubsub.subscribe("task_updates") + for message in pubsub.listen(): + if not _redis_listener_running: + break + if message["type"] == "message": + logger.info(f"📡 收到任务变更通知: {message['data'][:200]}") + sync_db_tasks() + pubsub.close() + r.close() + except Exception as e: + logger.warning(f"Redis 订阅异常,5 秒后重连: {e}") + _time.sleep(5) + + t = threading.Thread(target=_listen, daemon=True, name="redis-listener") + t.start() + + # =============== 启动入口 =============== def _shutdown(signum, frame): + global _redis_listener_running logger.info(f"收到信号 {signum},正在关闭调度器...") + _redis_listener_running = False scheduler.shutdown(wait=False) sys.exit(0) @@ -407,6 +449,9 @@ if __name__ == "__main__": # 首次同步 DB 任务 sync_db_tasks() + # 启动 Redis 订阅,实时接收任务变更通知 + _start_redis_listener() + # 每 5 分钟重新同步 DB(处理新增/删除/修改的任务) scheduler.add_job( sync_db_tasks,