Source code for status_view

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import subprocess
import time
import psutil
import rospy
from typing import Tuple

from snack import Grid, Label
from view_ui import View, NameValueScale, NameStatValue, SnackScreen

[docs]class RobotStatusView(View): """View that presents the overview of robot's hardware (excluding servos). This version includes the following information: - battery voltage - voltage for 5V railing - voltage for 3.3V railing - battery statistics (on battery for..., battery remaining...); these are claculated in the code here and are based on monitoring the discharge of the battery and the last time the battery was changed - processor temperature - fan status (on, off) - CPU frequency - CPU governor - CPU load (1 minute, 5 minutes, 15 minutes average) - memory used (in %) - WiFi AP status (IP address if on) - WiFi dongle status (IP address is connected to infrastructure) - LAN status (IP address is connected to infrastructure) """ batt_last_change: float """Keeps the time that the battery was changed last.""" batt_last_change_value: float """The last value for the battery voltage when battery was replaced.""" batt_last_value: float """Last read battery voltage.""" batt_last_estimate: float """Time when the latest estimate about battery life was done.""" on_batt_str: str """String showing hh:mm time on battery from last change (or start).""" rem_batt_str: str """String showing hh:mm time remaining on battery based on last estimate."""
[docs] def __init__(self, screen:SnackScreen, timer: int, title: str = 'Robot Status'): """Constructor for the status view. Initializes the battery statistics. Parameters ---------- screen : snack.SnackScreen The screen where the display will be made. timer : int [description] title : str, optional Title to be printed for the view, by default 'Robot Status' """ super().__init__(screen, timer, title) self.batt_last_change = None self.batt_last_change_value = None self.batt_last_value = None self.batt_last_estimate = None self.on_batt_str = '0:00' self.rem_batt_str = 'Calc'
[docs] def create_content(self) -> Grid: """Creates a snack.Grid that contains the items to be displayed and initializes the values for these elements. Returns ------- snack.Grid The initialized Grid to be used by MainUI. """ grid = Grid(3, 19) w = [16, 6, 14] # widths for columns row = 0 # current row # Voltage # grid.setField(Label('Voltage'), 0, row) # row += 1 self.battery = NameValueScale('Battery', 'V', grid, row, w, 9.0, 12.6) self.battery.update_value(12.2) row += 1 self.on_battery = NameStatValue('On battery for', '', grid, row, w) self.on_battery.update_value(self.on_batt_str) row += 1 self.rem_battery = NameStatValue('Batt remaining', '', grid, row, w) self.rem_battery.update_value(self.rem_batt_str) row += 1 self.rpi5v = NameStatValue('5V rail', 'V', grid, row, w) self.rpi5v.update_value('5.0') row += 1 self.rpi3v = NameStatValue('3.3V rail', 'V', grid, row, w) self.rpi3v.update_value('3.2') row += 1 grid.setField(Label(''), 0, row) # Temperature # row += 1 # grid.setField(Label('Temperature'), 0, row) row += 1 self.temp = NameValueScale('RPi temperature', '°', grid, row, w, 25.0, 85.0) self.temp.update_value(45.0) row += 1 self.fan = NameStatValue('Fan', '', grid, row, w) self.fan.update_value('Off') row += 1 grid.setField(Label(''), 0, row) # CPU # row += 1 # grid.setField(Label('CPU'), 0, row) row += 1 self.cpu_speed = NameValueScale('CPU Speed', 'G', grid, row, w, 0.6, 1.5) self.cpu_speed.update_value(1.2) row += 1 self.cpu_gov = NameStatValue('CPU Governor', '', grid, row, w) self.cpu_gov.update_value('', '') row += 1 self.cpu_1m = NameValueScale('Load [1m]', '', grid, row, w, 0.0, 4.0) self.cpu_1m.update_value(2.5) row += 1 self.cpu_5m = NameValueScale('Load [5m]', '', grid, row, w, 0.0, 4.0) self.cpu_5m.update_value(2.5) row += 1 self.cpu_15m = NameValueScale('Load [15m]', '', grid, row, w, 0.0, 4.0) self.cpu_15m.update_value(2.5) row += 1 # max_mem_str = self.shell_cmd('cat /proc/meminfo | grep MemTotal') # self.max_mem = int(max_mem_str.split()[1]) / 1000000.0 self.cpu_mem = NameValueScale('Memory', '', grid, row, w, 0.0, 100.0) self.cpu_mem.update_value(0.0) row += 1 grid.setField(Label(''), 0, row) # WiFi # row += 1 # grid.setField(Label('Wi-Fi'), 0, row) row += 1 # get the interface name from AP config ap_config = self.shell_cmd('cat /etc/hostapd/hostapd.conf | grep interface') self.ap_interf = ap_config.strip().split('=')[1] self.ap_stat = NameStatValue('AP', '', grid, row, w) self.ap_stat.update_value('', '') row += 1 nets = self.shell_cmd('ls /sys/class/net/ | grep wl*').split('\n') nets.remove(self.ap_interf) if nets: self.wf_interf = nets[0] else: self.wf_interf = '' self.wf_stat = NameStatValue('WAN', '', grid, row, w) self.wf_stat.update_value('On', '') row += 1 self.eth_stat = NameStatValue('Eth', '', grid, row, w) self.eth_stat.update_value('', '') # row += 1 # grid.setField(Label(''), 0, row) return grid
[docs] def shell_cmd(self, command:str) -> str: """Convenience function for running a Shell command an returning the result. Parameters ---------- command : str Command to be execcuted (ex. ``ifconfig wlan0 | grep "inet "``). Returns ------- str The result of running the command or empty string is errors occurred. """ comm = subprocess.run(command, shell=True, stdout=subprocess.PIPE, encoding='utf-8') if comm.returncode == 0: return comm.stdout.strip() else: return ''
[docs] def read_sysfs(self, file:str) -> str: """Reads the content of a ``sysfs`` parameter and returns the value stripped. The method is provided as a faster alternative to using the :meth:`shell_cmd`` because no shell will need to be spun. Parameters ---------- file : str The ``sysfs`` access (ex. ``/sys/class/thermal/thermal_zone0/temp``). Please note that the function does not handle any exceptions, so if the file does not exist or the user does not have authorization to read the value an exception will be raised and needs to be handled by the calling program. Returns ------- str The result of reading that ``sysfs`` parameter. """ # try: raw = open(file, 'r').read() val = raw.strip() return val
[docs] def get_interf_status(self, interf:str) -> Tuple[str, str]: """Convenience function for getting the status and the IP address of an interface. Parameters ---------- interf : str The name of the interface (ex. ``wlan0``) Returns ------- tuple(str, str) Returns the status of the interface in the first string as "On" or "Off" and the IP address in the second string if connected or empty string if not connected. """ inet_line = self.shell_cmd(f'ifconfig {interf} | grep "inet "') if inet_line: return 'On', inet_line.split(' ')[1] else: return 'Off', ''
[docs] def update_content(self) -> None: """Reads the information for each of the elements in the screen and updates their content. This is triggered by the timer that is setup by the :class:`view_ui.View` class. """ # voltage # 5V railing raw = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage1_raw") raw = float(raw) scale = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage1_scale") scale = float(scale) value = raw * scale / 1000.0 * 2.0 # 1:2 divisor on ADC1 self.rpi5v.update_value(f'{value:4.1f}') # 3.3V railing # in_voltage1_raw is giving some bizare readings and we will use the voltage0-voltage1 reading instead raw = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage0-voltage1_raw") raw = float(raw) scale = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage0-voltage1_scale") scale = float(scale) # value contains the 5V calculation from earlier # raw * scale is giving howmuch more ADC0 input is vs. ADC1; # ADC1 is the previous 5V railing / 2 (because of divisor) value = value / 2.0 + (raw * scale) / 1000.0 self.rpi3v.update_value(f'{value:4.1f}') # Battery railing raw = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage2_raw") raw = float(raw) scale = self.read_sysfs("/sys/class/i2c-dev/i2c-1/device/1-0048/iio:device0/in_voltage2_scale") scale = float(scale) value = raw * scale / 1000.0 * 4.0 # 1:4 divisor on ADC2 self.battery.update_value(value) # estimates # if self.batt_last_change_value is None: self.batt_last_change_value = value self.batt_last_estimate = time.time() self.batt_last_change = time.time() # if self.batt_last_value is None: self.batt_last_value = value now = time.time() if value >= self.batt_last_value + 0.5: # battery changed self.batt_last_change = now self.batt_last_estimate = now self.batt_last_change_value = value self.on_batt_str = '0:00' self.rem_batt_str = 'Calc' if now > self.batt_last_estimate + 60: # update battery status every 60 seconds self.batt_last_estimate = now on_batt = now - self.batt_last_change self.on_batt_str = time.strftime("%-H:%M", time.gmtime(on_batt)) v_used = self.batt_last_change_value - value if v_used > 0.1: v_rem = max(value - 9.0, 0) t_rem = v_rem * on_batt / v_used self.rem_batt_str = time.strftime("%-H:%M", time.gmtime(t_rem)) else: self.rem_batt_str = 'Calc' self.on_battery.update_value(self.on_batt_str) self.rem_battery.update_value(self.rem_batt_str) self.batt_last_value = value # temperature temp_str = self.read_sysfs('/sys/class/thermal/thermal_zone0/temp') self.temp.update_value(int(temp_str)/1000.0) fan_str = self.read_sysfs('/sys/class/thermal/cooling_device0/cur_state') if fan_str == '1': self.fan.update_value('On') else: self.fan.update_value('Off') # CPU freq_str = self.read_sysfs('/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq') self.cpu_speed.update_value(float(freq_str)/1000000.0) gov = self.read_sysfs('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor') self.cpu_gov.update_value('', gov) load_str = self.read_sysfs('/proc/loadavg').split() self.cpu_1m.update_value(float(load_str[0]), format='4.2f') self.cpu_5m.update_value(float(load_str[1]), format='4.2f') self.cpu_15m.update_value(float(load_str[2]), format='4.2f') # memory mem_str = psutil.virtual_memory().percent # mem_str = self.shell_cmd('cat /proc/meminfo | grep MemFree') # mem = int(mem_str.split()[1]) / 1000000.0 self.cpu_mem.update_value(float(mem_str)) # wi-fi if self.ap_interf: stat, ip_addr = self.get_interf_status(self.ap_interf) self.ap_stat.update_value(stat, ip_addr) else: self.ap_stat.update_value('N/A') if self.wf_interf: stat, ip_addr = self.get_interf_status(self.wf_interf) self.wf_stat.update_value(stat, ip_addr) else: self.wf_stat.update_value('N/A') stat, ip_addr = self.get_interf_status('eth0') self.eth_stat.update_value(stat, ip_addr)