在 SQLAlchemy 中实现数据处理的时候,实现表自引用、多对多、联合查询,有序id等常见的一些经验总结

有时候,我们在使用SQLAlchemy操作某些表的时候,需要使用外键关系来实现一对多或者多对多的关系引用,以及对多表的联合查询,有序列的uuid值或者自增id值,字符串的分拆等常见处理操作。

1、在 SQLAlchemy 中定义具有嵌套 children 关系的表

要在 SQLAlchemy 中定义具有嵌套 children 关系的表,如表中包含 idpid 字段,可以使用 relationshipForeignKey 来建立父子关系。

首先,你需要定义一个模型类,其中包含 idpid 字段。id 是主键,pid 是指向父记录的外键。然后,你使用 relationship 来建立父子关系。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class DictTypeInfo(Base):
    __tablename__ = 'dict_type_info'
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    code = Column(String)
    remark = Column(String)
    seq = Column(Integer)
    pid = Column(Integer, ForeignKey('dict_type_info.id'))  # 外键指向父节点的 id

    # 定义 parent 关系
    parent = relationship("DictTypeInfo", remote_side=[id], back_populates="children")

    # 定义 children 关系
    children = relationship("DictTypeInfo", back_populates="parent")

例子使用代码如下所示。

# 创建异步引擎和会话
DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# 示例:如何插入数据并进行查询
async def example_usage():
    async with AsyncSessionLocal() as session:
        async with session.begin():
            # 插入数据
            parent_node = DictTypeInfo(name="Parent", code="P001", remark="Parent Node", seq=1)
            child_node1 = DictTypeInfo(name="Child1", code="C001", remark="First Child", seq=1, parent=parent_node)
            child_node2 = DictTypeInfo(name="Child2", code="C002", remark="Second Child", seq=2, parent=parent_node)
            session.add(parent_node)
            session.add(child_node1)
            session.add(child_node2)
            
        # 查询数据
        async with session.begin():
            result = await session.execute(
                "SELECT * FROM dict_type_info WHERE pid IS NULL"
            )
            parent_nodes = result.scalars().all()
            for node in parent_nodes:
                print(f"Parent Node: {node.name}, Children: {[child.name for child in node.children]}")

代码说明

  1. 定义模型类 (DictTypeInfo):

    • id: 主键。
    • pid: 外键,指向同一个表的 id,表示父节点。
    • parent: 父关系,通过 remote_side 设定本模型的外键指向自身的主键。
    • children: 子关系,back_populates 用于双向关系的映射。
  2. 创建异步引擎和会话:

    • 使用 create_async_engineAsyncSession 创建数据库引擎和会话,以支持异步操作。
  3. 插入和查询数据:

    • 插入数据示例展示了如何创建父节点和子节点,并将子节点关联到父节点。
    • 查询数据示例展示了如何查询所有父节点以及它们的子节点。

注意事项

  • remote_side: 在 relationship 中,remote_side 是指定哪些字段是远程的一方(即子节点关系的目标)。
  • 确保在模型中定义了正确的外键约束。在你提供的模型中,pid 列需要指向同一表中的 id 列。确保 ForeignKey 设置正确。
  • 异步操作: 使用 AsyncSessionasyncio 进行异步数据库操作。
  • 创建表: 在初始化数据库时,确保表结构是正确的。

要使用 selectinload 加载某个 pid 下的对象及其子列表,可以通过 SQLAlchemy 的 selectinload 来优化加载子关系。selectinload 可以减少 SQL 查询的数量,特别是在加载具有层次结构的数据时。

async def get_tree(pid: int):
    async with AsyncSessionLocal() as session:
        # 通过 selectinload 加载所有子节点
        stmt = select(DictTypeInfo).filter(DictTypeInfo.pid == pid).options(selectinload(DictTypeInfo.children))
        result = await session.execute(stmt)
        nodes = result.scalars().all()
        
        return nodes

这样,调用 get_tree 函数获取指定 pid 的节点及其子节点,代码如下。

async def example_usage():
    nodes = await get_tree(pid=1)
    for node in nodes:
        print(f"Node: {node.name}, Children: {[child.name for child in node.children]}")

selectinload: selectinload 可以减少 N+1 查询问题,它通过一条额外的查询来加载相关对象。这适合用于层次结构数据的加载。通过这种方式,你可以使用 SQLAlchemy 的 selectinload 来高效地加载具有父子关系的对象,并优化数据库查询性能。

 

同样,我们在 SQLAlchemy 中实现多对多关系也是类似的处理方式。

 在 SQLAlchemy 中,实现多对多关系通常需要创建一个关联表(association table),该表将存储两个相关联表的外键,从而实现多对多关系。以下是一个实现多对多关系的详细步骤。

1) 定义多对多关系的关联表

