Skip to content

Python API

Auto-generated Python API documentation for OctoPrint-TempETA.

Main Plugin Class

Bases: StartupPluginBase, TemplatePluginBase, SettingsPluginBase, AssetPluginBase, EventHandlerPluginBase, SimpleApiPluginBase

Main plugin implementation for Temperature ETA.

Implements OctoPrint Issue #469: Show estimated time remaining for printer heating (bed, hotend, chamber).

Initialize plugin with temperature history tracking.

Source code in octoprint_temp_eta/__init__.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def __init__(self):
    """Initialize plugin with temperature history tracking."""
    super().__init__()
    self._lock = threading.Lock()

    self._debug_logging_enabled = False
    self._last_debug_log_time = 0.0
    self._last_heater_support_decision = {}
    self._heater_supported_cache: Dict[str, bool] = {}

    # MQTT client (initialized in on_after_startup when logger is available)
    self._mqtt_client: Optional[Any] = None

    # Number of temperature samples to keep per heater.
    # This is configurable via settings (history_size). We cache the active
    # value here for fast access in the 2Hz callback.
    self._default_history_size = 60
    self._history_maxlen = self._default_history_size

    # Persist history per printer profile (file per profile id).
    # Note: ETA uses only the most recent seconds of history, so we keep
    # persistence bounded by both maxlen and a max-age filter on load.
    self._active_profile_id: Optional[str] = None
    # Persistence backoff to reduce SD card wear in long-running phases
    # where the target is never reached.
    # Backoff sequence after phase reset: 30s -> 60s -> 120s -> 240s -> 300s (cap).
    self._persist_backoff_reset_s = 30.0
    self._persist_backoff_initial_s = 60.0
    self._persist_backoff_max_s = 300.0
    self._persist_backoff_current_s = self._persist_backoff_initial_s
    self._next_persist_time = 0.0
    self._persist_phase_active = False
    self._last_persist_size_warning_time = 0.0
    self._persist_max_json_bytes = 256 * 1024
    self._persist_max_age_seconds = 180.0
    self._history_dirty = False

    self._temp_history = {
        "bed": deque(maxlen=self._history_maxlen),
        "tool0": deque(maxlen=self._history_maxlen),
        "chamber": deque(maxlen=self._history_maxlen),
    }

    # Cooldown history (target==0). Kept separate from heating history so
    # the heat-up ETA fit doesn't get polluted by cooldown samples.
    self._cooldown_history = {
        "bed": deque(maxlen=self._history_maxlen),
        "tool0": deque(maxlen=self._history_maxlen),
        "chamber": deque(maxlen=self._history_maxlen),
    }

    # Ambient baseline per heater for ambient-mode cooldown.
    # If the user doesn't provide an ambient temperature, we rely on the
    # lowest temperature observed while the heater target is OFF (within a
    # reasonable range) as a proxy for ambient.
    self._cooldown_ambient_baseline: Dict[str, Optional[float]] = {
        "bed": None,
        "tool0": None,
        "chamber": None,
    }
    self._last_update_time = 0.0

    # Cache hot-path settings values for fast access in the ~2Hz callback.
    # Refreshed on startup and on settings save.
    self._threshold_start_c = 5.0
    self._update_interval_s = 1.0
    self._last_runtime_cache_refresh_time = 0.0

    # Track last seen target per heater so we can detect transitions like
    # heating -> off (cooldown start).
    self._last_target_by_heater: Dict[str, float] = {}

    # When enabled, suppress ETA updates while a print job is active.
    # This keeps the UI focused on the pre-print heat-up phase.
    self._suppressing_due_to_print = False

    self._last_settings_snapshot_log_time = 0.0

on_after_startup

on_after_startup()

Called after OctoPrint startup, register for temperature updates.

Source code in octoprint_temp_eta/__init__.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def on_after_startup(self):
    """Called after OctoPrint startup, register for temperature updates."""
    self._logger.info("Temperature ETA Plugin started")
    self._printer.register_callback(self)

    self._refresh_debug_logging_flag()
    self._debug_log("Debug logging enabled")

    self._refresh_runtime_caches()

    # Apply configured history size now that settings are available.
    self._set_history_maxlen(self._read_history_maxlen_setting())

    # Load persisted history for the active printer profile.
    self._switch_active_profile_if_needed(force=True)

    # Initialize MQTT client
    if MQTTClientWrapper is not None:
        self._mqtt_client = MQTTClientWrapper(self._logger, self._identifier)
        self._configure_mqtt_client()
    else:
        self._logger.info("MQTT support disabled: paho-mqtt not available")

on_printer_add_temperature

on_printer_add_temperature(data)

Called when new temperature data is available (~2Hz).

Parameters:

Name Type Description Default
data dict

Temperature data from OctoPrint { "bed": {"actual": float, "target": float}, "tool0": {"actual": float, "target": float}, ... }

