Python API
Auto-generated Python API documentation for OctoPrint-TempETA.
Main Plugin Class
Bases: StartupPluginBase, TemplatePluginBase, SettingsPluginBase, AssetPluginBase, EventHandlerPluginBase, SimpleApiPluginBase
Main plugin implementation for Temperature ETA.
Implements OctoPrint Issue #469: Show estimated time remaining
for printer heating (bed, hotend, chamber).
Initialize plugin with temperature history tracking.
Source code in octoprint_temp_eta/__init__.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249 | def __init__(self):
"""Initialize plugin with temperature history tracking."""
super().__init__()
self._lock = threading.Lock()
self._debug_logging_enabled = False
self._last_debug_log_time = 0.0
self._last_heater_support_decision = {}
self._heater_supported_cache: Dict[str, bool] = {}
# MQTT client (initialized in on_after_startup when logger is available)
self._mqtt_client: Optional[Any] = None
# Number of temperature samples to keep per heater.
# This is configurable via settings (history_size). We cache the active
# value here for fast access in the 2Hz callback.
self._default_history_size = 60
self._history_maxlen = self._default_history_size
# Persist history per printer profile (file per profile id).
# Note: ETA uses only the most recent seconds of history, so we keep
# persistence bounded by both maxlen and a max-age filter on load.
self._active_profile_id: Optional[str] = None
# Persistence backoff to reduce SD card wear in long-running phases
# where the target is never reached.
# Backoff sequence after phase reset: 30s -> 60s -> 120s -> 240s -> 300s (cap).
self._persist_backoff_reset_s = 30.0
self._persist_backoff_initial_s = 60.0
self._persist_backoff_max_s = 300.0
self._persist_backoff_current_s = self._persist_backoff_initial_s
self._next_persist_time = 0.0
self._persist_phase_active = False
self._last_persist_size_warning_time = 0.0
self._persist_max_json_bytes = 256 * 1024
self._persist_max_age_seconds = 180.0
self._history_dirty = False
self._temp_history = {
"bed": deque(maxlen=self._history_maxlen),
"tool0": deque(maxlen=self._history_maxlen),
"chamber": deque(maxlen=self._history_maxlen),
}
# Cooldown history (target==0). Kept separate from heating history so
# the heat-up ETA fit doesn't get polluted by cooldown samples.
self._cooldown_history = {
"bed": deque(maxlen=self._history_maxlen),
"tool0": deque(maxlen=self._history_maxlen),
"chamber": deque(maxlen=self._history_maxlen),
}
# Ambient baseline per heater for ambient-mode cooldown.
# If the user doesn't provide an ambient temperature, we rely on the
# lowest temperature observed while the heater target is OFF (within a
# reasonable range) as a proxy for ambient.
self._cooldown_ambient_baseline: Dict[str, Optional[float]] = {
"bed": None,
"tool0": None,
"chamber": None,
}
self._last_update_time = 0.0
# Cache hot-path settings values for fast access in the ~2Hz callback.
# Refreshed on startup and on settings save.
self._threshold_start_c = 5.0
self._update_interval_s = 1.0
self._last_runtime_cache_refresh_time = 0.0
# Track last seen target per heater so we can detect transitions like
# heating -> off (cooldown start).
self._last_target_by_heater: Dict[str, float] = {}
# When enabled, suppress ETA updates while a print job is active.
# This keeps the UI focused on the pre-print heat-up phase.
self._suppressing_due_to_print = False
self._last_settings_snapshot_log_time = 0.0
|
on_after_startup
Called after OctoPrint startup, register for temperature updates.
Source code in octoprint_temp_eta/__init__.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 | def on_after_startup(self):
"""Called after OctoPrint startup, register for temperature updates."""
self._logger.info("Temperature ETA Plugin started")
self._printer.register_callback(self)
self._refresh_debug_logging_flag()
self._debug_log("Debug logging enabled")
self._refresh_runtime_caches()
# Apply configured history size now that settings are available.
self._set_history_maxlen(self._read_history_maxlen_setting())
# Load persisted history for the active printer profile.
self._switch_active_profile_if_needed(force=True)
# Initialize MQTT client
if MQTTClientWrapper is not None:
self._mqtt_client = MQTTClientWrapper(self._logger, self._identifier)
self._configure_mqtt_client()
else:
self._logger.info("MQTT support disabled: paho-mqtt not available")
|
on_printer_add_temperature
on_printer_add_temperature(data)
Called when new temperature data is available (~2Hz).
Parameters:
| Name |
Type |
Description |
Default |
data
|
dict
|
Temperature data from OctoPrint
{
"bed": {"actual": float, "target": float},
"tool0": {"actual": float, "target": float},
...
}
|
required
|
Source code in octoprint_temp_eta/__init__.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 | def on_printer_add_temperature(self, data):
"""Called when new temperature data is available (~2Hz).
Args:
data (dict): Temperature data from OctoPrint
{
"bed": {"actual": float, "target": float},
"tool0": {"actual": float, "target": float},
...
}
"""
if not self._settings.get_boolean(["enabled"]):
return
# Ensure we are tracking the right profile's history.
self._switch_active_profile_if_needed()
# If enabled, suppress ETA completely while OctoPrint considers a print job active.
if self._suppress_while_printing_enabled() and self._is_print_job_active():
# Clear once when suppression starts to avoid stale countdowns.
if not self._suppressing_due_to_print:
self._suppressing_due_to_print = True
self._clear_all_heaters_frontend()
return
# If we were suppressing but the condition no longer applies (e.g. warm-up resumed or print ended), re-enable.
if self._suppressing_due_to_print:
self._suppressing_due_to_print = False
current_time = time.time()
# Keep cached settings reasonably fresh even if settings change outside
# of the normal save flow (e.g. tests or edge-case integrations).
if (current_time - self._last_runtime_cache_refresh_time) >= 5.0:
self._last_runtime_cache_refresh_time = current_time
self._refresh_runtime_caches()
# Periodic settings snapshot for debugging.
self._debug_log_settings_snapshot(current_time)
threshold = float(getattr(self, "_threshold_start_c", 5.0))
update_interval = float(getattr(self, "_update_interval_s", 1.0))
heating_enabled = self._heating_enabled()
cooldown_enabled = self._cooldown_enabled()
# Avoid eager f-string formatting in the hot path.
try:
is_logger_debug = bool(getattr(self._logger, "isEnabledFor")(10)) # type: ignore[attr-defined]
except Exception:
is_logger_debug = False
if is_logger_debug:
heaters_in_data = [k for k, v in data.items() if isinstance(v, dict)]
if heaters_in_data:
self._logger.debug(
"Received temperature data for heaters: %s", heaters_in_data
)
epsilon_hold = 0.2
# Track whether we are in an active heating phase (at least one heater
# is above the threshold window and not holding). This drives persistence
# backoff scheduling.
phase_active_now = False
target_changed_in_active_phase = False
recorded_count = 0
recorded_cooldown_count = 0
heaters_seen = 0
with self._lock:
# Protect shared history state (OctoPrint may call callbacks from worker threads).
for heater, temps in data.items():
if not isinstance(temps, dict):
continue
heater_key = str(heater)
heaters_seen += 1
# Record samples only while ETA could be shown (active target and above threshold).
target_raw = temps.get("target", 0)
actual_raw = temps.get("actual")
if actual_raw is None:
continue
try:
actual = float(actual_raw)
except Exception:
continue
try:
target = float(target_raw or 0)
except Exception:
# Some firmwares/virtual printer formats may provide a non-numeric
# target (e.g. "off"). Treat that as OFF for cooldown tracking.
target = 0.0
self._debug_log_throttled(
current_time,
30.0,
"Non-numeric target treated as OFF heater=%s target_raw=%r",
str(heater),
target_raw,
)
prev_target = self._last_target_by_heater.get(str(heater))
self._last_target_by_heater[heater_key] = float(target)
if target <= 0:
# Cooldown tracking (target==0).
if cooldown_enabled:
if heater_key not in self._cooldown_history:
self._cooldown_history[heater_key] = deque(
maxlen=self._history_maxlen
)
# If we just transitioned from heating to OFF, start a fresh
# cooldown history so our linear cooldown fit doesn't include
# old OFF samples from before the heat-up phase.
if prev_target is not None and prev_target > 0:
self._cooldown_history[heater_key].clear()
self._debug_log_throttled(
current_time,
10.0,
"Cooldown start detected, cleared history heater=%s prev_target=%.1f actual=%.1f",
str(heater),
float(prev_target),
float(actual),
)
self._cooldown_history[heater_key].append(
(current_time, actual)
)
recorded_cooldown_count += 1
# Track a baseline ambient temp while OFF.
# We only learn baseline values in a sane range to
# avoid "ambient" being polluted by still-hot cooldown.
if math.isfinite(actual) and actual < 120.0:
prev = self._cooldown_ambient_baseline.get(heater_key)
if prev is None or actual < prev:
self._cooldown_ambient_baseline[heater_key] = actual
continue
if not heating_enabled:
continue
remaining = target - actual
# Avoid recording while holding near target to prevent constant churn.
if remaining <= epsilon_hold:
continue
if remaining < threshold:
continue
# This heater is actively heating within the ETA window.
phase_active_now = True
if (
prev_target is not None
and math.isfinite(prev_target)
and math.isfinite(target)
and prev_target > 0.0
and target > 0.0
and abs(prev_target - target) >= 0.5
):
target_changed_in_active_phase = True
if heater_key not in self._temp_history:
self._temp_history[heater_key] = deque(maxlen=self._history_maxlen)
self._temp_history[heater_key].append((current_time, actual, target))
self._history_dirty = True
recorded_count += 1
# Avoid flooding logs while idle/holding temperature: log far less often
# when we didn't record any samples.
debug_interval = 5.0 if recorded_count > 0 else 60.0
self._debug_log_throttled(
current_time,
debug_interval,
"Temp callback profile=%s heaters=%d recorded=%d cooldown_recorded=%d threshold=%.2f",
str(self._active_profile_id),
int(heaters_seen),
recorded_count,
recorded_cooldown_count,
float(threshold),
)
if (current_time - self._last_update_time) >= update_interval:
self._last_update_time = current_time
self._calculate_and_broadcast_eta(data)
# Update persistence phase state based on transitions.
if target_changed_in_active_phase:
self._enter_persist_phase(current_time, "target_change")
elif self._persist_phase_active and (not phase_active_now):
# Active -> inactive transition: schedule a shorter persist.
self._reset_persist_backoff(current_time, "phase_end")
elif (not self._persist_phase_active) and phase_active_now:
# Inactive -> active transition: start backoff sequence.
self._enter_persist_phase(current_time, "phase_start")
self._maybe_persist_history(current_time)
|
on_printer_send_current_data
on_printer_send_current_data(data)
Stub: Called when current printer data is sent (required by callback interface).
Source code in octoprint_temp_eta/__init__.py
| def on_printer_send_current_data(self, data):
"""Stub: Called when current printer data is sent (required by callback interface)."""
pass
|
on_printer_add_log
Stub: Called when log entry is added (required by callback interface).
Source code in octoprint_temp_eta/__init__.py
| def on_printer_add_log(self, data):
"""Stub: Called when log entry is added (required by callback interface)."""
pass
|
on_event
Handle OctoPrint events to keep UI state consistent.
Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
do not keep showing stale countdowns.
Parameters:
| Name |
Type |
Description |
Default |
event
|
str
|
|
required
|
payload
|
dict
|
|
required
|
Source code in octoprint_temp_eta/__init__.py
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287 | def on_event(self, event, payload):
"""Handle OctoPrint events to keep UI state consistent.
Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
do not keep showing stale countdowns.
Args:
event (str): OctoPrint event name
payload (dict): Event payload
"""
if event in (
"Disconnected",
"Error",
"Shutdown",
): # clear UI on connection loss
# Persist what we have before clearing.
self._persist_current_profile_history()
self._reset_persist_backoff(time.time(), "disconnect_or_error")
with self._lock:
heaters = list(self._temp_history.keys())
for h in heaters:
self._temp_history[h].clear()
for h in list(self._cooldown_history.keys()):
self._cooldown_history[h].clear()
self._send_clear_messages(heaters)
# Disconnect MQTT on shutdown
if event == "Shutdown" and self._mqtt_client is not None:
self._mqtt_client.disconnect()
# Reset suppression flag on job lifecycle changes; actual suppression is decided in the temperature callback.
if event in (
"PrintStarted",
"PrintResumed",
"PrintDone",
"PrintFailed",
"PrintCancelled",
):
self._suppressing_due_to_print = False
|
get_settings_defaults
Return the default settings for the plugin.
Returns:
| Name | Type |
Description |
dict |
|
Dictionary containing default plugin settings.
|
Source code in octoprint_temp_eta/__init__.py
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812 | def get_settings_defaults(self):
"""Return the default settings for the plugin.
Returns:
dict: Dictionary containing default plugin settings.
"""
return dict(
enabled=True,
enable_heating_eta=True,
suppress_while_printing=False,
show_in_sidebar=True,
show_in_navbar=True,
show_in_tab=True,
show_progress_bars=True,
show_historical_graph=True,
historical_graph_window_seconds=180,
temp_display="octoprint",
threshold_unit="octoprint",
debug_logging=False,
threshold_start=5.0,
algorithm="linear",
update_interval=1.0,
history_size=60,
# Persistence (advanced; protects SD cards on long-running phases)
persist_backoff_reset_s=30.0,
persist_backoff_initial_s=60.0,
persist_backoff_max_s=300.0,
persist_max_json_bytes=256 * 1024,
# Cool Down ETA
enable_cooldown_eta=True,
cooldown_mode="threshold",
cooldown_target_tool0=50.0,
cooldown_target_bed=40.0,
cooldown_target_chamber=30.0,
cooldown_ambient_temp=None,
cooldown_hysteresis_c=1.0,
cooldown_fit_window_seconds=120,
# Extended settings: status colors
color_mode="bands",
color_heating="#5cb85c",
color_cooling="#337ab7",
color_idle="#777777",
# Extended settings: sound alerts
sound_enabled=False,
sound_target_reached=False,
sound_cooldown_finished=False,
sound_volume=0.5,
sound_min_interval_s=10.0,
# Extended settings: browser notifications (toast)
notification_enabled=False,
notification_target_reached=False,
notification_cooldown_finished=False,
notification_timeout_s=6.0,
notification_min_interval_s=10.0,
# MQTT settings
mqtt_enabled=False,
mqtt_broker_host="",
mqtt_broker_port=1883,
mqtt_username="",
mqtt_password="",
mqtt_use_tls=False,
mqtt_tls_insecure=False,
mqtt_base_topic="octoprint/temp_eta",
mqtt_qos=0,
mqtt_retain=False,
mqtt_publish_interval=1.0,
)
|
is_template_autoescaped
is_template_autoescaped()
Enable autoescaping for all plugin templates.
Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
XSS risk from unescaped injected variables.
Returns:
| Name | Type |
Description |
bool |
bool
|
True to enable autoescaping.
|
Source code in octoprint_temp_eta/__init__.py
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824 | def is_template_autoescaped(self) -> bool: # pyright: ignore
"""Enable autoescaping for all plugin templates.
Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
XSS risk from unescaped injected variables.
Returns:
bool: True to enable autoescaping.
"""
return True
|
get_template_configs
Configure which templates to use and how to bind them.
Returns:
| Name | Type |
Description |
list |
|
List of template configuration dictionaries.
|
Source code in octoprint_temp_eta/__init__.py
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843 | def get_template_configs(self):
"""Configure which templates to use and how to bind them.
Returns:
list: List of template configuration dictionaries.
"""
return [
dict(type="navbar", custom_bindings=True),
dict(
type="sidebar",
custom_bindings=False,
name=gettext("Temperature ETA"),
icon="fa fa-clock",
),
# Use OctoPrint's default settingsViewModel binding for settings UI.
dict(type="settings", custom_bindings=True),
dict(type="tab", custom_bindings=False),
]
|
on_settings_save
Persist settings and clear UI/state when disabling the plugin.
OctoPrint applies settings changes only on save. When the plugin is disabled,
we actively clear any previously shown countdowns in navbar/tab so the UI
does not keep showing stale values.
Parameters:
| Name |
Type |
Description |
Default |
data
|
dict
|
Settings data posted from the UI.
|
required
|
Source code in octoprint_temp_eta/__init__.py
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884 | def on_settings_save(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Persist settings and clear UI/state when disabling the plugin.
OctoPrint applies settings changes only on save. When the plugin is disabled,
we actively clear any previously shown countdowns in navbar/tab so the UI
does not keep showing stale values.
Args:
data (dict): Settings data posted from the UI.
"""
if not getattr(self, "_settings", None):
return {}
self._sanitize_settings_payload(data)
was_enabled = bool(self._settings.get_boolean(["enabled"]))
old_debug = bool(getattr(self, "_debug_logging_enabled", False))
old_history_maxlen = self._read_history_maxlen_setting()
saved = octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
is_enabled = bool(self._settings.get_boolean(["enabled"]))
self._refresh_debug_logging_flag()
self._refresh_runtime_caches()
if old_debug != bool(self._debug_logging_enabled):
self._logger.info(
"Debug logging %s",
"enabled" if self._debug_logging_enabled else "disabled",
)
new_history_maxlen = self._read_history_maxlen_setting()
if new_history_maxlen != old_history_maxlen:
self._set_history_maxlen(new_history_maxlen)
if was_enabled and not is_enabled:
self._clear_all_heaters_frontend()
# Reconfigure MQTT client with new settings
self._configure_mqtt_client()
return saved if isinstance(saved, dict) else {}
|
get_assets
Return static assets (JS, CSS, LESS) to be included.
Returns:
| Name | Type |
Description |
dict |
|
Dictionary with asset types and their file paths.
|
Source code in octoprint_temp_eta/__init__.py
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029 | def get_assets(self):
"""Return static assets (JS, CSS, LESS) to be included.
Returns:
dict: Dictionary with asset types and their file paths.
"""
return dict(
js=["js/temp_eta.js"],
less=["less/temp_eta.less"],
)
|
is_api_protected
Whether the Simple API requires an authenticated user.
OctoPrint's default for this will switch from False to True in the
future. We explicitly opt in to avoid relying on defaults.
Source code in octoprint_temp_eta/__init__.py
2032
2033
2034
2035
2036
2037
2038 | def is_api_protected(self) -> bool: # type: ignore[override]
"""Whether the Simple API requires an authenticated user.
OctoPrint's default for this will switch from False to True in the
future. We explicitly opt in to avoid relying on defaults.
"""
return True
|
is_api_adminonly
Whether the Simple API is restricted to admin users.
Source code in octoprint_temp_eta/__init__.py
| def is_api_adminonly(self) -> bool: # type: ignore[override]
"""Whether the Simple API is restricted to admin users."""
return True
|
get_api_commands
Return supported Simple API commands for the plugin.
Source code in octoprint_temp_eta/__init__.py
2044
2045
2046
2047
2048
2049 | def get_api_commands(self) -> Dict[str, list]: # type: ignore[override]
"""Return supported Simple API commands for the plugin."""
return {
"reset_profile_history": [],
"reset_settings_defaults": [],
}
|
on_api_command
on_api_command(command, data)
Handle Simple API commands.
Source code in octoprint_temp_eta/__init__.py
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088 | def on_api_command(self, command: str, data: Dict[str, Any]): # type: ignore[override]
"""Handle Simple API commands."""
if command == "reset_profile_history":
deleted_count = self._reset_all_profile_histories()
profile_id = self._get_current_profile_id()
logger = getattr(self, "_logger", None)
if logger is not None:
logger.info(
"Reset persisted history for all profiles (trigger_profile=%s deleted_files=%d)",
str(profile_id),
int(deleted_count),
)
return jsonify(
{
"success": True,
"profile_id": profile_id,
"deleted_files": deleted_count,
}
)
if command == "reset_settings_defaults":
self._reset_user_settings_to_defaults()
logger = getattr(self, "_logger", None)
if logger is not None:
logger.info("Restored plugin settings defaults (user-editable keys)")
# Notify all connected clients so the settings UI can refresh via requestData().
if getattr(self, "_plugin_manager", None):
try:
self._plugin_manager.send_plugin_message(
self._identifier, {"type": "settings_reset"}
)
except Exception:
pass
return jsonify({"success": True, "message": gettext("Defaults restored.")})
return jsonify({"success": False, "error": "unknown_command"})
|
Provide update information for the Software Update plugin.
Returns:
| Name | Type |
Description |
dict |
|
Update configuration for the plugin.
|
Source code in octoprint_temp_eta/__init__.py
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109 | def get_update_information(self):
"""Provide update information for the Software Update plugin.
Returns:
dict: Update configuration for the plugin.
"""
return dict(
temp_eta=dict(
displayName="Temperature ETA Plugin",
displayVersion=self._plugin_version,
# version check: github repository
type="github_release",
user="Ajimaru",
repo="OctoPrint-TempETA",
current=self._plugin_version,
# update method: pip
pip="https://github.com/Ajimaru/OctoPrint-TempETA/archive/{target_version}.zip",
)
)
|
Calculator Module
Temperature ETA calculation algorithms.
This module provides pure calculation logic for estimating time remaining
until a printer heater reaches its target temperature or cools down.
All functions are stateless and independent of OctoPrint plugin mechanics,
making them easy to test and maintain.
calculate_linear_eta
calculate_linear_eta(history, target, window_seconds=10.0)
Calculate ETA assuming constant heating rate.
Uses linear regression on recent temperature samples to estimate
the rate of temperature change and predict time to target.
Parameters:
| Name |
Type |
Description |
Default |
history
|
deque
|
Deque of (timestamp, actual_temp, target_temp) tuples
|
required
|
target
|
float
|
Target temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for rate calculation (default: 10s)
|
10.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to target, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def calculate_linear_eta(
history: deque, target: float, window_seconds: float = 10.0
) -> Optional[float]:
"""Calculate ETA assuming constant heating rate.
Uses linear regression on recent temperature samples to estimate
the rate of temperature change and predict time to target.
Args:
history: Deque of (timestamp, actual_temp, target_temp) tuples
target: Target temperature in degrees
window_seconds: Time window for rate calculation (default: 10s)
Returns:
Estimated seconds to target, or None if insufficient data
"""
# Validate inputs
if not math.isfinite(target):
return None
if not math.isfinite(window_seconds) or window_seconds <= 0:
return None
if not history or len(history) < 2:
return None
# Use last N seconds of data for rate calculation (anchored to history)
last_ts = max(
(
ts
for ts, actual, _target in history
if math.isfinite(ts) and math.isfinite(actual)
),
default=None,
)
if last_ts is None:
return None
cutoff = last_ts - window_seconds
t0 = None
temp0 = None
t1 = None
temp1 = None
for ts, actual, _target in history:
# Validate data from history
if not (math.isfinite(ts) and math.isfinite(actual)):
continue
if ts <= cutoff:
continue
if t0 is None:
t0 = ts
temp0 = actual
t1 = ts
temp1 = actual
if t0 is None or t1 is None or temp0 is None or temp1 is None:
return None
time_diff = t1 - t0
temp_diff = temp1 - temp0
if time_diff <= 0 or temp_diff <= 0:
return None
# rate = ΔT / Δt (°C per second)
rate = temp_diff / time_diff
remaining = target - temp1
if remaining <= 0:
return None
eta = remaining / rate
return max(0.0, eta)
|
calculate_exponential_eta
calculate_exponential_eta(history, target, window_seconds=30.0)
Calculate ETA accounting for thermal asymptotic behavior.
Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
Falls back to linear estimation when insufficient data or poor fit.
Parameters:
| Name |
Type |
Description |
Default |
history
|
deque
|
Deque of (timestamp, actual_temp, target_temp) tuples
|
required
|
target
|
float
|
Target temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for exponential fit (default: 30s)
|
30.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to target, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 | def calculate_exponential_eta(
history: deque, target: float, window_seconds: float = 30.0
) -> Optional[float]:
"""Calculate ETA accounting for thermal asymptotic behavior.
Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
Falls back to linear estimation when insufficient data or poor fit.
Args:
history: Deque of (timestamp, actual_temp, target_temp) tuples
target: Target temperature in degrees
window_seconds: Time window for exponential fit (default: 30s)
Returns:
Estimated seconds to target, or None if insufficient data
"""
# Validate inputs
if not math.isfinite(target):
return None
if not math.isfinite(window_seconds) or window_seconds <= 0:
return None
if not history or len(history) < 3:
return None
# Use a recent window for the fit
last_ts = None
for ts, temp, tgt in history:
if math.isfinite(ts) and math.isfinite(temp):
last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
if last_ts is None:
return None
cutoff = last_ts - window_seconds
recent = sorted(
(
(ts, temp, tgt)
for (ts, temp, tgt) in history
if math.isfinite(ts) and math.isfinite(temp) and ts > cutoff
),
key=lambda x: x[0],
)
# Drop duplicate timestamps to avoid zero-span/unstable fits
deduped = []
last_ts = None
for ts, temp, tgt in recent:
if last_ts is not None and ts == last_ts:
continue
deduped.append((ts, temp, tgt))
last_ts = ts
recent = deduped
if len(recent) < 6:
return calculate_linear_eta(history, target)
# Current sample
t_now, temp_now, _ = recent[-1]
remaining_now = target - temp_now
if remaining_now <= 0:
return None
# We model the approach to target as asymptotic.
# Reaching the target exactly takes infinite time; use an epsilon band.
epsilon_c = 0.5
if remaining_now <= epsilon_c:
return 0.0
# Build regression data for ln(target - T).
# Exclude points too close to target (noise dominates) and invalid samples.
t0 = recent[0][0]
xs = []
ys = []
for ts, temp, _ in recent:
delta = target - temp
if delta <= epsilon_c:
continue
x = ts - t0
if x < 0:
continue
xs.append(x)
ys.append(math.log(delta))
if len(xs) < 6:
return calculate_linear_eta(history, target)
span = xs[-1] - xs[0]
if span < 5:
return calculate_linear_eta(history, target)
# Require we are actually heating in this window.
if (recent[-1][1] - recent[0][1]) <= 0.2:
return None
# Linear regression: y = a + b*x, where b should be negative.
x_mean = sum(xs) / len(xs)
y_mean = sum(ys) / len(ys)
sxx = 0.0
sxy = 0.0
for x, y in zip(xs, ys):
dx = x - x_mean
dy = y - y_mean
sxx += dx * dx
sxy += dx * dy
if sxx <= 0:
return calculate_linear_eta(history, target)
slope = sxy / sxx
if slope >= -1e-4:
# Not decaying fast enough or unstable -> fallback.
return calculate_linear_eta(history, target)
tau = -1.0 / slope
if tau <= 0 or tau > 2000:
return calculate_linear_eta(history, target)
# ETA to reach epsilon band.
try:
eta = tau * math.log(remaining_now / epsilon_c)
except ValueError as e:
# Log mathematische Fehler für bessere Nachvollziehbarkeit
import logging
logging.getLogger("octoprint_temp_eta").debug(
"Exponential ETA math error: %s", e
)
return calculate_linear_eta(history, target)
if eta < 0:
eta = 0.0
# Protect against spikes: if exponential estimate is wildly larger than
# the linear estimate on the same data, trust the linear estimate.
linear_eta = calculate_linear_eta(history, target)
if linear_eta is not None and eta > (linear_eta * 5):
return linear_eta
return eta
|
calculate_cooldown_linear_eta
calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds=60.0)
Linear cooldown ETA from recent slope.
Uses linear regression on recent cooldown samples to estimate
the rate of temperature decrease and predict time to goal.
Parameters:
| Name |
Type |
Description |
Default |
cooldown_history
|
deque
|
Deque of (timestamp, temp) tuples
|
required
|
goal_c
|
float
|
Target cooldown temperature in degrees
|
required
|
window_seconds
|
float
|
Time window for fit (default: 60s)
|
60.0
|
Returns:
| Type |
Description |
Optional[float]
|
Estimated seconds to goal, or None if insufficient data
|
Source code in octoprint_temp_eta/calculator.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 | def calculate_cooldown_linear_eta(
cooldown_history: deque, goal_c: float, window_seconds: float = 60.0
) -> Optional[float]:
"""Linear cooldown ETA from recent slope.
Uses linear regression on recent cooldown samples to estimate
the rate of temperature decrease and predict time to goal.
Args:
cooldown_history: Deque of (timestamp, temp) tuples
goal_c: Target cooldown temperature in degrees
window_seconds: Time window for fit (default: 60s)
Returns:
Estimated seconds to goal, or None if insufficient data
"""
# Validate inputs
if not math.isfinite(goal_c):
return None
if not math.isfinite(window_seconds) or window_seconds <= 0:
return None
if not cooldown_history:
return None
last_ts = None
for ts, temp in cooldown_history:
if math.isfinite(ts) and math.isfinite(temp):
last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
if last_ts is None:
return None
cutoff = last_ts - window_seconds
recent = sorted(
(
(ts, temp)
for ts, temp in cooldown_history
if ts > cutoff and math.isfinite(ts) and math.isfinite(temp)
),
key=lambda x: x[0],
)
if len(recent) < 2:
return None
t0, temp0 = recent[0]
t1, temp1 = recent[-1]
dt = t1 - t0
dtemp = temp1 - temp0
if dt <= 0:
return None
slope = dtemp / dt
if slope >= -1e-3:
# Not cooling fast enough
return None
remaining = temp1 - goal_c
if remaining <= 0:
return None
eta = remaining / (-slope)
if not math.isfinite(eta) or eta < 0:
return None
# Cap at 24 hours
return float(min(eta, 24 * 3600))
|
calculate_cooldown_exponential_eta
calculate_cooldown_exponential_eta(cooldown_history, ambient_c, goal_c, window_seconds=60.0)
Exponential cooldown ETA (Newton's law of cooling).
Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
Source code in octoprint_temp_eta/calculator.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395 | def calculate_cooldown_exponential_eta(
cooldown_history: deque,
ambient_c: float,
goal_c: float,
window_seconds: float = 60.0,
) -> Optional[float]:
"""Exponential cooldown ETA (Newton's law of cooling).
Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
"""
if not (math.isfinite(ambient_c) and math.isfinite(goal_c)):
return None
if not math.isfinite(window_seconds) or window_seconds <= 0:
return None
if goal_c <= ambient_c:
return None
if not cooldown_history or len(cooldown_history) < 4:
return None
last_ts = None
for ts, temp in cooldown_history:
if math.isfinite(ts) and math.isfinite(temp):
last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
if last_ts is None:
return None
cutoff = last_ts - window_seconds
recent = [
(ts, temp)
for ts, temp in cooldown_history
if ts > cutoff and math.isfinite(ts) and math.isfinite(temp)
]
if len(recent) < 6:
return calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds)
_t_now, temp_now = recent[-1]
if temp_now <= goal_c:
return None
epsilon = 0.5
t0 = recent[0][0]
xs = []
ys = []
for ts, temp in recent:
delta = temp - ambient_c
if delta <= epsilon:
continue
x = ts - t0
if x < 0:
continue
xs.append(x)
ys.append(math.log(delta))
if len(xs) < 4:
return None
x_mean = sum(xs) / float(len(xs))
y_mean = sum(ys) / float(len(ys))
sxx = 0.0
sxy = 0.0
for x, y in zip(xs, ys):
dx = x - x_mean
dy = y - y_mean
sxx += dx * dx
sxy += dx * dy
if sxx <= 0:
return None
slope = sxy / sxx
if slope >= -1e-4:
return None
tau = -1.0 / slope
if tau <= 0 or tau > 20000:
return None
numerator = temp_now - ambient_c
denominator = goal_c - ambient_c
if numerator <= 0 or denominator <= 0:
return None
try:
eta = tau * math.log(numerator / denominator)
except (ValueError, ZeroDivisionError) as e:
import logging
logging.getLogger("octoprint_temp_eta").debug(
"Exponential ETA math error: %s", e
)
return None
if not math.isfinite(eta) or eta < 0:
return None
return float(min(eta, 24 * 3600))
|
MQTT Client Module
Thread-safe MQTT client wrapper for the Temperature ETA plugin.
Manages connection lifecycle, automatic reconnection, and message publishing.
All MQTT operations are non-blocking to avoid impacting the temperature callback.
Initialize MQTT client wrapper.
Parameters:
| Name |
Type |
Description |
Default |
logger
|
Any
|
Logger instance for debug/info messages
|
required
|
identifier
|
str
|
Plugin identifier for topic prefixes
|
required
|
Source code in octoprint_temp_eta/mqtt_client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | def __init__(self, logger: Any, identifier: str):
"""Initialize MQTT client wrapper.
Args:
logger: Logger instance for debug/info messages
identifier: Plugin identifier for topic prefixes
"""
self._logger = logger
self._identifier = identifier
self._lock = threading.Lock()
self._client: Optional[Any] = None
self._enabled = False
self._connected = False
self._connecting = False
# Connection settings
self._broker_host = ""
self._broker_port = 1883
self._username = ""
self._password = ""
self._use_tls = False
self._tls_insecure = False
# Publishing settings
self._base_topic = "octoprint/temp_eta"
self._qos = 0
self._retain = False
self._publish_interval = 1.0
# State tracking for state transition events
self._last_published_time = 0.0
self._last_heater_state: Dict[str, Optional[str]] = {}
# Connection retry logic
self._last_connect_attempt = 0.0
self._connect_retry_interval = 30.0
# Avoid log spam when paho-mqtt is not installed.
# Use a dedicated lock to avoid re-entrantly acquiring self._lock.
self._mqtt_unavailable_lock = threading.Lock()
self._mqtt_unavailable_warned = False
|
Update MQTT configuration from plugin settings.
Parameters:
| Name |
Type |
Description |
Default |
settings
|
Dict[str, Any]
|
Dictionary with MQTT configuration keys
|
required
|
Source code in octoprint_temp_eta/mqtt_client.py
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 | def configure(self, settings: Dict[str, Any]) -> None:
"""Update MQTT configuration from plugin settings.
Args:
settings: Dictionary with MQTT configuration keys
"""
with self._lock:
old_enabled = self._enabled
self._enabled = bool(settings.get("mqtt_enabled", False))
self._broker_host = str(settings.get("mqtt_broker_host", "")).strip()
self._broker_port = int(settings.get("mqtt_broker_port", 1883))
self._username = str(settings.get("mqtt_username", "")).strip()
self._password = str(settings.get("mqtt_password", "")).strip()
self._use_tls = bool(settings.get("mqtt_use_tls", False))
self._tls_insecure = bool(settings.get("mqtt_tls_insecure", False))
self._base_topic = str(
settings.get("mqtt_base_topic", "octoprint/temp_eta")
).strip()
self._qos = int(settings.get("mqtt_qos", 0))
self._retain = bool(settings.get("mqtt_retain", False))
self._publish_interval = float(settings.get("mqtt_publish_interval", 1.0))
# Reconnect if settings changed and enabled
if self._enabled and (not old_enabled or not self._connected):
self._schedule_connect()
elif not self._enabled and old_enabled:
self._disconnect_internal()
|
disconnect
Disconnect MQTT client gracefully.
Source code in octoprint_temp_eta/mqtt_client.py
| def disconnect(self) -> None:
"""Disconnect MQTT client gracefully."""
with self._lock:
self._disconnect_internal()
|
publish_eta_update
publish_eta_update(heater, eta, eta_kind, target, actual, cooldown_target=None)
Publish ETA update for a heater.
Parameters:
| Name |
Type |
Description |
Default |
heater
|
str
|
Heater name (bed, tool0, chamber)
|
required
|
eta
|
Optional[float]
|
|
required
|
eta_kind
|
Optional[str]
|
"heating", "cooling", or None
|
required
|
target
|
Optional[float]
|
|
required
|
actual
|
Optional[float]
|
|
required
|
cooldown_target
|
Optional[float]
|
Cooldown target temperature (if applicable)
|
None
|
Source code in octoprint_temp_eta/mqtt_client.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355 | def publish_eta_update(
self,
heater: str,
eta: Optional[float],
eta_kind: Optional[str],
target: Optional[float],
actual: Optional[float],
cooldown_target: Optional[float] = None,
) -> None:
"""Publish ETA update for a heater.
Args:
heater: Heater name (bed, tool0, chamber)
eta: ETA in seconds, or None
eta_kind: "heating", "cooling", or None
target: Target temperature
actual: Actual temperature
cooldown_target: Cooldown target temperature (if applicable)
"""
with self._lock:
if not self._enabled or not self._connected:
return
# Check if we should publish based on interval
now = time.time()
if (now - self._last_published_time) < self._publish_interval:
return
self._last_published_time = now
# Determine state for transition detection
current_state = None
if eta_kind == "heating" and eta is not None:
current_state = "heating"
elif eta_kind == "cooling" and eta is not None:
current_state = "cooling"
elif target is not None and actual is not None:
if abs(target - actual) <= 1.0:
current_state = "at_target"
elif cooldown_target is not None and actual is not None:
if abs(cooldown_target - actual) <= 1.0:
current_state = "cooled_down"
# Detect state transitions
last_state = self._last_heater_state.get(heater)
state_changed = last_state != current_state
self._last_heater_state[heater] = current_state
# Build payload
payload = {
"heater": heater,
"eta_seconds": eta,
"eta_kind": eta_kind,
"target": target,
"actual": actual,
"cooldown_target": cooldown_target,
"timestamp": now,
"state": current_state,
}
# Publish ETA data
topic = f"{self._base_topic}/{heater}/eta"
self._publish_message(topic, payload)
# Publish state transition event if state changed
if state_changed and current_state is not None:
event_payload = {
"heater": heater,
"state": current_state,
"previous_state": last_state,
"timestamp": now,
"actual": actual,
"target": target,
}
event_topic = f"{self._base_topic}/{heater}/state_change"
self._publish_message(event_topic, event_payload)
self._logger.info(
"MQTT: %s state changed from %s to %s",
heater,
last_state or "unknown",
current_state,
)
|
is_connected
Check if MQTT client is connected.
Returns:
| Name | Type |
Description |
bool |
bool
|
|
Source code in octoprint_temp_eta/mqtt_client.py
384
385
386
387
388
389
390
391 | def is_connected(self) -> bool:
"""Check if MQTT client is connected.
Returns:
bool: True if connected
"""
with self._lock:
return self._connected
|
Usage Examples
Using the Calculator
from octoprint_temp_eta.calculator import calculate_linear_eta, calculate_exponential_eta
from collections import deque
import time
# Create temperature history
history = deque()
for i in range(10):
timestamp = time.time() + i
temperature = 25 + i * 0.2 # small ramp
target = 200.0
history.append((timestamp, temperature, target))
# Calculate ETA using linear estimator
eta_seconds = calculate_linear_eta(history, target)
print(f"Linear ETA: {eta_seconds}")
# Calculate ETA using exponential estimator (fallbacks to linear if needed)
eta_exp = calculate_exponential_eta(history, target)
print(f"Exponential ETA: {eta_exp}")
Using the MQTT Client
from octoprint_temp_eta.mqtt_client import MQTTClientWrapper
import logging
# Create logger
logger = logging.getLogger(__name__)
# Instantiate wrapper (note: the wrapper expects a logger and plugin identifier)
mqtt_client = MQTTClientWrapper(logger, "temp_eta")
# Configure client via settings-like dict
mqtt_client.configure({
"mqtt_enabled": True,
"mqtt_broker_host": "localhost",
"mqtt_broker_port": 1883,
"mqtt_username": "",
"mqtt_password": "",
"mqtt_use_tls": False,
"mqtt_base_topic": "octoprint/temp_eta",
"mqtt_qos": 0,
"mqtt_retain": False,
"mqtt_publish_interval": 1.0,
})
# Publish a sample ETA update (heater name, eta seconds, eta kind, target, actual)
mqtt_client.publish_eta_update(
heater="tool0",
eta=120.0,
eta_kind="heating",
target=200.0,
actual=50.0,
)
# Disconnect when done
mqtt_client.disconnect()
Plugin Integration
import octoprint.plugin
class MyPlugin(octoprint.plugin.OctoPrintPlugin):
def on_after_startup(self):
# Access TempETA plugin
temp_eta = self._plugin_manager.get_plugin("temp_eta")
if temp_eta:
# Get current ETA
eta_data = temp_eta.get_current_eta()
self._logger.info(f"Current ETA: {eta_data}")
Threading Considerations
All public methods are thread-safe when accessed through the plugin instance. However, when using calculator or MQTT client directly, ensure proper synchronization:
import threading
class SafeCalculator:
def __init__(self):
self._calculator = ETACalculator()
self._lock = threading.RLock()
def calculate(self, history, target):
with self._lock:
return self._calculator.calculate_eta(history, target)
Error Handling
All methods handle errors gracefully and return None or default values on failure:
try:
eta = calculator.calculate_eta(history, target)
if eta is None:
print("Insufficient data for ETA calculation")
else:
print(f"ETA: {eta:.1f}s")
except Exception as e:
logger.error(f"Calculation failed: {e}")
Type Hints
The codebase uses Python type hints for better IDE support:
from typing import Optional, Deque, Tuple
def calculate_eta(
history: Deque[Tuple[float, float, float]],
target: float
) -> Optional[float]:
"""
Calculate ETA to target temperature.
Args:
history: Temperature history deque
target: Target temperature
Returns:
ETA in seconds, or None if calculation fails
"""
pass
Logging
Use the provided logger for all log messages:
self._logger.debug("Debug message")
self._logger.info("Info message")
self._logger.warning("Warning message")
self._logger.error("Error message")
self._logger.exception("Exception with traceback")
Next Steps