Skip to content

Python API

Auto-generated Python API documentation for OctoPrint-TempETA.

Main Plugin Class

Bases: PrinterCallbackBase, StartupPluginBase, TemplatePluginBase, SettingsPluginBase, AssetPluginBase, EventHandlerPluginBase, SimpleApiPluginBase

Main plugin implementation for Temperature ETA.

Implements OctoPrint Issue #469: Show estimated time remaining for printer heating (bed, hotend, chamber).

Initialize plugin with temperature history tracking.

Source code in octoprint_temp_eta/__init__.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def __init__(self):
    """Initialize plugin with temperature history tracking."""
    super().__init__()
    self._lock = threading.Lock()
    # Serializes profile-switch sequences against each other. Acquired
    # around the persist -> id-swap -> history-replace flow performed
    # inside the switch handler so two concurrent switches (and the
    # switch's own internal persist step) cannot interleave. Persists
    # triggered elsewhere are not serialized by this lock.
    self._profile_switch_lock = threading.Lock()

    self._debug_logging_enabled = False
    self._last_debug_log_time = 0.0
    self._last_heater_support_decision = {}
    self._heater_supported_cache: dict[str, bool] = {}

    # MQTT client (initialized in on_after_startup when logger is available)
    self._mqtt_client: Optional[Any] = None

    # Number of temperature samples to keep per heater.
    # This is configurable via settings (history_size). We cache the active
    # value here for fast access in the 2Hz callback.
    self._default_history_size = 60
    self._history_maxlen = self._default_history_size

    # Persist history per printer profile (file per profile id).
    # Note: ETA uses only the most recent seconds of history, so we keep
    # persistence bounded by both maxlen and a max-age filter on load.
    self._active_profile_id: Optional[str] = None
    # Persistence backoff to reduce SD card wear in long-running phases
    # where the target is never reached.
    # Backoff sequence after phase reset: 30s -> 60s -> 120s -> 240s -> 300s (cap).
    self._persist_backoff_reset_s = 30.0
    self._persist_backoff_initial_s = 60.0
    self._persist_backoff_max_s = 300.0
    self._persist_backoff_current_s = self._persist_backoff_initial_s
    self._next_persist_time = 0.0
    self._persist_phase_active = False
    # Guards the persist-backoff triplet (_persist_phase_active,
    # _persist_backoff_current_s, _next_persist_time) which is mutated from
    # the temperature callback, profile switches, and disconnect events.
    # Dedicated lock (not self._lock) so these short sections never nest
    # with the history lock taken by _persist_current_profile_history.
    self._persist_state_lock = threading.Lock()
    self._last_persist_size_warning_time = 0.0
    self._persist_max_json_bytes = 256 * 1024
    self._persist_max_age_seconds = 180.0
    self._history_dirty = False
    # Monotonic counter bumped whenever new history data is recorded.
    # A persist captures this under the lock and only clears the dirty
    # flag if it is unchanged after the write (no concurrent update).
    self._history_dirty_epoch = 0

    self._temp_history = {
        "bed": deque(maxlen=self._history_maxlen),
        "tool0": deque(maxlen=self._history_maxlen),
        "chamber": deque(maxlen=self._history_maxlen),
    }

    # Cooldown history (target==0). Kept separate from heating history so
    # the heat-up ETA fit doesn't get polluted by cooldown samples.
    self._cooldown_history = {
        "bed": deque(maxlen=self._history_maxlen),
        "tool0": deque(maxlen=self._history_maxlen),
        "chamber": deque(maxlen=self._history_maxlen),
    }

    # Ambient baseline per heater for ambient-mode cooldown.
    # If the user doesn't provide an ambient temperature, we rely on the
    # lowest temperature observed while the heater target is OFF (within a
    # reasonable range) as a proxy for ambient.
    self._cooldown_ambient_baseline: dict[str, Optional[float]] = {
        "bed": None,
        "tool0": None,
        "chamber": None,
    }
    self._last_update_time = 0.0

    # Cache hot-path settings values for fast access in the ~2Hz callback.
    # Refreshed on startup and on settings save.
    self._threshold_start_c = 5.0
    self._update_interval_s = 1.0
    self._last_runtime_cache_refresh_time = 0.0

    # Track last seen target per heater so we can detect transitions like
    # heating -> off (cooldown start).
    self._last_target_by_heater: dict[str, float] = {}

    # When enabled, suppress ETA updates while a print job is active.
    # This keeps the UI focused on the pre-print heat-up phase.
    self._suppressing_due_to_print = False
    # Guards _suppressing_due_to_print so the one-shot frontend clear is
    # not triggered twice by concurrent temperature callbacks and events.
    self._suppress_lock = threading.Lock()

    self._last_settings_snapshot_log_time = 0.0

on_after_startup

on_after_startup()

Called after OctoPrint startup, register for temperature updates.

Source code in octoprint_temp_eta/__init__.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def on_after_startup(self):
    """Called after OctoPrint startup, register for temperature updates."""
    self._logger.info("Temperature ETA Plugin started")
    self._printer.register_callback(self)

    self._refresh_debug_logging_flag()
    self._debug_log("Debug logging enabled")

    self._refresh_runtime_caches()

    # Apply configured history size now that settings are available.
    self._set_history_maxlen(self._read_history_maxlen_setting())

    # Load persisted history for the active printer profile.
    self._switch_active_profile_if_needed(force=True)

    # Initialize MQTT client
    if MQTTClientWrapper is not None:
        self._mqtt_client = MQTTClientWrapper(self._logger, self._identifier)
        self._configure_mqtt_client()
    else:
        self._logger.info("MQTT support disabled: paho-mqtt not available")

on_printer_add_temperature