required
Source code in octoprint_temp_eta/__init__.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
def on_printer_add_temperature(self, data):
    """Called when new temperature data is available (~2Hz).

    Args:
        data (dict): Temperature data from OctoPrint
            {
                "bed": {"actual": float, "target": float},
                "tool0": {"actual": float, "target": float},
                ...
            }
    """
    if not self._settings.get_boolean(["enabled"]):
        return

    # Ensure we are tracking the right profile's history.
    self._switch_active_profile_if_needed()

    # If enabled, suppress ETA completely while OctoPrint considers a print job active.
    if self._suppress_while_printing_enabled() and self._is_print_job_active():
        # Clear once when suppression starts to avoid stale countdowns.
        if not self._suppressing_due_to_print:
            self._suppressing_due_to_print = True
            self._clear_all_heaters_frontend()
        return

    # If we were suppressing but the condition no longer applies (e.g. warm-up resumed or print ended), re-enable.
    if self._suppressing_due_to_print:
        self._suppressing_due_to_print = False

    current_time = time.time()

    # Keep cached settings reasonably fresh even if settings change outside
    # of the normal save flow (e.g. tests or edge-case integrations).
    if (current_time - self._last_runtime_cache_refresh_time) >= 5.0:
        self._last_runtime_cache_refresh_time = current_time
        self._refresh_runtime_caches()

    # Periodic settings snapshot for debugging.
    self._debug_log_settings_snapshot(current_time)

    threshold = float(getattr(self, "_threshold_start_c", 5.0))
    update_interval = float(getattr(self, "_update_interval_s", 1.0))
    heating_enabled = self._heating_enabled()
    cooldown_enabled = self._cooldown_enabled()

    # Avoid eager f-string formatting in the hot path.
    try:
        is_logger_debug = bool(getattr(self._logger, "isEnabledFor")(10))  # type: ignore[attr-defined]
    except Exception:
        is_logger_debug = False
    if is_logger_debug:
        heaters_in_data = [k for k, v in data.items() if isinstance(v, dict)]
        if heaters_in_data:
            self._logger.debug(
                "Received temperature data for heaters: %s", heaters_in_data
            )

    epsilon_hold = 0.2

    # Track whether we are in an active heating phase (at least one heater
    # is above the threshold window and not holding). This drives persistence
    # backoff scheduling.
    phase_active_now = False
    target_changed_in_active_phase = False

    recorded_count = 0
    recorded_cooldown_count = 0
    heaters_seen = 0
    with self._lock:
        # Protect shared history state (OctoPrint may call callbacks from worker threads).
        for heater, temps in data.items():
            if not isinstance(temps, dict):
                continue
            heater_key = str(heater)
            heaters_seen += 1

            # Record samples only while ETA could be shown (active target and above threshold).
            target_raw = temps.get("target", 0)
            actual_raw = temps.get("actual")
            if actual_raw is None:
                continue
            try:
                actual = float(actual_raw)
            except Exception:
                continue

            try:
                target = float(target_raw or 0)
            except Exception:
                # Some firmwares/virtual printer formats may provide a non-numeric
                # target (e.g. "off"). Treat that as OFF for cooldown tracking.
                target = 0.0
                self._debug_log_throttled(
                    current_time,
                    30.0,
                    "Non-numeric target treated as OFF heater=%s target_raw=%r",
                    str(heater),
                    target_raw,
                )

            prev_target = self._last_target_by_heater.get(str(heater))
            self._last_target_by_heater[heater_key] = float(target)

            if target <= 0:
                # Cooldown tracking (target==0).
                if cooldown_enabled:
                    if heater_key not in self._cooldown_history:
                        self._cooldown_history[heater_key] = deque(
                            maxlen=self._history_maxlen
                        )

                    # If we just transitioned from heating to OFF, start a fresh
                    # cooldown history so our linear cooldown fit doesn't include
                    # old OFF samples from before the heat-up phase.
                    if prev_target is not None and prev_target > 0:
                        self._cooldown_history[heater_key].clear()
                        self._debug_log_throttled(
                            current_time,
                            10.0,
                            "Cooldown start detected, cleared history heater=%s prev_target=%.1f actual=%.1f",
                            str(heater),
                            float(prev_target),
                            float(actual),
                        )

                    self._cooldown_history[heater_key].append(
                        (current_time, actual)
                    )
                    recorded_cooldown_count += 1

                    # Track a baseline ambient temp while OFF.
                    # We only learn baseline values in a sane range to
                    # avoid "ambient" being polluted by still-hot cooldown.
                    if math.isfinite(actual) and actual < 120.0:
                        prev = self._cooldown_ambient_baseline.get(heater_key)
                        if prev is None or actual < prev:
                            self._cooldown_ambient_baseline[heater_key] = actual
                continue

            if not heating_enabled:
                continue

            remaining = target - actual
            # Avoid recording while holding near target to prevent constant churn.
            if remaining <= epsilon_hold:
                continue
            if remaining < threshold:
                continue

            # This heater is actively heating within the ETA window.
            phase_active_now = True
            if (
                prev_target is not None
                and math.isfinite(prev_target)
                and math.isfinite(target)
                and prev_target > 0.0
                and target > 0.0
                and abs(prev_target - target) >= 0.5
            ):
                target_changed_in_active_phase = True

            if heater_key not in self._temp_history:
                self._temp_history[heater_key] = deque(maxlen=self._history_maxlen)

            self._temp_history[heater_key].append((current_time, actual, target))
            self._history_dirty = True
            recorded_count += 1

    # Avoid flooding logs while idle/holding temperature: log far less often
    # when we didn't record any samples.
    debug_interval = 5.0 if recorded_count > 0 else 60.0
    self._debug_log_throttled(
        current_time,
        debug_interval,
        "Temp callback profile=%s heaters=%d recorded=%d cooldown_recorded=%d threshold=%.2f",
        str(self._active_profile_id),
        int(heaters_seen),
        recorded_count,
        recorded_cooldown_count,
        float(threshold),
    )

    if (current_time - self._last_update_time) >= update_interval:
        self._last_update_time = current_time
        self._calculate_and_broadcast_eta(data)

        # Update persistence phase state based on transitions.
        if target_changed_in_active_phase:
            self._enter_persist_phase(current_time, "target_change")
        elif self._persist_phase_active and (not phase_active_now):
            # Active -> inactive transition: schedule a shorter persist.
            self._reset_persist_backoff(current_time, "phase_end")
        elif (not self._persist_phase_active) and phase_active_now:
            # Inactive -> active transition: start backoff sequence.
            self._enter_persist_phase(current_time, "phase_start")

        self._maybe_persist_history(current_time)

