System
SessionRepository
SessionRepository(session: Session, comp_cls: type[BaseComponent]) -> NoneSource: hetu/data/backend/repo.py:36
从数据库查询数据并放入Session缓存,System的ctx.repo[...]返回的便是此类。
Attributes
ref(TableReference) —TableReference对象,表示当前repo关联的数据库Table信息session(Session) — 获取所属的内部Session对象。
Methods
remote_has_unique_conflicts_
remote_has_unique_conflicts_(row: numpy.record, fields: set) -> str | NoneSource: hetu/data/backend/repo.py:64
内部方法,在远程数据库中检查Unique索引冲突。
is_unique_conflicts
is_unique_conflicts(
row: numpy.record,
insert=False,
ignore: str | None = None,
) -> str | NoneSource: hetu/data/backend/repo.py:104
检查一行数据的Unique索引在本地和远程数据库中是否有冲突。
Parameters
row(np.record) — 待检查的行数据,必须是c-struct格式。insert(bool) — 如果为True,表示这是一个插入操作,否则是更新操作,要求之前已获取过旧数据。ignore(str | None) — 排除检查远程数据库中的某列冲突。用于upsert,因为upsert会先自己检查。
get_by_id
get_by_id(row_id: numpy.int64 | int) -> numpy.record | NoneSource: hetu/data/backend/repo.py:142
从数据库获取单行数据,并放入Session缓存。
本指令如果命中缓存,不会去数据库查询。
get
get(
index_name: str | None = None,
query_value: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
**kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> numpy.record | NoneSource: hetu/data/backend/repo.py:164
从数据库获取单行数据,并放入Session缓存。 推荐通过"id"主键查询,这样无须查询索引,如果缓存命中,不会去数据库查询;否则会执行1-2次查询。
Parameters
index_name(Any) — 辅助参数,如果不便使用kwargs参数时使用。query_value(Any) — 辅助参数,如果不便使用kwargs参数时使用。kwargs(Any) — 查询字段和值,例如id=1234567890。只能查询一个字段,且该字段必须有索引。
Returns
如果未查询到匹配数据,则返回 None。如果查询到数据,则返回查询到的第一行数据。 返回 np.record (c-struct) 格式。
Examples
::
item = await session.using(Item).get(id=1234567890)
range
range(
index_name: str | None = None,
_left: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
_right: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
limit: int = 10,
desc: bool = False,
**kwargs: tuple[numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool, numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool],
) -> numpy.rec.recarraySource: hetu/data/backend/repo.py:223
从数据库查询索引,返回区间内数据,限制 limit 条。
本指令会去数据库执行 1+(limit-缓存命中) 次查询,至少要进行1次数据库查询。
与 get 不同,本方法的区间匹配只读取已提交的数据,不会读取当前事务中未提交
的修改:当前事务内新 insert 的行、或索引字段被改动的行,不会反映在返回结果里
(但已 delete 的行仍会被正确排除)。如需读取事务内新插入的行,请改用 get。
Parameters
index_name(Any) — 辅助参数,如果不便使用kwargs参数时使用。_left(Any) — 辅助参数,如果不便使用kwargs参数时使用。_right(Any) — 辅助参数,如果不便使用kwargs参数时使用。kwargs(Any) — 查询字段和区间,例如level=(1, 10)。只能查询一个字段,且该字段必须有索引。 默认闭区间,如果要自定义区间,请转换为字符串并开头指定(或[。
- 如果要查询的字段和参数冲突,请使用辅助参数方式。
limit(Any) — 限制返回的行数,越少越快。负数表示不限制行数。desc(Any) — 是否降序排列
Returns
返回 numpy.recarray,如果没有查询到数据,返回空 numpy.recarray。
numpy.recarray 是一种 c-struct array。
Notes
如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::
items = await session.using(Item).range(level=(10, 20), limit=100)
few_items = items[items.amount < 10]
由于python numpy支持SIMD,比直接在数据库复合查询快。
insert
insert(row: numpy.record, ignore: str | None = None) -> NoneSource: hetu/data/backend/repo.py:320
向Session中添加一行待插入数据。
Parameters
row(Any) — 待插入的行数据,必须是c-struct格式。ignore(Any) — 排除remote检测Unique冲突的index,内部使用,请勿设置。
update
update(row: numpy.record) -> NoneSource: hetu/data/backend/repo.py:341
向Session中添加一行待更新数据。
Parameters
row(np.record) — 待更新的行数据,必须是c-struct格式。
upsert
upsert(
**kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> hetu.data.backend.repo.UpsertContextSource: hetu/data/backend/repo.py:371
使用async with语法,根据Unique索引,查询并返回一行数据,如果不存在则返回新行数据。 在退出上下文时,自动插入新行,或是更新已有行。
Parameters
kwargs(Any) — 查询字段和值,例如id=1234567890。只能查询一个字段,且该字段必须为unique索引。
Examples
::
async with session.using(Order).upsert(id=1234567890) as order:
order.status = "completed"
delete
delete(row_id: int) -> NoneSource: hetu/data/backend/repo.py:400
向Session中添加一行待删除数据。
Parameters
row_id(int) — 待删除行的主键ID。
SystemClusters
SystemClusters(*args, **kwargs)Source: hetu/system/definer.py:43
储存所有System,并分类成共置簇。可以通过它查询System的定义信息,和所属簇id。簇id每次服务器 启动时按簇大小排序分配,每个namespace下的簇id从0重新分配。 System之间components有交集的,表示这些System互相之间可能有事务冲突,形成一个共置簇, 簇和簇之间无交集,绝不会有事务冲突。
此类只负责储存定义,调度器通过此类查询System信息。
Attributes
main_namespace(Any) — No description.
Methods
get_startup_systems
get_startup_systems(namespace: str | None = None) -> list[str]Source: hetu/system/definer.py:96
返回所有标记了 on_start=True 的 System 名字(按定义顺序)。
Return names of all systems marked with on_start=True (in definition order).
get_components
get_components(namespace: str | None = None) -> dict[type[BaseComponent], int]Source: hetu/system/definer.py:109
返回所有被System引用过的Component及其所属簇id
build_endpoints
build_endpoints()Source: hetu/system/definer.py:265
把System定义复制到EndpointDefines中,作为Endpoint使用
SystemContext
SystemContext(
caller: int,
connection_id: int,
address: str,
group: str,
user_data: dict[str, typing.Any],
timestamp: float,
request: Request,
systems: SystemCaller,
guard_state: dict[str, typing.Any] = <factory>,
client_limits: list[list[int]] = <factory>,
server_limits: list[list[int]] = <factory>,
max_row_sub: int = 0,
max_index_sub: int = 0,
race_count: int = 0,
repo: dict[type[BaseComponent], SessionRepository] = <factory>,
depend: dict[str, typing.Callable] = <factory>,
) -> NoneSource: hetu/system/context.py:18
Bases: EndpointContext
System调用时的上下文,继承自 Context 并添加事务相关属性。
每次System执行时由engine创建并作为第一个参数 ctx 传入。
Attributes
race_count(int) — 当前事务因RaceCondition重试的次数,初始为0。注意:timeout引发的再次 触发会从0重新计数。repo(dict[type[BaseComponent],SessionRepository]) — 当前事务的repo字典,键为Component类,值为对应的SessionRepository。 通过ctx.repo[ComponentClass]取得并执行get/range/upsert/update/delete等CRUD操作;事务结束时统一提交。depend(dict[str, Callable]) —depends依赖中声明的父事务函数字典,键为System名(可带":副本名"后缀)。 通过ctx.depend["system_name"](ctx, ...)调用以在同一事务中执行。
Methods
session_commit
session_commit() -> NoneSource: hetu/system/context.py:38
提前显式结束事务,提交所有写入操作。如果遇到事务冲突,会抛出异常,因此后续的代码行不会执行。
注意:调用完 session_commit,ctx 将不再能够读写 repo 。且后续不再属于事务,
也就是说遇到宕机/crash可能导致整个函数执行不完全。
Returns
如果有任何其他失败,抛出以下异常:redis.exceptions,RaceCondition。 异常一般无需特别处理,系统的默认处理方式为:遇到RaceCondition异常,上游系统会自动重试。 其他任何异常会记录日志并断开客户端连接。
session_discard
session_discard() -> NoneSource: hetu/system/context.py:55
提前显式结束事务,放弃所有写入操作。
注意:调用完 session_discard,ctx 将不再能够读写 repo 。且后续不再属于事务,
也就是说遇到宕机/crash可能导致整个函数执行不完全。
Table
Table(
comp_cls: type[BaseComponent],
instance_name: str,
cluster_id: int,
backend: Backend,
) -> NoneSource: hetu/data/backend/table.py:60
Bases: TableReference
Table表的地址信息加上所属的Backend,能够让你直接去数据库中操作此表。
Attributes
backend(Backend) — 内部数据库连接管理实例
Methods
servant_get
servant_get(
row_id: int,
row_format=<RowFormat.STRUCT: 1>,
) -> numpy.record | dict[str, Any] | NoneSource: hetu/data/backend/base.py:180
从数据库直接获取单行数据。
Parameters
row_id(Any) — row id主键row_format(Any) — 返回数据解码格式,见 “Returns”
Returns
如果未查询到匹配数据,则返回 None。
否则根据 row_format 参数返回以下格式之一:
- RowFormat.STRUCT - 默认值 返回 np.record (c-struct) 的单行数据
- RowFormat.RAW 返回无类型的原始数据 (dict[str, str])
- RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型。
servant_range
servant_range(
index_name: str,
left: int | float | str | bytes | bool,
right: int | float | str | bytes | bool | None = None,
limit: int = 10,
desc: bool = False,
row_format=<RowFormat.STRUCT: 1>,
)Source: hetu/data/backend/base.py:265
从数据库直接查询索引 index_name,返回在 [left, right] 闭区间内数据。
如果 right 为 None,则查询等于 left 的数据,限制 limit 条。
Parameters
index_name(Any) — 查询Component中的哪条索引left(Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。right(Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。limit(Any) — 限制返回的行数,越少越快。负数表示不限制行数。desc(Any) — 是否降序排列row_format(Any) — 返回数据解码格式,见 “Returns”
Returns
根据 row_format 参数返回以下格式之一:
- RowFormat.STRUCT - 默认值
返回
numpy.recarray,如果没有查询到数据,返回空numpy.recarray。numpy.recarray是一种 c-struct array。 - RowFormat.RAW 返回无类型的原始数据 (dict[str, str]) 列表,如果没有查询到数据,返回空list
- RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型列表,如果没有查询到数据,返回空list
- RowFormat.ID_LIST 返回查询到的 row id 列表,如果没有查询到数据,返回空list
Notes
如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::
items = client.range(ref, "owner", player_id, limit=100)
few_items = items[items.amount < 10]
由于python numpy支持SIMD,比直接在数据库复合查询快。
direct_set
direct_set(id_: int, **kwargs: str) -> NoneSource: hetu/data/backend/base.py:334
UNSAFE! 只用于易失数据! 不会做类型检查!
直接写入属性到数据库,避免session必须要执行get+事务2条指令。 仅支持非索引字段,索引字段更新是非原子性的,必须使用事务。 注意此方法可能导致写入数据到已删除的行,请确保逻辑。
一些系统级别的临时数据,使用直接写入的方式效率会更高,但不保证数据一致性。
TableReference
TableReference(
comp_cls: type[BaseComponent],
instance_name: str,
cluster_id: int,
) -> NoneSource: hetu/data/backend/table.py:33
Table表的地址信息,在后端,组件持久化的目标称为表。 组件实际储存在数据库中时,需要实例名和cluster id等信息,此类封装了这些信息。
Attributes
comp_cls(type[BaseComponent]) — 该表所属的组件类instance_name(str) — 该表所属服务器实例名cluster_id(int) — 该表所属的System簇IDcomp_name(str) — 获得组件名称
Methods
is_same_txn_group
is_same_txn_group(other: hetu.data.backend.table.TableReference) -> boolSource: hetu/data/backend/table.py:47
内部方法,判断和另一个TableReference
是否可以在同一事务中执行
cancel_future_call
cancel_future_call(ctx: hetu.system.context.SystemContext, key: str) -> boolSource: hetu/system/future.py:287
按 key 删除 ensure_future_call 创建的未来调用(停止 / 重配循环任务)。
返回 True 表示存在并已删除,False 表示该 key 没有对应的未来调用。重配间隔等参数: 先 cancel 再 ensure(ensure 是 ensure-exists,不会就地改参数)。
Cancel a keyed future call created by ensure_future_call. Returns True if it existed and was deleted, False otherwise. To reconfigure (e.g. change interval): cancel then ensure again.
Parameters
ctx(Any) — System默认变量key(Any) — 要删除的未来调用的幂等键,与 ensure_future_call 的 key 一致。
Returns
是否存在并删除
create_future_call
create_future_call(
ctx: hetu.system.context.SystemContext,
at: float,
system: str,
*args,
timeout: int = 60,
recurring: bool = False,
)Source: hetu/system/future.py:147
创建一个未来调用任务,到约定时间后会由内部进程执行该System。 未来调用储存在FutureCalls组件中,服务器重启不会丢失。 timeout不为0时,则保证目标System事务一定成功,且只执行一次。 只执行一次的保证通过call_lock引发的事务冲突实现,会强制要求定义System时开启call_lock。
Parameters
ctx(Any) — System默认变量at(Any) — 正数是执行的绝对时间(POSIX时间戳);负数是相对时间,表示延后几秒执行。system(Any) — 未来调用的目标system名*args(Any) — 目标system的参数,注意,只支持可以通过repr转义为string并不丢失信息的参数,比如基础类型。timeout(Any) — 再次调用时间(秒)。如果超过这个时间System调用依然没有成功,就会再次触发调用。 注意:代码错误/数据库错误也会引发timeout重试。如果是代码错误,虽然重试大概率还是失败, 但任务并不会丢失,等程序员修复完代码任务会再次伟大
如果设为0,则不重试,因此不保证任务成功,甚至会丢失。执行时遇到任何错误/程序关闭/Crash, 则未来调用丢失。
如果timeout再次触发时前一次执行还未完成,会引起事务竞态,其中一个事务会被抛弃。 如果前一次已经成功执行,call_lock会触发,跳过执行。
- 注意:抛弃的只有事务(所有ctx.repo[components]的操作),修改全局变量、写入文件等操作是永久的
- 注意:
ctx.race_count只是事务冲突的计数,timeout引起的再次触发会从0重新计数
recurring(Any) — 设置后,将永不删除此未来调用,每次执行后按timeout时间再次执行。
Returns
返回未来调用的uuid: int
Examples
>>> import hetu
>>> @hetu.define_system(namespace='test', permission=None)
... async def test_future_call(ctx: hetu.SystemContext, *args):
... # do ctx.repo[...] operations
... print('Future call test', args)
>>> @hetu.define_system(namespace='test', permission=hetu.Permission.USER, depends=('create_future_call:test',) )
... async def test_future_create(ctx: hetu.SystemContext):
... await ctx.depend['create_future_call:test'](ctx, -10, 'test_future_call', 'arg1', 'arg2', timeout=5)示例中,depends依赖使用’:‘符号创建了create_future_call的test副本。
继承System会和对方的簇合并,而create_future_call是常用System,所以使用副本避免System簇过于集中,
增加backend的扩展性,具体参考簇相关的文档。
Notes
- System执行时的Context是内部服务,而不是用户连接,无法获取用户ID,要自己作为参数传入
- 触发精度<=1秒,由每个Worker每秒运行一次循环检查并触发
ensure_future_call
ensure_future_call(
ctx: hetu.system.context.SystemContext,
key: str,
at: float,
system: str,
*args,
timeout: int = 60,
recurring: bool = False,
)Source: hetu/system/future.py:217
按 key 等幂地确保一个未来调用存在;已存在则原样保留(不更新参数),返回其 id。
与 create_future_call 相同,但用 key 做幂等:同一 key 多次调用只会创建一条。 适合在 on_start System 里播种"开机即起"的全局循环任务(recurring=True), 服务器重启多次也不会重复堆积。
幂等通过 key 的确定性 id 复用 FutureCalls 主键唯一性实现:已存在则直接返回(不写入, 事务空提交),并发同 key 的多余插入会撞主键引发事务竞态并自动重试,最终只保留一条。
Idempotently ensure a single future call exists, keyed by key; if it already
exists, keep it as-is (params are NOT updated) and return its id. Useful for seeding
a server-wide recurring background task from an on_start system without piling up
duplicates across restarts.
Parameters
ctx(Any) — System默认变量key(Any) — 幂等键。同一 key 只会存在一条未来调用。at(Any) — 同 create_future_call:正数为绝对 POSIX 时间戳;负数为相对延后秒数。system(Any) — 未来调用的目标 system 名。*args(Any) — 目标 system 的参数(须能 repr 往返还原,如基础类型)。timeout(Any) — 再次调用时间(秒),含义同 create_future_call;recurring=True 时不能为 0。recurring(Any) — 设置后永不删除,按 timeout 周期重复触发。
Returns
返回未来调用的 id: int(由 key 推导的确定性负数 id)
Examples
>>> import hetu
>>> @hetu.define_system(namespace='game', permission=None, call_lock=True,
... components=(World,))
... async def world_tick(ctx): ...
>>> @hetu.define_system(namespace='game', permission=None, on_start=True,
... depends=('ensure_future_call:game',))
... async def boot(ctx):
... await ctx.depend['ensure_future_call:game'](
... ctx, 'game:world_tick', -30, 'world_tick', recurring=True, timeout=30)开服时 on_start 跑一次 → ensure 幂等 → 重启 N 次也只有一条 → 由 future_call_task 每 ~1 秒轮询、全局只一个 worker 执行、timeout 重试、重启不丢。