on_printer_add_temperature(data)

Called when new temperature data is available (~2Hz).

Parameters:

Name Type Description Default
data dict

Temperature data from OctoPrint { "bed": {"actual": float, "target": float}, "tool0": {"actual": float, "target": float}, ... }

required
Source code in octoprint_temp_eta/__init__.py
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
def on_printer_add_temperature(self, data):
    """Called when new temperature data is available (~2Hz).

    Args:
        data (dict): Temperature data from OctoPrint
            {
                "bed": {"actual": float, "target": float},
                "tool0": {"actual": float, "target": float},
                ...
            }
    """
    if not self._settings.get_boolean(["enabled"]):
        return

    # Ensure we are tracking the right profile's history.
    self._switch_active_profile_if_needed()

    # If enabled, suppress ETA completely while OctoPrint considers a print job active.
    if self._suppress_while_printing_enabled() and self._is_print_job_active():
        # Clear once when suppression starts to avoid stale countdowns.
        # Check-and-set under the lock so the one-shot clear cannot be
        # triggered twice by concurrent callbacks/events.
        with self._suppress_lock:
            just_started = not self._suppressing_due_to_print
            self._suppressing_due_to_print = True
        if just_started:
            self._clear_all_heaters_frontend()
        return

    # If we were suppressing but the condition no longer applies (e.g. warm-up resumed or print ended), re-enable.
    with self._suppress_lock:
        self._suppressing_due_to_print = False

    current_time = time.time()

    # Keep cached settings reasonably fresh even if settings change outside
    # of the normal save flow (e.g. tests or edge-case integrations).
    if (current_time - self._last_runtime_cache_refresh_time) >= 5.0:
        self._last_runtime_cache_refresh_time = current_time
        self._refresh_runtime_caches()

    # Periodic settings snapshot for debugging.
    self._debug_log_settings_snapshot(current_time)

    threshold = float(getattr(self, "_threshold_start_c", 5.0))
    update_interval = float(getattr(self, "_update_interval_s", 1.0))
    heating_enabled = self._heating_enabled()
    cooldown_enabled = self._cooldown_enabled()

    # Avoid eager f-string formatting in the hot path.
    try:
        is_logger_debug = bool(getattr(self._logger, "isEnabledFor")(10))  # type: ignore[attr-defined]
    except (AttributeError, KeyError, OSError, RuntimeError, TypeError, ValueError):
        is_logger_debug = False
    if is_logger_debug:
        heaters_in_data = [k for k, v in data.items() if isinstance(v, Mapping)]
        if heaters_in_data:
            self._logger.debug(
                "Received temperature data for heaters: %s", heaters_in_data
            )

    epsilon_hold = 0.2

    # Track whether we are in an active heating phase (at least one heater
    # is above the threshold window and not holding). This drives persistence
    # backoff scheduling.
    phase_active_now = False
    target_changed_in_active_phase = False

    recorded_count = 0
    recorded_cooldown_count = 0
    heaters_seen = 0
    with self._lock:
        # Protect shared history state (OctoPrint may call callbacks from worker threads).
        for heater, temps in data.items():
            # OctoPrint wraps heater entries in frozendict when
            # devel.useFrozenDictForPrinterState is enabled, which is not a
            # dict subclass; accept any Mapping so ETA still works there.
            if not isinstance(temps, Mapping):
                continue
            heater_key = str(heater)
            heaters_seen += 1

            # Record samples only while ETA could be shown (active target and above threshold).
            target_raw = temps.get("target", 0)
            actual_raw = temps.get("actual")
            if actual_raw is None:
                continue
            try:
                actual = float(actual_raw)
            except (
                AttributeError,
                KeyError,
                OSError,
                RuntimeError,
                TypeError,
                ValueError,
            ):
                continue

            try:
                target = float(target_raw or 0)
            except (
                AttributeError,
                KeyError,
                OSError,
                RuntimeError,
                TypeError,
                ValueError,
            ):
                # Some firmwares/virtual printer formats may provide a non-numeric
                # target (e.g. "off"). Treat that as OFF for cooldown tracking.
                target = 0.0
                self._debug_log_throttled(
                    current_time,
                    30.0,
                    "Non-numeric target treated as OFF heater=%s target_raw=%r",
                    str(heater),
                    target_raw,
                )

            prev_target = self._last_target_by_heater.get(str(heater))
            self._last_target_by_heater[heater_key] = float(target)

            if target <= 0:
                # Cooldown tracking (target==0).
                if cooldown_enabled:
                    if heater_key not in self._cooldown_history:
                        self._cooldown_history[heater_key] = deque(
                            maxlen=self._history_maxlen
                        )

                    # If we just transitioned from heating to OFF, start a fresh
                    # cooldown history so our linear cooldown fit doesn't include
                    # old OFF samples from before the heat-up phase.
                    if prev_target is not None and prev_target > 0:
                        self._cooldown_history[heater_key].clear()
                        self._debug_log_throttled(
                            current_time,
                            10.0,
                            "Cooldown start detected, cleared history heater=%s prev_target=%.1f actual=%.1f",
                            str(heater),
                            float(prev_target),
                            float(actual),
                        )

                    self._cooldown_history[heater_key].append(
                        (current_time, actual)
                    )
                    recorded_cooldown_count += 1

                    # Track a baseline ambient temp while OFF.
                    # We only learn baseline values in a sane range to
                    # avoid "ambient" being polluted by still-hot cooldown.
                    if math.isfinite(actual) and actual < 120.0:
                        prev = self._cooldown_ambient_baseline.get(heater_key)
                        if prev is None or actual < prev:
                            self._cooldown_ambient_baseline[heater_key] = actual
                continue

            if not heating_enabled:
                continue

            remaining = target - actual
            # Avoid recording while holding near target to prevent constant churn.
            if remaining <= epsilon_hold:
                continue
            if remaining < threshold:
                continue

            # This heater is actively heating within the ETA window.
            phase_active_now = True
            if (
                prev_target is not None
                and math.isfinite(prev_target)
                and math.isfinite(target)
                and prev_target > 0.0
                and target > 0.0
                and abs(prev_target - target) >= 0.5
            ):
                target_changed_in_active_phase = True

            if heater_key not in self._temp_history:
                self._temp_history[heater_key] = deque(maxlen=self._history_maxlen)

            self._temp_history[heater_key].append((current_time, actual, target))
            self._history_dirty = True
            self._history_dirty_epoch += 1
            recorded_count += 1

    # Avoid flooding logs while idle/holding temperature: log far less often
    # when we didn't record any samples.
    debug_interval = 5.0 if recorded_count > 0 else 60.0
    self._debug_log_throttled(
        current_time,
        debug_interval,
        "Temp callback profile=%s heaters=%d recorded=%d cooldown_recorded=%d threshold=%.2f",
        str(self._active_profile_id),
        int(heaters_seen),
        recorded_count,
        recorded_cooldown_count,
        float(threshold),
    )

    # Atomic check-and-set of the broadcast/persist cadence gate so two
    # concurrent callbacks cannot both pass and double-broadcast.
    with self._persist_state_lock:
        due = (current_time - self._last_update_time) >= update_interval
        if due:
            self._last_update_time = current_time
        phase_active_prev = self._persist_phase_active

    if due:
        self._calculate_and_broadcast_eta(data)

        # Update persistence phase state based on transitions. Decide using
        # the phase snapshot taken above so the branch is consistent.
        if target_changed_in_active_phase:
            self._enter_persist_phase(current_time, "target_change")
        elif phase_active_prev and (not phase_active_now):
            # Active -> inactive transition: schedule a shorter persist.
            self._reset_persist_backoff(current_time, "phase_end")
        elif (not phase_active_prev) and phase_active_now:
            # Inactive -> active transition: start backoff sequence.
            self._enter_persist_phase(current_time, "phase_start")

        self._maybe_persist_history(current_time)