on_printer_send_current_data

on_printer_send_current_data(data)

Stub: Called when current printer data is sent (required by callback interface).

Source code in octoprint_temp_eta/__init__.py
1239
1240
1241
def on_printer_send_current_data(self, data):
    """Stub: Called when current printer data is sent (required by callback interface)."""
    pass

on_printer_add_log

on_printer_add_log(data)

Stub: Called when log entry is added (required by callback interface).

Source code in octoprint_temp_eta/__init__.py
1243
1244
1245
def on_printer_add_log(self, data):
    """Stub: Called when log entry is added (required by callback interface)."""
    pass

on_event

on_event(event, payload)

Handle OctoPrint events to keep UI state consistent.

Clears all ETAs immediately on disconnect or printer errors so the navbar/tab do not keep showing stale countdowns.

Parameters:

Name Type Description Default
event str

OctoPrint event name

required
payload dict

Event payload

required
Source code in octoprint_temp_eta/__init__.py
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
def on_event(self, event, payload):
    """Handle OctoPrint events to keep UI state consistent.

    Clears all ETAs immediately on disconnect or printer errors so the navbar/tab
    do not keep showing stale countdowns.

    Args:
        event (str): OctoPrint event name
        payload (dict): Event payload
    """
    if event in (
        "Disconnected",
        "Error",
        "Shutdown",
    ):  # clear UI on connection loss
        # Persist what we have before clearing.
        self._persist_current_profile_history()
        self._reset_persist_backoff(time.time(), "disconnect_or_error")
        with self._lock:
            heaters = list(self._temp_history.keys())
            for h in heaters:
                self._temp_history[h].clear()

            for h in list(self._cooldown_history.keys()):
                self._cooldown_history[h].clear()

        self._send_clear_messages(heaters)

        # Disconnect MQTT on shutdown
        if event == "Shutdown" and self._mqtt_client is not None:
            self._mqtt_client.disconnect()

    # Reset suppression flag on job lifecycle changes; actual suppression is decided in the temperature callback.
    if event in (
        "PrintStarted",
        "PrintResumed",
        "PrintDone",
        "PrintFailed",
        "PrintCancelled",
    ):
        self._suppressing_due_to_print = False

get_settings_defaults

get_settings_defaults()

Return the default settings for the plugin.

Returns:

Name Type Description
dict

Dictionary containing default plugin settings.

Source code in octoprint_temp_eta/__init__.py
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
def get_settings_defaults(self):
    """Return the default settings for the plugin.

    Returns:
        dict: Dictionary containing default plugin settings.
    """
    return dict(
        enabled=True,
        enable_heating_eta=True,
        suppress_while_printing=False,
        show_in_sidebar=True,
        show_in_navbar=True,
        show_in_tab=True,
        show_progress_bars=True,
        show_historical_graph=True,
        historical_graph_window_seconds=180,
        temp_display="octoprint",
        threshold_unit="octoprint",
        debug_logging=False,
        threshold_start=5.0,
        algorithm="linear",
        update_interval=1.0,
        history_size=60,
        # Persistence (advanced; protects SD cards on long-running phases)
        persist_backoff_reset_s=30.0,
        persist_backoff_initial_s=60.0,
        persist_backoff_max_s=300.0,
        persist_max_json_bytes=256 * 1024,
        # Cool Down ETA
        enable_cooldown_eta=True,
        cooldown_mode="threshold",
        cooldown_target_tool0=50.0,
        cooldown_target_bed=40.0,
        cooldown_target_chamber=30.0,
        cooldown_ambient_temp=None,
        cooldown_hysteresis_c=1.0,
        cooldown_fit_window_seconds=120,
        # Extended settings: status colors
        color_mode="bands",
        color_heating="#5cb85c",
        color_cooling="#337ab7",
        color_idle="#777777",
        # Extended settings: sound alerts
        sound_enabled=False,
        sound_target_reached=False,
        sound_cooldown_finished=False,
        sound_volume=0.5,
        sound_min_interval_s=10.0,
        # Extended settings: browser notifications (toast)
        notification_enabled=False,
        notification_target_reached=False,
        notification_cooldown_finished=False,
        notification_timeout_s=6.0,
        notification_min_interval_s=10.0,
        # MQTT settings
        mqtt_enabled=False,
        mqtt_broker_host="",
        mqtt_broker_port=1883,
        mqtt_username="",
        mqtt_password="",
        mqtt_use_tls=False,
        mqtt_tls_insecure=False,
        mqtt_base_topic="octoprint/temp_eta",
        mqtt_qos=0,
        mqtt_retain=False,
        mqtt_publish_interval=1.0,
    )

is_template_autoescaped

is_template_autoescaped()

Enable autoescaping for all plugin templates.

Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce XSS risk from unescaped injected variables.

Returns:

Name Type Description
bool bool

True to enable autoescaping.

Source code in octoprint_temp_eta/__init__.py
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
def is_template_autoescaped(self) -> bool:  # pyright: ignore
    """Enable autoescaping for all plugin templates.

    Opt-in to OctoPrint's template autoescaping (OctoPrint 1.11+) to reduce
    XSS risk from unescaped injected variables.

    Returns:
        bool: True to enable autoescaping.
    """
    return True

get_template_configs

get_template_configs()

Configure which templates to use and how to bind them.

Returns:

Name Type Description
list

List of template configuration dictionaries.

