Skip to content

高并发设计模式

⏱ 阅读约 12 分钟 ⭐ 核心概念 🔗 项目应用:云智汇 + hm-dianping + sky-take-out

在 MiroFish 项目中,定时新闻采集涉及大量的并发任务处理。本文梳理常见的高并发解决方案及其在项目中的实践。

项目实践: 高并发方案贯穿多个项目——云智汇的优惠券领取(RabbitMQ + Redisson)、播放进度合并写库(Redis Hash + DelayQueue)、hm-dianping的缓存三连防护和秒杀系统。

消息队列解耦

核心思想

将"任务的生产"和"任务的执行"解耦,通过消息队列作为中间缓冲层:

生产者(APScheduler) → 消息队列(RabbitMQ) → 消费者(Celery Worker)

优势

  • 削峰填谷:突发流量被队列缓冲,消费者按自身能力处理
  • 异步处理:生产者无需等待任务完成,提高响应速度
  • 可扩展:可以随时增加消费者实例,水平扩展处理能力

RabbitMQ 关键概念

概念说明
Exchange接收生产者消息并路由到队列
Queue存储消息的缓冲区
BindingExchange 和 Queue 之间的路由规则
Consumer监听队列并处理消息的程序

Exchange 类型:

  • Direct:精确匹配 routing key
  • Topic:通配符匹配 routing key
  • Fanout:广播到所有绑定的队列
  • Headers:基于消息头属性匹配

分布式任务调度

APScheduler + Celery 协作

APScheduler(定时触发)
    ↓ 触发
Celery Producer(投递任务)
    ↓ 投递
RabbitMQ(消息队列)
    ↓ 分发
Celery Worker × N(分布式执行)

APScheduler 的角色: 定时触发器,按照 Cron 或间隔表达式定时向 Celery 投递采集任务。

Celery 的角色: 分布式任务执行框架,管理 Worker 进程池,处理任务的分发、执行、重试、结果存储。

Celery 关键配置

python
# 任务路由
task_routes = {
    'tasks.collect_news': {'queue': 'news_collect'},
    'tasks.process_data': {'queue': 'data_process'},
}

# 并发控制
worker_concurrency = 4  # 每个 Worker 的并发数

# 任务超时
task_time_limit = 300  # 单任务最长执行时间(秒)
task_soft_time_limit = 240  # 软超时,抛出异常而非强制终止

# 重试策略
autoretry_for = (Exception,)
max_retries = 3
retry_backoff = True  # 指数退避重试

缓存策略

多级缓存架构

请求 → 本地缓存(Guava/Caffeine) → Redis → MySQL
  • L1 本地缓存:毫秒级响应,容量有限,适合热点数据
  • L2 Redis:共享缓存,支持集群,适合大部分查询场景
  • L3 MySQL:持久化存储,数据全量

缓存更新策略

  • Cache Aside:读时填充,写时失效(最常用)
  • Write Through:写缓存时同步写数据库
  • Write Behind:写缓存后异步批量写数据库

限流与降级

限流算法

算法原理适用场景
令牌桶以固定速率往桶中放令牌,请求消耗令牌允许突发流量
漏桶以固定速率从桶中流出请求平滑输出
计数器固定时间窗口内限制请求数简单场景
滑动窗口细分时间窗口,更精确的限流精确控制

降级策略

当系统压力过大时,主动降低服务质量以保障核心功能:

  • 返回缓存的旧数据而非实时查询
  • 关闭非核心功能(推荐、统计等)
  • 返回降级响应(如"系统繁忙,请稍后再试")

MiroFish 中的实践

采集链路的并发设计

APScheduler 每30分钟触发

按关键词拆分为多个子任务

并行投递到 RabbitMQ(不同关键词不同 routing key)

多个 Celery Worker 并行采集

结果汇总 → 去重 → 入库

去重三层防线

  1. MD5 内容哈希hashlib.md5(content.encode()).hexdigest(),完全相同的内容直接跳过
  2. 时间窗口去重:同一来源 5 分钟内的重复数据合并
  3. 语义去重:基于向量余弦相似度,threshold > 0.95 视为重复

面试回答要点

面试官问:你的采集链路怎么处理并发?

我用 APScheduler 做定时触发,Celery + RabbitMQ 做分布式任务队列。APScheduler 按间隔触发采集任务,把任务拆分为按关键词的子任务并行投递到 RabbitMQ,多个 Celery Worker 消费处理。这种架构天然支持水平扩展——增加 Worker 数量就能提升吞吐量。同时通过三层去重(MD5 哈希、时间窗口、语义相似度)保证数据质量。

AI 应用开发 / Agent 开发实习生