Skip to content

System

SessionRepository

SessionRepository(session: Session, comp_cls: type[BaseComponent]) -> None

Source: hetu/data/backend/repo.py:36

从数据库查询数据并放入Session缓存,Systemctx.repo[...]返回的便是此类。

Attributes

Methods

remote_has_unique_conflicts_

remote_has_unique_conflicts_(row: numpy.record, fields: set) -> str | None

Source: hetu/data/backend/repo.py:64

内部方法,在远程数据库中检查Unique索引冲突。

is_unique_conflicts

is_unique_conflicts(
    row: numpy.record,
    insert=False,
    ignore: str | None = None,
) -> str | None

Source: hetu/data/backend/repo.py:104

检查一行数据的Unique索引在本地和远程数据库中是否有冲突。

Parameters

  • row (np.record) — 待检查的行数据,必须是 c-struct 格式。

  • insert (bool) — 如果为True,表示这是一个插入操作,否则是更新操作,要求之前已获取过旧数据。

  • ignore (str | None) — 排除检查远程数据库中的某列冲突。用于upsert,因为upsert会先自己检查。

get_by_id

get_by_id(row_id: numpy.int64 | int) -> numpy.record | None

Source: hetu/data/backend/repo.py:142

从数据库获取单行数据,并放入Session缓存。 本指令如果命中缓存,不会去数据库查询。

get