Source code in octoprint_temp_eta/__init__.py
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
def get_template_configs(self):
    """Configure which templates to use and how to bind them.

    Returns:
        list: List of template configuration dictionaries.
    """
    return [
        dict(type="navbar", custom_bindings=True),
        dict(
            type="sidebar",
            custom_bindings=False,
            name=gettext("Temperature ETA"),
            icon="fa fa-clock",
        ),
        # Use OctoPrint's default settingsViewModel binding for settings UI.
        dict(type="settings", custom_bindings=True),
        dict(type="tab", custom_bindings=False),
    ]

on_settings_save

on_settings_save(data)

Persist settings and clear UI/state when disabling the plugin.

OctoPrint applies settings changes only on save. When the plugin is disabled, we actively clear any previously shown countdowns in navbar/tab so the UI does not keep showing stale values.

Parameters:

Name Type Description Default
data dict

Settings data posted from the UI.

required
Source code in octoprint_temp_eta/__init__.py
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
def on_settings_save(self, data: Dict[str, Any]) -> Dict[str, Any]:
    """Persist settings and clear UI/state when disabling the plugin.

    OctoPrint applies settings changes only on save. When the plugin is disabled,
    we actively clear any previously shown countdowns in navbar/tab so the UI
    does not keep showing stale values.

    Args:
        data (dict): Settings data posted from the UI.
    """
    if not getattr(self, "_settings", None):
        return {}

    self._sanitize_settings_payload(data)

    was_enabled = bool(self._settings.get_boolean(["enabled"]))
    old_debug = bool(getattr(self, "_debug_logging_enabled", False))
    old_history_maxlen = self._read_history_maxlen_setting()
    saved = octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
    is_enabled = bool(self._settings.get_boolean(["enabled"]))

    self._refresh_debug_logging_flag()
    self._refresh_runtime_caches()
    if old_debug != bool(self._debug_logging_enabled):
        self._logger.info(
            "Debug logging %s",
            "enabled" if self._debug_logging_enabled else "disabled",
        )

    new_history_maxlen = self._read_history_maxlen_setting()
    if new_history_maxlen != old_history_maxlen:
        self._set_history_maxlen(new_history_maxlen)

    if was_enabled and not is_enabled:
        self._clear_all_heaters_frontend()

    # Reconfigure MQTT client with new settings
    self._configure_mqtt_client()

    return saved if isinstance(saved, dict) else {}

get_assets

get_assets()

Return static assets (JS, CSS, LESS) to be included.

Returns:

Name Type Description
dict

Dictionary with asset types and their file paths.

Source code in octoprint_temp_eta/__init__.py
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
def get_assets(self):
    """Return static assets (JS, CSS, LESS) to be included.

    Returns:
        dict: Dictionary with asset types and their file paths.
    """
    return dict(
        js=["js/temp_eta.js"],
        less=["less/temp_eta.less"],
    )

is_api_protected

is_api_protected()

Whether the Simple API requires an authenticated user.

OctoPrint's default for this will switch from False to True in the future. We explicitly opt in to avoid relying on defaults.

Source code in octoprint_temp_eta/__init__.py
2032
2033
2034
2035
2036
2037
2038
def is_api_protected(self) -> bool:  # type: ignore[override]
    """Whether the Simple API requires an authenticated user.

    OctoPrint's default for this will switch from False to True in the
    future. We explicitly opt in to avoid relying on defaults.
    """
    return True

is_api_adminonly

is_api_adminonly()

Whether the Simple API is restricted to admin users.

Source code in octoprint_temp_eta/__init__.py
2040
2041
2042
def is_api_adminonly(self) -> bool:  # type: ignore[override]
    """Whether the Simple API is restricted to admin users."""
    return True

get_api_commands

get_api_commands()

Return supported Simple API commands for the plugin.

Source code in octoprint_temp_eta/__init__.py
2044
2045
2046
2047
2048
2049
def get_api_commands(self) -> Dict[str, list]:  # type: ignore[override]
    """Return supported Simple API commands for the plugin."""
    return {
        "reset_profile_history": [],
        "reset_settings_defaults": [],
    }

on_api_command

on_api_command(command, data)

Handle Simple API commands.

Source code in octoprint_temp_eta/__init__.py
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
def on_api_command(self, command: str, data: Dict[str, Any]):  # type: ignore[override]
    """Handle Simple API commands."""
    if command == "reset_profile_history":
        deleted_count = self._reset_all_profile_histories()
        profile_id = self._get_current_profile_id()
        logger = getattr(self, "_logger", None)
        if logger is not None:
            logger.info(
                "Reset persisted history for all profiles (trigger_profile=%s deleted_files=%d)",
                str(profile_id),
                int(deleted_count),
            )
        return jsonify(
            {
                "success": True,
                "profile_id": profile_id,
                "deleted_files": deleted_count,
            }
        )

    if command == "reset_settings_defaults":
        self._reset_user_settings_to_defaults()
        logger = getattr(self, "_logger", None)
        if logger is not None:
            logger.info("Restored plugin settings defaults (user-editable keys)")

        # Notify all connected clients so the settings UI can refresh via requestData().
        if getattr(self, "_plugin_manager", None):
            try:
                self._plugin_manager.send_plugin_message(
                    self._identifier, {"type": "settings_reset"}
                )
            except Exception:
                pass

        return jsonify({"success": True, "message": gettext("Defaults restored.")})

    return jsonify({"success": False, "error": "unknown_command"})

get_update_information

get_update_information()

Provide update information for the Software Update plugin.

Returns:

Name Type Description
dict

Update configuration for the plugin.

