Antodis

Seshat Python SDK

TR | EN

Python SDK (Preview)

Early preview; interfaces will align with other SDKs.

Install

pip install seshat-sdk

PyPI

Fetching package info…

Initialize Client

from seshat_sdk import SeshatClient

client = SeshatClient(
  api_key="<key>", secret="<secret>",
  host="localhost"  # seshat host
)
print(client.sensors.find())

Sensors

res = client.sensors.find(name="temp", tag="critical", page=1, limit=50)
for s in res.get("data", []):
  pass

Functions overview: find returns one page (items + totals). find_each_page iterates sequential pages invoking your handler until exhausted or it returns False. find_handler (totals-only) yields pagination metadata (total/page/limit) for progress bars when all_pages(True) is set. Filters combine with AND; empty / None values are omitted from the query string. Exceptions inside handlers should be caught to avoid aborting iteration.

Returns

find / find_each_page callbacks return sensor dictionaries and pagination info.

Sensor fields (Python)

latest_value fields

FindOptions parameters

Notes: filters combine with AND; server may cap page size; identical options apply to the Machines service.

Machines

# Builder style
mo = client.machines.find_opts().tag_is("production").all_pages(True)
client.machines.find_each_page(mo, lambda items, total, page, limit: (
    print(f"page={page} machines={len(items)} total={total}") or True
))
# Single page
mres = client.machines.find(tag="edge", page=1, limit=50)
print(mres.get("total", 0))

Returns

Machine struct fields (Python)

Pagination helpers

# FindOptions + find_each_page
fo = client.sensors.find_opts().name_is("temp").tag_is("critical").all_pages(True)
client.sensors.find_each_page(fo, lambda items, total, page, limit: (
    print(f"page {page}/{(total+limit-1)//limit}: {len(items)} items") or True
))

# find_handler: totals-only progress callback
fo = client.sensors.find_opts().all_pages(True).limit_is(100)
client.sensors.find_handler(fo, lambda total, page, limit: (
    print(f"progress: page={page} limit={limit} total={total}") or True
))

IoT Data (builder-style)

q = client.sensors.iot_query_options() \
  .sensor_id("sensor123") \
  .per_page_count(50) \
  .sort_date_desc()
resp = client.sensors.iot_data(q)
for item in resp.get("items", []):
  _ = item.get("value")

Functions: sensors.iot_query_options() creates a builder with fluent setters (sensor_id, start_date/end_date, min_value/max_value, page_num/per_page_count, sort_by/sort_order or shortcuts). iot_data(opts) and iot_data_changes(opts) accept the builder; for backward compatibility, a plain sensor_id string is also accepted.

Returns

IoT data record (Python)

metadata fields
change meta (before_change_data / change_data)

Live

Data Types (type id → Python value)

When streaming, each item includes a type integer describing the decoded Python value form:

Unsigned numeric types are represented as Python int; Python automatically handles arbitrary precision.

for batch in client.sensors.live(["sensor1", "sensor2"], limit=25):
    for item in batch:
        handle(item)

Python live streaming yields batches of sensor values (size governed by limit). Provide either a single sensor id or a list. Exceptions inside your handle function should be caught to avoid losing remaining batch items.

Connections

Each sensor is tied to a connection. The endpoints a sensor reads from or writes to are called connections (e.g., an MQTT client or a PostgreSQL database). To write to an MQTT connection with parameters like topic, send explicit item dictionaries (mirroring proto WriteItem fields).

# Advanced: boolean write with 'topic' parameter
item = {"value": True, "type": 16, "parameters": [{"key": "topic", "value": b"67333c8d615f8c6cd6f07c38/67333c8d615f8c6cd6f07c39"}]}
client.connections.write_items("mqtt://0.0.0.0:8888", [item])
# Restart connection
client.connections.restart("mqtt://0.0.0.0:8888")

Events

resp = client.events.trigger("rebuild-cache", {"fast": True})
events = client.events.list()

Error handling

try:
  res = client.sensors.find()
except Exception as e:
  # log and retry/backoff

try:
  q = client.sensors.iot_query_options().sensor_id("sensor123").per_page_count(50)
  data = client.sensors.iot_data(q)
except Exception as e:
  # network / parse error

try:
  for batch in client.sensors.live("sensor123", limit=10):
    pass
except RuntimeError as e:
  # streaming not configured
except Exception as e:
  # streaming failure