on_printer_send_current_data

on_printer_send_current_data(_data)

Stub: Called when current printer data is sent (required by callback interface).

Source code in octoprint_temp_eta/__init__.py
1407
1408
1409
def on_printer_send_current_data(self, _data):
    """Stub: Called when current printer data is sent (required by callback interface)."""
    return None

on_printer_add_log

on_printer_add_log(_data)

Stub: Called when log entry is added (required by callback interface).

Source code in octoprint_temp_eta/__init__.py
1411
1412
1413
def on_printer_add_log(self, _data):
    """Stub: Called when log entry is added (required by callback interface)."""
    return None

on_printer_add_message

on_printer_add_message(_data)

Stub: Called when printer message is added (required by callback interface).

Source code in octoprint_temp_eta/__init__.py
1415
1416
1417
def on_printer_add_message(self, _data):
    """Stub: Called when printer message is added (required by callback interface)."""
    return None

on_event

on_event(event, _payload)

Handle OctoPrint events to keep UI state consistent.

Clears all ETAs immediately on disconnect or printer errors so the navbar/tab do not keep showing stale countdowns.

Parameters:

Name Type Description Default
event str

OctoPrint event name

required
payload dict

Event payload

required
Source code in octoprint_temp_eta/__init__.py
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
def on_event(self, event, _payload):
    """Handle OctoPrint events to keep UI state consistent.

    Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
    do not keep showing stale countdowns.

    Args:
        event (str): OctoPrint event name
        payload (dict): Event payload
    """
    if event in (
        "Disconnected",
        "Error",
        "Shutdown",
    ):  # clear UI on connection loss
        # Persist what we have before clearing.
        self._persist_current_profile_history()
        self._reset_persist_backoff(time.time(), "disconnect_or_error")
        with self._lock:
            heaters = list(self._temp_history.keys())
            for h in heaters:
                self._temp_history[h].clear()

            for h in list(self._cooldown_history.keys()):
                self._cooldown_history[h].clear()

        self._send_clear_messages(heaters)

        # Disconnect MQTT on shutdown. Snapshot the reference so the
        # null-check and use cannot race a concurrent reassignment.
        mqtt_client = self._mqtt_client
        if event == "Shutdown" and mqtt_client is not None:
            mqtt_client.disconnect()

    # Reset suppression flag on job lifecycle changes; actual suppression is decided in the temperature callback.
    if event in (
        "PrintStarted",
        "PrintResumed",
        "PrintDone",
        "PrintFailed",
        "PrintCancelled",
    ):
        with self._suppress_lock:
            self._suppressing_due_to_print = False

get_settings_defaults

get_settings_defaults()

Return the default settings for the plugin.

Returns:

Name Type Description
dict

Dictionary containing default plugin settings.