Source code in octoprint_temp_eta/__init__.py
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
def get_update_information(self):
    """Provide update information for the Software Update plugin.

    Returns:
        dict: Update configuration for the plugin.
    """
    return dict(
        temp_eta=dict(
            displayName="Temperature ETA Plugin",
            displayVersion=self._plugin_version,
            # version check: github repository
            type="github_release",
            user="Ajimaru",
            repo="OctoPrint-TempETA",
            current=self._plugin_version,
            # update method: pip
            pip="https://github.com/Ajimaru/OctoPrint-TempETA/archive/{target_version}.zip",
        )
    )

Calculator Module

Temperature ETA calculation algorithms.

This module provides pure calculation logic for estimating time remaining until a printer heater reaches its target temperature or cools down.

All functions are stateless and independent of OctoPrint plugin mechanics, making them easy to test and maintain.

calculate_linear_eta

calculate_linear_eta(history, target, window_seconds=10.0)

Calculate ETA assuming constant heating rate.

Uses linear regression on recent temperature samples to estimate the rate of temperature change and predict time to target.

Parameters:

Name Type Description Default
history deque

Deque of (timestamp, actual_temp, target_temp) tuples

required
target float

Target temperature in degrees

required
window_seconds float

Time window for rate calculation (default: 10s)

10.0

Returns:

Type Description
Optional[float]

Estimated seconds to target, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def calculate_linear_eta(
    history: deque, target: float, window_seconds: float = 10.0
) -> Optional[float]:
    """Calculate ETA assuming constant heating rate.

    Uses linear regression on recent temperature samples to estimate
    the rate of temperature change and predict time to target.

    Args:
        history: Deque of (timestamp, actual_temp, target_temp) tuples
        target: Target temperature in degrees
        window_seconds: Time window for rate calculation (default: 10s)

    Returns:
        Estimated seconds to target, or None if insufficient data
    """
    # Validate inputs
    if not math.isfinite(target):
        return None
    if not math.isfinite(window_seconds) or window_seconds <= 0:
        return None

    if not history or len(history) < 2:
        return None

    # Use last N seconds of data for rate calculation (anchored to history)
    last_ts = max(
        (
            ts
            for ts, actual, _target in history
            if math.isfinite(ts) and math.isfinite(actual)
        ),
        default=None,
    )
    if last_ts is None:
        return None

    cutoff = last_ts - window_seconds
    t0 = None
    temp0 = None
    t1 = None
    temp1 = None

    for ts, actual, _target in history:
        # Validate data from history
        if not (math.isfinite(ts) and math.isfinite(actual)):
            continue
        if ts <= cutoff:
            continue
        if t0 is None:
            t0 = ts
            temp0 = actual
        t1 = ts
        temp1 = actual

    if t0 is None or t1 is None or temp0 is None or temp1 is None:
        return None

    time_diff = t1 - t0
    temp_diff = temp1 - temp0
    if time_diff <= 0 or temp_diff <= 0:
        return None

    # rate = ΔT / Δt (°C per second)
    rate = temp_diff / time_diff
    remaining = target - temp1

    if remaining <= 0:
        return None

    eta = remaining / rate
    return max(0.0, eta)

calculate_exponential_eta

calculate_exponential_eta(history, target, window_seconds=30.0)

Calculate ETA accounting for thermal asymptotic behavior.

Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau) Falls back to linear estimation when insufficient data or poor fit.

Parameters:

Name Type Description Default
history deque

Deque of (timestamp, actual_temp, target_temp) tuples

required
target float

Target temperature in degrees

required
window_seconds float

Time window for exponential fit (default: 30s)

30.0

Returns:

Type Description
Optional[float]

Estimated seconds to target, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def calculate_exponential_eta(
    history: deque, target: float, window_seconds: float = 30.0
) -> Optional[float]:
    """Calculate ETA accounting for thermal asymptotic behavior.

    Uses exponential model: T(t) = T_final - (T_final - T_0) * e^(-t/tau)
    Falls back to linear estimation when insufficient data or poor fit.

    Args:
        history: Deque of (timestamp, actual_temp, target_temp) tuples
        target: Target temperature in degrees
        window_seconds: Time window for exponential fit (default: 30s)

    Returns:
        Estimated seconds to target, or None if insufficient data
    """
    # Validate inputs
    if not math.isfinite(target):
        return None
    if not math.isfinite(window_seconds) or window_seconds <= 0:
        return None

    if not history or len(history) < 3:
        return None

    # Use a recent window for the fit
    last_ts = None
    for ts, temp, tgt in history:
        if math.isfinite(ts) and math.isfinite(temp):
            last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
    if last_ts is None:
        return None
    cutoff = last_ts - window_seconds

    recent = sorted(
        (
            (ts, temp, tgt)
            for (ts, temp, tgt) in history
            if math.isfinite(ts) and math.isfinite(temp) and ts > cutoff
        ),
        key=lambda x: x[0],
    )
    # Drop duplicate timestamps to avoid zero-span/unstable fits
    deduped = []
    last_ts = None
    for ts, temp, tgt in recent:
        if last_ts is not None and ts == last_ts:
            continue
        deduped.append((ts, temp, tgt))
        last_ts = ts
    recent = deduped

    if len(recent) < 6:
        return calculate_linear_eta(history, target)

    # Current sample
    t_now, temp_now, _ = recent[-1]
    remaining_now = target - temp_now
    if remaining_now <= 0:
        return None

    # We model the approach to target as asymptotic.
    # Reaching the target exactly takes infinite time; use an epsilon band.
    epsilon_c = 0.5
    if remaining_now <= epsilon_c:
        return 0.0

    # Build regression data for ln(target - T).
    # Exclude points too close to target (noise dominates) and invalid samples.
    t0 = recent[0][0]
    xs = []
    ys = []
    for ts, temp, _ in recent:
        delta = target - temp
        if delta <= epsilon_c:
            continue
        x = ts - t0
        if x < 0:
            continue
        xs.append(x)
        ys.append(math.log(delta))

    if len(xs) < 6:
        return calculate_linear_eta(history, target)

    span = xs[-1] - xs[0]
    if span < 5:
        return calculate_linear_eta(history, target)

    # Require we are actually heating in this window.
    if (recent[-1][1] - recent[0][1]) <= 0.2:
        return None

    # Linear regression: y = a + b*x, where b should be negative.
    x_mean = sum(xs) / len(xs)
    y_mean = sum(ys) / len(ys)
    sxx = 0.0
    sxy = 0.0
    for x, y in zip(xs, ys):
        dx = x - x_mean
        dy = y - y_mean
        sxx += dx * dx
        sxy += dx * dy

    if sxx <= 0:
        return calculate_linear_eta(history, target)

    slope = sxy / sxx
    if slope >= -1e-4:
        # Not decaying fast enough or unstable -> fallback.
        return calculate_linear_eta(history, target)

    tau = -1.0 / slope
    if tau <= 0 or tau > 2000:
        return calculate_linear_eta(history, target)

    # ETA to reach epsilon band.
    try:
        eta = tau * math.log(remaining_now / epsilon_c)
    except ValueError as e:
        # Log mathematische Fehler für bessere Nachvollziehbarkeit
        import logging

        logging.getLogger("octoprint_temp_eta").debug(
            "Exponential ETA math error: %s", e
        )
        return calculate_linear_eta(history, target)

    if eta < 0:
        eta = 0.0

    # Protect against spikes: if exponential estimate is wildly larger than
    # the linear estimate on the same data, trust the linear estimate.
    linear_eta = calculate_linear_eta(history, target)
    if linear_eta is not None and eta > (linear_eta * 5):
        return linear_eta

    return eta