首先,需要定义一个关联表,该表包含两个外键,分别指向两端的主表。这通常使用 Table 对象来实现。

from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

association_table = Table('association', Base.metadata,
    Column('left_id', Integer, ForeignKey('left_table.id')),
    Column('right_id', Integer, ForeignKey('right_table.id'))
)

在这个例子中,association_table 是一个包含两个外键的中间表:left_idright_id 分别指向 left_tableright_table 的主键。

2)定义两端的模型并添加关系

在两端的模型中,使用 relationship 来定义多对多关系,并指定 secondary 参数为关联表。

from sqlalchemy.orm import relationship

class LeftModel(Base):
    __tablename__ = 'left_table'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    rights = relationship("RightModel", secondary=association_table, back_populates="lefts")

class RightModel(Base):
    __tablename__ = 'right_table'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    lefts = relationship("LeftModel", secondary=association_table, back_populates="rights")
  • rightsLeftModel 中定义的关系属性,它将连接到 RightModel
  • leftsRightModel 中定义的关系属性,它将连接到 LeftModel
  • secondary=association_table 告诉 SQLAlchemy 使用 association_table 作为连接表。
  • back_populates 用于双向关系的对称引用。

3)创建数据库并插入数据

下面的代码展示了如何创建数据库、插入数据并查询多对多关系。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建数据库引擎
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

# 创建模型实例
left1 = LeftModel(name="Left 1")
right1 = RightModel(name="Right 1")
right2 = RightModel(name="Right 2")

# 设置多对多关系
left1.rights = [right1, right2]

# 添加到会话并提交
session.add(left1)
session.commit()

# 查询并打印关系
for right in left1.rights:
    print(right.name)  # 输出: Right 1, Right 2

for left in right1.lefts:
    print(left.name)  # 输出: Left 1

你可以像操作普通列表一样来处理这些关系,例如添加、删除关联等:

# 添加关系
left1.rights.append(RightModel(name="Right 3"))
session.commit()

# 删除关系
left1.rights.remove(right2)
session.commit()

 通过这些步骤,你可以在 SQLAlchemy 中实现和操作多对多关系。

 

2、在 SQLAlchemy 中联合多个表进行记录关联查询

例如,在我的框架中,字典大类和字典项目是不同的表进管理的,因此如果需要根据大类名称进行字典项目的查询,那么就需要联合两个表进行处理。

具体操作如下:创建一个查询,将 DictDataInfo 表与 DictTypeInfo 表联接(通过 DictType_IDId 列)

from sqlalchemy.future import select
from sqlalchemy.orm import aliased
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker

# 假设你的数据库模型是 DictDataInfo 和 DictTypeInfo
# 需要提前定义好这两个模型类

DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async def get_dict_data(dict_type_name: str):
    async with AsyncSessionLocal() as session:
        # 创建别名
        DictData = aliased(DictDataInfo)
        DictType = aliased(DictTypeInfo)

        # 联合查询并根据条件过滤
        stmt = (
            select(DictData)
            .join(DictType, DictData.DictType_ID == DictType.id)
            .filter(DictType.name == dict_type_name)
        )

        result = await session.execute(stmt)
        dict_data = result.scalars().all()

        return dict_data

# 示例用法
import asyncio

async def example_usage():
    dict_type_name = "some_type_name"
    dict_data = await get_dict_data(dict_type_name)
    for data in dict_data:
        print(data)

代码说明

  1. aliased: 使用 aliased 创建表的别名,这样可以方便地在查询中引用这些表。

  2. join: 使用 join 进行表连接。这里 DictDataInfo 表的 DictType_ID 列与 DictTypeInfo 表的 id 列连接。

  3. filter: 使用 filter 来添加条件筛选,筛选出 DictTypeInfo 表中 name 列等于 dict_type_name 的记录。

  4. select: 使用 select 语句来选择 DictDataInfo 表中的记录,这对应于 Select(d => d)

  5. 异步操作: 由于使用的是 SQLAlchemy 的异步模式,所有数据库操作都在 async withawait 语句中进行,以确保异步执行。

如果我们需要将获得的数据进行对象转换,我们可以使用下面的处理代码实现。

# 定义 CListItem 类
class CListItem:
    def __init__(self, name, value):
        self.name = name
        self.value = value

# 定义示例列表和转换操作
def convert_list_items(list_items):
    dict_list = []
    if list_items:  # 确保 list_items 不是 None
        for info in list_items.Items:
            dict_list.append(CListItem(info.Name, info.Value))
    return dict_list

 

3、使用sqlalchemy插入数据的时候,如何判断为非自增类型的时候,id赋值一个有序列的uuid值

有时候,我们的数据表主键是用字符串的,这种适用于很广的用途,比较容易在插入的时候就确定好id键的值,从而可以处理相关的内容。

但是,有时候我们可以让后端进行确定一个有序的ID值,那么使用SQLAlchemy 我们应该如何实现?

