1. 概要
这篇文章解释了一个系统的设计,该系统处理传入的消息,如果消息在10秒内重复到达,则过滤这些重复的消息。主要目标是确保不同的消息仅在其在过去10秒内未被打印的情况下才会打印。此设计适用于实时系统,其中消息去重对于避免冗余处理或记录至关重要。
功能需求:
- 系统必须按照消息到达的顺序处理传入的消息。
- 如果消息在10秒内重复出现,则不应进行打印。
- 超过10秒窗口后到达的消息应被视为新的消息并打印。
非功能规范:
- 可扩展性: 该系统应该能够处理来自多个生产者的大量消息。
- 低延迟: 该系统应该确保消息接收和处理之间的延迟尽可能小。
- 容错性: 该系统应该能够应对故障,确保消息不会丢失或被错误处理。
- 并发性: 该系统应该支持并发消息处理,避免出现竞争条件。
部件:
- 消息生产者: 生成并向系统发送消息。
- 消息队列: 接收并缓存传入的消息,确保它们按照到达顺序处理。
- 消息处理服务: 处理消息,检查重复消息,并决定是否打印它们。
- 去重存储(内存缓存): 存储每个消息最后一次出现的时间戳。
- 日志/打印服务: 处理通过去重检查的消息并进行打印。
4.1 消息发送者
- 角色: 生成并向消息队列发送消息。
- 注意事项:
- 确保消息在创建或接收时加上时间戳以保持顺序。
- 生产者应该能够处理网络故障,并在消息队列暂时不可用时重试。
4.2 消息队列
- 职责: 确保消息按其到达的顺序进行处理,如果处理服务较慢,则对其进行暂存。
- 技术选择:
- Apache Kafka: 用于分布式、高吞吐量的消息处理。
- RabbitMQ: 用于可靠的消息传递和多种路由选项支持。
- AWS SQS: 一种提供可扩展性的全托管队列服务。
- 注意事项:
- 保持消息的先进先出处理至关重要。
4.3 消息处理服务
- 职责: 处理每一条消息,利用去重存储检查重复消息,并在通过检查后打印该消息。
- 组件:
- 工作者池: 一组用于并行处理消息的工作节点。
- 去重逻辑:
- 对于每一条消息,检查去重存储中最近一次出现的记录。
- 如果消息在最近10秒内已被打印过,则丢弃这条消息。
- 如果消息是新的或超出10秒的时间窗口,则打印该消息并更新存储。
- 并发控制机制: 使用分布式锁或缓存中的原子操作来防止竞态条件。
- 容错: 实现重试机制以处理消息处理失败。
4.4 去重存储区(内存缓存区)
- 角色说明: 存储每个消息最后一次被打印的时间戳。
- 技术选择:
- Redis: 一个支持TTL(生存时间)的内存键值存储,适合快速查找和自动过期。
- Memcached: 一个简单的高速缓存替代方案,虽然它不如Redis功能丰富。
- 工作流:
- 键: 消息的内容(或其哈希值)。
- 值: 消息最后一次被打印的时间戳。
- TTL: 设置为10秒,以确保旧消息自动过期。
- 考虑因素:
- 确保低延迟操作,以避免消息处理速度变慢。
- 缓存应是分布式或可扩展的,以处理大量独特消息。
4.5 日志打印服务
- 角色: 负责打印或记录所有通过去重检查的消息,以确保信息准确无误。
- 注意事项:
- 幂等性: 确保消息不会被重复打印。
- 可靠性: 消息应被可靠且持久地记录下来。
- 消息到达: 生产者发来了一条消息。
- 入队: 消息进入消息队列。
- 处理: 开始处理消息。
-
消息处理服务会取出消息。
-
它检查重复数据删除存储,查看该消息上次打印的时间。
-
如果10秒内打印过,它将被丢弃。
- 如果消息是新的,或上次打印的时间超过10秒,则打印该消息,然后更新存储中的时间戳。
4.记录日志: 消息会被打印或记录下来。
6. 性能方面的考虑- 吞吐量: 确保系统可以处理高消息量,特别是在高峰期。
- 延迟: 系统应尽量减少从接收到处理每条消息的时间延迟。
- 可扩展性: 系统应能够通过增加更多的工作节点来横向扩展,同时拆分消息队列和去重存储。
- 容错性: 系统应能优雅地从故障中恢复,确保没有消息丢失或处理错误。
这里展示了一种使用Redis来进行去重存储的消息处理方式。
import redis
import time
class MessageHandler:
def __init__(self):
self.redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
def process_message(self, message: str):
current_time = time.time()
last_time = self.redis_client.get(message)
if last_time is None or current_time - float(last_time) >= 10:
# 打印消息
print(message)
self.redis_client.set(message, current_time, ex=10) # ex=10 设置10秒过期时间
# 示例用法:
handler = MessageHandler()
# 模拟收到的消息
handler.process_message("Hello")
time.sleep(5)
handler.process_message("Hello") # 重发,不应打印
time.sleep(6)
handler.process_message("Bye") # 新消息,应打印
handler.process_message("Hi") # 新消息,应打印
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