System
SessionRepository
SessionRepository(session: Session, comp_cls: type[BaseComponent]) -> NoneSource: hetu/data/backend/repo.py:36
从数据库查询数据并放入Session缓存,System的ctx.repo[...]返回的便是此类。
Attributes
ref(TableReference) —TableReference对象,表示当前repo关联的数据库Table信息session(Session) — 获取所属的内部Session对象。
Methods
remote_has_unique_conflicts_
remote_has_unique_conflicts_(row: numpy.record, fields: set) -> str | NoneSource: hetu/data/backend/repo.py:64
内部方法,在远程数据库中检查Unique索引冲突。
is_unique_conflicts
is_unique_conflicts(
row: numpy.record,
insert=False,
ignore: str | None = None,
) -> str | NoneSource: hetu/data/backend/repo.py:104
检查一行数据的Unique索引在本地和远程数据库中是否有冲突。
Parameters
row(np.record) — 待检查的行数据,必须是c-struct格式。insert(bool) — 如果为True,表示这是一个插入操作,否则是更新操作,要求之前已获取过旧数据。ignore(str | None) — 排除检查远程数据库中的某列冲突。用于upsert,因为upsert会先自己检查。
get_by_id
get_by_id(row_id: numpy.int64 | int) -> numpy.record | NoneSource: hetu/data/backend/repo.py:142
从数据库获取单行数据,并放入Session缓存。
本指令如果命中缓存,不会去数据库查询。
get
get(
index_name: str | None = None,
query_value: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
**kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> numpy.record | NoneSource: hetu/data/backend/repo.py:164
从数据库获取单行数据,并放入Session缓存。 推荐通过"id"主键查询,这样无须查询索引,如果缓存命中,不会去数据库查询;否则会执行1-2次查询。
Parameters
index_name(Any) — 辅助参数,如果不便使用kwargs参数时使用。query_value(Any) — 辅助参数,如果不便使用kwargs参数时使用。kwargs(Any) — 查询字段和值,例如id=1234567890。只能查询一个字段,且该字段必须有索引。
Returns
如果未查询到匹配数据,则返回 None。如果查询到数据,则返回查询到的第一行数据。 返回 np.record (c-struct) 格式。
Examples
::
item = await session.using(Item).get(id=1234567890)
range
range(
index_name: str | None = None,
_left: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
_right: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
limit: int = 10,
desc: bool = False,
**kwargs: tuple[numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool, numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool],
) -> numpy.rec.recarraySource: hetu/data/backend/repo.py:220
从数据库查询索引,返回区间内数据,限制 limit 条。
本指令会去数据库执行 1+(limit-缓存命中) 次查询,至少要进行1次数据库查询。
Parameters
index_name(Any) — 辅助参数,如果不便使用kwargs参数时使用。_left(Any) — 辅助参数,如果不便使用kwargs参数时使用。_right(Any) — 辅助参数,如果不便使用kwargs参数时使用。kwargs(Any) — 查询字段和区间,例如level=(1, 10)。只能查询一个字段,且该字段必须有索引。 默认闭区间,如果要自定义区间,请转换为字符串并开头指定(或[。
- 如果要查询的字段和参数冲突,请使用辅助参数方式。
limit(Any) — 限制返回的行数,越少越快。负数表示不限制行数。desc(Any) — 是否降序排列
Returns
返回 numpy.recarray,如果没有查询到数据,返回空 numpy.recarray。
numpy.recarray 是一种 c-struct array。
Notes
如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::
items = await session.using(Item).range(level=(10, 20), limit=100)
few_items = items[items.amount < 10]
由于python numpy支持SIMD,比直接在数据库复合查询快。
insert
insert(row: numpy.record, ignore: str | None = None) -> NoneSource: hetu/data/backend/repo.py:310
向Session中添加一行待插入数据。
Parameters
row(Any) — 待插入的行数据,必须是c-struct格式。ignore(Any) — 排除remote检测Unique冲突的index,内部使用,请勿设置。
update
update(row: numpy.record) -> NoneSource: hetu/data/backend/repo.py:331
向Session中添加一行待更新数据。
Parameters
row(np.record) — 待更新的行数据,必须是c-struct格式。
upsert
upsert(
**kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> hetu.data.backend.repo.UpsertContextSource: hetu/data/backend/repo.py:361
使用async with语法,根据Unique索引,查询并返回一行数据,如果不存在则返回新行数据。 在退出上下文时,自动插入新行,或是更新已有行。
Parameters
kwargs(Any) — 查询字段和值,例如id=1234567890。只能查询一个字段,且该字段必须为unique索引。
Examples
::
async with session.using(Order).upsert(id=1234567890) as order:
order.status = "completed"
delete
delete(row_id: int) -> NoneSource: hetu/data/backend/repo.py:390
向Session中添加一行待删除数据。
Parameters
row_id(int) — 待删除行的主键ID。
SystemClusters
SystemClusters(*args, **kwargs)Source: hetu/system/definer.py:41
储存所有System,并分类成共置簇。可以通过它查询System的定义信息,和所属簇id。簇id每次服务器 启动时按簇大小排序分配,每个namespace下的簇id从0重新分配。 System之间components有交集的,表示这些System互相之间可能有事务冲突,形成一个共置簇, 簇和簇之间无交集,绝不会有事务冲突。
此类只负责储存定义,调度器通过此类查询System信息。
Attributes
main_namespace(Any) — No description.
Methods
get_components
get_components(namespace: str | None = None) -> dict[type[BaseComponent], int]Source: hetu/system/definer.py:99
返回所有被System引用过的Component及其所属簇id
build_endpoints
build_endpoints()Source: hetu/system/definer.py:255
把System定义复制到EndpointDefines中,作为Endpoint使用
SystemContext
SystemContext(
caller: int,
connection_id: int,
address: str,
group: str,
user_data: dict[str, typing.Any],
timestamp: float,
request: Request,
systems: SystemCaller,
client_limits: list[list[int]] = <factory>,
server_limits: list[list[int]] = <factory>,
max_row_sub: int = 0,
max_index_sub: int = 0,
race_count: int = 0,
repo: dict[type[BaseComponent], SessionRepository] = <factory>,
depend: dict[str, typing.Callable] = <factory>,
) -> NoneSource: hetu/system/context.py:18
Bases: EndpointContext
System调用时的上下文,继承自 Context 并添加事务相关属性。
每次System执行时由engine创建并作为第一个参数 ctx 传入。
Attributes
race_count(int) — 当前事务因RaceCondition重试的次数,初始为0。注意:timeout引发的再次 触发会从0重新计数。repo(dict[type[BaseComponent],SessionRepository]) — 当前事务的repo字典,键为Component类,值为对应的SessionRepository。 通过ctx.repo[ComponentClass]取得并执行get/range/upsert/update/delete等CRUD操作;事务结束时统一提交。depend(dict[str, Callable]) —depends依赖中声明的父事务函数字典,键为System名(可带":副本名"后缀)。 通过ctx.depend["system_name"](ctx, ...)调用以在同一事务中执行。
Methods
session_commit
session_commit() -> NoneSource: hetu/system/context.py:38
提前显式结束事务,提交所有写入操作。如果遇到事务冲突,会抛出异常,因此后续的代码行不会执行。
注意:调用完 session_commit,ctx 将不再能够读写 repo 。且后续不再属于事务,
也就是说遇到宕机/crash可能导致整个函数执行不完全。
Returns
如果有任何其他失败,抛出以下异常:redis.exceptions,RaceCondition。 异常一般无需特别处理,系统的默认处理方式为:遇到RaceCondition异常,上游系统会自动重试。 其他任何异常会记录日志并断开客户端连接。
session_discard
session_discard() -> NoneSource: hetu/system/context.py:55
提前显式结束事务,放弃所有写入操作。
注意:调用完 session_discard,ctx 将不再能够读写 repo 。且后续不再属于事务,
也就是说遇到宕机/crash可能导致整个函数执行不完全。
Table
Table(
comp_cls: type[BaseComponent],
instance_name: str,
cluster_id: int,
backend: Backend,
) -> NoneSource: hetu/data/backend/table.py:60
Bases: TableReference
Table表的地址信息加上所属的Backend,能够让你直接去数据库中操作此表。
Attributes
backend(Backend) — 内部数据库连接管理实例
Methods
servant_get
servant_get(
row_id: int,
row_format=<RowFormat.STRUCT: 1>,
) -> numpy.record | dict[str, Any] | NoneSource: hetu/data/backend/base.py:180
从数据库直接获取单行数据。
Parameters
row_id(Any) — row id主键row_format(Any) — 返回数据解码格式,见 “Returns”
Returns
如果未查询到匹配数据,则返回 None。
否则根据 row_format 参数返回以下格式之一:
- RowFormat.STRUCT - 默认值 返回 np.record (c-struct) 的单行数据
- RowFormat.RAW 返回无类型的原始数据 (dict[str, str])
- RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型。
servant_range
servant_range(
index_name: str,
left: int | float | str | bytes | bool,
right: int | float | str | bytes | bool | None = None,
limit: int = 10,
desc: bool = False,
row_format=<RowFormat.STRUCT: 1>,
)Source: hetu/data/backend/base.py:265
从数据库直接查询索引 index_name,返回在 [left, right] 闭区间内数据。
如果 right 为 None,则查询等于 left 的数据,限制 limit 条。
Parameters
index_name(Any) — 查询Component中的哪条索引left(Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。right(Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。limit(Any) — 限制返回的行数,越少越快。负数表示不限制行数。desc(Any) — 是否降序排列row_format(Any) — 返回数据解码格式,见 “Returns”
Returns
根据 row_format 参数返回以下格式之一:
- RowFormat.STRUCT - 默认值
返回
numpy.recarray,如果没有查询到数据,返回空numpy.recarray。numpy.recarray是一种 c-struct array。 - RowFormat.RAW 返回无类型的原始数据 (dict[str, str]) 列表,如果没有查询到数据,返回空list
- RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型列表,如果没有查询到数据,返回空list
- RowFormat.ID_LIST 返回查询到的 row id 列表,如果没有查询到数据,返回空list
Notes
如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::
items = client.range(ref, "owner", player_id, limit=100)
few_items = items[items.amount < 10]
由于python numpy支持SIMD,比直接在数据库复合查询快。
direct_set
direct_set(id_: int, **kwargs: str) -> NoneSource: hetu/data/backend/base.py:334
UNSAFE! 只用于易失数据! 不会做类型检查!
直接写入属性到数据库,避免session必须要执行get+事务2条指令。 仅支持非索引字段,索引字段更新是非原子性的,必须使用事务。 注意此方法可能导致写入数据到已删除的行,请确保逻辑。
一些系统级别的临时数据,使用直接写入的方式效率会更高,但不保证数据一致性。
TableReference
TableReference(
comp_cls: type[BaseComponent],
instance_name: str,
cluster_id: int,
) -> NoneSource: hetu/data/backend/table.py:33
Table表的地址信息,在后端,组件持久化的目标称为表。 组件实际储存在数据库中时,需要实例名和cluster id等信息,此类封装了这些信息。
Attributes
comp_cls(type[BaseComponent]) — 该表所属的组件类instance_name(str) — 该表所属服务器实例名cluster_id(int) — 该表所属的System簇IDcomp_name(str) — 获得组件名称
Methods
is_same_txn_group
is_same_txn_group(other: hetu.data.backend.table.TableReference) -> boolSource: hetu/data/backend/table.py:47
内部方法,判断和另一个TableReference
是否可以在同一事务中执行
create_future_call
create_future_call(
ctx: hetu.system.context.SystemContext,
at: float,
system: str,
*args,
timeout: int = 60,
recurring: bool = False,
)Source: hetu/system/future.py:48
创建一个未来调用任务,到约定时间后会由内部进程执行该System。 未来调用储存在FutureCalls组件中,服务器重启不会丢失。 timeout不为0时,则保证目标System事务一定成功,且只执行一次。 只执行一次的保证通过call_lock引发的事务冲突实现,会强制要求定义System时开启call_lock。
Parameters
ctx(Any) — System默认变量at(Any) — 正数是执行的绝对时间(POSIX时间戳);负数是相对时间,表示延后几秒执行。system(Any) — 未来调用的目标system名*args(Any) — 目标system的参数,注意,只支持可以通过repr转义为string并不丢失信息的参数,比如基础类型。timeout(Any) — 再次调用时间(秒)。如果超过这个时间System调用依然没有成功,就会再次触发调用。 注意:代码错误/数据库错误也会引发timeout重试。如果是代码错误,虽然重试大概率还是失败, 但任务并不会丢失,等程序员修复完代码任务会再次伟大
如果设为0,则不重试,因此不保证任务成功,甚至会丢失。执行时遇到任何错误/程序关闭/Crash, 则未来调用丢失。
如果timeout再次触发时前一次执行还未完成,会引起事务竞态,其中一个事务会被抛弃。 如果前一次已经成功执行,call_lock会触发,跳过执行。
- 注意:抛弃的只有事务(所有ctx.repo[components]的操作),修改全局变量、写入文件等操作是永久的
- 注意:
ctx.race_count只是事务冲突的计数,timeout引起的再次触发会从0重新计数
recurring(Any) — 设置后,将永不删除此未来调用,每次执行后按timeout时间再次执行。
Returns
返回未来调用的uuid: int
Examples
>>> import hetu
>>> @hetu.define_system(namespace='test', permission=None)
... def test_future_call(ctx: hetu.SystemContext, *args):
... # do ctx.repo[...] operations
... print('Future call test', args)
>>> @hetu.define_system(namespace='test', permission=hetu.Permission.USER, depends=('create_future_call:test') )
... def test_future_create(ctx: hetu.SystemContext):
... ctx.depend['create_future_call:test'](ctx, -10, 'test_future_call', 'arg1', 'arg2', timeout=5)示例中,depends依赖使用’:‘符号创建了create_future_call的test副本。
继承System会和对方的簇合并,而create_future_call是常用System,所以使用副本避免System簇过于集中,
增加backend的扩展性,具体参考簇相关的文档。
Notes
- System执行时的Context是内部服务,而不是用户连接,无法获取用户ID,要自己作为参数传入
- 触发精度<=1秒,由每个Worker每秒运行一次循环检查并触发