Python SDK (Preview)
Early preview; interfaces will align with other SDKs.
Install
pip install seshat-sdk
PyPI
Fetching package info…
Initialize Client
from seshat_sdk import SeshatClient client = SeshatClient( api_key="<key>", secret="<secret>", host="localhost" # seshat host ) print(client.sensors.find())
Sensors
res = client.sensors.find(name="temp", tag="critical", page=1, limit=50)
for s in res.get("data", []):
pass
Functions overview: find returns one page (items + totals). find_each_page iterates sequential pages invoking your handler until exhausted or it returns False. find_handler (totals-only) yields pagination metadata (total/page/limit) for progress bars when all_pages(True) is set. Filters combine with AND; empty / None values are omitted from the query string. Exceptions inside handlers should be caught to avoid aborting iteration.
Returns
find / find_each_page callbacks return sensor dictionaries and pagination info.
list[dict]: sensors listtotal int: total count across all pagespage int,limit int: current page metadata
Sensor fields (Python)
id,name,keycompany,machine,collectortags(list[str])is_deleted(bool)creation_date(int)data_type(int)latest_value(dict | None)latest_change_value(dict | None)generative(bool)send_only_change(bool)
latest_value fields
date(str ISO),type(int),value(Any)
FindOptions parameters
name_is(str): Case-insensitive substring match on names.key_is(str): Exact key filter.tag_is(str): Comma-separated tags; matches any (e.g."prod,critical").machine_is(str): Filter by owning machine id or name.limit_is(int): Page size (ignored whenall_pages(True)).page_is(int): 1-based page index for manual paging.all_pages(bool): Auto-paginate across all pages.
Notes: filters combine with AND; server may cap page size; identical options apply to the Machines service.
Machines
# Builder style
mo = client.machines.find_opts().tag_is("production").all_pages(True)
client.machines.find_each_page(mo, lambda items, total, page, limit: (
print(f"page={page} machines={len(items)} total={total}") or True
))
# Single page
mres = client.machines.find(tag="edge", page=1, limit=50)
print(mres.get("total", 0))
Returns
machines.find(...)→dictwith keysdata(list),total(int)machines.find_each_page(opts, handler)→ handler(items, total, page, limit) for iteration (page/limit are runtime metadata)machines.find_handler(opts, handler)→ totals-only metadata (total, page, limit)
Machine struct fields (Python)
id(str)collector(str)company(str)name(str)key(str)tags(list[str])created_by(str)creation_date(int)send_mqtt_direct(bool)
Pagination helpers
# FindOptions + find_each_page
fo = client.sensors.find_opts().name_is("temp").tag_is("critical").all_pages(True)
client.sensors.find_each_page(fo, lambda items, total, page, limit: (
print(f"page {page}/{(total+limit-1)//limit}: {len(items)} items") or True
))
# find_handler: totals-only progress callback
fo = client.sensors.find_opts().all_pages(True).limit_is(100)
client.sensors.find_handler(fo, lambda total, page, limit: (
print(f"progress: page={page} limit={limit} total={total}") or True
))
IoT Data (builder-style)
- Sensors: find + IoT endpoints (builder-style options)
- Events: trigger/list
- Connections: write operations
- Streaming: planned
q = client.sensors.iot_query_options() \
.sensor_id("sensor123") \
.per_page_count(50) \
.sort_date_desc()
resp = client.sensors.iot_data(q)
for item in resp.get("items", []):
_ = item.get("value")
Functions: sensors.iot_query_options() creates a builder with fluent setters (sensor_id, start_date/end_date, min_value/max_value, page_num/per_page_count, sort_by/sort_order or shortcuts). iot_data(opts) and iot_data_changes(opts) accept the builder; for backward compatibility, a plain sensor_id string is also accepted.
Returns
iot_data(opts)→dictwith keysitems,ok,page,per_page,total,sensor_idsiot_data_changes(opts)→ same shape (change-focused records)
IoT data record (Python)
timestamp(str ISO)value(Any)metadata(dict | None)
metadata fields
company_id(str),machine(str),sensor(str)change(bool)before_change_data(dict | None),change_data(dict | None)together_trigger(str)together_values(dict[str,Any])
change meta (before_change_data / change_data)
start(str),end(str)duration_ms(int)start_value(Any),end_value(Any)inc_value(float),dec_value(float)start_id(str)
Live
Data Types (type id → Python value)
When streaming, each item includes a type integer describing the decoded Python value form:
1→int,2→int(uint mapped)3→int(int8),4→int(uint8)5→int(int16),6→int(uint16)7→int(int32),8→int(uint32)9→int(int64),10→int(uint64 may overflow to long)11→float12→datetime.datetime(parsed RFC3339 / ISO)13→str14→list[int]15→bytes16→bool17→dict[str,Any]18→list[dict[str,Any]]19→Any(raw JSON)22→str
Unsigned numeric types are represented as Python int; Python automatically handles arbitrary precision.
for batch in client.sensors.live(["sensor1", "sensor2"], limit=25):
for item in batch:
handle(item)
Python live streaming yields batches of sensor values (size governed by limit). Provide either a single sensor id or a list. Exceptions inside your handle function should be caught to avoid losing remaining batch items.
Connections
Each sensor is tied to a connection. The endpoints a sensor reads from or writes to are called connections (e.g., an MQTT client or a PostgreSQL database). To write to an MQTT connection with parameters like topic, send explicit item dictionaries (mirroring proto WriteItem fields).
# Advanced: boolean write with 'topic' parameter
item = {"value": True, "type": 16, "parameters": [{"key": "topic", "value": b"67333c8d615f8c6cd6f07c38/67333c8d615f8c6cd6f07c39"}]}
client.connections.write_items("mqtt://0.0.0.0:8888", [item])
# Restart connection
client.connections.restart("mqtt://0.0.0.0:8888")
Events
resp = client.events.trigger("rebuild-cache", {"fast": True})
events = client.events.list()
Error handling
try:
res = client.sensors.find()
except Exception as e:
# log and retry/backoff
try:
q = client.sensors.iot_query_options().sensor_id("sensor123").per_page_count(50)
data = client.sensors.iot_data(q)
except Exception as e:
# network / parse error
try:
for batch in client.sensors.live("sensor123", limit=10):
pass
except RuntimeError as e:
# streaming not configured
except Exception as e:
# streaming failure