Python API
Auto-generated Python API documentation for OctoPrint-TempETA.
Main Plugin Class
Bases: PrinterCallbackBase, StartupPluginBase, TemplatePluginBase, SettingsPluginBase, AssetPluginBase, EventHandlerPluginBase, SimpleApiPluginBase
Main plugin implementation for Temperature ETA.
Implements OctoPrint Issue #469: Show estimated time remaining
for printer heating (bed, hotend, chamber).
Initialize plugin with temperature history tracking.
Source code in octoprint_temp_eta/__init__.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 | def __init__(self):
"""Initialize plugin with temperature history tracking."""
super().__init__()
self._lock = threading.Lock()
# Serializes profile-switch sequences against each other. Acquired
# around the persist -> id-swap -> history-replace flow performed
# inside the switch handler so two concurrent switches (and the
# switch's own internal persist step) cannot interleave. Persists
# triggered elsewhere are not serialized by this lock.
self._profile_switch_lock = threading.Lock()
self._debug_logging_enabled = False
self._last_debug_log_time = 0.0
self._last_heater_support_decision = {}
self._heater_supported_cache: dict[str, bool] = {}
# MQTT client (initialized in on_after_startup when logger is available)
self._mqtt_client: Optional[Any] = None
# Number of temperature samples to keep per heater.
# This is configurable via settings (history_size). We cache the active
# value here for fast access in the 2Hz callback.
self._default_history_size = 60
self._history_maxlen = self._default_history_size
# Persist history per printer profile (file per profile id).
# Note: ETA uses only the most recent seconds of history, so we keep
# persistence bounded by both maxlen and a max-age filter on load.
self._active_profile_id: Optional[str] = None
# Persistence backoff to reduce SD card wear in long-running phases
# where the target is never reached.
# Backoff sequence after phase reset: 30s -> 60s -> 120s -> 240s -> 300s (cap).
self._persist_backoff_reset_s = 30.0
self._persist_backoff_initial_s = 60.0
self._persist_backoff_max_s = 300.0
self._persist_backoff_current_s = self._persist_backoff_initial_s
self._next_persist_time = 0.0
self._persist_phase_active = False
# Guards the persist-backoff triplet (_persist_phase_active,
# _persist_backoff_current_s, _next_persist_time) which is mutated from
# the temperature callback, profile switches, and disconnect events.
# Dedicated lock (not self._lock) so these short sections never nest
# with the history lock taken by _persist_current_profile_history.
self._persist_state_lock = threading.Lock()
self._last_persist_size_warning_time = 0.0
self._persist_max_json_bytes = 256 * 1024
self._persist_max_age_seconds = 180.0
self._history_dirty = False
# Monotonic counter bumped whenever new history data is recorded.
# A persist captures this under the lock and only clears the dirty
# flag if it is unchanged after the write (no concurrent update).
self._history_dirty_epoch = 0
self._temp_history = {
"bed": deque(maxlen=self._history_maxlen),
"tool0": deque(maxlen=self._history_maxlen),
"chamber": deque(maxlen=self._history_maxlen),
}
# Cooldown history (target==0). Kept separate from heating history so
# the heat-up ETA fit doesn't get polluted by cooldown samples.
self._cooldown_history = {
"bed": deque(maxlen=self._history_maxlen),
"tool0": deque(maxlen=self._history_maxlen),
"chamber": deque(maxlen=self._history_maxlen),
}
# Ambient baseline per heater for ambient-mode cooldown.
# If the user doesn't provide an ambient temperature, we rely on the
# lowest temperature observed while the heater target is OFF (within a
# reasonable range) as a proxy for ambient.
self._cooldown_ambient_baseline: dict[str, Optional[float]] = {
"bed": None,
"tool0": None,
"chamber": None,
}
self._last_update_time = 0.0
# Cache hot-path settings values for fast access in the ~2Hz callback.
# Refreshed on startup and on settings save.
self._threshold_start_c = 5.0
self._update_interval_s = 1.0
self._last_runtime_cache_refresh_time = 0.0
# Track last seen target per heater so we can detect transitions like
# heating -> off (cooldown start).
self._last_target_by_heater: dict[str, float] = {}
# When enabled, suppress ETA updates while a print job is active.
# This keeps the UI focused on the pre-print heat-up phase.
self._suppressing_due_to_print = False
# Guards _suppressing_due_to_print so the one-shot frontend clear is
# not triggered twice by concurrent temperature callbacks and events.
self._suppress_lock = threading.Lock()
self._last_settings_snapshot_log_time = 0.0
|
on_after_startup
Called after OctoPrint startup, register for temperature updates.
Source code in octoprint_temp_eta/__init__.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450 | def on_after_startup(self):
"""Called after OctoPrint startup, register for temperature updates."""
self._logger.info("Temperature ETA Plugin started")
self._printer.register_callback(self)
self._refresh_debug_logging_flag()
self._debug_log("Debug logging enabled")
self._refresh_runtime_caches()
# Apply configured history size now that settings are available.
self._set_history_maxlen(self._read_history_maxlen_setting())
# Load persisted history for the active printer profile.
self._switch_active_profile_if_needed(force=True)
# Initialize MQTT client
if MQTTClientWrapper is not None:
self._mqtt_client = MQTTClientWrapper(self._logger, self._identifier)
self._configure_mqtt_client()
else:
self._logger.info("MQTT support disabled: paho-mqtt not available")
|
on_printer_add_temperature
on_printer_add_temperature(data)
Called when new temperature data is available (~2Hz).
Parameters:
| Name |
Type |
Description |
Default |
data
|
dict
|
Temperature data from OctoPrint
{
"bed": {"actual": float, "target": float},
"tool0": {"actual": float, "target": float},
...
}
|
required
|
Source code in octoprint_temp_eta/__init__.py
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405 | def on_printer_add_temperature(self, data):
"""Called when new temperature data is available (~2Hz).
Args:
data (dict): Temperature data from OctoPrint
{
"bed": {"actual": float, "target": float},
"tool0": {"actual": float, "target": float},
...
}
"""
if not self._settings.get_boolean(["enabled"]):
return
# Ensure we are tracking the right profile's history.
self._switch_active_profile_if_needed()
# If enabled, suppress ETA completely while OctoPrint considers a print job active.
if self._suppress_while_printing_enabled() and self._is_print_job_active():
# Clear once when suppression starts to avoid stale countdowns.
# Check-and-set under the lock so the one-shot clear cannot be
# triggered twice by concurrent callbacks/events.
with self._suppress_lock:
just_started = not self._suppressing_due_to_print
self._suppressing_due_to_print = True
if just_started:
self._clear_all_heaters_frontend()
return
# If we were suppressing but the condition no longer applies (e.g. warm-up resumed or print ended), re-enable.
with self._suppress_lock:
self._suppressing_due_to_print = False
current_time = time.time()
# Keep cached settings reasonably fresh even if settings change outside
# of the normal save flow (e.g. tests or edge-case integrations).
if (current_time - self._last_runtime_cache_refresh_time) >= 5.0:
self._last_runtime_cache_refresh_time = current_time
self._refresh_runtime_caches()
# Periodic settings snapshot for debugging.
self._debug_log_settings_snapshot(current_time)
threshold = float(getattr(self, "_threshold_start_c", 5.0))
update_interval = float(getattr(self, "_update_interval_s", 1.0))
heating_enabled = self._heating_enabled()
cooldown_enabled = self._cooldown_enabled()
# Avoid eager f-string formatting in the hot path.
try:
is_logger_debug = bool(getattr(self._logger, "isEnabledFor")(10)) # type: ignore[attr-defined]
except (AttributeError, KeyError, OSError, RuntimeError, TypeError, ValueError):
is_logger_debug = False
if is_logger_debug:
heaters_in_data = [k for k, v in data.items() if isinstance(v, Mapping)]
if heaters_in_data:
self._logger.debug(
"Received temperature data for heaters: %s", heaters_in_data
)
epsilon_hold = 0.2
# Track whether we are in an active heating phase (at least one heater
# is above the threshold window and not holding). This drives persistence
# backoff scheduling.
phase_active_now = False
target_changed_in_active_phase = False
recorded_count = 0
recorded_cooldown_count = 0
heaters_seen = 0
with self._lock:
# Protect shared history state (OctoPrint may call callbacks from worker threads).
for heater, temps in data.items():
# OctoPrint wraps heater entries in frozendict when
# devel.useFrozenDictForPrinterState is enabled, which is not a
# dict subclass; accept any Mapping so ETA still works there.
if not isinstance(temps, Mapping):
continue
heater_key = str(heater)
heaters_seen += 1
# Record samples only while ETA could be shown (active target and above threshold).
target_raw = temps.get("target", 0)
actual_raw = temps.get("actual")
if actual_raw is None:
continue
try:
actual = float(actual_raw)
except (
AttributeError,
KeyError,
OSError,
RuntimeError,
TypeError,
ValueError,
):
continue
try:
target = float(target_raw or 0)
except (
AttributeError,
KeyError,
OSError,
RuntimeError,
TypeError,
ValueError,
):
# Some firmwares/virtual printer formats may provide a non-numeric
# target (e.g. "off"). Treat that as OFF for cooldown tracking.
target = 0.0
self._debug_log_throttled(
current_time,
30.0,
"Non-numeric target treated as OFF heater=%s target_raw=%r",
str(heater),
target_raw,
)
prev_target = self._last_target_by_heater.get(str(heater))
self._last_target_by_heater[heater_key] = float(target)
if target <= 0:
# Cooldown tracking (target==0).
if cooldown_enabled:
if heater_key not in self._cooldown_history:
self._cooldown_history[heater_key] = deque(
maxlen=self._history_maxlen
)
# If we just transitioned from heating to OFF, start a fresh
# cooldown history so our linear cooldown fit doesn't include
# old OFF samples from before the heat-up phase.
if prev_target is not None and prev_target > 0:
self._cooldown_history[heater_key].clear()
self._debug_log_throttled(
current_time,
10.0,
"Cooldown start detected, cleared history heater=%s prev_target=%.1f actual=%.1f",
str(heater),
float(prev_target),
float(actual),
)
self._cooldown_history[heater_key].append(
(current_time, actual)
)
recorded_cooldown_count += 1
# Track a baseline ambient temp while OFF.
# We only learn baseline values in a sane range to
# avoid "ambient" being polluted by still-hot cooldown.
if math.isfinite(actual) and actual < 120.0:
prev = self._cooldown_ambient_baseline.get(heater_key)
if prev is None or actual < prev:
self._cooldown_ambient_baseline[heater_key] = actual
continue
if not heating_enabled:
continue
remaining = target - actual
# Avoid recording while holding near target to prevent constant churn.
if remaining <= epsilon_hold:
continue
if remaining < threshold:
continue
# This heater is actively heating within the ETA window.
phase_active_now = True
if (
prev_target is not None
and math.isfinite(prev_target)
and math.isfinite(target)
and prev_target > 0.0
and target > 0.0
and abs(prev_target - target) >= 0.5
):
target_changed_in_active_phase = True
if heater_key not in self._temp_history:
self._temp_history[heater_key] = deque(maxlen=self._history_maxlen)
self._temp_history[heater_key].append((current_time, actual, target))
self._history_dirty = True
self._history_dirty_epoch += 1
recorded_count += 1
# Avoid flooding logs while idle/holding temperature: log far less often
# when we didn't record any samples.
debug_interval = 5.0 if recorded_count > 0 else 60.0
self._debug_log_throttled(
current_time,
debug_interval,
"Temp callback profile=%s heaters=%d recorded=%d cooldown_recorded=%d threshold=%.2f",
str(self._active_profile_id),
int(heaters_seen),
recorded_count,
recorded_cooldown_count,
float(threshold),
)
# Atomic check-and-set of the broadcast/persist cadence gate so two
# concurrent callbacks cannot both pass and double-broadcast.
with self._persist_state_lock:
due = (current_time - self._last_update_time) >= update_interval
if due:
self._last_update_time = current_time
phase_active_prev = self._persist_phase_active
if due:
self._calculate_and_broadcast_eta(data)
# Update persistence phase state based on transitions. Decide using
# the phase snapshot taken above so the branch is consistent.
if target_changed_in_active_phase:
self._enter_persist_phase(current_time, "target_change")
elif phase_active_prev and (not phase_active_now):
# Active -> inactive transition: schedule a shorter persist.
self._reset_persist_backoff(current_time, "phase_end")
elif (not phase_active_prev) and phase_active_now:
# Inactive -> active transition: start backoff sequence.
self._enter_persist_phase(current_time, "phase_start")
self._maybe_persist_history(current_time)
|
on_printer_send_current_data
on_printer_send_current_data(_data)
Stub: Called when current printer data is sent (required by callback interface).
Source code in octoprint_temp_eta/__init__.py
| def on_printer_send_current_data(self, _data):
"""Stub: Called when current printer data is sent (required by callback interface)."""
return None
|
on_printer_add_log
on_printer_add_log(_data)
Stub: Called when log entry is added (required by callback interface).
Source code in octoprint_temp_eta/__init__.py
| def on_printer_add_log(self, _data):
"""Stub: Called when log entry is added (required by callback interface)."""
return None
|
on_printer_add_message
on_printer_add_message(_data)
Stub: Called when printer message is added (required by callback interface).
Source code in octoprint_temp_eta/__init__.py
| def on_printer_add_message(self, _data):
"""Stub: Called when printer message is added (required by callback interface)."""
return None
|
on_event
on_event(event, _payload)
Handle OctoPrint events to keep UI state consistent.
Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
do not keep showing stale countdowns.
Parameters:
| Name |
Type |
Description |
Default |
event
|
str
|
|
required
|
payload
|
dict
|
|
required
|
Source code in octoprint_temp_eta/__init__.py
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462 | def on_event(self, event, _payload):
"""Handle OctoPrint events to keep UI state consistent.
Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
do not keep showing stale countdowns.
Args:
event (str): OctoPrint event name
payload (dict): Event payload
"""
if event in (
"Disconnected",
"Error",
"Shutdown",
): # clear UI on connection loss
# Persist what we have before clearing.
self._persist_current_profile_history()
self._reset_persist_backoff(time.time(), "disconnect_or_error")
with self._lock:
heaters = list(self._temp_history.keys())
for h in heaters:
self._temp_history[h].clear()
for h in list(self._cooldown_history.keys()):
self._cooldown_history[h].clear()
self._send_clear_messages(heaters)
# Disconnect MQTT on shutdown. Snapshot the reference so the
# null-check and use cannot race a concurrent reassignment.
mqtt_client = self._mqtt_client
if event == "Shutdown" and mqtt_client is not None:
mqtt_client.disconnect()
# Reset suppression flag on job lifecycle changes; actual suppression is decided in the temperature callback.
if event in (
"PrintStarted",
"PrintResumed",
"PrintDone",
"PrintFailed",
"PrintCancelled",
):
with self._suppress_lock:
self._suppressing_due_to_print = False
|
get_settings_defaults
Return the default settings for the plugin.
Returns:
| Name | Type |
Description |
dict |
|
Dictionary containing default plugin settings.
|
Source code in octoprint_temp_eta/__init__.py
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015 | def get_settings_defaults(self):
"""Return the default settings for the plugin.
Returns:
dict: Dictionary containing default plugin settings.
"""
return {
"enabled": True,
"enable_heating_eta": True,
"suppress_while_printing": False,
"show_in_sidebar": True,
"show_in_navbar": True,
"show_in_tab": True,
"show_progress_bars": True,
"show_historical_graph": True,
"historical_graph_window_seconds": 180,
"temp_display": "octoprint",
"threshold_unit": "octoprint",
"debug_logging": False,
"threshold_start": 5.0,
"algorithm": "linear",
"update_interval": 1.0,
"history_size": 60,
# Persistence (advanced; protects SD cards on long-running phases)
"persist_backoff_reset_s": 30.0,
"persist_backoff_initial_s": 60.0,
"persist_backoff_max_s": 300.0,
"persist_max_json_bytes": 256 * 1024,
# Cool Down ETA
"enable_cooldown_eta": True,
"cooldown_mode": "threshold",
"cooldown_target_tool0": 50.0,
"cooldown_target_bed": 40.0,
"cooldown_target_chamber": 30.0,
"cooldown_ambient_temp": None,
"cooldown_hysteresis_c": 1.0,
"cooldown_fit_window_seconds": 120,
# Extended settings: status colors
"color_mode": "bands",
"color_heating": "#5cb85c",
"color_cooling": "#337ab7",
"color_idle": "#777777",
# Extended settings: sound alerts
"sound_enabled": False,
"sound_target_reached": False,
"sound_cooldown_finished": False,
"sound_volume": 0.5,
"sound_min_interval_s": 10.0,
# Extended settings: browser notifications (toast)
"notification_enabled": False,
"notification_target_reached": False,
"notification_cooldown_finished": False,
"notification_timeout_s": 6.0,
"notification_min_interval_s": 10.0,
# MQTT settings
"mqtt_enabled": False,
"mqtt_broker_host": "",
"mqtt_broker_port": 1883,
"mqtt_username": "",
"mqtt_password": "",
"mqtt_use_tls": False,
"mqtt_tls_insecure": False,
"mqtt_base_topic": "octoprint/temp_eta",
"mqtt_use_appearance_name": True,
"mqtt_custom_identifier": "",
"mqtt_qos": 0,
"mqtt_retain": False,
"mqtt_publish_interval": 1.0,
}
|
is_template_autoescaped
is_template_autoescaped()
Enable autoescaping for all plugin templates.
Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
XSS risk from unescaped injected variables.
Returns:
| Name | Type |
Description |
bool |
bool
|
True to enable autoescaping.
|
Source code in octoprint_temp_eta/__init__.py
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043 | def is_template_autoescaped(self) -> bool: # pyright: ignore
"""Enable autoescaping for all plugin templates.
Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
XSS risk from unescaped injected variables.
Returns:
bool: True to enable autoescaping.
"""
return True
|
get_template_configs
Configure which templates to use and how to bind them.
Returns:
| Name | Type |
Description |
list |
|
List of template configuration dictionaries.
|
Source code in octoprint_temp_eta/__init__.py
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062 | def get_template_configs(self):
"""Configure which templates to use and how to bind them.
Returns:
list: List of template configuration dictionaries.
"""
return [
{"type": "navbar", "custom_bindings": True},
{
"type": "sidebar",
"custom_bindings": False,
"name": gettext("Temperature ETA"),
"icon": "fa fa-clock",
},
# Use OctoPrint's default settingsViewModel binding for settings UI.
{"type": "settings", "custom_bindings": True},
{"type": "tab", "custom_bindings": False},
]
|
on_settings_save
Persist settings and clear UI/state when disabling the plugin.
OctoPrint applies settings changes only on save. When the plugin is disabled,
we actively clear any previously shown countdowns in navbar/tab so the UI
does not keep showing stale values.
Parameters:
| Name |
Type |
Description |
Default |
data
|
dict
|
Settings data posted from the UI.
|
required
|
Source code in octoprint_temp_eta/__init__.py
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103 | def on_settings_save(self, data: dict[str, Any]) -> dict[str, Any]:
"""Persist settings and clear UI/state when disabling the plugin.
OctoPrint applies settings changes only on save. When the plugin is disabled,
we actively clear any previously shown countdowns in navbar/tab so the UI
does not keep showing stale values.
Args:
data (dict): Settings data posted from the UI.
"""
if not getattr(self, "_settings", None):
return {}
self._sanitize_settings_payload(data)
was_enabled = bool(self._settings.get_boolean(["enabled"]))
old_debug = bool(getattr(self, "_debug_logging_enabled", False))
old_history_maxlen = self._read_history_maxlen_setting()
saved = octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
is_enabled = bool(self._settings.get_boolean(["enabled"]))
self._refresh_debug_logging_flag()
self._refresh_runtime_caches()
if old_debug != bool(self._debug_logging_enabled):
self._logger.info(
"Debug logging %s",
"enabled" if self._debug_logging_enabled else "disabled",
)
new_history_maxlen = self._read_history_maxlen_setting()
if new_history_maxlen != old_history_maxlen:
self._set_history_maxlen(new_history_maxlen)
if was_enabled and not is_enabled:
self._clear_all_heaters_frontend()
# Reconfigure MQTT client with new settings
self._configure_mqtt_client()
return saved if isinstance(saved, dict) else {}
|
get_assets
Return static assets (JS, CSS, LESS) to be included.
Returns:
| Name | Type |
Description |
dict |
|
Dictionary with asset types and their file paths.
|
Source code in octoprint_temp_eta/__init__.py
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272 | def get_assets(self):
"""Return static assets (JS, CSS, LESS) to be included.
Returns:
dict: Dictionary with asset types and their file paths.
"""
return {
"js": ["js/temp_eta.js"],
# Ship pre-compiled CSS rather than LESS: OctoPrint only compiles
# LESS when a server-side compiler (lesscpy / Node lessc) is present,
# which is not guaranteed on a pip-installed plugin. Without it the
# LESS is silently dropped from the bundle, so none of the plugin's
# styling (graph colours included) reaches the page. The .less stays
# in the source tree as the editable origin; regenerate temp_eta.css
# with `npx less less/temp_eta.less css/temp_eta.css` after editing.
"css": ["css/temp_eta.css"],
}
|
is_api_protected
Whether the Simple API requires an authenticated user.
OctoPrint's default for this will switch from False to True in the
future. We explicitly opt in to avoid relying on defaults.
Source code in octoprint_temp_eta/__init__.py
2275
2276
2277
2278
2279
2280
2281 | def is_api_protected(self) -> bool: # type: ignore[override]
"""Whether the Simple API requires an authenticated user.
OctoPrint's default for this will switch from False to True in the
future. We explicitly opt in to avoid relying on defaults.
"""
return True
|
is_api_adminonly
Whether the Simple API is restricted to admin users.
Source code in octoprint_temp_eta/__init__.py
| def is_api_adminonly(self) -> bool: # type: ignore[override]
"""Whether the Simple API is restricted to admin users."""
return True
|
get_api_commands
Return supported Simple API commands for the plugin.
Source code in octoprint_temp_eta/__init__.py
2287
2288
2289
2290
2291
2292 | def get_api_commands(self) -> dict[str, list]: # type: ignore[override]
"""Return supported Simple API commands for the plugin."""
return {
"reset_profile_history": [],
"reset_settings_defaults": [],
}
|
on_api_get
Return plugin status for the Simple API GET endpoint.
Currently exposes whether the MQTT integration is enabled and
connected to a broker so the settings UI can show a live status.
Source code in octoprint_temp_eta/__init__.py
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314 | def on_api_get(self, _request: Any): # type: ignore[override]
"""Return plugin status for the Simple API GET endpoint.
Currently exposes whether the MQTT integration is enabled and
connected to a broker so the settings UI can show a live status.
"""
mqtt_client = self._mqtt_client
mqtt_enabled = bool(self._settings.get_boolean(["mqtt_enabled"]))
# Only report "connected" when MQTT is actually enabled: a client may
# still be mid-disconnect right after the feature is switched off, and
# the UI should reflect the configured state, not a lingering socket.
mqtt_connected = bool(
mqtt_enabled and mqtt_client is not None and mqtt_client.is_connected()
)
return jsonify(
{
"mqtt_available": MQTTClientWrapper is not None,
"mqtt_enabled": mqtt_enabled,
"mqtt_connected": mqtt_connected,
}
)
|
on_api_command
on_api_command(command, _data)
Handle Simple API commands.
Source code in octoprint_temp_eta/__init__.py
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360 | def on_api_command(self, command: str, _data: dict[str, Any]): # type: ignore[override]
"""Handle Simple API commands."""
if command == "reset_profile_history":
deleted_count = self._reset_all_profile_histories()
profile_id = self._get_current_profile_id()
logger = getattr(self, "_logger", None)
if logger is not None:
logger.info(
"Reset persisted history for all profiles (trigger_profile=%s deleted_files=%d)",
str(profile_id),
int(deleted_count),
)
return jsonify(
{
"success": True,
"profile_id": profile_id,
"deleted_files": deleted_count,
}
)
if command == "reset_settings_defaults":
self._reset_user_settings_to_defaults()
logger = getattr(self, "_logger", None)
if logger is not None:
logger.info("Restored plugin settings defaults (user-editable keys)")
# Notify all connected clients so the settings UI can refresh via requestData().
if getattr(self, "_plugin_manager", None):
try:
self._plugin_manager.send_plugin_message(
self._identifier, {"type": "settings_reset"}
)
except (
AttributeError,
KeyError,
OSError,
RuntimeError,
TypeError,
ValueError,
):
pass
return jsonify({"success": True, "message": gettext("Defaults restored.")})
return jsonify({"success": False, "error": "unknown_command"})
|
Provide update information for the Software Update plugin.
Returns:
| Name | Type |
Description |
dict |
|
Update configuration for the plugin.
|
Source code in octoprint_temp_eta/__init__.py
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381 | def get_update_information(self):
"""Provide update information for the Software Update plugin.
Returns:
dict: Update configuration for the plugin.
"""
return {
"temp_eta": {
"displayName": "Temperature ETA Plugin",
"displayVersion": self._plugin_version,
# version check: github repository
"type": "github_release",
"user": "Ajimaru",
"repo": "OctoPrint-TempETA",
"current": self._plugin_version,
# update method: pip
"pip": "https://github.com/Ajimaru/OctoPrint-TempETA/archive/{target_version}.zip",
}
}
|
Calculator Module
Temperature ETA calculation algorithms.
This module provides pure calculation logic for estimating time remaining
until a printer heater reaches its target temperature or cools down.
All functions are stateless and independent of OctoPrint plugin mechanics,
making them easy to test and maintain.
calculate_linear_eta
calculate_linear_eta(history, target, window_seconds=10.0)
Calculate ETA assuming constant heating rate.
Uses linear regression on recent temperature samples to estimate
the rate of temperature change and predict time to target.
Parameters:
| Name |
Type |
Description |
Default |
history
|
deque
|
Deque of (timestamp, actual_temp, target_temp) tuples
|
required
|
target
|
float
|
Target temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for rate calculation (default: 10s)
|
10.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to target, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | def calculate_linear_eta(
history: deque, target: float, window_seconds: float = 10.0
) -> Optional[float]:
"""
Calculate ETA assuming constant heating rate.
Uses linear regression on recent temperature samples to estimate
the rate of temperature change and predict time to target.
Args:
history: Deque of (timestamp, actual_temp, target_temp) tuples
target: Target temperature in degrees
window_seconds: Time window for rate calculation (default: 10s)
Returns:
Estimated seconds to target, or None if insufficient data
"""
if not _validate_scalar(target) or not _validate_window(window_seconds):
return None
if not history or len(history) < 2:
return None
last_ts = _find_last_ts(history)
if last_ts is None:
return None
recent = _filter_recent(history, last_ts - window_seconds)
if len(recent) < 2:
return None
t0, temp0 = recent[0][0], recent[0][1]
t1, temp1 = recent[-1][0], recent[-1][1]
time_diff = t1 - t0
temp_diff = temp1 - temp0
if time_diff <= 0 or temp_diff <= 0:
return None
remaining = target - temp1
if remaining <= 0:
return None
return max(0.0, remaining / (temp_diff / time_diff))
|
calculate_exponential_eta
calculate_exponential_eta(history, target, window_seconds=30.0)
Calculate ETA accounting for thermal asymptotic behavior.
Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
Falls back to linear estimation when insufficient data or poor fit.
Parameters:
| Name |
Type |
Description |
Default |
history
|
deque
|
Deque of (timestamp, actual_temp, target_temp) tuples
|
required
|
target
|
float
|
Target temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for exponential fit (default: 30s)
|
30.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to target, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 | def calculate_exponential_eta(
history: deque, target: float, window_seconds: float = 30.0
) -> Optional[float]:
"""
Calculate ETA accounting for thermal asymptotic behavior.
Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
Falls back to linear estimation when insufficient data or poor fit.
Args:
history: Deque of (timestamp, actual_temp, target_temp) tuples
target: Target temperature in degrees
window_seconds: Time window for exponential fit (default: 30s)
Returns:
Estimated seconds to target, or None if insufficient data
"""
if not _validate_scalar(target) or not _validate_window(window_seconds):
return None
if not history or len(history) < 3:
return None
last_ts = _find_last_ts(history)
if last_ts is None:
return None
recent = _dedupe_by_ts(_filter_recent(history, last_ts - window_seconds))
if len(recent) < 6:
return calculate_linear_eta(history, target)
temp_now = recent[-1][1]
remaining_now = target - temp_now
if remaining_now <= 0:
return None
# Guard: window must show meaningful heating; otherwise no ETA is possible.
if (recent[-1][1] - recent[0][1]) <= 0.2:
return None
epsilon_c = 0.5
if remaining_now <= epsilon_c:
return 0.0
try:
eta = _exponential_fit(recent, target, epsilon_c)
except (ValueError, ArithmeticError) as exc:
_LOG.debug("Exponential ETA math error: %s", exc)
return calculate_linear_eta(history, target)
if eta is None:
return calculate_linear_eta(history, target)
linear_eta = calculate_linear_eta(history, target)
if linear_eta is not None and eta > linear_eta * 5:
return linear_eta
return eta
|
calculate_cooldown_linear_eta
calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds=60.0)
Linear cooldown ETA from recent slope.
Uses linear regression on recent cooldown samples to estimate
the rate of temperature decrease and predict time to goal.
Parameters:
| Name |
Type |
Description |
Default |
cooldown_history
|
deque
|
Deque of (timestamp, temp) tuples
|
required
|
goal_c
|
float
|
Target cooldown temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for fit (default: 60s)
|
60.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to goal, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279 | def calculate_cooldown_linear_eta(
cooldown_history: deque, goal_c: float, window_seconds: float = 60.0
) -> Optional[float]:
"""
Linear cooldown ETA from recent slope.
Uses linear regression on recent cooldown samples to estimate
the rate of temperature decrease and predict time to goal.
Args:
cooldown_history: Deque of (timestamp, temp) tuples
goal_c: Target cooldown temperature in degrees
window_seconds: Time window for fit (default: 60s)
Returns:
Estimated seconds to goal, or None if insufficient data
"""
if not _validate_scalar(goal_c) or not _validate_window(window_seconds):
return None
if not cooldown_history:
return None
last_ts = _find_last_ts(cooldown_history)
if last_ts is None:
return None
recent = _filter_recent(cooldown_history, last_ts - window_seconds)
if len(recent) < 2:
return None
t0, temp0 = recent[0]
t1, temp1 = recent[-1]
dt = t1 - t0
if dt <= 0:
return None
slope = (temp1 - temp0) / dt
if slope >= -1e-3:
return None
remaining = temp1 - goal_c
if remaining <= 0:
return None
eta = remaining / (-slope)
if not math.isfinite(eta) or eta < 0:
return None
return float(min(eta, 24 * 3600))
|
calculate_cooldown_exponential_eta
calculate_cooldown_exponential_eta(cooldown_history, ambient_c, goal_c, window_seconds=60.0)
Exponential cooldown ETA (Newton's law of cooling).
Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
Source code in octoprint_temp_eta/calculator.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 | def calculate_cooldown_exponential_eta(
cooldown_history: deque,
ambient_c: float,
goal_c: float,
window_seconds: float = 60.0,
) -> Optional[float]:
"""
Exponential cooldown ETA (Newton's law of cooling).
Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
"""
if not (_validate_scalar(ambient_c) and _validate_scalar(goal_c)):
return None
if not _validate_window(window_seconds):
return None
if goal_c <= ambient_c:
return None
if not cooldown_history or len(cooldown_history) < 4:
return None
last_ts = _find_last_ts(cooldown_history)
if last_ts is None:
return None
recent = _filter_recent(cooldown_history, last_ts - window_seconds)
if len(recent) < 6:
return calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds)
if recent[-1][1] <= goal_c:
return None
try:
return _cooldown_exponential_fit(recent, ambient_c, goal_c, epsilon=0.5)
except (ValueError, ArithmeticError) as exc:
_LOG.debug("Exponential ETA math error: %s", exc)
return None
|
MQTT Client Module
Thread-safe MQTT client wrapper for the Temperature ETA plugin.
Manages connection lifecycle, automatic reconnection, and message publishing.
All MQTT operations are non-blocking to avoid impacting the temperature callback.
Initialize MQTT client wrapper.
Parameters:
| Name |
Type |
Description |
Default |
logger
|
Any
|
Logger instance for debug/info messages
|
required
|
identifier
|
str
|
Plugin identifier for topic prefixes
|
required
|
Source code in octoprint_temp_eta/mqtt_client.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | def __init__(self, logger: Any, identifier: str):
"""Initialize MQTT client wrapper.
Args:
logger: Logger instance for debug/info messages
identifier: Plugin identifier for topic prefixes
"""
self._logger = logger
self._identifier = identifier
self._lock = threading.Lock()
self._client: Optional[Any] = None
self._enabled = False
self._connected = False
self._connecting = False
# True once a paho-mqtt 2.x (callback API v2) client was created.
# Set in _create_new_client; informational/diagnostic only.
self._callback_api_v2 = False
# Connection settings
self._broker_host = ""
self._broker_port = 1883
self._username = ""
self._password = "" # nosec B105 - empty default, overwritten from settings
self._use_tls = False
self._tls_insecure = False
# Publishing settings
self._base_topic = "octoprint/temp_eta"
self._qos = 0
self._retain = False
self._publish_interval = 1.0
# State tracking for state transition events
self._last_published_time = 0.0
self._last_heater_state: dict[str, Optional[str]] = {}
# Connection retry logic
self._last_connect_attempt = 0.0
self._connect_retry_interval = 30.0
# Avoid log spam when paho-mqtt is not installed.
# Use a dedicated lock to avoid re-entrantly acquiring self._lock.
self._mqtt_unavailable_lock = threading.Lock()
self._mqtt_unavailable_warned = False
|
Update MQTT configuration from plugin settings.
Parameters:
| Name |
Type |
Description |
Default |
settings
|
dict[str, Any]
|
Dictionary with MQTT configuration keys
|
required
|
Source code in octoprint_temp_eta/mqtt_client.py
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | def configure(self, settings: dict[str, Any]) -> None:
"""Update MQTT configuration from plugin settings.
Args:
settings: Dictionary with MQTT configuration keys
"""
with self._lock:
old_enabled = self._enabled
self._enabled = bool(settings.get("mqtt_enabled", False))
self._broker_host = str(settings.get("mqtt_broker_host", "")).strip()
self._broker_port = int(settings.get("mqtt_broker_port", 1883))
self._username = str(settings.get("mqtt_username", "")).strip()
self._password = str(settings.get("mqtt_password", "")).strip()
self._use_tls = bool(settings.get("mqtt_use_tls", False))
self._tls_insecure = bool(settings.get("mqtt_tls_insecure", False))
base_topic = str(
settings.get("mqtt_base_topic", "octoprint/temp_eta")
).strip()
use_appearance_name = bool(settings.get("mqtt_use_appearance_name", True))
appearance_name = str(settings.get("mqtt_appearance_name") or "").strip()
custom_identifier = str(settings.get("mqtt_custom_identifier", "")).strip()
# Build final topic with optional identifier suffix
self._base_topic = self._build_final_topic(
base_topic, use_appearance_name, appearance_name, custom_identifier
)
self._qos = int(settings.get("mqtt_qos", 0))
self._retain = bool(settings.get("mqtt_retain", False))
self._publish_interval = float(settings.get("mqtt_publish_interval", 1.0))
# Reconnect if settings changed and enabled
if self._enabled and (not old_enabled or not self._connected):
self._schedule_connect()
elif not self._enabled and old_enabled:
self._disconnect_internal()
|
disconnect
Disconnect MQTT client gracefully.
Source code in octoprint_temp_eta/mqtt_client.py
| def disconnect(self) -> None:
"""Disconnect MQTT client gracefully."""
with self._lock:
self._disconnect_internal()
|
publish_eta_update
publish_eta_update(heater, eta, eta_kind, target, actual, cooldown_target=None)
Publish ETA update for a heater.
Parameters:
| Name |
Type |
Description |
Default |
heater
|
str
|
Heater name (bed, tool0, chamber)
|
required
|
eta
|
Optional[float]
|
|
required
|
eta_kind
|
Optional[str]
|
"heating", "cooling", or None
|
required
|
target
|
Optional[float]
|
|
required
|
actual
|
Optional[float]
|
|
required
|
cooldown_target
|
Optional[float]
|
Cooldown target temperature (if applicable)
|
None
|
Source code in octoprint_temp_eta/mqtt_client.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487 | def publish_eta_update(
self,
heater: str,
eta: Optional[float],
eta_kind: Optional[str],
target: Optional[float],
actual: Optional[float],
cooldown_target: Optional[float] = None,
) -> None:
"""Publish ETA update for a heater.
Args:
heater: Heater name (bed, tool0, chamber)
eta: ETA in seconds, or None
eta_kind: "heating", "cooling", or None
target: Target temperature
actual: Actual temperature
cooldown_target: Cooldown target temperature (if applicable)
"""
with self._lock:
if not self._enabled or not self._connected:
return
# Check if we should publish based on interval
now = time.time()
if (now - self._last_published_time) < self._publish_interval:
return
self._last_published_time = now
# Determine state for transition detection
current_state = None
if eta_kind == "heating" and eta is not None:
current_state = "heating"
elif eta_kind == "cooling" and eta is not None:
current_state = "cooling"
elif target is not None and actual is not None:
if abs(target - actual) <= 1.0:
current_state = "at_target"
elif cooldown_target is not None and actual is not None:
if abs(cooldown_target - actual) <= 1.0:
current_state = "cooled_down"
# Detect state transitions
last_state = self._last_heater_state.get(heater)
state_changed = last_state != current_state
self._last_heater_state[heater] = current_state
# Build payload
payload = {
"heater": heater,
"eta_seconds": eta,
"eta_kind": eta_kind,
"target": target,
"actual": actual,
"cooldown_target": cooldown_target,
"timestamp": now,
"state": current_state,
}
# Publish ETA data
topic = f"{self._base_topic}/{heater}/eta"
self._publish_message(topic, payload)
# Publish state transition event if state changed
if state_changed and current_state is not None:
event_payload = {
"heater": heater,
"state": current_state,
"previous_state": last_state,
"timestamp": now,
"actual": actual,
"target": target,
}
event_topic = f"{self._base_topic}/{heater}/state_change"
self._publish_message(event_topic, event_payload)
self._logger.info(
"MQTT: %s state changed from %s to %s",
heater,
last_state or "unknown",
current_state,
)
|
is_connected
Check if MQTT client is connected.
Returns:
| Name | Type |
Description |
bool |
bool
|
|
Source code in octoprint_temp_eta/mqtt_client.py
515
516
517
518
519
520
521
522 | def is_connected(self) -> bool:
"""Check if MQTT client is connected.
Returns:
bool: True if connected
"""
with self._lock:
return self._connected
|
Usage Examples
Using the Calculator
from octoprint_temp_eta.calculator import calculate_linear_eta, calculate_exponential_eta
from collections import deque
import time
# Create temperature history
history = deque()
for i in range(10):
timestamp = time.time() + i
temperature = 25 + i * 0.2 # small ramp
target = 200.0
history.append((timestamp, temperature, target))
# Calculate ETA using linear estimator
eta_seconds = calculate_linear_eta(history, target)
print(f"Linear ETA: {eta_seconds}")
# Calculate ETA using exponential estimator (fallbacks to linear if needed)
eta_exp = calculate_exponential_eta(history, target)
print(f"Exponential ETA: {eta_exp}")
Using the MQTT Client
from octoprint_temp_eta.mqtt_client import MQTTClientWrapper
import logging
# Create logger
logger = logging.getLogger(__name__)
# Instantiate wrapper (note: the wrapper expects a logger and plugin identifier)
mqtt_client = MQTTClientWrapper(logger, "temp_eta")
# Configure client via settings-like dict
mqtt_client.configure({
"mqtt_enabled": True,
"mqtt_broker_host": "localhost",
"mqtt_broker_port": 1883,
"mqtt_username": "",
"mqtt_password": "",
"mqtt_use_tls": False,
"mqtt_base_topic": "octoprint/temp_eta",
"mqtt_qos": 0,
"mqtt_retain": False,
"mqtt_publish_interval": 1.0,
})
# Publish a sample ETA update (heater name, eta seconds, eta kind, target, actual)
mqtt_client.publish_eta_update(
heater="tool0",
eta=120.0,
eta_kind="heating",
target=200.0,
actual=50.0,
)
# Disconnect when done
mqtt_client.disconnect()
Plugin Integration
import octoprint.plugin
class MyPlugin(octoprint.plugin.OctoPrintPlugin):
def on_after_startup(self):
# Access TempETA plugin
temp_eta = self._plugin_manager.get_plugin("temp_eta")
if temp_eta:
# Get current ETA
eta_data = temp_eta.get_current_eta()
self._logger.info(f"Current ETA: {eta_data}")
Threading Considerations
All public methods are thread-safe when accessed through the plugin instance. However, when using calculator or MQTT client directly, ensure proper synchronization:
import threading
class SafeCalculator:
def __init__(self):
self._calculator = ETACalculator()
self._lock = threading.RLock()
def calculate(self, history, target):
with self._lock:
return self._calculator.calculate_eta(history, target)
Error Handling
All methods handle errors gracefully and return None or default values on failure:
try:
eta = calculator.calculate_eta(history, target)
if eta is None:
print("Insufficient data for ETA calculation")
else:
print(f"ETA: {eta:.1f}s")
except Exception as e:
logger.error(f"Calculation failed: {e}")
Type Hints
The codebase uses Python type hints for better IDE support:
from typing import Optional, Deque, Tuple
def calculate_eta(
history: Deque[Tuple[float, float, float]],
target: float
) -> Optional[float]:
"""
Calculate ETA to target temperature.
Args:
history: Temperature history deque
target: Target temperature
Returns:
ETA in seconds, or None if calculation fails
"""
pass
Logging
Use the provided logger for all log messages:
self._logger.debug("Debug message")
self._logger.info("Info message")
self._logger.warning("Warning message")
self._logger.error("Error message")
self._logger.exception("Exception with traceback")
Next Steps