Source code in octoprint_temp_eta/__init__.py
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
def get_settings_defaults(self):
    """Return the default settings for the plugin.

    Returns:
        dict: Dictionary containing default plugin settings.
    """
    return {
        "enabled": True,
        "enable_heating_eta": True,
        "suppress_while_printing": False,
        "show_in_sidebar": True,
        "show_in_navbar": True,
        "show_in_tab": True,
        "show_progress_bars": True,
        "show_historical_graph": True,
        "historical_graph_window_seconds": 180,
        "temp_display": "octoprint",
        "threshold_unit": "octoprint",
        "debug_logging": False,
        "threshold_start": 5.0,
        "algorithm": "linear",
        "update_interval": 1.0,
        "history_size": 60,
        # Persistence (advanced; protects SD cards on long-running phases)
        "persist_backoff_reset_s": 30.0,
        "persist_backoff_initial_s": 60.0,
        "persist_backoff_max_s": 300.0,
        "persist_max_json_bytes": 256 * 1024,
        # Cool Down ETA
        "enable_cooldown_eta": True,
        "cooldown_mode": "threshold",
        "cooldown_target_tool0": 50.0,
        "cooldown_target_bed": 40.0,
        "cooldown_target_chamber": 30.0,
        "cooldown_ambient_temp": None,
        "cooldown_hysteresis_c": 1.0,
        "cooldown_fit_window_seconds": 120,
        # Extended settings: status colors
        "color_mode": "bands",
        "color_heating": "#5cb85c",
        "color_cooling": "#337ab7",
        "color_idle": "#777777",
        # Extended settings: sound alerts
        "sound_enabled": False,
        "sound_target_reached": False,
        "sound_cooldown_finished": False,
        "sound_volume": 0.5,
        "sound_min_interval_s": 10.0,
        # Extended settings: browser notifications (toast)
        "notification_enabled": False,
        "notification_target_reached": False,
        "notification_cooldown_finished": False,
        "notification_timeout_s": 6.0,
        "notification_min_interval_s": 10.0,
        # MQTT settings
        "mqtt_enabled": False,
        "mqtt_broker_host": "",
        "mqtt_broker_port": 1883,
        "mqtt_username": "",
        "mqtt_password": "",
        "mqtt_use_tls": False,
        "mqtt_tls_insecure": False,
        "mqtt_base_topic": "octoprint/temp_eta",
        "mqtt_use_appearance_name": True,
        "mqtt_custom_identifier": "",
        "mqtt_qos": 0,
        "mqtt_retain": False,
        "mqtt_publish_interval": 1.0,
    }

is_template_autoescaped

is_template_autoescaped()

Enable autoescaping for all plugin templates.

Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce XSS risk from unescaped injected variables.

Returns:

Name Type Description
bool bool

True to enable autoescaping.

Source code in octoprint_temp_eta/__init__.py
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
def is_template_autoescaped(self) -> bool:  # pyright: ignore
    """Enable autoescaping for all plugin templates.

    Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
    XSS risk from unescaped injected variables.

    Returns:
        bool: True to enable autoescaping.
    """
    return True

get_template_configs

get_template_configs()

Configure which templates to use and how to bind them.

Returns:

Name Type Description
list

List of template configuration dictionaries.

Source code in octoprint_temp_eta/__init__.py
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
def get_template_configs(self):
    """Configure which templates to use and how to bind them.

    Returns:
        list: List of template configuration dictionaries.
    """
    return [
        {"type": "navbar", "custom_bindings": True},
        {
            "type": "sidebar",
            "custom_bindings": False,
            "name": gettext("Temperature ETA"),
            "icon": "fa fa-clock",
        },
        # Use OctoPrint's default settingsViewModel binding for settings UI.
        {"type": "settings", "custom_bindings": True},
        {"type": "tab", "custom_bindings": False},
    ]

on_settings_save

on_settings_save(data)

Persist settings and clear UI/state when disabling the plugin.

OctoPrint applies settings changes only on save. When the plugin is disabled, we actively clear any previously shown countdowns in navbar/tab so the UI does not keep showing stale values.

Parameters:

Name Type Description Default
data dict

Settings data posted from the UI.

required
Source code in octoprint_temp_eta/__init__.py
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
def on_settings_save(self, data: dict[str, Any]) -> dict[str, Any]:
    """Persist settings and clear UI/state when disabling the plugin.

    OctoPrint applies settings changes only on save. When the plugin is disabled,
    we actively clear any previously shown countdowns in navbar/tab so the UI
    does not keep showing stale values.

    Args:
        data (dict): Settings data posted from the UI.
    """
    if not getattr(self, "_settings", None):
        return {}

    self._sanitize_settings_payload(data)

    was_enabled = bool(self._settings.get_boolean(["enabled"]))
    old_debug = bool(getattr(self, "_debug_logging_enabled", False))
    old_history_maxlen = self._read_history_maxlen_setting()
    saved = octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
    is_enabled = bool(self._settings.get_boolean(["enabled"]))

    self._refresh_debug_logging_flag()
    self._refresh_runtime_caches()
    if old_debug != bool(self._debug_logging_enabled):
        self._logger.info(
            "Debug logging %s",
            "enabled" if self._debug_logging_enabled else "disabled",
        )

    new_history_maxlen = self._read_history_maxlen_setting()
    if new_history_maxlen != old_history_maxlen:
        self._set_history_maxlen(new_history_maxlen)

    if was_enabled and not is_enabled:
        self._clear_all_heaters_frontend()

    # Reconfigure MQTT client with new settings
    self._configure_mqtt_client()

    return saved if isinstance(saved, dict) else {}

get_assets

get_assets()

Return static assets (JS, CSS, LESS) to be included.

Returns:

Name Type Description
dict

Dictionary with asset types and their file paths.

Source code in octoprint_temp_eta/__init__.py
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
def get_assets(self):
    """Return static assets (JS, CSS, LESS) to be included.

    Returns:
        dict: Dictionary with asset types and their file paths.
    """
    return {
        "js": ["js/temp_eta.js"],
        # Ship pre-compiled CSS rather than LESS: OctoPrint only compiles
        # LESS when a server-side compiler (lesscpy / Node lessc) is present,
        # which is not guaranteed on a pip-installed plugin. Without it the
        # LESS is silently dropped from the bundle, so none of the plugin's
        # styling (graph colours included) reaches the page. The .less stays
        # in the source tree as the editable origin; regenerate temp_eta.css
        # with `npx less less/temp_eta.less css/temp_eta.css` after editing.
        "css": ["css/temp_eta.css"],
    }

