Tutorial: Chat Room
In this tutorial you’ll build a multi-user chat room. By the end you’ll have exercised every major HeTu concept: typed components, server-side systems, subscription-based realtime updates, permission checks, and lifecycle hooks.
The complete reference implementation is at
examples/chat/server/src/app.py
.
What you’ll build
- A presence component (who’s online)
- A chat-history component
- Login, send-message, and quit RPCs
- A subscription that pushes new messages to every client in real time
- A disconnect hook that cleans up presence
Step 1 — Define ChatMessage
Components are typed tables. Add this to src/app.py:
import time
import numpy as np
import hetu
@hetu.define_component(namespace="Chat", permission=hetu.Permission.EVERYBODY)
class ChatMessage(hetu.BaseComponent):
owner: np.int64 = hetu.property_field(0, index=True)
name: str = hetu.property_field("", dtype="U32")
text: str = hetu.property_field("", dtype="U256")
kind: str = hetu.property_field("chat", dtype="U16")
created_at_ms: np.int64 = hetu.property_field(0, index=True)Notes on this declaration:
permission=Permission.EVERYBODYlets unauthenticated clients read this table. (Writes still go throughSystems, which can have stricter permissions.)index=Trueonownerandcreated_at_msbuilds sorted indexes that clients canrange-query.dtype="U256"declares a fixed-width 256-character UTF-32 column. Strings are stored in NumPy structured arrays, which is why widths are explicit.
Step 2 — Define OnlineUser (presence)
Add a second component for who’s currently connected:
@hetu.define_component(namespace="Chat", permission=hetu.Permission.EVERYBODY)
class OnlineUser(hetu.BaseComponent):
owner: np.int64 = hetu.property_field(0, unique=True)
name: str = hetu.property_field("", unique=True, dtype="U32")
online: bool = hetu.property_field(False)
last_seen_ms: np.int64 = hetu.property_field(0)unique=True on owner and name enforces uniqueness at insert time and
also creates a fast point-lookup index.
Step 3 — user_login System
Systems are async functions that run inside a transaction. They receive a
SystemContext (typically ctx) plus your RPC arguments.
def _now_ms() -> int:
return int(time.time() * 1000)
async def _insert_message(ctx, owner, name, text, kind):
row = ChatMessage.new_row()
row.owner = owner
row.name = name
row.text = text
row.kind = kind
row.created_at_ms = _now_ms()
await ctx.repo[ChatMessage].insert(row)
@hetu.define_system(
namespace="Chat",
components=(OnlineUser, ChatMessage),
permission=hetu.Permission.EVERYBODY,
)
async def user_login(ctx: hetu.SystemContext, user_id: int, name: str):
await hetu.elevate(ctx, int(user_id), kick_logged_in=True)
async with ctx.repo[OnlineUser].upsert(owner=ctx.caller) as row:
row.name = name
row.online = True
row.last_seen_ms = _now_ms()
ctx.user_data["me"] = row
await _insert_message(
ctx, owner=ctx.caller, name=name,
text=f"{name} joined the chat", kind="system",
)Two things worth pointing out:
hetu.elevate(ctx, user_id)promotes this connection from anonymous to user-authenticated. Everything after it on the same connection runs withctx.caller == user_idand passesPermission.USERchecks. (Real applications will validateuser_idagainst an external auth provider before callingelevate.)ctx.user_datais a per-connection dictionary that survives across RPC calls. We stash the user’sOnlineUserrow so later systems don’t have to re-query it.
Step 4 — user_chat System
The actual “send a message” RPC:
@hetu.define_system(
namespace="Chat", components=(ChatMessage,),
permission=hetu.Permission.USER,
)
async def user_chat(ctx: hetu.SystemContext, text: str):
me = ctx.user_data["me"]
assert me and me.online, "call user_login first"
await _insert_message(
ctx, owner=ctx.caller, name=me.name, text=text, kind="chat",
)permission=Permission.USER means only connections that have been through
elevate can call this — anonymous clients get an error before the function
body runs.
Step 5 — user_quit and on_disconnect
Cleanly mark a user offline:
@hetu.define_system(
namespace="Chat", components=(OnlineUser, ChatMessage),
permission=hetu.Permission.USER,
)
async def user_quit(ctx: hetu.SystemContext):
if row := await ctx.repo[OnlineUser].get(owner=ctx.caller):
row.online = False
row.last_seen_ms = _now_ms()
await ctx.repo[OnlineUser].update(row)
await _insert_message(
ctx, owner=ctx.caller, name=row.name,
text=f"{row.name} left the chat", kind="system",
)
@hetu.define_system(
namespace="Chat", components=(OnlineUser,),
depends=("user_quit",), permission=None,
)
async def on_disconnect(ctx: hetu.SystemContext):
await ctx.depend["user_quit"](ctx)on_disconnect is special:
permission=Nonemeans the client cannot call it directly.- HeTu fires it automatically when a websocket connection closes.
depends=("user_quit",)lets us reuseuser_quit’s implementation viactx.depend["user_quit"](ctx).
Step 6 — Run it
Save src/app.py and start the server (SQLite for local dev):
uv run hetu start \
--app-file=./src/app.py \
--db=sqlite:///./chat.db \
--namespace=Chat \
--instance=devThe provided example also ships a config.yml you can use instead:
cd examples/chat/server
uv run hetu start --config=./config.ymlStep 7 — Subscribe from a client
In Unity, subscribe to the chat history and react to new messages:
// Fire and forget connect
// In practice, this should be wrapped within an asynchronous method,
// with a loop controlling the automatic reconnection.
HeTuClient.Instance.Connect("ws://127.0.0.1:2466/hetu/Chat");
// will automatically wait for the connection to be established before sending.
await HeTuClient.Instance.CallSystem("user_login", 1001, "Alice");
var messages = await HeTuClient.Instance.Range<ChatMessage>(
"created_at_ms", 0, long.MaxValue, 1024);
messages.addTo(gameObject);
messages.ObserveAdd()
.Subscribe(msg => Debug.Log($"{msg.name}: {msg.text}"))
.AddTo(ref messages.DisposeBag);
await HeTuClient.Instance.CallSystem("user_chat", "Hello, world!");The subscription is reactive: any new message inserted by any client (not
just yours) flows into ObserveAdd() within milliseconds, no polling.
What you’ve learned
- Components are typed tables stored in Redis (or SQLite/Postgres in dev).
- Systems are async functions that read/write components inside a
transaction. Their
permission=controls who can call them. elevate()promotes a connection to authenticated.- Subscriptions push row-level changes to clients without polling.
- Lifecycle systems (
on_disconnect, periodicFutureCalls, etc.) let the engine call your code on its own schedule.
Where to next
- Concepts — the underlying ECS model, transaction guarantees, and permission system in depth.
- API Reference — every public symbol, with signatures and examples.
- Operations — Docker, Redis topology, and load balancing for production.