亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

系統設計:消息重復消除系統

1. 概要

这篇文章解释了一个系统的设计,该系统处理传入的消息,如果消息在10秒内重复到达,则过滤这些重复的消息。主要目标是确保不同的消息仅在其在过去10秒内未被打印的情况下才会打印。此设计适用于实时系统,其中消息去重对于避免冗余处理或记录至关重要。

第二. 功能需求

功能需求:

  • 系统必须按照消息到达的顺序处理传入的消息。
  • 如果消息在10秒内重复出现,则不应进行打印。
  • 超过10秒窗口后到达的消息应被视为新的消息并打印。

非功能规范:

  • 可扩展性: 该系统应该能够处理来自多个生产者的大量消息。
  • 低延迟: 该系统应该确保消息接收和处理之间的延迟尽可能小。
  • 容错性: 该系统应该能够应对故障,确保消息不会丢失或被错误处理。
  • 并发性: 该系统应该支持并发消息处理,避免出现竞争条件。
3. 高级架构

部件:

  1. 消息生产者: 生成并向系统发送消息。
  2. 消息队列: 接收并缓存传入的消息,确保它们按照到达顺序处理。
  3. 消息处理服务: 处理消息,检查重复消息,并决定是否打印它们。
  4. 去重存储(内存缓存): 存储每个消息最后一次出现的时间戳。
  5. 日志/打印服务: 处理通过去重检查的消息并进行打印。
4. 详细设计方案.

4.1 消息发送者

  • 角色: 生成并向消息队列发送消息。
  • 注意事项:
  • 确保消息在创建或接收时加上时间戳以保持顺序。
  • 生产者应该能够处理网络故障,并在消息队列暂时不可用时重试。

4.2 消息队列

  • 职责: 确保消息按其到达的顺序进行处理,如果处理服务较慢,则对其进行暂存。
  • 技术选择:
  • Apache Kafka: 用于分布式、高吞吐量的消息处理。
  • RabbitMQ: 用于可靠的消息传递和多种路由选项支持。
  • AWS SQS: 一种提供可扩展性的全托管队列服务。
  • 注意事项:
  • 保持消息的先进先出处理至关重要。

4.3 消息处理服务

  • 职责: 处理每一条消息,利用去重存储检查重复消息,并在通过检查后打印该消息。
  • 组件:
  • 工作者池: 一组用于并行处理消息的工作节点。
  • 去重逻辑:
  • 对于每一条消息,检查去重存储中最近一次出现的记录。
  • 如果消息在最近10秒内已被打印过,则丢弃这条消息。
  • 如果消息是新的或超出10秒的时间窗口,则打印该消息并更新存储。
  • 并发控制机制: 使用分布式锁或缓存中的原子操作来防止竞态条件。
  • 容错: 实现重试机制以处理消息处理失败。

4.4 去重存储区(内存缓存区)

  • 角色说明: 存储每个消息最后一次被打印的时间戳。
  • 技术选择:
  • Redis: 一个支持TTL(生存时间)的内存键值存储,适合快速查找和自动过期。
  • Memcached: 一个简单的高速缓存替代方案,虽然它不如Redis功能丰富。
  • 工作流:
  • 键: 消息的内容(或其哈希值)。
  • 值: 消息最后一次被打印的时间戳。
  • TTL: 设置为10秒,以确保旧消息自动过期。
  • 考虑因素:
  • 确保低延迟操作,以避免消息处理速度变慢。
  • 缓存应是分布式或可扩展的,以处理大量独特消息。

4.5 日志打印服务

  • 角色: 负责打印或记录所有通过去重检查的消息,以确保信息准确无误。
  • 注意事项:
  • 幂等性: 确保消息不会被重复打印。
  • 可靠性: 消息应被可靠且持久地记录下来。
5. 序列图(序列图)
  1. 消息到达: 生产者发来了一条消息。
  2. 入队: 消息进入消息队列。
  3. 处理: 开始处理消息。
  • 消息处理服务会取出消息。

  • 它检查重复数据删除存储,查看该消息上次打印的时间。

  • 如果10秒内打印过,它将被丢弃。

  • 如果消息是新的,或上次打印的时间超过10秒,则打印该消息,然后更新存储中的时间戳。

4.记录日志: 消息会被打印或记录下来。

6. 性能方面的考虑
  • 吞吐量: 确保系统可以处理高消息量,特别是在高峰期。
  • 延迟: 系统应尽量减少从接收到处理每条消息的时间延迟。
  • 可扩展性: 系统应能够通过增加更多的工作节点来横向扩展,同时拆分消息队列和去重存储。
  • 容错性: 系统应能优雅地从故障中恢复,确保没有消息丢失或处理错误。
7. 示例实现代码(Python)

这里展示了一种使用Redis来进行去重存储的消息处理方式。

    import redis  
    import time  

    class MessageHandler:  
        def __init__(self):  
            self.redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
        def process_message(self, message: str):  
            current_time = time.time()  
            last_time = self.redis_client.get(message)  
            if last_time is None or current_time - float(last_time) >= 10:  
                # 打印消息  
                print(message)  
                self.redis_client.set(message, current_time, ex=10)  # ex=10 设置10秒过期时间  
    # 示例用法:  
    handler = MessageHandler()  
    # 模拟收到的消息  
    handler.process_message("Hello")  
    time.sleep(5)  
    handler.process_message("Hello")  # 重发,不应打印  
    time.sleep(6)  
    handler.process_message("Bye")    # 新消息,应打印  
    handler.process_message("Hi")     # 新消息,应打印
點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消