is_api_protected

is_api_protected()

Whether the Simple API requires an authenticated user.

OctoPrint's default for this will switch from False to True in the future. We explicitly opt in to avoid relying on defaults.

Source code in octoprint_temp_eta/__init__.py
2275
2276
2277
2278
2279
2280
2281
def is_api_protected(self) -> bool:  # type: ignore[override]
    """Whether the Simple API requires an authenticated user.

    OctoPrint's default for this will switch from False to True in the
    future. We explicitly opt in to avoid relying on defaults.
    """
    return True

is_api_adminonly

is_api_adminonly()

Whether the Simple API is restricted to admin users.

Source code in octoprint_temp_eta/__init__.py
2283
2284
2285
def is_api_adminonly(self) -> bool:  # type: ignore[override]
    """Whether the Simple API is restricted to admin users."""
    return True

get_api_commands

get_api_commands()

Return supported Simple API commands for the plugin.

Source code in octoprint_temp_eta/__init__.py
2287
2288
2289
2290
2291
2292
def get_api_commands(self) -> dict[str, list]:  # type: ignore[override]
    """Return supported Simple API commands for the plugin."""
    return {
        "reset_profile_history": [],
        "reset_settings_defaults": [],
    }

on_api_get

on_api_get(_request)

Return plugin status for the Simple API GET endpoint.

Currently exposes whether the MQTT integration is enabled and connected to a broker so the settings UI can show a live status.

Source code in octoprint_temp_eta/__init__.py
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
def on_api_get(self, _request: Any):  # type: ignore[override]
    """Return plugin status for the Simple API GET endpoint.

    Currently exposes whether the MQTT integration is enabled and
    connected to a broker so the settings UI can show a live status.
    """
    mqtt_client = self._mqtt_client
    mqtt_enabled = bool(self._settings.get_boolean(["mqtt_enabled"]))
    # Only report "connected" when MQTT is actually enabled: a client may
    # still be mid-disconnect right after the feature is switched off, and
    # the UI should reflect the configured state, not a lingering socket.
    mqtt_connected = bool(
        mqtt_enabled and mqtt_client is not None and mqtt_client.is_connected()
    )
    return jsonify(
        {
            "mqtt_available": MQTTClientWrapper is not None,
            "mqtt_enabled": mqtt_enabled,
            "mqtt_connected": mqtt_connected,
        }
    )

on_api_command

on_api_command(command, _data)

Handle Simple API commands.

Source code in octoprint_temp_eta/__init__.py
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
def on_api_command(self, command: str, _data: dict[str, Any]):  # type: ignore[override]
    """Handle Simple API commands."""
    if command == "reset_profile_history":
        deleted_count = self._reset_all_profile_histories()
        profile_id = self._get_current_profile_id()
        logger = getattr(self, "_logger", None)
        if logger is not None:
            logger.info(
                "Reset persisted history for all profiles (trigger_profile=%s deleted_files=%d)",
                str(profile_id),
                int(deleted_count),
            )
        return jsonify(
            {
                "success": True,
                "profile_id": profile_id,
                "deleted_files": deleted_count,
            }
        )

    if command == "reset_settings_defaults":
        self._reset_user_settings_to_defaults()
        logger = getattr(self, "_logger", None)
        if logger is not None:
            logger.info("Restored plugin settings defaults (user-editable keys)")

        # Notify all connected clients so the settings UI can refresh via requestData().
        if getattr(self, "_plugin_manager", None):
            try:
                self._plugin_manager.send_plugin_message(
                    self._identifier, {"type": "settings_reset"}
                )
            except (
                AttributeError,
                KeyError,
                OSError,
                RuntimeError,
                TypeError,
                ValueError,
            ):
                pass

        return jsonify({"success": True, "message": gettext("Defaults restored.")})

    return jsonify({"success": False, "error": "unknown_command"})

get_update_information

get_update_information()

Provide update information for the Software Update plugin.

Returns:

Name Type Description
dict

Update configuration for the plugin.

Source code in octoprint_temp_eta/__init__.py
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
def get_update_information(self):
    """Provide update information for the Software Update plugin.

    Returns:
        dict: Update configuration for the plugin.
    """
    return {
        "temp_eta": {
            "displayName": "Temperature ETA Plugin",
            "displayVersion": self._plugin_version,
            # version check: github repository
            "type": "github_release",
            "user": "Ajimaru",
            "repo": "OctoPrint-TempETA",
            "current": self._plugin_version,
            # update method: pip
            "pip": "https://github.com/Ajimaru/OctoPrint-TempETA/archive/{target_version}.zip",
        }
    }

Calculator Module

Temperature ETA calculation algorithms.

This module provides pure calculation logic for estimating time remaining until a printer heater reaches its target temperature or cools down.

All functions are stateless and independent of OctoPrint plugin mechanics, making them easy to test and maintain.

calculate_linear_eta

calculate_linear_eta(history, target, window_seconds=10.0)

Calculate ETA assuming constant heating rate.

Uses linear regression on recent temperature samples to estimate the rate of temperature change and predict time to target.

Parameters:

Name Type Description Default
history deque

Deque of (timestamp, actual_temp, target_temp) tuples

required
target float

Target temperature in degrees

required
window_seconds float

Time window for rate calculation (default: 10s)

10.0

Returns:

Type Description
Optional[float]

