# pemmican: notifies users of Raspberry Pi 5 power issues
#
# Copyright (c) 2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0
"""
This module contains the "main" entry points for the :program:`pemmican-reset`
and :program:`pemmican-mon` applications, :class:`ResetApplication` and
:class:`MonitorApplication`, respectively. Both are derived from an abstract
base class, :class:`NotifierApplication` containing the logic common to both.
"""
import os
import sys
import html
import webbrowser
from time import monotonic, sleep
from abc import ABC, abstractmethod
import gi
gi.require_version('Gio', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gio, GLib
from dbus.mainloop.glib import DBusGMainLoop
from dbus.exceptions import DBusException
from pyudev import Context, Monitor
from pyudev.glib import MonitorObserver
from . import lang
from .power import reset_brownout, psu_max_current
from .notify import Notifications
from .const import (
XDG_CONFIG_DIRS,
XDG_CONFIG_HOME,
RPI_PSU_URL,
BROWNOUT_INHIBIT,
MAX_CURRENT_INHIBIT,
UNDERVOLT_INHIBIT,
OVERCURRENT_INHIBIT,
)
[docs]
class NotifierApplication(ABC):
"""
Base class for a GLib GApplication which needs to talk to the freedestkop
`notification service`_. An instance of this class can be called as a
"main" function, optionally passing in the command line parameters.
As a GApplication with an identifier (see :attr:`APP_ID`), only one
instance is typically permitted to run. Additional instances will exit
before activation, but will signal the original instance to activate
instead. The XDG directories, particularly those related to configuration
(:envvar:`XDG_CONFIG_HOME` and :envvar:`XDG_CONFIG_DIRS`) are expected in
the environment.
The application will terminate early with a non-zero exit code if
:envvar:`DISPLAY` or :envvar:`WAYLAND_DISPLAY` are missing from the
environment. Finally, if the freedesktop notification service does not show
up within 1 minute of the application starting, the application will also
terminate with a non-zero exit code.
This is an abstract class; descendents need to implement the :meth:`run`
method.
.. attribute:: APP_ID
The application's identifier, in the typical form of a reverse
domain-name. This should be overridden at the class-level in each
descendent.
.. _notification service: https://specifications.freedesktop.org/notification-spec/
"""
APP_ID = 'com.canonical.pemmican'
def __init__(self):
super().__init__()
self.title = ''
self.app = None
self.main_loop = None
self.notifier = None
self._run_start = None
def __call__(self, args=None):
# Bail if we don't have DISPLAY or WAYLAND_DISPLAY set in the
# environment (which are required to launch the browser on "more
# information"); the service will retry us later (when hopefully
# they've shown up...)
if not os.environ.keys() & {'DISPLAY', 'WAYLAND_DISPLAY'}:
print('Missing DISPLAY / WAYLAND_DISPLAY',
file=sys.stderr, flush=True)
return 1
# Integrate dbus-python with GLib; the set_as_default parameter means
# we don't have to pass around the NativeMainLoop object this returns
# whenever connecting to a bus -- it'll be the default anyway
DBusGMainLoop(set_as_default=True)
# Yes, it's unconventional to use GApplication instead of
# GtkApplication but we've no need of any GUI stuff here; we just want
# to talk to DBus notifications (if necessary) and quit
self.app = Gio.Application()
self.app.set_application_id(self.APP_ID)
self.app.connect('activate', self.do_activate)
with lang.init():
self.title = lang._('Raspberry Pi PMIC Monitor')
return self.app.run(sys.argv if args is None else args)
[docs]
def do_activate(self, user_data):
"""
Application activation. This starts the GLib main loop; any set up
which should be performed before entering the main loop should be done
here.
The application's main logic (in the abstract :meth:`run` method) is
ultimately executed as a one-shot idle handler from the GLib main loop,
configured here.
"""
self.main_loop = GLib.MainLoop()
GLib.idle_add(self._call_run)
self._run_start = monotonic()
self.main_loop.run()
def _call_run(self):
"""
This method is run as an idle handler from the GLib main loop. If the
freedesktop notification service is available, this executes the
abstract :meth:`run` method. Otherwise, provided we haven't yet timed
out, it schedules a future retry.
"""
try:
self.notifier = Notifications()
except DBusException as err:
if err.get_dbus_name() in (
'org.freedesktop.DBus.Error.NameHasNoOwner',
'org.freedesktop.DBus.Error.ServiceUnknown',
):
if monotonic() - self._run_start > 60:
raise
else:
sleep(1)
return True
else:
raise
self._run_id = None
self.run()
return False
[docs]
@abstractmethod
def run(self):
"""
This abstract method should be overridden in descendents to provide the
main logic of the application.
"""
raise NotImplementedError()
[docs]
class ResetApplication(NotifierApplication):
"""
Checks the Raspberry Pi 5's power status and reports, via the freedesktop
notification mechanism, if the last reset occurred due to a brownout
(undervolt) situation, or if the current power supply failed to negotiate a
5A supply. This script is intended to be run from a systemd user slice as
part of the :file:`graphical-session.target`.
"""
APP_ID = 'com.canonical.pemmican.ResetGui'
def __init__(self):
super().__init__()
self.inhibit = None
[docs]
def run(self):
self.notifier.on_closed = self.do_notification_closed
self.notifier.on_action = self.do_notification_action
self.do_check()
[docs]
def do_notification_closed(self, msg_id, reason):
"""
Callback executed when the user dismisses a notification by any
mechanism (explicit close, timeout, action activation, etc). As a
oneshot application, which can only ever show one notification, we
just quit if it's closed.
"""
self.main_loop.quit()
[docs]
def do_notification_action(self, msg_id, action_key):
"""
Callback executed when the user activates an action on one of our
pending notifications. This launches the web-browser for the "More
information" action, or touches the appropriate file for the "Don't
show again" action.
"""
if action_key == 'moreinfo':
webbrowser.open_new_tab(RPI_PSU_URL)
else: # action_key == 'suppress'
inhibit_path = XDG_CONFIG_HOME / __package__ / self.inhibit
inhibit_path.parent.mkdir(parents=True, exist_ok=True)
inhibit_path.touch()
[docs]
def do_check(self):
"""
This method is the bulk of the :program:`pemmican-reset` application.
It runs the checks on the device-tree nodes and, if notifications are
required, queries the notification service's capabilities to format the
notifications accordingly.
"""
try:
brownout = reset_brownout() and not any(
(p / __package__ / BROWNOUT_INHIBIT).exists()
for p in [XDG_CONFIG_HOME] + XDG_CONFIG_DIRS)
max_current = (psu_max_current() < 5000) and not any(
(p / __package__ / MAX_CURRENT_INHIBIT).exists()
for p in [XDG_CONFIG_HOME] + XDG_CONFIG_DIRS)
except OSError:
# We're probably not on a Pi 5; just exit
brownout = max_current = False
if not brownout and not max_current:
self.main_loop.quit()
return
caps = self.notifier.get_capabilities()
escape = html.escape if 'body-markup' in caps else lambda s: s
# Customize what we notify based on the notification system's
# capabilities; if we have actions, use them, otherwise tack "more
# info" URLs onto the notification itself, using hyperlinks if capable
if 'actions' in caps:
actions = [
('moreinfo', lang._('More information')),
('suppress', lang._("Don't show again")),
]
suffix = ''
elif 'body-hyperlinks' in caps:
actions = []
suffix = (
f'<a href="{escape(RPI_PSU_URL)}">' +
escape(lang._("More information")) +
'</a>')
else:
actions = []
suffix = escape(lang._('See {RPI_PSU_URL} for more information')
.format(RPI_PSU_URL=RPI_PSU_URL))
# Check for brownout initially. If brownout caused a reset, don't
# bother double-warning about an inadequate PSU
if brownout:
self.inhibit = BROWNOUT_INHIBIT
body=escape(lang._(
'Reset due to low power; please check your power supply')) + (
'. ' + suffix if suffix else '')
self.notifier.notify(
self.title, body=body,
hints={'urgency': 2}, actions=actions)
else: # max_current
self.inhibit = MAX_CURRENT_INHIBIT
body=escape(lang._(
'This power supply is not capable of supplying 5A; power '
'to peripherals will be restricted')) + (
'. ' + suffix if suffix else '')
self.notifier.notify(
self.title, body=body,
hints={'urgency': 1}, actions=actions)
# If nothing is pending (already!), just exit immediately
if not self.notifier.pending:
self.main_loop.quit()
[docs]
class MonitorApplication(NotifierApplication):
"""
Monitors the Raspberry Pi 5's power supply for reports of undervolt
(deficient power supply), or overcurrent (excessive draw by USB
peripherals). Issues are reported via the freedesktop notification
mechanism. This script is intended to be run from a systemd user slice as
part of the :file:`graphical-session.target`.
"""
APP_ID = 'com.canonical.pemmican.MonitorGui'
def __init__(self):
super().__init__()
self.overcurrent_monitor = None
self.overcurrent_observer = None
self.overcurrent_handle = None
self.overcurrent_msg_id = 0
self.overcurrent_counts = {}
self.undervolt_monitor = None
self.undervolt_observer = None
self.undervolt_handle = None
self.undervolt_msg_id = 0
[docs]
def run(self):
check_undervolt = not any(
(p / __package__ / UNDERVOLT_INHIBIT).exists()
for p in [XDG_CONFIG_HOME] + XDG_CONFIG_DIRS)
check_overcurrent = not any(
(p / __package__ / OVERCURRENT_INHIBIT).exists()
for p in [XDG_CONFIG_HOME] + XDG_CONFIG_DIRS)
if not check_undervolt and not check_overcurrent:
self.main_loop.quit()
return
self.notifier.on_closed = self.do_notification_closed
self.notifier.on_action = self.do_notification_action
context = Context()
if check_overcurrent:
self.overcurrent_monitor = Monitor.from_netlink(context)
self.overcurrent_monitor.filter_by(subsystem='usb')
self.overcurrent_observer = MonitorObserver(self.overcurrent_monitor)
self.overcurrent_handle = self.overcurrent_observer.connect(
'device-event', self.do_usb_device)
self.overcurrent_monitor.start()
if check_undervolt:
self.undervolt_monitor = Monitor.from_netlink(context)
self.undervolt_monitor.filter_by(subsystem='hwmon')
self.undervolt_observer = MonitorObserver(self.undervolt_monitor)
self.undervolt_handle = self.undervolt_observer.connect(
'device-event', self.do_hwmon_device)
self.undervolt_monitor.start()
[docs]
def do_usb_device(self, observer, device):
"""
Callback registered for USB device events. This method performs further
filtering to determine if this is actually an overcurrent event, and
dispatches a notification if it is.
"""
try:
if device.action == 'change':
port = device.properties['OVER_CURRENT_PORT']
count = int(device.properties['OVER_CURRENT_COUNT'])
else:
return
except KeyError:
return
if (
# Only display a notification if there's no active notification
# already; this works around an issue in GNOME that replacing a
# notification loses its actions
self.overcurrent_msg_id == 0 and
port and count > self.overcurrent_counts.get(port, 0)
):
self.overcurrent_counts[port] = count
self.overcurrent_msg_id = self.notify(
'overcurrent',
lang._('USB overcurrent; please check your connected USB devices'),
replaces_id=self.overcurrent_msg_id)
[docs]
def do_hwmon_device(self, observer, device):
"""
Callback registered for hardware monitoring events. This performs
further filtering to determine if this is actually an undervolt event,
and dispatches a notification if it is.
"""
try:
if device.action == 'change':
name = device.attributes.asstring('name')
alarm = device.attributes.asint('in0_lcrit_alarm')
else:
return
except KeyError:
return
# See note in method above
if self.undervolt_msg_id == 0 and name == 'rpi_volt' and alarm:
self.undervolt_msg_id = self.notify(
'undervolt',
lang._('Low voltage warning; please check your power supply'),
replaces_id=self.undervolt_msg_id)
[docs]
def notify(self, key, msg, *, replaces_id=0):
"""
This method is called by the monitoring callbacks
(:meth:`do_usb_device` and :meth:`do_hwmon_device`) to format and
dispatch a notification according to the capabilities of the system's
notification mechanism.
"""
caps = self.notifier.get_capabilities()
escape = html.escape if 'body-markup' in caps else lambda s: s
# Customize what we notify based on the notification system's
# capabilities; if we have actions, use them, otherwise tack "more
# info" URLs onto the notification itself, using hyperlinks if capable
if 'actions' in caps:
actions = [
('moreinfo', lang._('More information')),
(f'suppress_{key}', lang._("Don't show again")),
]
suffix = ''
elif 'body-hyperlinks' in caps:
actions = []
suffix = (
f'<a href="{escape(RPI_PSU_URL)}">' +
escape(lang._("More information")) +
'</a>')
else:
actions = []
suffix = escape(lang._('See {RPI_PSU_URL} for more information')
.format(RPI_PSU_URL=RPI_PSU_URL))
return self.notifier.notify(
self.title, body=escape(msg) + ('. ' + suffix if suffix else ''),
hints={'urgency': 2}, actions=actions, replaces_id=replaces_id)
[docs]
def do_notification_closed(self, msg_id, reason):
"""
Callback executed when the user dismisses a notification by any
mechanism (explicit close, timeout, action activation, etc).
"""
if msg_id == self.undervolt_msg_id:
self.undervolt_msg_id = 0
elif msg_id == self.overcurrent_msg_id:
self.overcurrent_msg_id = 0
[docs]
def do_notification_action(self, msg_id, action_key):
"""
Callback executed when the user activates an action on one of our
pending notifications. This launches the web-browser for the "More
information" action, or touches the appropriate file for the "Don't
show again" action.
"""
if action_key == 'suppress_undervolt':
inhibit = XDG_CONFIG_HOME / __package__ / UNDERVOLT_INHIBIT
inhibit.parent.mkdir(parents=True, exist_ok=True)
inhibit.touch()
self.undervolt_observer.disconnect(self.undervolt_handle)
self.undervolt_handle = None
self.undervolt_observer = None
self.undervolt_monitor = None
elif action_key == 'suppress_overcurrent':
inhibit = XDG_CONFIG_HOME / __package__ / OVERCURRENT_INHIBIT
inhibit.parent.mkdir(parents=True, exist_ok=True)
inhibit.touch()
self.overcurrent_observer.disconnect(self.overcurrent_handle)
self.overcurrent_handle = None
self.overcurrent_observer = None
self.overcurrent_monitor = None
else: # action_key == 'moreinfo'
webbrowser.open_new_tab(RPI_PSU_URL)
if self.undervolt_monitor is None and self.overcurrent_monitor is None:
self.main_loop.quit()
reset_main = ResetApplication()
monitor_main = MonitorApplication()