梳理一团乱麻似的代码是一项每个资深工程师都曾面临的挑战。这是一段充满挫败感的经历,一场与复杂性斗争的较量。这时,大家引入了仓库模式,为那些面对混乱的代码的工程团队提供了一份指导,尤其是在业务逻辑和数据存储纠缠不清的情况下。在这篇文章里,我将展示这个模式如何帮助我们重构旧代码,使之更加清晰,并为未来的扩展和维护打好基础。
先来简单说一下背景
这一切始于2021年春天,我们开始了一个项目,旨在为遇到入站数据同步问题时的客户提供更多实用警报。
我们用来发现这些问题并通过向客户发出警告的系统叫做 IssueLog
,名为 IssueLog
的系统。
IssueLog
系统会在用户的数据同步出现问题时向用户发送警告。然而,这些警告提供的信息很少,无法帮助用户解决问题。例如,信息“您的定期同步不再正常工作”并不能指导用户如何解决问题。我们的目标是让这些警告更有操作性。例如,信息为“您的定期同步出现身份验证错误。您可以按照以下方法解决此问题。”
我们仔细阅读了产品规格,审查了代码,很快发现自己处于经典的Tidy First?场景中:除非我们先花时间重构IssueLog
代码,否则这些看似微小的改动会花费大量工程资源。例如,我们的业务逻辑和数据访问逻辑之间耦合紧密,虽然功能相似但差异细微的许多公共服务方法使得很难确定该使用哪些方法。
我们开始了一项任务来优化IssueLog代码的组织和结构,这将使添加新功能和编写这些功能的自动化测试变得更加简单。
我们知道我们想要转向一种分层结构(类似于产品中的其他许多部分),这种结构将代码组织成水平的分层结构,每一层在应用程序中都扮演特定的角色。
多年来,IssueLog
代码多年来一直被忽视——当你看到来自我们 CEO AB 的提交时,你就知道你正在处理老旧的代码了。它已经成为 Klaviyo 最古老的代码之一,并且变得非常混乱,并因此变成了一个[Big Ball of Mud]。我们的主要目标是为其提供结构、模块化以及职责分离。
代码库也早于我们在数据访问层上的采用。而在Klaviyo中,数据访问层(DAL)是一个常见的模式。我们广泛使用DAL,但这也导致了代码库中该模式实现的一致性问题,我们希望解决这个问题。我们正在寻求一致性。
进入仓库模式概念部分。
什么是仓库模式(Repository Pattern)?我们第一次在我们每周的读书小组中遇到了仓储模式,当时我们正在读一本名为《用Python构建大型软件的架构模式》的书。这是一本非常好的书,我强烈推荐给所有想提升Python技能的朋友们。这本书有免费在线版这里。书中如是说,
“仓库模式是对持久存储的一种抽象层。它通过假装所有的数据都在内存里,来隐藏数据访问的那些繁琐的细节。”(《Python架构模式》第二章中)
仓库模式的核心思想是[依赖倒置]。像Django ORM或SQLAlchemy这样的对象关系映射器增加了领域模型与其在数据库中的表示(即数据库模式)之间的耦合。仓库模式倒置了这种依赖关系,认为领域模型不应该依赖任何其他部分,而ORM应该依赖领域模型,而不是相反的方向。
当你应用层需要读写数据时,会传递领域模型对象给仓储层,而仓储层负责处理数据库交互。因此,你的应用层永远不会直接处理原始ORM模型(例如Django)。这样做有以下好处:职责分离和代码隔离及封装,这不仅有利于服务边界定义,也有利于领域分解。
(Python架构模式,第二章)(https://www.oreilly.com/library/view/architecture-patterns-with/9781492052197/)
我们一步步来看一个例子我们可以用一个虚构的例子来说明这种模式。假设我们有一个名为“Product”的数据模型,它包含库存保管制代码(SKU)、名称以及库存数量。
class Product(BaseModel):
sku: 字符串字段()
名称: 字符串字段()
库存数量: 整数字段()
假设我们有一个处理我们产品的服务。该服务提供一个公共方法,在有订单时更新库存中的产品。
class ProductService:
def update_product_inventory(self, sku: str, amount: int):
product = Product.objects.get(sku=sku)
# 确保库存充足。
if product.inventory_quantity - amount >= 0:
product.inventory_quantity = product.inventory_quantity - amount
product.save(update_fields=["inventory_quantity"])
else:
# 引发一个异常
...
这个道理其实挺简单的:
- 检索具有给定SKU的产品。
- 如果当前库存数量与给定数量之间的差值大于或等于零,则更新库存数量并将对象保存到数据库中。
- 否则,抛出一个异常。
现在,让我们将其转换为使用仓库模式(Repository Pattern)。首先,我们需要一个领域模型对象。
@dataclass
class 产品模型:
sku: str
商品名: str
库存量: int
id: int | None = None
我们使用 Python 的 dataclasses
来定义领域模型对象,当然,还有其他的库,例如 attrs
,效果也很不错。接下来,我们来创建仓库。
class ProductRepository:
def get(self, sku: str) -> ProductDomainModel:
product = Product.objects.get(sku=sku)
# 将数据库模型转换为领域模型
return self._to_domain(product)
def create_or_update(self, product: ProductDomainModel) -> ProductDomainModel:
# 忽略 update_or_create 的第二个输出
product, _ = Product.objects.update_or_create(
id=product.id,
defaults={
"sku": product.sku,
"name": product.name,
"inventory_quantity": product.inventory_quantity,
}
)
# 将数据库模型转换为领域模型
return self._to_domain(product)
def _to_domain(self, django_model: Product) -> ProductDomainModel:
# 将数据库模型转换为领域模型
return ProductDomainModel(
id=django_model.id,
sku=django_model.sku,
name=django_model.name,
inventory_quantity=django_model.inventory_quantity
)
注意,每个方法都返回一个 ProductDomainModel 对象,这意味着这个仓库类是我们操作 ORM 唯一的地方。
还有一个值得一提的地方是 _to_domain
方法。在使用仓库模式的情况下,你需要将领域模型对象映射到 ORM 模型对象。许多ORM,比如 SQLAlchemy,自带了映射工具。然而 Django ORM 却没有内置这种映射工具。
从现在起,上游服务类不再需要直接与ORM打交道。
class ProductService:
def __init__(self, repository: AbstractProductRepository):
self._repository = repository
def update_product_inventory(self, sku: str, amount: int) -> None:
product = self._repository.get(sku=sku)
# 确保库存充足,如果库存数量减去数量大于等于0,
if product.inventory_quantity - amount >= 0:
product.inventory_quantity = product.inventory_quantity - amount
self._repository.create_or_update(product)
else:
# 抛异常
...
它也很精简,仅包含业务逻辑。服务类并不了解产品存储的具体细节;这些职责已经委托给仓储抽象处理,并被隐藏起来。
Repository 模式的好处有哪些 支持域分离使用仓库模式的第一个也是最明显的原因是实现领域分离。比如我们使用的是Django,不让应用程序代码访问底层的ORM模型有助于建立服务边界。你的高层服务,也就是业务逻辑所在的地方,只能执行仓库指定的存储操作,而不知道数据是如何存储的。应用程序层只知道领域模型及其相关的业务逻辑,而仓库层则负责封装所有的存储相关操作。
让代码更容易测试仓库模式让测试应用程序层变得更加容易,因为它允许利用依赖注入在测试中创建模拟的数据库交互。假设你有一个应用程序级别的服务,它从数据库读取并执行一些业务逻辑。你不必弄清楚具体的数据库调用并模拟这些调用来返回测试所需的数据,而是可以实现一个返回你需要的数据的模拟仓库,并将这个模拟仓库传递给你的服务类。
比如说,我们想为 ProductService.update_product_inventory
方法编写一个单元测试。首先,我们要创建一个实现与我们的 ProductRepository
相同接口的模拟仓库:
class FakeProductRepository:
def get(self, sku: str) -> ProductDomainModel:
return ProductDomainModel(
sku=sku,
name="Fake Product",
inventory_quantity=1,
)
def create_or_update(self, product: ProductDomainModel) -> ProductDomainModel:
return product
注意,我们的假代码库实际上并不与数据库交互,它仅仅返回虚构的数据。
我们的测试大概会是这样的:
def test_update_product_inventory():
product_service = ProductService(repository=FakeProductRepository())
# 测试确保按照业务规则正确更新产品库存等
...
更进一步了解仓库模式如何帮助测试,请参考Architecture Patterns with Python。
简化数据存储技术更换工作仓库层提供了一套读写数据的接口,应用层只需与仓库层进行交互。例如,如果你决定将持久层从 MySQL 切换到 Postgres,你只需要创建一个与 MySQL 仓库接口相同的 Postgres 仓库即可。除了更换仓库外,应用代码无需做任何其他改动。
日志重构问题到目前为止,我们已经介绍了仓库模式背后的理论,并给出了一例人为编造的例子。现在让我们来看看我们如何改造我们的代码库,让IssueLog
系统焕然一新,从而轻松实现客户所看到的功能:提醒他们如何解决入站数据同步问题的提醒功能。
记住,这次重构的目的是让代码结构更清晰,从而更容易添加新功能。有无数种不同的设计模式可以用来重构混乱的代码库。当我们第一次读到仓库模式时,我们觉得这更像是我们在代码库中使用的DAL的一个更高级且更严格的版本。我们已经熟悉了DAL,希望向分层架构迈进,并看看仓库模式中设定的规则是否有助于我们在整个代码库中实现DAL的一致性。
数据模型IssueLog
系统采用了一个简单的数据模型:一个Issue
表用来记录客户数据同步中出现的错误,以及一个Log
表用来记录该问题的具体发生情况。一个Issue
可以关联多个Log
,每个Log
都对应特定的Issue
。以下是Django ORM定义这些模型的简化版本:
# 代表一个问题的类
class Issue(BaseModel):
# 问题代码
code = models.IntegerField()
# 服务ID
service_id = models.IntegerField()
# 最新的日志ID
latest_log_id = models.IntegerField()
# 状态,默认为开放状态
status = models.IntegerField(default=STATUS_OPEN) # 表示问题的状态为开放状态
# 创建时间
created = models.DateTimeField(auto_now_add=True)
# 更新时间, 自动更新
updated = models.DateTimeField(db_index=True, auto_now=True)
# 代表日志的类
class Log(BaseModel):
# 服务ID
service_id = models.IntegerField()
# 日志来源
origin = models.CharField(max_length=255, null=True)
# 关联的问题ID
issue_id = models.IntegerField(db_index=True, null=True)
# 存储日志数据的JSON字段
data = JSONField()
# 创建时间
created = models.DateTimeField(auto_now_add=True)
使用仓库模式(Repository Pattern)进行代码重构
在重构之前,我们有几种不同的服务类,每个服务类都直接调用Django ORM来读写问题和日志。正如我在上面提到的,这部分代码已经很久没有得到关注了,因此我们积累了一些技术债务。总的来说,有31个公共方法分布在几个类里,都在进行类似的数据库查询。
1. IssueLogService记录自定义集成错误
2. IssueLogService生成自定义问题日志负载
3. IssueLogService保存自定义集成错误并可能发送警报
4. IssueLogService应记录
5. IssueLogService记录集成错误
6. IssueLogService屏蔽敏感键
7. IssueLogService创建统计缺口问题
8. LogService获取
9. LogService日志
10. LogService为问题获取日志
11. LogService自某时起的日志
12. LogService最新日志
13. LogService计数
14. LogService通过ID列表获取所有日志
15. LogService创建
16. LogService更新日志
17. IssueService获取问题或无
18. IssueService问题
19. IssueService最新问题
20. IssueService未解决的问题
21. IssueService创建错误
22. IssueService创建警告
23. IssueService创建
24. IssueService更新
25. IssueService最近是否记录了采样问题
26. IssueService标记已被记录的采样问题
27. IssueService修剪问题日志
28. IssueService解决
29. IssueService为集成解决
30. IssueService最近是否自动解决
31. IssueService带有问题的公司集成
这个挑战是要将这段代码修改得易于添加新功能和进行改动。然而,阅读所有31个公共方法并尝试理解每个方法的功能及是否可以合并代码等,这个过程非常困难,让人感到不知所措。因此,我们采用自下而上的方式开始。
- 首先,查找我们与数据库的交互的所有地方(通过ORM)。
- 然后,为这些交互中的每一个创建仓库方法,确保尽可能合并逻辑,使得最终的方法数量相对较少,这些方法通常对应于CRUD操作。
- 最后,用新的仓库方法调用替换所有的直接ORM调用。
我们来看看几个例子:record_new_issue
(以前叫 log_integration_error
)和 most_recent_issue
。
我们从 record_new_issue
开始。这个方法负责创建一个新的 Issue
记录,并且还会创建一个新的 Log
记录与之相关。
在重构之前,它大致是这样的:
def record_new_issue(
self, service_id, ex,
):
data = ex.data
code = ex.code
# 添加一个日志条目
log_entry = Log(
service_id=service_id,
data=data,
)
log_entry.save()
# 检查是否已有该问题
issue = Issue.objects.filter(service_id=service_id, code=code, status=STATUS_OPEN).first()
# 若问题不存在,则创建;否则更新现有问题
if not issue:
issue = Issue(
code=code, service_id=service_id, latest_log_id=log_entry.id, status=STATUS_OPEN
)
issue.save()
else:
issue.latest_log_id = log_entry.id
issue.save()
# 将新的日志条目与问题相关联
log_entry.issue_id = issue.id
log_entry.save()
return issue
完成领域模型的创建并将所有数据库操作移到 Repository
类(一种技术术语)之后:
def record_new_issue(self, service_id, ex):
# 生成一条日志
log = LogDomainModel(
service_id=service_id,
data=ex.data,
)
LogRepository().create_or_update(log)
# 获取或创建问题记录
issue = IssueRepository().get_or_create(service_id, ex, ex.code)
# 设置最新日志ID
issue.latest_log_id = log.id
IssueRepository().create_or_update(issue)
# 将日志与问题关联起来
log.issue_id = issue.id
LogRepository().create_or_update(log)
接下来,让我们重构most_recent_issue
方法。这个方法负责根据给定的service_id
检索最新的记录。原来的代码是这样的:
def 获取最近的问题(self,
service_id, historical=False,
):
问题列表 = Issue.objects.filter(
service_id=service_id,
level=IntegrationIssue.LEVEL_ERROR,
status=IntegrationIssue.STATUS_OPEN,
).order_by("-updated")
# 以防止运行大量的SQL查询。
最近的非定期同步问题 = 问题列表.filter(
monitor_item_id__isnull=True
)[:10]
for 记录 in 最近的非定期同步问题:
日志记录查询 = Log.objects.filter(
service_id=service_id,
issue_id=记录.id,
)
日志记录是否源于历史同步 = 日志记录查询[0].is_origin_historical_sync
# 如果它是历史性的,那么日志记录的来源也必须是历史性的;如果不是,则来源不是历史性的。
if (
historical and 日志记录是否源于历史同步
) or not 日志记录是否源于历史同步:
return 记录
return 问题列表.first()
重构完了以后,那么...
def get_most_recent_issue(
self, service_id: int, is_historical: bool
) -> IntegrationIssue:
issues = IssueRepository().get_many_with_order_and_limit(
service_id=service_id,
order_by=IssueRepository.ORDER_BY_UPDATED,
status=IssueStatus.OPEN,
limit=20,
)
for issue in issues:
logs = LogRepository().get_many_with_order_and_limit(
issue_id=issue.id, limit=1
)
log = next(logs, None)
if is_historical == log.is_origin_historical_sync:
return issue
我们就这样一直做下去,直到所有的服务方法都调到了我们的仓库类。完成之后,我们把业务逻辑和数据存储逻辑彻底分开了。
所以,你只是稍微调整了一下代码。这有什么了不起,大惊小怪?
好了,当我们完成了将所有ORM调用转换为使用新的仓库方法之后,一切都开始变得井井有条。现在所有的数据访问逻辑都集中到了我们的Repository
类中,业务逻辑则是通过传递给这些Repository
类的公共方法的参数实现的,我们注意到,许多公共服务方法之间的唯一区别在于传递给数据访问方法的参数不同。
例如,在我们的 IssueService
中,有三种不同的方式来记录一个新的议题:
1. 记录自定义集成的问题
2. 保存自定义集成的错误并可能发送警报
3. 记录集成的错误
一旦我们将数据存储逻辑从这些方法中移出后,并且就可以更轻松地识别它们之间细微的业务逻辑差异,我们就可以把它们合并为一个名为 record_new_issue
的单一方法。
将这一点应用到这31个方法上,它扩展得很顺畅。重构之后,我们将这些公共服务方法的数量从31个减少到只有10个。这种简化让我们可以把所有的服务方法都合并到一个类IssueLogService
中,这成为了IssueLog
系统的新的公共接口。
这是我们重构后的公共接口们的样子。
1. IssueLogService.get_most_recent_logs_for_issue_id // IssueLogService.获取最近的issue_id日志
2. IssueLogService.get_issue_by_id // IssueLogService.根据ID获取问题
3. IssueLogService.get_most_recent_periodic_sync_issue // IssueLogService.获取最近的周期性同步问题
4. IssueLogService.get_most_recent_issue // IssueLogService.获取最近的问题
5. IssueLogService.get_most_recent_issues_and_logs // IssueLogService.获取最近的问题和日志
6. IssueLogService.resolve_issues // IssueLogService.解决问题
7. IssueLogService.record_new_issue // IssueLogService.记录新问题
8. IssueLogService.delete_issue_logs_before_date // IssueLogService.删除日期之前的日志记录
9. IssueLogService.has_auto_resolved_recently // IssueLogService.最近自动解决了吗
10. IssueLogService.get_data_from_exception // IssueLogService.从异常中获取数据
我觉得这个界面比我们之前的更简洁,也更友好。
additional好处 of the 仓储模式 便于优化数据访问将所有数据访问逻辑统一到一个 Repository
类中,使性能优化更容易进行,例如添加缓存或优化与数据存储的交互方式。
在重构之前,如果我们想要优化查询MySQL数据库的方式,需要在多个不同地方进行修改。由于缺乏集中,也更难识别并追踪那些表现不佳的查询。现在所有的数据访问都集中在一个地方,我们对查询模式有了更清晰的认识,并且优化SQL查询和添加索引变得更加容易。我们还能够轻松地为所有数据访问操作添加监控,而如果数据访问方法分散在许多不同的方法中,这项任务会更难完成。
比如這個 Grafana 仪表板:
另一个副作用是,将所有数据存储操作移到仓库层后,添加错误处理变得更加容易。在网络交互中,瞬时错误(瞬态错误)是很常见的。处理这些错误的一个常见方法是捕获错误并重试,这需要一些额外的代码来实现。
在此之前,增加错误处理机制需要我们在每次访问数据的地方做出相应修改。现在,我们可以在Repository
类中的一个单独位置处理所有错误逻辑。
我们已经采用了仓库模式,并在此基础上我们已经在代码库中引入了多种新用法。然而,每个设计模式都有其权衡利弊,如果我不提及我们在实际应用中遇到的一些问题,比如在性能、维护或扩展性方面遇到的问题,那我就会显得不够全面。
模型对象的映射关系如上面的示例代码所示,我们实现的仓储模式要求我们维护 Django 模型字段和领域模型中的字段之间的显式映射。这意味着每当添加一个字段时,需要在三个不同的地方进行更新。这种做法容易出错,对于刚开始熟悉该模式的工程师来说,这已经引起了相当的困惑。
其他ORM(例如SQLAlchemy)也提供了自动执行这种映射的工具。将来我们可能还会选择为Django ORM开发自己的映射工具,这样就无需显式地维护这些映射了。
复杂性这种模式引入了一层额外的间接性。间接性可能会让代码更难理解,特别是对于新加入的工程师而言。对于简单的服务来说,这种额外的复杂性可能没有必要,然而,随着服务的增长,它在维护成本方面将会体现出其价值。
结尾实现仓库模式来重构混乱且容易出错的代码是一项策略性的举措,可以将混乱且容易出错的代码库转变为干净、可维护且灵活的代码。通过将数据访问层与应用程序的业务逻辑解耦,仓库模式不仅促进了职责分离,还提高了代码的可测试性、可重用性以及数据源的灵活性。这种方法为数据管理和查询操作提供了一个结构良好的解决方案,从而产生一个更高效、更有序的代码库,长期更易工作和维护。采用仓库模式不仅仅是使代码更干净,它还是一种对更可扩展和更可持续的软件架构的承诺。
我们团队致力于这种模式,这体现在我们将它用于所有的新项目,并且还重构了其他系统(包括IssueLog
)等,使它们也采用该模式。
_PS:我们在读《Python中的架构模式》这本书时发现了这种模式。这本书也在这篇博客文章《掌握构建大规模软件的十大书籍》中提到,这篇文章还列出了对我们设计和构建Klaviyo可扩展软件有所帮助的其他书籍。欢迎大家去看看!_
共同學習,寫下你的評論
評論加載中...
作者其他優質文章