Estimated seconds to target, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def calculate_linear_eta(
    history: deque, target: float, window_seconds: float = 10.0
) -> Optional[float]:
    """
    Calculate ETA assuming constant heating rate.

    Uses linear regression on recent temperature samples to estimate
    the rate of temperature change and predict time to target.

    Args:
        history: Deque of (timestamp, actual_temp, target_temp) tuples
        target: Target temperature in degrees
        window_seconds: Time window for rate calculation (default: 10s)

    Returns:
        Estimated seconds to target, or None if insufficient data
    """
    if not _validate_scalar(target) or not _validate_window(window_seconds):
        return None
    if not history or len(history) < 2:
        return None

    last_ts = _find_last_ts(history)
    if last_ts is None:
        return None

    recent = _filter_recent(history, last_ts - window_seconds)
    if len(recent) < 2:
        return None

    t0, temp0 = recent[0][0], recent[0][1]
    t1, temp1 = recent[-1][0], recent[-1][1]
    time_diff = t1 - t0
    temp_diff = temp1 - temp0
    if time_diff <= 0 or temp_diff <= 0:
        return None

    remaining = target - temp1
    if remaining <= 0:
        return None

    return max(0.0, remaining / (temp_diff / time_diff))

calculate_exponential_eta

calculate_exponential_eta(history, target, window_seconds=30.0)

Calculate ETA accounting for thermal asymptotic behavior.

Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau) Falls back to linear estimation when insufficient data or poor fit.

Parameters:

Name Type Description Default
history deque

Deque of (timestamp, actual_temp, target_temp) tuples

required
target float

Target temperature in degrees

required
window_seconds float

Time window for exponential fit (default: 30s)

30.0

Returns:

Type Description
Optional[float]

Estimated seconds to target, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def calculate_exponential_eta(
    history: deque, target: float, window_seconds: float = 30.0
) -> Optional[float]:
    """
    Calculate ETA accounting for thermal asymptotic behavior.

    Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
    Falls back to linear estimation when insufficient data or poor fit.

    Args:
        history: Deque of (timestamp, actual_temp, target_temp) tuples
        target: Target temperature in degrees
        window_seconds: Time window for exponential fit (default: 30s)

    Returns:
        Estimated seconds to target, or None if insufficient data
    """
    if not _validate_scalar(target) or not _validate_window(window_seconds):
        return None
    if not history or len(history) < 3:
        return None

    last_ts = _find_last_ts(history)
    if last_ts is None:
        return None

    recent = _dedupe_by_ts(_filter_recent(history, last_ts - window_seconds))
    if len(recent) < 6:
        return calculate_linear_eta(history, target)

    temp_now = recent[-1][1]
    remaining_now = target - temp_now
    if remaining_now <= 0:
        return None

    # Guard: window must show meaningful heating; otherwise no ETA is possible.
    if (recent[-1][1] - recent[0][1]) <= 0.2:
        return None

    epsilon_c = 0.5
    if remaining_now <= epsilon_c:
        return 0.0

    try:
        eta = _exponential_fit(recent, target, epsilon_c)
    except (ValueError, ArithmeticError) as exc:
        _LOG.debug("Exponential ETA math error: %s", exc)
        return calculate_linear_eta(history, target)

    if eta is None:
        return calculate_linear_eta(history, target)

    linear_eta = calculate_linear_eta(history, target)
    if linear_eta is not None and eta > linear_eta * 5:
        return linear_eta

    return eta

calculate_cooldown_linear_eta

calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds=60.0)

Linear cooldown ETA from recent slope.

Uses linear regression on recent cooldown samples to estimate the rate of temperature decrease and predict time to goal.

Parameters:

Name Type Description Default
cooldown_history deque

Deque of (timestamp, temp) tuples

required
goal_c float

Target cooldown temperature in degrees

required
window_seconds float

Time window for fit (default: 60s)

60.0

Returns:

Type Description
Optional[float]

Estimated seconds to goal, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def calculate_cooldown_linear_eta(
    cooldown_history: deque, goal_c: float, window_seconds: float = 60.0
) -> Optional[float]:
    """
    Linear cooldown ETA from recent slope.

    Uses linear regression on recent cooldown samples to estimate
    the rate of temperature decrease and predict time to goal.

    Args:
        cooldown_history: Deque of (timestamp, temp) tuples
        goal_c: Target cooldown temperature in degrees
        window_seconds: Time window for fit (default: 60s)

    Returns:
        Estimated seconds to goal, or None if insufficient data
    """
    if not _validate_scalar(goal_c) or not _validate_window(window_seconds):
        return None
    if not cooldown_history:
        return None

    last_ts = _find_last_ts(cooldown_history)
    if last_ts is None:
        return None

    recent = _filter_recent(cooldown_history, last_ts - window_seconds)
    if len(recent) < 2:
        return None

    t0, temp0 = recent[0]
    t1, temp1 = recent[-1]
    dt = t1 - t0
    if dt <= 0:
        return None

    slope = (temp1 - temp0) / dt
    if slope >= -1e-3:
        return None

    remaining = temp1 - goal_c
    if remaining <= 0:
        return None

    eta = remaining / (-slope)
    if not math.isfinite(eta) or eta < 0:
        return None

    return float(min(eta, 24 * 3600))

calculate_cooldown_exponential_eta

calculate_cooldown_exponential_eta(cooldown_history, ambient_c, goal_c, window_seconds=60.0)

Exponential cooldown ETA (Newton's law of cooling).

Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)

