跳至内容

Migration

编写迁移脚本

运行 hetu upgrade 时,框架会针对每个 schema 变更的组件查找同名迁移脚本;找不到时回退到下方默认脚本。脚本必须导出两个模块级函数 prepare()upgrade(...),可参考下方完整源码。

upgrade(...) 中传入的 client 参数是 TableMaintenance 实例,其下方所列方法是迁移期间唯一可用的底层数据库操作。

默认迁移脚本模板

完整源码(hetu/data/default_migration.py),可直接复制后修改:

import logging

import numpy as np

from hetu import BaseComponent
from hetu.data.backend import TableReference
from hetu.data.backend.base import TableMaintenance

logger = logging.getLogger("HeTu.root")

down_component_json = r'<"DOWN_JSON">'
target_component_json = r'<"TARGET_JSON">'
# 设置导出模块变量,表示迁移的源和目标模型
TARGET_COMPONENT_MODEL = BaseComponent.load_json(target_component_json)
DOWN_COMPONENT_MODEL = BaseComponent.load_json(down_component_json)


# 默认迁移脚本用变量
remove_columns = []
add_columns = []
unsafe_convert_columns = []
type_convert_columns = []


def prepare() -> str:
    """
    迁移前的预检查,如果不能迁移在这报错。
    此方法会在upgrade前多次调用,必须幂等。

    Returns
    -------
    str
        - "skip": 组件表结构无变更,无需迁移。
        - "unsafe": 本迁移代码是有损迁移,需要用force指令手动迁移。
        - "ok": 可以安全迁移。
    """
    name = TARGET_COMPONENT_MODEL.name_
    # 检查是否无变更
    down_dtypes = DOWN_COMPONENT_MODEL.dtypes
    target_dtypes = TARGET_COMPONENT_MODEL.dtypes
    if down_dtypes == target_dtypes:
        return "skip"

    logger.warning(
        f"  ⚠️ [💾Redis][{name}组件] 代码定义的Schema与已存的不一致,"
        f"数据库中:\n"
        f"{down_dtypes}\n"
        f"代码定义的:\n"
        f"{target_dtypes}\n "
        f"将尝试数据迁移(只处理新属性,不处理类型变更,改名等等情况):"
    )

    # 准备列检查
    assert down_dtypes.fields and target_dtypes.fields  # for type checker
    down_columns = down_dtypes.fields
    target_columns = target_dtypes.fields

    # 检查是否有属性被删除
    for down_column in down_columns:
        if down_column not in target_columns:
            msg = (
                f"  ⚠️ [💾Redis][{name}组件] "
                f"数据库中的属性 {down_column} 在新的组件定义中不存在,如果改名了需要手动迁移,"
                f"强制执行将丢弃该属性数据。"
            )
            logger.warning(msg)
            remove_columns.append(down_column)

    # 检查是否有属性类型变更且无法自动转换
    for target_column in target_columns:
        if target_column in down_columns:
            old_type = down_dtypes.fields[target_column]
            new_type = target_dtypes.fields[target_column]
            if old_type != new_type:
                type_convert_columns.append(target_column)
                if not np.can_cast(old_type[0], new_type[0]):
                    msg = (
                        f"  ⚠️ [💾Redis][{name}组件] "
                        f"属性 {target_column} 的类型由 {old_type} 变更为 {new_type},"
                        f"无法自动转换类型,需要手动迁移,强制执行将截断/丢弃该属性数据。"
                    )
                    logger.warning(msg)
                    unsafe_convert_columns.append(target_column)

    # 检查新增的属性是否有默认值
    target_props = dict(TARGET_COMPONENT_MODEL.properties_)
    for target_column in target_columns:
        if target_column not in down_columns:
            add_columns.append(target_column)
            logger.warning(
                f"  ⚠️ [💾Redis][{name}组件] "
                f"新的代码定义中多出属性 {target_column},将使用默认值填充。"
            )
            default = target_props[target_column].default
            if default is None:
                msg = (
                    f"  ⚠️ [💾Redis][{name}组件] "
                    f"迁移时尝试新增 {target_column} 属性失败,该属性没有默认值,无法新增。"
                )
                logger.error(msg)
                raise ValueError(msg)

    if remove_columns or unsafe_convert_columns:
        return "unsafe"

    return "ok"


