Migration
编写迁移脚本
运行 hetu upgrade 时,框架会针对每个 schema 变更的组件查找同名迁移脚本;找不到时回退到下方默认脚本。脚本必须导出两个模块级函数 prepare() 和 upgrade(...),可参考下方完整源码。
upgrade(...) 中传入的 client 参数是 TableMaintenance
实例,其下方所列方法是迁移期间唯一可用的底层数据库操作。
默认迁移脚本模板
完整源码(hetu/data/default_migration.py),可直接复制后修改:
import logging
import numpy as np
from hetu import BaseComponent
from hetu.data.backend import TableReference
from hetu.data.backend.base import TableMaintenance
logger = logging.getLogger("HeTu.root")
down_component_json = r'<"DOWN_JSON">'
target_component_json = r'<"TARGET_JSON">'
# 设置导出模块变量,表示迁移的源和目标模型
TARGET_COMPONENT_MODEL = BaseComponent.load_json(target_component_json)
DOWN_COMPONENT_MODEL = BaseComponent.load_json(down_component_json)
# 默认迁移脚本用变量
remove_columns = []
add_columns = []
unsafe_convert_columns = []
type_convert_columns = []
def prepare() -> str:
"""
迁移前的预检查,如果不能迁移在这报错。
此方法会在upgrade前多次调用,必须幂等。
Returns
-------
str
- "skip": 组件表结构无变更,无需迁移。
- "unsafe": 本迁移代码是有损迁移,需要用force指令手动迁移。
- "ok": 可以安全迁移。
"""
name = TARGET_COMPONENT_MODEL.name_
# 检查是否无变更
down_dtypes = DOWN_COMPONENT_MODEL.dtypes
target_dtypes = TARGET_COMPONENT_MODEL.dtypes
if down_dtypes == target_dtypes:
return "skip"
logger.warning(
f" ⚠️ [💾Redis][{name}组件] 代码定义的Schema与已存的不一致,"
f"数据库中:\n"
f"{down_dtypes}\n"
f"代码定义的:\n"
f"{target_dtypes}\n "
f"将尝试数据迁移(只处理新属性,不处理类型变更,改名等等情况):"
)
# 准备列检查
assert down_dtypes.fields and target_dtypes.fields # for type checker
down_columns = down_dtypes.fields
target_columns = target_dtypes.fields
# 检查是否有属性被删除
for down_column in down_columns:
if down_column not in target_columns:
msg = (
f" ⚠️ [💾Redis][{name}组件] "
f"数据库中的属性 {down_column} 在新的组件定义中不存在,如果改名了需要手动迁移,"
f"强制执行将丢弃该属性数据。"
)
logger.warning(msg)
remove_columns.append(down_column)
# 检查是否有属性类型变更且无法自动转换
for target_column in target_columns:
if target_column in down_columns:
old_type = down_dtypes.fields[target_column]
new_type = target_dtypes.fields[target_column]
if old_type != new_type:
type_convert_columns.append(target_column)
if not np.can_cast(old_type[0], new_type[0]):
msg = (
f" ⚠️ [💾Redis][{name}组件] "
f"属性 {target_column} 的类型由 {old_type} 变更为 {new_type},"
f"无法自动转换类型,需要手动迁移,强制执行将截断/丢弃该属性数据。"
)
logger.warning(msg)
unsafe_convert_columns.append(target_column)
# 检查新增的属性是否有默认值
target_props = dict(TARGET_COMPONENT_MODEL.properties_)
for target_column in target_columns:
if target_column not in down_columns:
add_columns.append(target_column)
logger.warning(
f" ⚠️ [💾Redis][{name}组件] "
f"新的代码定义中多出属性 {target_column},将使用默认值填充。"
)
default = target_props[target_column].default
if default is None:
msg = (
f" ⚠️ [💾Redis][{name}组件] "
f"迁移时尝试新增 {target_column} 属性失败,该属性没有默认值,无法新增。"
)
logger.error(msg)
raise ValueError(msg)
if remove_columns or unsafe_convert_columns:
return "unsafe"
return "ok"
def upgrade(
row_ids: list[int],
down_tables: dict[str, TableReference],
target_table: TableReference,
client: TableMaintenance, # 负责直接写入数据的,专供迁移使用的客户端
) -> None:
"""实际执行升级迁移的操作,本操作不可失败。"""
# 一些属性信息
assert DOWN_COMPONENT_MODEL.name_ == TARGET_COMPONENT_MODEL.name_
table_name = DOWN_COMPONENT_MODEL.name_
target_columns = dict(TARGET_COMPONENT_MODEL.properties_)
down_columns = dict(DOWN_COMPONENT_MODEL.properties_)
down_table = down_tables[table_name]
# 修改老的table名, 老的表读完后就删除
renamed_down_component = DOWN_COMPONENT_MODEL.duplicate(
DOWN_COMPONENT_MODEL.namespace_, "__temp__"
)
renamed_down_tbl = TableReference(
renamed_down_component, down_table.instance_name, down_table.cluster_id
)
client.do_rename_table_(down_table, renamed_down_tbl)
# 创建表,开始schema迁移
client.do_create_table_(target_table)
for row_id in row_ids:
down_row = client.get(renamed_down_tbl, row_id)
assert down_row
up_row = TARGET_COMPONENT_MODEL.new_row(down_row.id)
# 复制共有列
for col in target_columns:
if col in down_columns:
up_row[col] = down_row[col]
# 如果有新增列,不用管,new_row已经自动填充了默认值
# 如果有删除列,不用管,up_row已经不包含了
# 如果有类型变更,也不用管,前面在复制共有列时自动完成了
client.upsert_row(target_table, up_row)
# 删除老的表
client.do_drop_table_(renamed_down_tbl)TableMaintenance
TableMaintenance(master: hetu.data.backend.base.BackendClient)Source: hetu/data/backend/base.py:377
组件表维护类,继承此类实现具体的维护逻辑。
服务器启动时会用check_table检查各个组件表的状态,并会调用create_table创建新表。
其他方法仅在CLI相关命令时才会启用。
Attributes
client(Any) — No description.
Methods
get
get(ref: TableReference, row_id: int) -> numpy.record | NoneSource: hetu/data/backend/base.py:395
获取指定表的指定行数据
range
range(ref: TableReference, index_name: str, left: Any, right: Any = None) -> list[int]Source: hetu/data/backend/base.py:399
按索引范围查询指定表的数据
get_all_row_id
get_all_row_id(ref: TableReference) -> list[int]Source: hetu/data/backend/base.py:405
获取指定表的所有row id
delete_row
delete_row(ref: TableReference, row_id: int)Source: hetu/data/backend/base.py:409
删除指定表的指定行数据
upsert_row
upsert_row(ref: TableReference, row_data: numpy.record)Source: hetu/data/backend/base.py:413
更新指定表的一行数据,如果不存在就插入
read_meta
read_meta(
instance_name: str,
comp_cls: type[BaseComponent],
) -> hetu.data.backend.base.TableMaintenance.TableMeta | NoneSource: hetu/data/backend/base.py:417
读取组件表在数据库中的meta信息,如果不存在则返回None
get_lock
get_lock() -> contextlib.AbstractContextManagerSource: hetu/data/backend/base.py:423
获得一个可以锁整个数据库的with锁,在获得锁之前堵塞,获得锁之后可以安全的进行表结构变更等操作,操作完成后释放锁
do_create_table_
do_create_table_(
table_ref: TableReference,
) -> hetu.data.backend.base.TableMaintenance.TableMetaSource: hetu/data/backend/base.py:427
实际创建组件表的逻辑实现,返回创建后的TableMeta
do_rename_table_
do_rename_table_(from_: TableReference, to_: TableReference) -> NoneSource: hetu/data/backend/base.py:431
修改表名的实现,迁移组件表cluster_id用的就是这个,因为水平分片根据表名决定
do_drop_table_
do_drop_table_(table_ref: TableReference) -> intSource: hetu/data/backend/base.py:435
实际drop组件表数据的逻辑实现,返回删除的行数
do_rebuild_index_
do_rebuild_index_(table_ref: TableReference) -> intSource: hetu/data/backend/base.py:439
实际重建组件表索引的逻辑实现,返回重建的行数