Source code in octoprint_temp_eta/calculator.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def calculate_cooldown_exponential_eta(
    cooldown_history: deque,
    ambient_c: float,
    goal_c: float,
    window_seconds: float = 60.0,
) -> Optional[float]:
    """
    Exponential cooldown ETA (Newton's law of cooling).

    Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
    """
    if not (_validate_scalar(ambient_c) and _validate_scalar(goal_c)):
        return None
    if not _validate_window(window_seconds):
        return None
    if goal_c <= ambient_c:
        return None
    if not cooldown_history or len(cooldown_history) < 4:
        return None

    last_ts = _find_last_ts(cooldown_history)
    if last_ts is None:
        return None

    recent = _filter_recent(cooldown_history, last_ts - window_seconds)
    if len(recent) < 6:
        return calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds)

    if recent[-1][1] <= goal_c:
        return None

    try:
        return _cooldown_exponential_fit(recent, ambient_c, goal_c, epsilon=0.5)
    except (ValueError, ArithmeticError) as exc:
        _LOG.debug("Exponential ETA math error: %s", exc)
        return None

MQTT Client Module

Thread-safe MQTT client wrapper for the Temperature ETA plugin.

Manages connection lifecycle, automatic reconnection, and message publishing. All MQTT operations are non-blocking to avoid impacting the temperature callback.

Initialize MQTT client wrapper.

Parameters:

Name Type Description Default
logger Any

Logger instance for debug/info messages

required
identifier str

Plugin identifier for topic prefixes

required
Source code in octoprint_temp_eta/mqtt_client.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, logger: Any, identifier: str):
    """Initialize MQTT client wrapper.

    Args:
        logger: Logger instance for debug/info messages
        identifier: Plugin identifier for topic prefixes
    """
    self._logger = logger
    self._identifier = identifier
    self._lock = threading.Lock()

    self._client: Optional[Any] = None
    self._enabled = False
    self._connected = False
    self._connecting = False
    # True once a paho-mqtt 2.x (callback API v2) client was created.
    # Set in _create_new_client; informational/diagnostic only.
    self._callback_api_v2 = False

    # Connection settings
    self._broker_host = ""
    self._broker_port = 1883
    self._username = ""
    self._password = ""  # nosec B105 - empty default, overwritten from settings
    self._use_tls = False
    self._tls_insecure = False

    # Publishing settings
    self._base_topic = "octoprint/temp_eta"
    self._qos = 0
    self._retain = False
    self._publish_interval = 1.0

    # State tracking for state transition events
    self._last_published_time = 0.0
    self._last_heater_state: dict[str, Optional[str]] = {}

    # Connection retry logic
    self._last_connect_attempt = 0.0
    self._connect_retry_interval = 30.0

    # Avoid log spam when paho-mqtt is not installed.
    # Use a dedicated lock to avoid re-entrantly acquiring self._lock.
    self._mqtt_unavailable_lock = threading.Lock()
    self._mqtt_unavailable_warned = False

configure

configure(settings)

Update MQTT configuration from plugin settings.

Parameters:

Name Type Description Default
settings dict[str, Any]

Dictionary with MQTT configuration keys

required
Source code in octoprint_temp_eta/mqtt_client.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def configure(self, settings: dict[str, Any]) -> None:
    """Update MQTT configuration from plugin settings.

    Args:
        settings: Dictionary with MQTT configuration keys
    """
    with self._lock:
        old_enabled = self._enabled

        self._enabled = bool(settings.get("mqtt_enabled", False))
        self._broker_host = str(settings.get("mqtt_broker_host", "")).strip()
        self._broker_port = int(settings.get("mqtt_broker_port", 1883))
        self._username = str(settings.get("mqtt_username", "")).strip()
        self._password = str(settings.get("mqtt_password", "")).strip()
        self._use_tls = bool(settings.get("mqtt_use_tls", False))
        self._tls_insecure = bool(settings.get("mqtt_tls_insecure", False))

        base_topic = str(
            settings.get("mqtt_base_topic", "octoprint/temp_eta")
        ).strip()
        use_appearance_name = bool(settings.get("mqtt_use_appearance_name", True))
        appearance_name = str(settings.get("mqtt_appearance_name") or "").strip()
        custom_identifier = str(settings.get("mqtt_custom_identifier", "")).strip()

        # Build final topic with optional identifier suffix
        self._base_topic = self._build_final_topic(
            base_topic, use_appearance_name, appearance_name, custom_identifier
        )

        self._qos = int(settings.get("mqtt_qos", 0))
        self._retain = bool(settings.get("mqtt_retain", False))
        self._publish_interval = float(settings.get("mqtt_publish_interval", 1.0))

        # Reconnect if settings changed and enabled
        if self._enabled and (not old_enabled or not self._connected):
            self._schedule_connect()
        elif not self._enabled and old_enabled:
            self._disconnect_internal()

disconnect

disconnect()

Disconnect MQTT client gracefully.

Source code in octoprint_temp_eta/mqtt_client.py
400
401
402
403
def disconnect(self) -> None:
    """Disconnect MQTT client gracefully."""
    with self._lock:
        self._disconnect_internal()

publish_eta_update

publish_eta_update(heater, eta, eta_kind, target, actual, cooldown_target=None)

Publish ETA update for a heater.

Parameters:

Name Type Description Default
heater str

Heater name (bed, tool0, chamber)

required
eta Optional[float]

ETA in seconds, or None

required
eta_kind Optional[str]

"heating", "cooling", or None

required
target Optional[float]

Target temperature

required
actual Optional[float]

Actual temperature

required
cooldown_target Optional[float]

Cooldown target temperature (if applicable)