calculate_cooldown_linear_eta

calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds=60.0)

Linear cooldown ETA from recent slope.

Uses linear regression on recent cooldown samples to estimate the rate of temperature decrease and predict time to goal.

Parameters:

Name Type Description Default
cooldown_history deque

Deque of (timestamp, temp) tuples

required
goal_c float

Target cooldown temperature in degrees

required
window_seconds float

Time window for fit (default: 60s)

60.0

Returns:

Type Description
Optional[float]

Estimated seconds to goal, or None if insufficient data

Source code in octoprint_temp_eta/calculator.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def calculate_cooldown_linear_eta(
    cooldown_history: deque, goal_c: float, window_seconds: float = 60.0
) -> Optional[float]:
    """Linear cooldown ETA from recent slope.

    Uses linear regression on recent cooldown samples to estimate
    the rate of temperature decrease and predict time to goal.

    Args:
        cooldown_history: Deque of (timestamp, temp) tuples
        goal_c: Target cooldown temperature in degrees
        window_seconds: Time window for fit (default: 60s)

    Returns:
        Estimated seconds to goal, or None if insufficient data
    """
    # Validate inputs
    if not math.isfinite(goal_c):
        return None
    if not math.isfinite(window_seconds) or window_seconds <= 0:
        return None

    if not cooldown_history:
        return None

    last_ts = None
    for ts, temp in cooldown_history:
        if math.isfinite(ts) and math.isfinite(temp):
            last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
    if last_ts is None:
        return None

    cutoff = last_ts - window_seconds
    recent = sorted(
        (
            (ts, temp)
            for ts, temp in cooldown_history
            if ts > cutoff and math.isfinite(ts) and math.isfinite(temp)
        ),
        key=lambda x: x[0],
    )
    if len(recent) < 2:
        return None

    t0, temp0 = recent[0]
    t1, temp1 = recent[-1]
    dt = t1 - t0
    dtemp = temp1 - temp0
    if dt <= 0:
        return None

    slope = dtemp / dt
    if slope >= -1e-3:
        # Not cooling fast enough
        return None

    remaining = temp1 - goal_c
    if remaining <= 0:
        return None

    eta = remaining / (-slope)
    if not math.isfinite(eta) or eta < 0:
        return None

    # Cap at 24 hours
    return float(min(eta, 24 * 3600))

calculate_cooldown_exponential_eta

calculate_cooldown_exponential_eta(cooldown_history, ambient_c, goal_c, window_seconds=60.0)

Exponential cooldown ETA (Newton's law of cooling).

Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)

Source code in octoprint_temp_eta/calculator.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def calculate_cooldown_exponential_eta(
    cooldown_history: deque,
    ambient_c: float,
    goal_c: float,
    window_seconds: float = 60.0,
) -> Optional[float]:
    """Exponential cooldown ETA (Newton's law of cooling).

    Models cooldown as: T(t) = T_ambient + (T_0 - T_ambient) * e^(-t/tau)
    """
    if not (math.isfinite(ambient_c) and math.isfinite(goal_c)):
        return None
    if not math.isfinite(window_seconds) or window_seconds <= 0:
        return None
    if goal_c <= ambient_c:
        return None
    if not cooldown_history or len(cooldown_history) < 4:
        return None

    last_ts = None
    for ts, temp in cooldown_history:
        if math.isfinite(ts) and math.isfinite(temp):
            last_ts = ts if (last_ts is None or ts > last_ts) else last_ts
    if last_ts is None:
        return None
    cutoff = last_ts - window_seconds

    recent = [
        (ts, temp)
        for ts, temp in cooldown_history
        if ts > cutoff and math.isfinite(ts) and math.isfinite(temp)
    ]
    if len(recent) < 6:
        return calculate_cooldown_linear_eta(cooldown_history, goal_c, window_seconds)

    _t_now, temp_now = recent[-1]
    if temp_now <= goal_c:
        return None

    epsilon = 0.5
    t0 = recent[0][0]
    xs = []
    ys = []
    for ts, temp in recent:
        delta = temp - ambient_c
        if delta <= epsilon:
            continue
        x = ts - t0
        if x < 0:
            continue
        xs.append(x)
        ys.append(math.log(delta))

    if len(xs) < 4:
        return None

    x_mean = sum(xs) / float(len(xs))
    y_mean = sum(ys) / float(len(ys))
    sxx = 0.0
    sxy = 0.0
    for x, y in zip(xs, ys):
        dx = x - x_mean
        dy = y - y_mean
        sxx += dx * dx
        sxy += dx * dy

    if sxx <= 0:
        return None

    slope = sxy / sxx
    if slope >= -1e-4:
        return None

    tau = -1.0 / slope
    if tau <= 0 or tau > 20000:
        return None

    numerator = temp_now - ambient_c
    denominator = goal_c - ambient_c
    if numerator <= 0 or denominator <= 0:
        return None

    try:
        eta = tau * math.log(numerator / denominator)
    except (ValueError, ZeroDivisionError) as e:
        import logging

        logging.getLogger("octoprint_temp_eta").debug(
            "Exponential ETA math error: %s", e
        )
        return None

    if not math.isfinite(eta) or eta < 0:
        return None

    return float(min(eta, 24 * 3600))