首先,确保你已经导入了 uuid 库,这是用于生成 UUID 的 Python 标准库。

有序 UUID 通常是基于时间的 UUID。你可以使用 uuid.uuid1() 来生成基于时间的 UUID。

def generate_sequential_uuid():
    return uuid.uuid1()  # 基于时间生成有序UUID

在定义 SQLAlchemy 模型时,可以将 id 字段设置为使用该函数生成的 UUID。通常在模型中通过 default 参数设置默认值。

from sqlalchemy import Column, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(String(36), primary_key=True, default=generate_sequential_uuid, nullable=False)
    # 其他字段...

在插入新数据时,如果 id 字段为空,它将自动使用 generate_sequential_uuid 函数生成一个基于时间的 UUID。

这样就可以确保在插入数据时,非自增类型的 id 字段会被赋值为一个有序列的 UUID 值。

对于自增的整型 id,SQLAlchemy 提供了自动处理机制。你只需要在模型中将 id 字段定义为 Integer 类型,并设置 primary_key=True,SQLAlchemy 就会自动为该字段设置自增属性。

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50))
    # 其他字段..

默认情况下,SQLAlchemy 会使用数据库的原生自增机制(如 MySQL 的 AUTO_INCREMENT 或 PostgreSQL 的 SERIAL)。如果你需要使用自定义的自增策略,可以通过设置 Sequence 来实现(适用于支持 Sequence 的数据库,如 PostgreSQL)。

from sqlalchemy import Sequence

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(Integer, Sequence('my_sequence'), primary_key=True)
    name = Column(String(50))

在上述代码中,Sequence('my_sequence') 定义了一个序列,SQLAlchemy 将使用该序列生成自增的 id 值。

通过这些步骤,你可以轻松处理整型自增 id 字段,SQLAlchemy 会自动为每个新记录分配唯一的自增 id

 

4、在插入记录的时候,对字符串的数据处理

在批量插入数据字典的时候,我希望根据用户输入内容(多行数据)进行转化,把每行的数据分拆进行判断,如果符合条件的进行处理插入。

在 Python 中,可以使用字符串的 splitlines() 方法来实现相同的功能。

# 假设 Data 和 input.Seq 是从输入中获取的
Data = "example\nline1\nline2\n"  # 示例数据
input_seq = "123"  # 示例序列字符串

# 将 Data 按行拆分,并移除空行
array_items = [line for line in Data.splitlines() if line]

# 初始化变量
int_seq = -1
seq_length = 3
str_seq = input_seq

# 尝试将 str_seq 转换为整数
if str_seq.isdigit():
    int_seq = int(str_seq)
    seq_length = len(str_seq)

# 打印结果
print(f"Array Items: {array_items}")
print(f"int_seq: {int_seq}")
print(f"seq_length: {seq_length}")
  • Python 的 splitlines() 方法将字符串按行分割,同时自动处理各种换行符(包括 \n\r\n)。
  • 列表推导式 [line for line in Data.splitlines() if line] 移除了空行,类似于 C# 中的 StringSplitOptions.RemoveEmptyEntries
  • 使用 str_seq.isdigit() 检查 str_seq 是否全部由数字组成,这类似于 C# 的 int.TryParse

在 Python 中,可以使用 re.split() 函数来按照正则表达式分割字符串。以下是对应的 Python 代码:

import re

# 假设 info 是一个包含 Name 和 Value 属性的对象
class Info:
    def __init__(self):
        self.Name = ""
        self.Value = ""

info = Info()

# dictData 是输入的字符串
dict_data = "example_name example_value"

# 使用正则表达式按照空白字符分割字符串
array = re.split(r'\s+', dict_data)

# 赋值给 info 对象的属性
info.Name = array[0]
info.Value = array[1] if len(array) > 1 else array[0]

# 打印结果
print(f"Name: {info.Name}")
print(f"Value: {info.Value}")

使用 re.split() 函数根据空白字符(包括空格、制表符等)分割字符串 dict_datar'\s+' 是一个正则表达式,表示一个或多个空白字符。

如果你需要根据多个分隔符来分割字符串,同样可以使用正则表达式(re 模块)的 re.split() 方法。

str_item = " 1,2,3;4;5/6/7、8、9;10 "

import re

result = re.split(r"[;,|/,;、]+", str_item.strip())
print(result)

结果输出:['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

解释:

  • re.split(r'[;,|/,;、]', text) 中的 r'[;,|/,;、]' 是一个正则表达式模式:
    [] 表示字符类,表示匹配字符类中的任意一个字符。
    ;,|/,;、 分别表示分号、逗号,竖线,中文逗号,中文分号,和空格,这些字符都将作为分隔符。

使用正则表达式可以灵活处理多个分隔符,适用于更复杂的分割需求。