None
Source code in octoprint_temp_eta/mqtt_client.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def publish_eta_update(
    self,
    heater: str,
    eta: Optional[float],
    eta_kind: Optional[str],
    target: Optional[float],
    actual: Optional[float],
    cooldown_target: Optional[float] = None,
) -> None:
    """Publish ETA update for a heater.

    Args:
        heater: Heater name (bed, tool0, chamber)
        eta: ETA in seconds, or None
        eta_kind: "heating", "cooling", or None
        target: Target temperature
        actual: Actual temperature
        cooldown_target: Cooldown target temperature (if applicable)
    """
    with self._lock:
        if not self._enabled or not self._connected:
            return

        # Check if we should publish based on interval
        now = time.time()
        if (now - self._last_published_time) < self._publish_interval:
            return

        self._last_published_time = now

        # Determine state for transition detection
        current_state = None
        if eta_kind == "heating" and eta is not None:
            current_state = "heating"
        elif eta_kind == "cooling" and eta is not None:
            current_state = "cooling"
        elif target is not None and actual is not None:
            if abs(target - actual) <= 1.0:
                current_state = "at_target"
            elif cooldown_target is not None and actual is not None:
                if abs(cooldown_target - actual) <= 1.0:
                    current_state = "cooled_down"

        # Detect state transitions
        last_state = self._last_heater_state.get(heater)
        state_changed = last_state != current_state
        self._last_heater_state[heater] = current_state

        # Build payload
        payload = {
            "heater": heater,
            "eta_seconds": eta,
            "eta_kind": eta_kind,
            "target": target,
            "actual": actual,
            "cooldown_target": cooldown_target,
            "timestamp": now,
            "state": current_state,
        }

        # Publish ETA data
        topic = f"{self._base_topic}/{heater}/eta"
        self._publish_message(topic, payload)

        # Publish state transition event if state changed
        if state_changed and current_state is not None:
            event_payload = {
                "heater": heater,
                "state": current_state,
                "previous_state": last_state,
                "timestamp": now,
                "actual": actual,
                "target": target,
            }
            event_topic = f"{self._base_topic}/{heater}/state_change"
            self._publish_message(event_topic, event_payload)

            self._logger.info(
                "MQTT: %s state changed from %s to %s",
                heater,
                last_state or "unknown",
                current_state,
            )

is_connected

is_connected()

Check if MQTT client is connected.

Returns:

Name Type Description
bool bool

True if connected

Source code in octoprint_temp_eta/mqtt_client.py
515
516
517
518
519
520
521
522
def is_connected(self) -> bool:
    """Check if MQTT client is connected.

    Returns:
        bool: True if connected
    """
    with self._lock:
        return self._connected

Usage Examples

Using the Calculator

from octoprint_temp_eta.calculator import calculate_linear_eta, calculate_exponential_eta
from collections import deque
import time

# Create temperature history
history = deque()
for i in range(10):
    timestamp = time.time() + i
    temperature = 25 + i * 0.2  # small ramp
    target = 200.0
    history.append((timestamp, temperature, target))

# Calculate ETA using linear estimator
eta_seconds = calculate_linear_eta(history, target)
print(f"Linear ETA: {eta_seconds}")

# Calculate ETA using exponential estimator (fallbacks to linear if needed)
eta_exp = calculate_exponential_eta(history, target)
print(f"Exponential ETA: {eta_exp}")

Using the MQTT Client

from octoprint_temp_eta.mqtt_client import MQTTClientWrapper
import logging

# Create logger
logger = logging.getLogger(__name__)

# Instantiate wrapper (note: the wrapper expects a logger and plugin identifier)
mqtt_client = MQTTClientWrapper(logger, "temp_eta")

# Configure client via settings-like dict
mqtt_client.configure({
    "mqtt_enabled": True,
    "mqtt_broker_host": "localhost",
    "mqtt_broker_port": 1883,
    "mqtt_username": "",
    "mqtt_password": "",
    "mqtt_use_tls": False,
    "mqtt_base_topic": "octoprint/temp_eta",
    "mqtt_qos": 0,
    "mqtt_retain": False,
    "mqtt_publish_interval": 1.0,
})

# Publish a sample ETA update (heater name, eta seconds, eta kind, target, actual)
mqtt_client.publish_eta_update(
    heater="tool0",
    eta=120.0,
    eta_kind="heating",
    target=200.0,
    actual=50.0,
)

# Disconnect when done
mqtt_client.disconnect()

Plugin Integration

import octoprint.plugin

class MyPlugin(octoprint.plugin.OctoPrintPlugin):
    def on_after_startup(self):
        # Access TempETA plugin
        temp_eta = self._plugin_manager.get_plugin("temp_eta")

        if temp_eta:
            # Get current ETA
            eta_data = temp_eta.get_current_eta()
            self._logger.info(f"Current ETA: {eta_data}")

Threading Considerations

All public methods are thread-safe when accessed through the plugin instance. However, when using calculator or MQTT client directly, ensure proper synchronization:

import threading

class SafeCalculator:
    def __init__(self):
        self._calculator = ETACalculator()
        self._lock = threading.RLock()

    def calculate(self, history, target):
        with self._lock:
            return self._calculator.calculate_eta(history, target)

Error Handling

All methods handle errors gracefully and return None or default values on failure:

try:
    eta = calculator.calculate_eta(history, target)
    if eta is None:
        print("Insufficient data for ETA calculation")
    else:
        print(f"ETA: {eta:.1f}s")
except Exception as e:
    logger.error(f"Calculation failed: {e}")

Type Hints

The codebase uses Python type hints for better IDE support:

from typing import Optional, Deque, Tuple

def calculate_eta(
    history: Deque[Tuple[float, float, float]],
    target: float
) -> Optional[float]:
    """
    Calculate ETA to target temperature.

    Args:
        history: Temperature history deque
        target: Target temperature

    Returns:
        ETA in seconds, or None if calculation fails
    """
    pass

Logging

Use the provided logger for all log messages:

self._logger.debug("Debug message")
self._logger.info("Info message")
self._logger.warning("Warning message")
self._logger.error("Error message")
self._logger.exception("Exception with traceback")

Next Steps