get(
    index_name: str | None = None,
    query_value: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
    **kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> numpy.record | None

Source: hetu/data/backend/repo.py:164

从数据库获取单行数据,并放入Session缓存。 推荐通过"id"主键查询,这样无须查询索引,如果缓存命中,不会去数据库查询;否则会执行1-2次查询。

Parameters

  • index_name (Any) — 辅助参数,如果不便使用kwargs参数时使用。

  • query_value (Any) — 辅助参数,如果不便使用kwargs参数时使用。

  • kwargs (Any) — 查询字段和值,例如 id=1234567890。只能查询一个字段,且该字段必须有索引。

Returns

如果未查询到匹配数据,则返回 None。如果查询到数据,则返回查询到的第一行数据。 返回 np.record (c-struct) 格式。

Examples

::

item = await session.using(Item).get(id=1234567890)

range

range(
    index_name: str | None = None,
    _left: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
    _right: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool | None = None,
    limit: int = 10,
    desc: bool = False,
    **kwargs: tuple[numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool, numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool],
) -> numpy.rec.recarray

Source: hetu/data/backend/repo.py:220

从数据库查询索引,返回区间内数据,限制 limit 条。 本指令会去数据库执行 1+(limit-缓存命中) 次查询,至少要进行1次数据库查询。

Parameters

  • index_name (Any) — 辅助参数,如果不便使用kwargs参数时使用。

  • _left (Any) — 辅助参数,如果不便使用kwargs参数时使用。

  • _right (Any) — 辅助参数,如果不便使用kwargs参数时使用。

  • kwargs (Any) — 查询字段和区间,例如 level=(1, 10)。只能查询一个字段,且该字段必须有索引。 默认闭区间,如果要自定义区间,请转换为字符串并开头指定 ([

  • 如果要查询的字段和参数冲突,请使用辅助参数方式。
  • limit (Any) — 限制返回的行数,越少越快。负数表示不限制行数。

  • desc (Any) — 是否降序排列

Returns

返回 numpy.recarray,如果没有查询到数据,返回空 numpy.recarraynumpy.recarray 是一种 c-struct array。

Notes

如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::

items = await session.using(Item).range(level=(10, 20), limit=100)
few_items = items[items.amount < 10]

由于python numpy支持SIMD,比直接在数据库复合查询快。

insert

insert(row: numpy.record, ignore: str | None = None) -> None

Source: hetu/data/backend/repo.py:310

向Session中添加一行待插入数据。

Parameters

  • row (Any) — 待插入的行数据,必须是 c-struct 格式。

  • ignore (Any) — 排除remote检测Unique冲突的index,内部使用,请勿设置。

update

update(row: numpy.record) -> None

Source: hetu/data/backend/repo.py:331

向Session中添加一行待更新数据。

Parameters

  • row (np.record) — 待更新的行数据,必须是 c-struct 格式。

upsert

upsert(
    **kwargs: numpy.integer | numpy.floating | numpy.str_ | numpy.bytes_ | numpy.bool | float | str | bytes | bool,
) -> hetu.data.backend.repo.UpsertContext

Source: hetu/data/backend/repo.py:361

使用async with语法,根据Unique索引,查询并返回一行数据,如果不存在则返回新行数据。 在退出上下文时,自动插入新行,或是更新已有行。

Parameters

  • kwargs (Any) — 查询字段和值,例如 id=1234567890。只能查询一个字段,且该字段必须为unique索引。

Examples

::

async with session.using(Order).upsert(id=1234567890) as order:
    order.status = "completed"

delete

delete(row_id: int) -> None

Source: hetu/data/backend/repo.py:390

向Session中添加一行待删除数据。

Parameters

  • row_id (int) — 待删除行的主键ID。

SystemClusters

SystemClusters(*args, **kwargs)

Source: hetu/system/definer.py:41

储存所有System,并分类成共置簇。可以通过它查询System的定义信息,和所属簇id。簇id每次服务器 启动时按簇大小排序分配,每个namespace下的簇id从0重新分配。 System之间components有交集的,表示这些System互相之间可能有事务冲突,形成一个共置簇, 簇和簇之间无交集,绝不会有事务冲突。

此类只负责储存定义,调度器通过此类查询System信息。

Attributes

  • main_namespace (Any) — No description.

Methods

get_components

get_components(namespace: str | None = None) -> dict[type[BaseComponent], int]

Source: hetu/system/definer.py:99

返回所有被System引用过的Component及其所属簇id

build_endpoints

build_endpoints()

Source: hetu/system/definer.py:255

把System定义复制到EndpointDefines中,作为Endpoint使用


SystemContext

SystemContext(
    caller: int,
    connection_id: int,
    address: str,
    group: str,
    user_data: dict[str, typing.Any],
    timestamp: float,
    request: Request,
    systems: SystemCaller,
    client_limits: list[list[int]] = <factory>,
    server_limits: list[list[int]] = <factory>,
    max_row_sub: int = 0,
    max_index_sub: int = 0,
    race_count: int = 0,
    repo: dict[type[BaseComponent], SessionRepository] = <factory>,
    depend: dict[str, typing.Callable] = <factory>,
) -> None

Source: hetu/system/context.py:18

Bases: EndpointContext

System调用时的上下文,继承自 Context 并添加事务相关属性。 每次System执行时由engine创建并作为第一个参数 ctx 传入。

Attributes

  • race_count (int) — 当前事务因 RaceCondition 重试的次数,初始为0。注意:timeout 引发的再次 触发会从0重新计数。

  • repo (dict[type[BaseComponent ], SessionRepository ]) — 当前事务的repo字典,键为Component类,值为对应的 SessionRepository 。 通过 ctx.repo[ComponentClass] 取得并执行 get / range / upsert / update / delete 等CRUD操作;事务结束时统一提交。

  • depend (dict[str, Callable]) — depends 依赖中声明的父事务函数字典,键为System名(可带":副本名"后缀)。 通过 ctx.depend["system_name"](ctx, ...) 调用以在同一事务中执行。

Methods

session_commit

session_commit() -> None

Source: hetu/system/context.py:38

提前显式结束事务,提交所有写入操作。如果遇到事务冲突,会抛出异常,因此后续的代码行不会执行。 注意:调用完 session_commitctx 将不再能够读写 repo 。且后续不再属于事务, 也就是说遇到宕机/crash可能导致整个函数执行不完全。

Returns

如果有任何其他失败,抛出以下异常:redis.exceptions,RaceCondition。 异常一般无需特别处理,系统的默认处理方式为:遇到RaceCondition异常,上游系统会自动重试。 其他任何异常会记录日志并断开客户端连接。

session_discard

session_discard() -> None

Source: hetu/system/context.py:55

提前显式结束事务,放弃所有写入操作。 注意:调用完 session_discardctx 将不再能够读写 repo 。且后续不再属于事务, 也就是说遇到宕机/crash可能导致整个函数执行不完全。


Table

Table(
    comp_cls: type[BaseComponent],
    instance_name: str,
    cluster_id: int,
    backend: Backend,
) -> None

Source: hetu/data/backend/table.py:60

Bases: TableReference

Table表的地址信息加上所属的Backend,能够让你直接去数据库中操作此表。

Attributes

  • backend (Backend) — 内部数据库连接管理实例

Methods

servant_get

servant_get(
    row_id: int,
    row_format=<RowFormat.STRUCT: 1>,
) -> numpy.record | dict[str, Any] | None

Source: hetu/data/backend/base.py:180

从数据库直接获取单行数据。

Parameters

  • row_id (Any) — row id主键

  • row_format (Any) — 返回数据解码格式,见 “Returns”

Returns

如果未查询到匹配数据,则返回 None。 否则根据 row_format 参数返回以下格式之一:

  • RowFormat.STRUCT - 默认值 返回 np.record (c-struct) 的单行数据
  • RowFormat.RAW 返回无类型的原始数据 (dict[str, str])
  • RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型。

servant_range

servant_range(
    index_name: str,
    left: int | float | str | bytes | bool,
    right: int | float | str | bytes | bool | None = None,
    limit: int = 10,
    desc: bool = False,
    row_format=<RowFormat.STRUCT: 1>,
)

Source: hetu/data/backend/base.py:265

从数据库直接查询索引 index_name,返回在 [left, right] 闭区间内数据。 如果 rightNone,则查询等于 left 的数据,限制 limit 条。

Parameters

  • index_name (Any) — 查询Component中的哪条索引

  • left (Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。

  • right (Any) — 查询范围,闭区间。字符串查询时,可以在开头指定是[闭区间,还是(开区间。 如果right不填写,则精确查询等于left的数据。

  • limit (Any) — 限制返回的行数,越少越快。负数表示不限制行数。

  • desc (Any) — 是否降序排列

  • row_format (Any) — 返回数据解码格式,见 “Returns”

Returns

根据 row_format 参数返回以下格式之一:

  • RowFormat.STRUCT - 默认值 返回 numpy.recarray,如果没有查询到数据,返回空 numpy.recarraynumpy.recarray 是一种 c-struct array。
  • RowFormat.RAW 返回无类型的原始数据 (dict[str, str]) 列表,如果没有查询到数据,返回空list
  • RowFormat.TYPED_DICT 返回符合Component定义的,有格式的dict类型列表,如果没有查询到数据,返回空list
  • RowFormat.ID_LIST 返回查询到的 row id 列表,如果没有查询到数据,返回空list

Notes

如何复合条件查询? 请利用python的特性,先在数据库上筛选出最少量的数据,然后本地二次筛选::

items = client.range(ref, "owner", player_id, limit=100)
few_items = items[items.amount < 10]

由于python numpy支持SIMD,比直接在数据库复合查询快。

direct_set

direct_set(id_: int, **kwargs: str) -> None

Source: hetu/data/backend/base.py:334

UNSAFE! 只用于易失数据! 不会做类型检查!

直接写入属性到数据库,避免session必须要执行get+事务2条指令。 仅支持非索引字段,索引字段更新是非原子性的,必须使用事务。 注意此方法可能导致写入数据到已删除的行,请确保逻辑。

一些系统级别的临时数据,使用直接写入的方式效率会更高,但不保证数据一致性。


TableReference

TableReference(
    comp_cls: type[BaseComponent],
    instance_name: str,
    cluster_id: int,
) -> None

Source: hetu/data/backend/table.py:33

Table表的地址信息,在后端,组件持久化的目标称为表。 组件实际储存在数据库中时,需要实例名和cluster id等信息,此类封装了这些信息。

Attributes

  • comp_cls (type[BaseComponent ]) — 该表所属的组件类

  • instance_name (str) — 该表所属服务器实例名

  • cluster_id (int) — 该表所属的System簇ID

  • comp_name (str) — 获得组件名称

Methods

is_same_txn_group

is_same_txn_group(other: hetu.data.backend.table.TableReference) -> bool

Source: hetu/data/backend/table.py:47

内部方法,判断和另一个TableReference 是否可以在同一事务中执行


create_future_call

create_future_call(
    ctx: hetu.system.context.SystemContext,
    at: float,
    system: str,
    *args,
    timeout: int = 60,
    recurring: bool = False,
)

Source: hetu/system/future.py:48

创建一个未来调用任务,到约定时间后会由内部进程执行该System。 未来调用储存在FutureCalls组件中,服务器重启不会丢失。 timeout不为0时,则保证目标System事务一定成功,且只执行一次。 只执行一次的保证通过call_lock引发的事务冲突实现,会强制要求定义System时开启call_lock。

Parameters

  • ctx (Any) — System默认变量

  • at (Any) — 正数是执行的绝对时间(POSIX时间戳);负数是相对时间,表示延后几秒执行。

  • system (Any) — 未来调用的目标system名

  • *args (Any) — 目标system的参数,注意,只支持可以通过repr转义为string并不丢失信息的参数,比如基础类型。

  • timeout (Any) — 再次调用时间(秒)。如果超过这个时间System调用依然没有成功,就会再次触发调用。 注意:代码错误/数据库错误也会引发timeout重试。如果是代码错误,虽然重试大概率还是失败, 但任务并不会丢失,等程序员修复完代码任务会再次伟大

如果设为0,则不重试,因此不保证任务成功,甚至会丢失。执行时遇到任何错误/程序关闭/Crash, 则未来调用丢失。

如果timeout再次触发时前一次执行还未完成,会引起事务竞态,其中一个事务会被抛弃。 如果前一次已经成功执行,call_lock会触发,跳过执行。

  • 注意:抛弃的只有事务(所有ctx.repo[components]的操作),修改全局变量、写入文件等操作是永久的
  • 注意:ctx.race_count只是事务冲突的计数,timeout引起的再次触发会从0重新计数
  • recurring (Any) — 设置后,将永不删除此未来调用,每次执行后按timeout时间再次执行。

Returns

返回未来调用的uuid: int

Examples

>>> import hetu
>>> @hetu.define_system(namespace='test', permission=None)
... def test_future_call(ctx: hetu.SystemContext, *args):
...     # do ctx.repo[...] operations
...     print('Future call test', args)
>>> @hetu.define_system(namespace='test', permission=hetu.Permission.USER, depends=('create_future_call:test') )
... def test_future_create(ctx: hetu.SystemContext):
...     ctx.depend['create_future_call:test'](ctx, -10, 'test_future_call', 'arg1', 'arg2', timeout=5)

示例中,depends依赖使用’:‘符号创建了create_future_call的test副本。 继承System会和对方的簇合并,而create_future_call是常用System,所以使用副本避免System簇过于集中, 增加backend的扩展性,具体参考簇相关的文档。

Notes

  • System执行时的Context是内部服务,而不是用户连接,无法获取用户ID,要自己作为参数传入
  • 触发精度<=1秒,由每个Worker每秒运行一次循环检查并触发