def upgrade(
    row_ids: list[int],
    down_tables: dict[str, TableReference],
    target_table: TableReference,
    client: TableMaintenance,  # 负责直接写入数据的,专供迁移使用的客户端
) -> None:
    """实际执行升级迁移的操作,本操作不可失败。"""
    # 一些属性信息
    assert DOWN_COMPONENT_MODEL.name_ == TARGET_COMPONENT_MODEL.name_
    table_name = DOWN_COMPONENT_MODEL.name_
    target_columns = dict(TARGET_COMPONENT_MODEL.properties_)
    down_columns = dict(DOWN_COMPONENT_MODEL.properties_)
    down_table = down_tables[table_name]

    # 修改老的table名, 老的表读完后就删除
    renamed_down_component = DOWN_COMPONENT_MODEL.duplicate(
        DOWN_COMPONENT_MODEL.namespace_, "__temp__"
    )
    renamed_down_tbl = TableReference(
        renamed_down_component, down_table.instance_name, down_table.cluster_id
    )
    client.do_rename_table_(down_table, renamed_down_tbl)
    # 创建表,开始schema迁移
    client.do_create_table_(target_table)

    for row_id in row_ids:
        down_row = client.get(renamed_down_tbl, row_id)
        assert down_row

        up_row = TARGET_COMPONENT_MODEL.new_row(down_row.id)

        # 复制共有列
        for col in target_columns:
            if col in down_columns:
                up_row[col] = down_row[col]

        # 如果有新增列,不用管,new_row已经自动填充了默认值
        # 如果有删除列,不用管,up_row已经不包含了
        # 如果有类型变更,也不用管,前面在复制共有列时自动完成了

        client.upsert_row(target_table, up_row)

    # 删除老的表
    client.do_drop_table_(renamed_down_tbl)

TableMaintenance

TableMaintenance(master: hetu.data.backend.base.BackendClient)

Source: hetu/data/backend/base.py:377

组件表维护类,继承此类实现具体的维护逻辑。

服务器启动时会用check_table检查各个组件表的状态,并会调用create_table创建新表。

其他方法仅在CLI相关命令时才会启用。

Attributes

  • client (Any) — No description.

Methods

get

get(ref: TableReference, row_id: int) -> numpy.record | None

Source: hetu/data/backend/base.py:395

获取指定表的指定行数据

range

range(ref: TableReference, index_name: str, left: Any, right: Any = None) -> list[int]

Source: hetu/data/backend/base.py:399

按索引范围查询指定表的数据

get_all_row_id

get_all_row_id(ref: TableReference) -> list[int]

Source: hetu/data/backend/base.py:405

获取指定表的所有row id

delete_row

delete_row(ref: TableReference, row_id: int)

Source: hetu/data/backend/base.py:409

删除指定表的指定行数据

upsert_row

upsert_row(ref: TableReference, row_data: numpy.record)

Source: hetu/data/backend/base.py:413

更新指定表的一行数据,如果不存在就插入

read_meta

read_meta(
    instance_name: str,
    comp_cls: type[BaseComponent],
) -> hetu.data.backend.base.TableMaintenance.TableMeta | None

Source: hetu/data/backend/base.py:417

读取组件表在数据库中的meta信息,如果不存在则返回None

get_lock

get_lock() -> contextlib.AbstractContextManager

Source: hetu/data/backend/base.py:423

获得一个可以锁整个数据库的with锁,在获得锁之前堵塞,获得锁之后可以安全的进行表结构变更等操作,操作完成后释放锁

do_create_table_

do_create_table_(
    table_ref: TableReference,
) -> hetu.data.backend.base.TableMaintenance.TableMeta

Source: hetu/data/backend/base.py:427

实际创建组件表的逻辑实现,返回创建后的TableMeta

do_rename_table_

do_rename_table_(from_: TableReference, to_: TableReference) -> None

Source: hetu/data/backend/base.py:431

修改表名的实现,迁移组件表cluster_id用的就是这个,因为水平分片根据表名决定

do_drop_table_

do_drop_table_(table_ref: TableReference) -> int

Source: hetu/data/backend/base.py:435

实际drop组件表数据的逻辑实现,返回删除的行数

do_rebuild_index_

do_rebuild_index_(table_ref: TableReference) -> int

Source: hetu/data/backend/base.py:439

实际重建组件表索引的逻辑实现,返回重建的行数