MQTT Client Module

Thread-safe MQTT client wrapper for the Temperature ETA plugin.

Manages connection lifecycle, automatic reconnection, and message publishing. All MQTT operations are non-blocking to avoid impacting the temperature callback.

Initialize MQTT client wrapper.

Parameters:

Name Type Description Default
logger Any

Logger instance for debug/info messages

required
identifier str

Plugin identifier for topic prefixes

required
Source code in octoprint_temp_eta/mqtt_client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(self, logger: Any, identifier: str):
    """Initialize MQTT client wrapper.

    Args:
        logger: Logger instance for debug/info messages
        identifier: Plugin identifier for topic prefixes
    """
    self._logger = logger
    self._identifier = identifier
    self._lock = threading.Lock()

    self._client: Optional[Any] = None
    self._enabled = False
    self._connected = False
    self._connecting = False

    # Connection settings
    self._broker_host = ""
    self._broker_port = 1883
    self._username = ""
    self._password = ""
    self._use_tls = False
    self._tls_insecure = False

    # Publishing settings
    self._base_topic = "octoprint/temp_eta"
    self._qos = 0
    self._retain = False
    self._publish_interval = 1.0

    # State tracking for state transition events
    self._last_published_time = 0.0
    self._last_heater_state: Dict[str, Optional[str]] = {}

    # Connection retry logic
    self._last_connect_attempt = 0.0
    self._connect_retry_interval = 30.0

    # Avoid log spam when paho-mqtt is not installed.
    # Use a dedicated lock to avoid re-entrantly acquiring self._lock.
    self._mqtt_unavailable_lock = threading.Lock()
    self._mqtt_unavailable_warned = False

configure

configure(settings)

Update MQTT configuration from plugin settings.

Parameters:

Name Type Description Default
settings Dict[str, Any]

Dictionary with MQTT configuration keys

required
Source code in octoprint_temp_eta/mqtt_client.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def configure(self, settings: Dict[str, Any]) -> None:
    """Update MQTT configuration from plugin settings.

    Args:
        settings: Dictionary with MQTT configuration keys
    """
    with self._lock:
        old_enabled = self._enabled

        self._enabled = bool(settings.get("mqtt_enabled", False))
        self._broker_host = str(settings.get("mqtt_broker_host", "")).strip()
        self._broker_port = int(settings.get("mqtt_broker_port", 1883))
        self._username = str(settings.get("mqtt_username", "")).strip()
        self._password = str(settings.get("mqtt_password", "")).strip()
        self._use_tls = bool(settings.get("mqtt_use_tls", False))
        self._tls_insecure = bool(settings.get("mqtt_tls_insecure", False))

        self._base_topic = str(
            settings.get("mqtt_base_topic", "octoprint/temp_eta")
        ).strip()
        self._qos = int(settings.get("mqtt_qos", 0))
        self._retain = bool(settings.get("mqtt_retain", False))
        self._publish_interval = float(settings.get("mqtt_publish_interval", 1.0))

        # Reconnect if settings changed and enabled
        if self._enabled and (not old_enabled or not self._connected):
            self._schedule_connect()
        elif not self._enabled and old_enabled:
            self._disconnect_internal()

disconnect

disconnect()

Disconnect MQTT client gracefully.

Source code in octoprint_temp_eta/mqtt_client.py
268
269
270
271
def disconnect(self) -> None:
    """Disconnect MQTT client gracefully."""
    with self._lock:
        self._disconnect_internal()

publish_eta_update

publish_eta_update(heater, eta, eta_kind, target, actual, cooldown_target=None)

Publish ETA update for a heater.

Parameters:

Name Type Description Default
heater str

Heater name (bed, tool0, chamber)

required
eta Optional[float]

ETA in seconds, or None

required
eta_kind Optional[str]

"heating", "cooling", or None

required
target Optional[float]

Target temperature

required
actual Optional[float]

Actual temperature

required
cooldown_target Optional[float]

Cooldown target temperature (if applicable)

