作为 Python 开发人员,我们经常需要处理数据库中的数据,以便为我们的应用程序存储和检索数据。虽然 SQLAlchemy 是一个强大且广受欢迎的数据库交互库,但在代码中直接使用它会带来不少挑战和麻烦。在本文中,我们将探讨如何使用仓库模式来简化数据库交互并解决这些问题。
直接使用 SQLAlchemy 遇到的问题当我们在Python应用程序中直接使用SQLAlchemy时,经常会碰到以下难题:
- 到处传递会话对象:我们在代码中经常会将会话对象作为参数传递给函数和方法,导致代码混乱,并可能误用会话。
2. 分散的数据库代码 :数据库代码通常分散在应用的不同层级里,这使得代码库更难维护、理解和改动。
3. 处理事务困难:确保数据的一致性和完整性需要适当的事务管理,直接操作SQLAlchemy时,尤其是在处理嵌套事务或进行多个数据库操作时,这可能会比较棘手。
这些痛点以各种方式在我们的代码中体现。例如,我们可能会看到会话对象这样被传递来传递去。
def create_user(session, username, email):
# 创建用户
user = User(username=username, email=email)
session.add(user)
session.commit()
def update_user(session, user_id, new_email):
# 更新用户信息
user = session.query(User).get(user_id)
user.email = new_email
session.commit()
或者我们可能发现数据库查询和操作分散在应用程序的不同部分。
def some_view_function():
session = Session()
# 获取第一个用户并更新其电子邮件。
user = session.query(User).get(1)
user.email = '[email protected]'
session.commit()
session.close()
def some_service_function():
session = Session()
# 查询所有活跃用户并关闭会话。
users = session.query(User).filter(User.active == True).all()
session.close()
return users
直接用SQLAlchemy处理事务时,可能会变得复杂且容易出错。
def 创建用户及其资料(username, email, profile_data):
session = Session()
try:
user = User(username=username, email=email)
session.add(user)
# 刷新会话以确保用户对象已保存
session.flush()
profile = Profile(user_id=user.id, data=profile_data)
session.add(profile)
# 提交更改以保存用户和资料信息
session.commit()
except:
# 回滚会话
session.rollback()
raise
finally:
# 关闭会话
session.close()
这些挑战可能导致代码混乱不堪,降低可维护性,若处理不当,还可能引发潜在的一致性错误。
仓库模式来救驾了仓储模式是一种设计方法,旨在帮助管理应用程序与数据库之间的交互。它充当应用程序代码和数据库之间的中间层,隐藏数据库操作的复杂性,并提供一个简化的操作接口给应用程序使用。
仓储模式里有几点关键原则:
- 封装:数据仓库封装了数据访问的逻辑,隐藏了数据库操作的复杂性,从而使应用程序更易于使用,并为应用程序提供了一个简单的接口,使操作更方便。
- 关注分离:仓库模式将数据访问逻辑与业务逻辑分离,使代码更清晰和易于维护。
- 抽象:仓库提供了一层抽象,让应用可以无需了解底层存储方式的细节即可与数据交互。
通过使用仓库模式,我们可以从仓库模式中获得多种好处:
- 数据库相关代码的封装:通过将数据库相关的代码与应用的其他部分分离,从而实现更清晰、更易于维护的代码库。
- 简洁的服务层接口:存储库为服务层(或其他高层次层)提供了简单且直观的接口来与数据库进行交互,简化了数据库操作的复杂性。
- 增强的可测试性:仓库模式通过在测试时模拟或替换数据访问部分,使得编写应用程序逻辑的单元测试变得更加容易。
设计理念的实际应用 # session_management.py
import os
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from contextvars import ContextVar
from repository_sqlalchemy.database_config import DatabaseConfig, DatabaseEngineFactory
session_context_var: ContextVar[any] = ContextVar("db_session", default=None)
_engine = None
_Session = None
def get_engine():
global _engine
if _engine is None:
db_type = os.environ.get('DB_TYPE', 'postgresql')
db_config = DatabaseConfig(db_type)
_engine = DatabaseEngineFactory.create_engine(
db_config
)
return _engine
def get_session():
global _Session
if _Session is None:
_Session = sessionmaker(bind=get_engine(), expire_on_commit=False)
return _Session()
# BaseRepository 类
class BaseRepository(Generic[ModelType], metaclass=TransactionalMetaclass):
model = None
@property
def session(self) -> Session:
return session_context_var.get()
def create(self, obj: ModelType) -> ModelType:
self.session.add(obj)
self.session.flush()
self.session.refresh(obj)
return obj
def update(self, instance: ModelType, data: Dict[str, Any]) -> ModelType:
for key, value in data.items():
if hasattr(instance, key):
setattr(instance, key, value)
else:
raise AttributeError(f"{type(instance).__name__} 没有属性 '{key}'")
self.session.flush()
return instance
def delete(self, obj: ModelType) -> None:
self.session.delete(obj)
self.session.flush()
# TransactionalMetaclass
class 事务元类(type):
def __new__(cls, name: str, bases: tuple, attrs: Dict[str, Any]) -> Type:
cls.apply_transactional_wrapper(attrs)
new_class = super().__new__(cls, name, bases, attrs)
cls.set_model_attribute(new_class, bases)
return new_class
@classmethod
def apply_transactional_wrapper(cls, attrs: Dict[str, Any]) -> None:
transactional_prefixes = (
"find",
"创建",
"删除",
)
for attr_name, attr_value in attrs.items():
if callable(attr_value) and any(
attr_name.startswith(prefix) for prefix in transactional_prefixes
):
attrs[attr_name] = cls.add_transactional(attr_value)
# 事务性装饰器
@contextmanager
def transaction():
session = session_context_var.get()
if session is None:
session = get_session()
session_context_var.set(session)
is_nested = session.in_transaction()
try:
if is_nested:
savepoint = session.begin_nested()
yield savepoint
else:
session.begin()
yield session
if is_nested:
savepoint.commit()
else:
session.commit()
session.close()
except Exception as e:
if is_nested:
savepoint.rollback()
else:
session.rollback()
session.close()
raise
finally:
if not is_nested:
session_context_var.set(None)
def transactional(func):
@wraps(func)
def wrapper(*args, **kwargs):
with transaction():
return func(*args, **kwargs)
return wrapper
需要指出的是,在下面的代码中,**sessionmaker**
会话生成器,会话生成器设置为 **expire_on_commit=False**
:
定义_session为一个会话生成器,绑定到get_engine()返回的引擎,并设置expire_on_commit为False。
_Session = sessionmaker(bind=get_engine(), expire_on_commit=False)
这个设置很重要,因为在事务结束后,会话将被关闭。如果将 **expire_on_commit**
设置为 **True**
(默认值),在会话关闭后访问模型的属性会导致 SQLAlchemy 报错。通过将 **expire_on_commit=False**
,在事务结束后模型将脱离会话,即使会话已关闭,也可以继续访问属性值。
上面的代码体现了以下源代码中应用的关键设计理念:
- 应用仓库模式:代码提供了一个使用 SQLAlchemy 封装常见数据库操作的基础仓库类(
BaseRepository
)。可以继承该类来为不同的实体模型创建特定的仓库类(例如)。
2. 使用一个名为TransactionalMetaclass
的元类来实现事务行为:代码使用一个名为TransactionalMetaclass
的元类,根据命名约定自动使仓库方法具有事务性。这确保指定的仓库方法自动在事务中运行,减少了样板代码,保证了代码的一致性。
3. 使用装饰器进行事务管理:代码提供了一个 transaction
事务管理器(transaction
context manager)和装饰器来处理事务。**transactional**
装饰器用于包装仓库方法或其他执行数据库操作的函数,自动开始新的事务或加入现有事务,并根据函数的执行结果来提交或回滚事务。
这些设计思路共同作用,简化数据库交互,减少重复代码,并确保事务管理的一致性。类似于Java中的**@Transactional**
注解,用于事务管理的注解提供了一种声明式的、非侵入性的方式来应用事务性行为,从而提高代码的可读性和维护性。
下面是一个全面的例子,演示如何使用repository_sqlalchemy库:
从 sqlalchemy 导入 Column, Integer, String, ForeignKey
从 sqlalchemy.ext.declarative 导入 declarative_base
从 repository_sqlalchemy 导入 BaseRepository, Base, transaction
从 typing 导入 List, Dict, Any
类 UserModel(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
类 PostModel(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
content = Column(String(1000), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
类 UserRepository(BaseRepository[UserModel]):
def find_by_username(self, username: str) -> UserModel:
返回 self.session.query(self.model).filter_by(username=username).first()
类 PostRepository(BaseRepository[PostModel]):
def find_by_user_id(self, user_id: int) -> List[PostModel]:
返回 self.session.query(self.model).filter_by(user_id=user_id).all()
@transaction()
def create_user_with_posts(username: str, email: str, posts: List[Dict[str, str]]) -> UserModel:
user_repo = UserRepository()
post_repo = PostRepository()
# 创建用户
user = user_repo.create(UserModel(username=username, email=email))
# 为用户创建帖子
对于 post_data 在 posts:
post = PostModel(title=post_data['title'], content=post_data['content'], user_id=user.id)
post_repo.create(post)
返回 user
@transaction()
def update_user_and_posts(username: str, user_data: Dict[str, Any], post_updates: List[Dict[str, Any]]) -> UserModel:
user_repo = UserRepository()
post_repo = PostRepository()
user = user_repo.find_by_username(username)
如果 not user:
抛出 ValueError(f"User {username} not found")
# 更新用户
user_repo.update(user, user_data)
# 更新帖子
posts = post_repo.find_by_user_id(user.id)
对于 post, post_data 在 zip(posts, post_updates):
post_repo.update(post, post_data)
返回 user
# 使用示例
new_user = create_user_with_posts(
"john_doe",
"[email protected]",
[
{"title": "First Post", "content": "Hello, world!"},
{"title": "Second Post", "content": "This is my second post."}
]
)
打印(f"创建了用户: {new_user.username},带有 2 个帖子")
updated_user = update_user_and_posts(
"john_doe",
{"email": "[email protected]"},
[
{"title": "Updated First Post"},
{"content": "Updated content for second post."}
]
)
打印(f"更新了用户: {updated_user.username}, {updated_user.email}")
这个示例展示了用户在基础搭建好后使用repository_sqlalchemy库有多么简单和方便。
定义了UserModel
和PostModel
这两个SQLAlchemy模型。
2. 通过继承 **BaseRepository**
类来创建自定义仓库(如 **UserRepository**
和 **PostRepository**
)。**BaseRepository**
类提供了 session
属性,该属性可以用于在仓库方法中访问当前会话,如在这些自定义仓库方法中使用 self.session
所示。
3. 使用 **@transaction()**
装饰器注解在多个操作间自动管理事务。
4. 在一个事务中执行多个数据记录的创建和更新操作。
通过继承 **BaseRepository**
类,自定义存储库继承了 **session**
属性,该属性提供了对当前会话的访问权限。这省去了手动传递会话对象给存储库方法的步骤,因为可以通过 **self.session**
自动访问,无需手动传递。
通过使用仓储模式和 repository_sqlalchemy 库提供的功能,客户端可以以更简化且直观的方式与数据库进行交互,无需手动管理会话或显式处理事务。
结论部分如下:仓储模式是简化Python应用程序中数据库交互的有力工具。通过封装数据库相关的代码并提供一个简洁的接口,它提高了代码的可维护性和可测试性。
查看源代码:https://github.com/ryan-zheng-teki/repository-sqlalchemy,并考虑在项目中使用仓库模式。您可以随意提出问题或将其作为起点。
编程愉快!
共同學習,寫下你的評論
評論加載中...
作者其他優質文章