高并发设计模式
在 MiroFish 项目中,定时新闻采集涉及大量的并发任务处理。本文梳理常见的高并发解决方案及其在项目中的实践。
项目实践: 高并发方案贯穿多个项目——云智汇的优惠券领取(RabbitMQ + Redisson)、播放进度合并写库(Redis Hash + DelayQueue)、hm-dianping的缓存三连防护和秒杀系统。
消息队列解耦
核心思想
将"任务的生产"和"任务的执行"解耦,通过消息队列作为中间缓冲层:
生产者(APScheduler) → 消息队列(RabbitMQ) → 消费者(Celery Worker)优势
- 削峰填谷:突发流量被队列缓冲,消费者按自身能力处理
- 异步处理:生产者无需等待任务完成,提高响应速度
- 可扩展:可以随时增加消费者实例,水平扩展处理能力
RabbitMQ 关键概念
| 概念 | 说明 |
|---|---|
| Exchange | 接收生产者消息并路由到队列 |
| Queue | 存储消息的缓冲区 |
| Binding | Exchange 和 Queue 之间的路由规则 |
| Consumer | 监听队列并处理消息的程序 |
Exchange 类型:
- Direct:精确匹配 routing key
- Topic:通配符匹配 routing key
- Fanout:广播到所有绑定的队列
- Headers:基于消息头属性匹配
分布式任务调度
APScheduler + Celery 协作
APScheduler(定时触发)
↓ 触发
Celery Producer(投递任务)
↓ 投递
RabbitMQ(消息队列)
↓ 分发
Celery Worker × N(分布式执行)APScheduler 的角色: 定时触发器,按照 Cron 或间隔表达式定时向 Celery 投递采集任务。
Celery 的角色: 分布式任务执行框架,管理 Worker 进程池,处理任务的分发、执行、重试、结果存储。
Celery 关键配置
python
# 任务路由
task_routes = {
'tasks.collect_news': {'queue': 'news_collect'},
'tasks.process_data': {'queue': 'data_process'},
}
# 并发控制
worker_concurrency = 4 # 每个 Worker 的并发数
# 任务超时
task_time_limit = 300 # 单任务最长执行时间(秒)
task_soft_time_limit = 240 # 软超时,抛出异常而非强制终止
# 重试策略
autoretry_for = (Exception,)
max_retries = 3
retry_backoff = True # 指数退避重试缓存策略
多级缓存架构
请求 → 本地缓存(Guava/Caffeine) → Redis → MySQL- L1 本地缓存:毫秒级响应,容量有限,适合热点数据
- L2 Redis:共享缓存,支持集群,适合大部分查询场景
- L3 MySQL:持久化存储,数据全量
缓存更新策略
- Cache Aside:读时填充,写时失效(最常用)
- Write Through:写缓存时同步写数据库
- Write Behind:写缓存后异步批量写数据库
限流与降级
限流算法
| 算法 | 原理 | 适用场景 |
|---|---|---|
| 令牌桶 | 以固定速率往桶中放令牌,请求消耗令牌 | 允许突发流量 |
| 漏桶 | 以固定速率从桶中流出请求 | 平滑输出 |
| 计数器 | 固定时间窗口内限制请求数 | 简单场景 |
| 滑动窗口 | 细分时间窗口,更精确的限流 | 精确控制 |
降级策略
当系统压力过大时,主动降低服务质量以保障核心功能:
- 返回缓存的旧数据而非实时查询
- 关闭非核心功能(推荐、统计等)
- 返回降级响应(如"系统繁忙,请稍后再试")
MiroFish 中的实践
采集链路的并发设计
APScheduler 每30分钟触发
↓
按关键词拆分为多个子任务
↓
并行投递到 RabbitMQ(不同关键词不同 routing key)
↓
多个 Celery Worker 并行采集
↓
结果汇总 → 去重 → 入库去重三层防线
- MD5 内容哈希:
hashlib.md5(content.encode()).hexdigest(),完全相同的内容直接跳过 - 时间窗口去重:同一来源 5 分钟内的重复数据合并
- 语义去重:基于向量余弦相似度,threshold > 0.95 视为重复
面试回答要点
面试官问:你的采集链路怎么处理并发?
我用 APScheduler 做定时触发,Celery + RabbitMQ 做分布式任务队列。APScheduler 按间隔触发采集任务,把任务拆分为按关键词的子任务并行投递到 RabbitMQ,多个 Celery Worker 消费处理。这种架构天然支持水平扩展——增加 Worker 数量就能提升吞吐量。同时通过三层去重(MD5 哈希、时间窗口、语义相似度)保证数据质量。