None
Source code in octoprint_temp_eta/mqtt_client.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def publish_eta_update(
    self,
    heater: str,
    eta: Optional[float],
    eta_kind: Optional[str],
    target: Optional[float],
    actual: Optional[float],
    cooldown_target: Optional[float] = None,
) -> None:
    """Publish ETA update for a heater.

    Args:
        heater: Heater name (bed, tool0, chamber)
        eta: ETA in seconds, or None
        eta_kind: "heating", "cooling", or None
        target: Target temperature
        actual: Actual temperature
        cooldown_target: Cooldown target temperature (if applicable)
    """
    with self._lock:
        if not self._enabled or not self._connected:
            return

        # Check if we should publish based on interval
        now = time.time()
        if (now - self._last_published_time) < self._publish_interval:
            return

        self._last_published_time = now

        # Determine state for transition detection
        current_state = None
        if eta_kind == "heating" and eta is not None:
            current_state = "heating"
        elif eta_kind == "cooling" and eta is not None:
            current_state = "cooling"
        elif target is not None and actual is not None:
            if abs(target - actual) <= 1.0:
                current_state = "at_target"
            elif cooldown_target is not None and actual is not None:
                if abs(cooldown_target - actual) <= 1.0:
                    current_state = "cooled_down"

        # Detect state transitions
        last_state = self._last_heater_state.get(heater)
        state_changed = last_state != current_state
        self._last_heater_state[heater] = current_state

        # Build payload
        payload = {
            "heater": heater,
            "eta_seconds": eta,
            "eta_kind": eta_kind,
            "target": target,
            "actual": actual,
            "cooldown_target": cooldown_target,
            "timestamp": now,
            "state": current_state,
        }

        # Publish ETA data
        topic = f"{self._base_topic}/{heater}/eta"
        self._publish_message(topic, payload)

        # Publish state transition event if state changed
        if state_changed and current_state is not None:
            event_payload = {
                "heater": heater,
                "state": current_state,
                "previous_state": last_state,
                "timestamp": now,
                "actual": actual,
                "target": target,
            }
            event_topic = f"{self._base_topic}/{heater}/state_change"
            self._publish_message(event_topic, event_payload)

            self._logger.info(
                "MQTT: %s state changed from %s to %s",
                heater,
                last_state or "unknown",
                current_state,
            )

is_connected

is_connected()

Check if MQTT client is connected.

Returns:

Name Type Description
bool bool

True if connected

Source code in octoprint_temp_eta/mqtt_client.py
384
385
386
387
388
389
390
391
def is_connected(self) -> bool:
    """Check if MQTT client is connected.

    Returns:
        bool: True if connected
    """
    with self._lock:
        return self._connected

Usage Examples

Using the Calculator

from octoprint_temp_eta.calculator import calculate_linear_eta, calculate_exponential_eta
from collections import deque
import time

# Create temperature history
history = deque()
for i in range(10):
    timestamp = time.time() + i
    temperature = 25 + i * 0.2  # small ramp
    target = 200.0
    history.append((timestamp, temperature, target))

# Calculate ETA using linear estimator
eta_seconds = calculate_linear_eta(history, target)
print(f"Linear ETA: {eta_seconds}")

# Calculate ETA using exponential estimator (fallbacks to linear if needed)
eta_exp = calculate_exponential_eta(history, target)
print(f"Exponential ETA: {eta_exp}")

Using the MQTT Client

from octoprint_temp_eta.mqtt_client import MQTTClientWrapper
import logging

# Create logger
logger = logging.getLogger(__name__)

# Instantiate wrapper (note: the wrapper expects a logger and plugin identifier)
mqtt_client = MQTTClientWrapper(logger, "temp_eta")

# Configure client via settings-like dict
mqtt_client.configure({
    "mqtt_enabled": True,
    "mqtt_broker_host": "localhost",
    "mqtt_broker_port": 1883,
    "mqtt_username": "",
    "mqtt_password": "",
    "mqtt_use_tls": False,
    "mqtt_base_topic": "octoprint/temp_eta",
    "mqtt_qos": 0,
    "mqtt_retain": False,
    "mqtt_publish_interval": 1.0,
})

# Publish a sample ETA update (heater name, eta seconds, eta kind, target, actual)
mqtt_client.publish_eta_update(
    heater="tool0",
    eta=120.0,
    eta_kind="heating",
    target=200.0,
    actual=50.0,
)

# Disconnect when done
mqtt_client.disconnect()

Plugin Integration

import octoprint.plugin

class MyPlugin(octoprint.plugin.OctoPrintPlugin):
    def on_after_startup(self):
        # Access TempETA plugin
        temp_eta = self._plugin_manager.get_plugin("temp_eta")

        if temp_eta:
            # Get current ETA
            eta_data = temp_eta.get_current_eta()
            self._logger.info(f"Current ETA: {eta_data}")

Threading Considerations

All public methods are thread-safe when accessed through the plugin instance. However, when using calculator or MQTT client directly, ensure proper synchronization:

import threading

class SafeCalculator:
    def __init__(self):
        self._calculator = ETACalculator()
        self._lock = threading.RLock()

    def calculate(self, history, target):
        with self._lock:
            return self._calculator.calculate_eta(history, target)

Error Handling

All methods handle errors gracefully and return None or default values on failure:

try:
    eta = calculator.calculate_eta(history, target)
    if eta is None:
        print("Insufficient data for ETA calculation")
    else:
        print(f"ETA: {eta:.1f}s")
except Exception as e:
    logger.error(f"Calculation failed: {e}")

Type Hints

The codebase uses Python type hints for better IDE support:

from typing import Optional, Deque, Tuple

def calculate_eta(
    history: Deque[Tuple[float, float, float]],
    target: float
) -> Optional[float]:
    """
    Calculate ETA to target temperature.

    Args:
        history: Temperature history deque
        target: Target temperature

    Returns:
        ETA in seconds, or None if calculation fails
    """
    pass

Logging

Use the provided logger for all log messages:

self._logger.debug("Debug message")
self._logger.info("Info message")
self._logger.warning("Warning message")
self._logger.error("Error message")
self._logger.exception("Exception with traceback")

Next Steps