This commit is contained in:
Mario Fetka 2021-10-27 08:44:51 +02:00
parent 38ddc53c2c
commit 6efd4c00f8
39 changed files with 872 additions and 722 deletions

View File

@ -2,10 +2,10 @@ language: python
dist: xenial
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
# TODO add 3.9
- "3.9-dev"
install: pip install tox-travis tox tox-venv

43
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,43 @@
# Contributing to afancontrol
I started afancontrol in 2013 in an attempt to make my custom PC case quiet.
It's been working 24/7 ever since with no issues, and eventually I started using
it on my other machines as well.
I'm quite happy with how this package serves my needs, and I hope
it can be useful for someone else too.
Contributions are welcome, however, keep in mind, that:
* Complex features and large diffs would probably be rejected,
because it would make maintenance more complicated for me,
* I don't have any plans for active development and promotion
of the package.
## Dev workflow
Prepare a virtualenv:
mkvirtualenv afancontrol
make develop
I use [TDD](https://en.wikipedia.org/wiki/Test-driven_development) for development.
Run tests:
make test
Autoformat the code and imports:
make format
Run linters:
make lint
So essentially after writing a small part of code and tests I call these
three commands and fix the errors until they stop failing.
To build docs:
make docs

View File

@ -14,14 +14,14 @@ RUN apt-get update \
RUN mkdir ~/.gnupg && echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf
# Import the GPG key used to sign the PyPI releases of `afancontrol`:
RUN gpg --recv-keys "2D3B9C1712FF84F7"
RUN gpg --recv-keys "AA7B5406547AF062"
COPY debian /build/afancontrol/debian
WORKDIR /build/afancontrol/
RUN mkdir -p debian/upstream \
&& gpg --export --export-options export-minimal --armor \
'BE3D633AB6792715ECF34D742D3B9C1712FF84F7' \
'A18FE9F6F570D5B4E1E1853FAA7B5406547AF062' \
> debian/upstream/signing-key.asc
RUN apt-get -y build-dep .

View File

@ -1,11 +1,11 @@
.PHONY: format
format:
black src tests *.py && isort -rc src tests *.py
black src tests *.py && isort src tests *.py
.PHONY: lint
lint:
flake8 src tests *.py && isort --check-only -rc src tests *.py && black --check src tests *.py && mypy src tests
flake8 src tests *.py && isort --check-only src tests *.py && black --check src tests *.py && mypy src tests
.PHONY: test
test:

View File

@ -17,7 +17,7 @@ afancontrol
`fancontrol <https://github.com/lm-sensors/lm-sensors/blob/master/prog/pwm/fancontrol>`_
with more advanced configuration abilities.
`afancontrol` measures temperature from the sensors, computes the required
airflow and sets the PWM fan speeds accordingly.
`afancontrol` measures temperatures from sensors, computes the required
airflow and sets PWM fan speeds accordingly.
The docs are available at `<https://afancontrol.readthedocs.io/>`_.

11
debian/changelog vendored
View File

@ -1,3 +1,14 @@
afancontrol (3.0.0-1) unstable; urgency=medium
* Drop support for prometheus-client < 0.1.0 (debian stretch)
* Drop support for Python 3.5 (debian stretch)
* Add support for Python 3.9
* config: add `ipmi_sensors` location property
* Add dh-systemd (would automatically (re)start the systemd service upon
package (re)installation)
-- Kostya Esmukov <kostya@esmukov.ru> Sat, 10 Oct 2020 14:43:01 +0000
afancontrol (2.2.1-1) unstable; urgency=medium
* Fix compatibility with py3.5

5
debian/control vendored
View File

@ -4,12 +4,13 @@ Priority: optional
Maintainer: Kostya Esmukov <kostya@esmukov.ru>
Build-Depends: debhelper (>= 9),
dh-python,
debhelper (>= 9.20160709) | dh-systemd,
python3-all,
python3-setuptools
Build-Depends-Indep: python3-pytest,
python3-requests,
python3-click,
python3-prometheus-client,
python3-prometheus-client (>= 0.1.0),
python3-serial
Standards-Version: 3.9.8
Homepage: https://github.com/KostyaEsmukov/afancontrol
@ -27,7 +28,7 @@ Depends: ${python3:Depends},
lm-sensors,
python3-click,
python3-pkg-resources,
python3-prometheus-client,
python3-prometheus-client (>= 0.1.0),
python3-serial
Suggests: freeipmi-tools,
Description: Advanced Fan Control program (Python 3)

7
debian/rules vendored
View File

@ -8,10 +8,5 @@ export PYBUILD_NAME=afancontrol
export PYBUILD_TEST_PYTEST=1
export PYBUILD_TEST_ARGS={dir}/tests/
override_dh_auto_test:
# the test suite does not 100% pass at present,
# but the output is useful documentation for users
(dh_auto_test; echo $?) > test-results
%:
dh $@ --with python3 --buildsystem=pybuild
dh $@ --with systemd,python3 --buildsystem=pybuild

View File

@ -220,8 +220,7 @@ There's a Dockerfile which can be used to build a Debian `.deb` package:
make deb-from-pypi
# Install the package:
sudo dpkg -i dist/debian/*.deb
sudo apt install -f
sudo apt install ./dist/debian/*.deb
Perhaps one day the package might get published to the Debian repos,
so a simple ``apt install afancontrol`` would work. But for now, given

View File

@ -10,10 +10,15 @@ logfile = /var/log/afancontrol.log
# Default: 5
interval = 5
# Hddtemp location. Relevant only when there're `type = hdd` temperature sensors.
# Hddtemp location. Used by the `type = hdd` temperature sensors.
# Default: hddtemp
;hddtemp = /usr/local/bin/hddtemp
# `ipmi-sensors` location from the `freeipmi-tools` package.
# Used by the `type = freeipmi` fans.
# Default: ipmi-sensors
;ipmi_sensors = /usr/local/bin/ipmi-sensors
# Prometheus exporter listening hostname and TCP port.
# Default: (empty value)
;exporter_listen_host = 127.0.0.1:8083

View File

@ -32,13 +32,12 @@ include_trailing_comma = True
force_grid_wrap = 0
combine_as_imports = True
line_length = 88
not_skip = __init__.py
[metadata]
author = Kostya Esmukov
author_email = kostya@esmukov.ru
classifier =
Development Status :: 4 - Beta
Development Status :: 5 - Production/Stable
Intended Audience :: System Administrators
License :: OSI Approved :: MIT License
Natural Language :: English
@ -46,10 +45,10 @@ classifier =
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: System :: Hardware
Topic :: System :: Monitoring
Topic :: System :: Systems Administration
@ -77,7 +76,7 @@ install_requires =
package_dir =
= src
packages = find:
python_requires = >=3.5
python_requires = >=3.6
[options.entry_points]
console_scripts =
@ -87,16 +86,16 @@ console_scripts =
arduino =
pyserial>=3.0
metrics =
prometheus-client
prometheus-client>=0.1.0
dev =
black==19.10b0; python_version>='3.6'
coverage==5.1
flake8==3.7.9
isort==4.3.21
mypy==0.770
pytest==5.4.2
black==20.8b1
coverage==5.3
flake8==3.8.4
isort==5.5.4
mypy==0.782
pytest==6.1.0
requests
sphinx==3.0.3
sphinx==3.2.1
wheel
[options.packages.find]

View File

@ -1 +1 @@
__version__ = "2.2.1"
__version__ = "3.0.0"

View File

@ -5,6 +5,7 @@ import threading
from timeit import default_timer
from typing import TYPE_CHECKING, Any, Dict, NewType, Optional
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger
if TYPE_CHECKING:
@ -50,11 +51,22 @@ class ArduinoConnection:
lambda: _StatusProtocol(self), url=serial_url, baudrate=baudrate
)
self._context_manager_depth = 0
self._status = None # type: Optional[Dict[str, Dict[str, int]]]
self._status_clock = None # type: Optional[float]
self._status: Optional[Dict[str, Dict[str, int]]] = None
self._status_clock: Optional[float] = None
self._status_lock = threading.Lock()
self._status_event = threading.Event()
@classmethod
def from_configparser(
cls, section: ConfigParserSection[ArduinoName]
) -> "ArduinoConnection":
return cls(
name=section.name,
serial_url=section["serial_url"],
baudrate=section.getint("baudrate", fallback=DEFAULT_BAUDRATE),
status_ttl=section.getint("status_ttl", fallback=DEFAULT_STATUS_TTL),
)
def __eq__(self, other):
if isinstance(other, type(self)):
return (
@ -218,10 +230,10 @@ class _AutoRetriedReaderThread:
def __init__(self, protocol_factory, **serial_for_url_kwargs) -> None:
self.protocol_factory = protocol_factory
self.serial_for_url_kwargs = serial_for_url_kwargs
self._reader_thread = None # type: Optional[ReaderThread]
self._transport = None # type: Optional[ReaderThread]
self._watchdog_thread = None # type: Optional[threading.Thread]
self._watchdog_queue = queue.Queue() # type: queue.Queue[Any]
self._reader_thread: Optional[ReaderThread] = None
self._transport: Optional[ReaderThread] = None
self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_queue: queue.Queue[Any] = queue.Queue()
def __enter__(self): # reusable
# TODO ?? maybe clean the _watchdog_queue?

View File

@ -9,171 +9,120 @@ from typing import (
Sequence,
Tuple,
TypeVar,
Union,
)
from afancontrol.arduino import (
DEFAULT_BAUDRATE,
DEFAULT_STATUS_TTL,
ArduinoConnection,
ArduinoName,
ArduinoPin,
)
from afancontrol.filters import (
MovingMedianFilter,
MovingQuantileFilter,
NullFilter,
TempFilter,
)
import afancontrol.filters
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection, iter_sections
from afancontrol.exec import Programs
from afancontrol.filters import FilterName, TempFilter
from afancontrol.logger import logger
from afancontrol.pwmfan import (
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice,
FreeIPMIFanSpeed,
LinuxFanPWMRead,
LinuxFanPWMWrite,
LinuxFanSpeed,
PWMDevice,
PWMValue,
)
from afancontrol.pwmfan import FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temp import CommandTemp, FileTemp, HDDTemp, Temp, TempCelsius
from afancontrol.temp import FilteredTemp, TempName
DEFAULT_CONFIG = "/etc/afancontrol/afancontrol.conf"
DEFAULT_PIDFILE = "/run/afancontrol.pid"
DEFAULT_INTERVAL = 5
DEFAULT_FANS_SPEED_CHECK_INTERVAL = 3
DEFAULT_HDDTEMP = "hddtemp"
DEFAULT_REPORT_CMD = (
'printf "Subject: %s\nTo: %s\n\n%b"'
' "afancontrol daemon report: %REASON%" root "%MESSAGE%"'
" | sendmail -t"
)
DEFAULT_FAN_TYPE = "linux"
DEFAULT_PWM_LINE_START = 100
DEFAULT_PWM_LINE_END = 240
DEFAULT_NEVER_STOP = True
DEFAULT_WINDOW_SIZE = 3
FilterName = NewType("FilterName", str)
TempName = NewType("TempName", str)
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
MappingName = NewType("MappingName", str)
T = TypeVar("T")
FanSpeedModifier = NamedTuple(
"FanSpeedModifier",
# fmt: off
[
("fan", FanName),
("modifier", float), # [0..1]
]
# fmt: on
)
FansTempsRelation = NamedTuple(
"FansTempsRelation",
# fmt: off
[
("temps", Sequence[TempName]),
("fans", Sequence[FanSpeedModifier]),
]
# fmt: on
)
AlertCommands = NamedTuple(
"AlertCommands",
# fmt: off
[
("enter_cmd", Optional[str]),
("leave_cmd", Optional[str]),
]
# fmt: on
)
class FanSpeedModifier(NamedTuple):
fan: FanName
modifier: float # [0..1]
Actions = NamedTuple(
"Actions",
# fmt: off
[
("panic", AlertCommands),
("threshold", AlertCommands),
]
# fmt: on
)
class FansTempsRelation(NamedTuple):
temps: Sequence[TempName]
fans: Sequence[FanSpeedModifier]
TriggerConfig = NamedTuple(
"TriggerConfig",
# fmt: off
[
("global_commands", Actions),
("temp_commands", Mapping[TempName, Actions]),
]
# fmt: on
)
class AlertCommands(NamedTuple):
enter_cmd: Optional[str]
leave_cmd: Optional[str]
DaemonCLIConfig = NamedTuple(
"DaemonCLIConfig",
# fmt: off
[
("pidfile", Optional[str]),
("logfile", Optional[str]),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
class Actions(NamedTuple):
panic: AlertCommands
threshold: AlertCommands
DaemonConfig = NamedTuple(
"DaemonConfig",
# fmt: off
[
("pidfile", Optional[str]),
("logfile", Optional[str]),
("interval", int),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> "Actions":
panic = AlertCommands(
enter_cmd=section.get("panic_enter_cmd", fallback=None),
leave_cmd=section.get("panic_leave_cmd", fallback=None),
)
FilteredTemp = NamedTuple(
"FilteredTemp",
# fmt: off
[
("temp", Temp),
("filter", TempFilter),
]
# fmt: on
)
threshold = AlertCommands(
enter_cmd=section.get("threshold_enter_cmd", fallback=None),
leave_cmd=section.get("threshold_leave_cmd", fallback=None),
)
ParsedConfig = NamedTuple(
"ParsedConfig",
# fmt: off
[
("daemon", DaemonConfig),
("report_cmd", str),
("triggers", TriggerConfig),
("arduino_connections", Mapping[ArduinoName, ArduinoConnection]),
("fans", Mapping[FanName, PWMFanNorm]),
("readonly_fans", Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]),
("temps", Mapping[TempName, FilteredTemp]),
("mappings", Mapping[MappingName, FansTempsRelation]),
]
# fmt: on
)
return cls(panic=panic, threshold=threshold)
class TriggerConfig(NamedTuple):
global_commands: Actions
temp_commands: Mapping[TempName, Actions]
class DaemonCLIConfig(NamedTuple):
pidfile: Optional[str]
logfile: Optional[str]
exporter_listen_host: Optional[str]
class DaemonConfig(NamedTuple):
pidfile: Optional[str]
logfile: Optional[str]
interval: int
exporter_listen_host: Optional[str]
@classmethod
def from_configparser(
cls, section: ConfigParserSection, daemon_cli_config: DaemonCLIConfig
) -> "DaemonConfig":
pidfile = first_not_none(
daemon_cli_config.pidfile, section.get("pidfile", fallback=DEFAULT_PIDFILE)
)
if pidfile is not None and not pidfile.strip():
pidfile = None
logfile = first_not_none(
daemon_cli_config.logfile, section.get("logfile", fallback=None)
)
interval = section.getint("interval", fallback=5)
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host,
section.get("exporter_listen_host", fallback=None),
)
return cls(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
)
class ParsedConfig(NamedTuple):
daemon: DaemonConfig
report_cmd: str
triggers: TriggerConfig
arduino_connections: Mapping[ArduinoName, ArduinoConnection]
fans: Mapping[FanName, PWMFanNorm]
readonly_fans: Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]
temps: Mapping[TempName, FilteredTemp]
mappings: Mapping[MappingName, FansTempsRelation]
def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> ParsedConfig:
@ -183,13 +132,13 @@ def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> Parse
except Exception as e:
raise RuntimeError("Unable to parse %s:\n%s" % (config_path, e))
daemon, hddtemp = _parse_daemon(config, daemon_cli_config)
daemon, programs = _parse_daemon(config, daemon_cli_config)
report_cmd, global_commands = _parse_actions(config)
arduino_connections = _parse_arduino_connections(config)
filters = _parse_filters(config)
temps, temp_commands = _parse_temps(config, hddtemp, filters)
temps, temp_commands = _parse_temps(config, programs, filters)
fans = _parse_fans(config, arduino_connections)
readonly_fans = _parse_readonly_fans(config, arduino_connections)
readonly_fans = _parse_readonly_fans(config, arduino_connections, programs)
_check_fans_namespace(fans, readonly_fans)
mappings = _parse_mappings(config, fans, temps)
@ -216,110 +165,35 @@ def first_not_none(*parts: Optional[T]) -> Optional[T]:
def _parse_daemon(
config: configparser.ConfigParser, daemon_cli_config: DaemonCLIConfig
) -> Tuple[DaemonConfig, str]:
daemon = config["daemon"]
keys = set(daemon.keys())
) -> Tuple[DaemonConfig, Programs]:
section: ConfigParserSection[str] = ConfigParserSection(config["daemon"])
daemon_config = DaemonConfig.from_configparser(section, daemon_cli_config)
programs = Programs.from_configparser(section)
section.ensure_no_unused_keys()
pidfile = first_not_none(
daemon_cli_config.pidfile, daemon.get("pidfile"), DEFAULT_PIDFILE
)
if pidfile is not None and not pidfile.strip():
pidfile = None
keys.discard("pidfile")
logfile = first_not_none(daemon_cli_config.logfile, daemon.get("logfile"))
keys.discard("logfile")
interval = daemon.getint("interval", fallback=DEFAULT_INTERVAL)
keys.discard("interval")
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host, daemon.get("exporter_listen_host")
)
keys.discard("exporter_listen_host")
hddtemp = daemon.get("hddtemp") or DEFAULT_HDDTEMP
keys.discard("hddtemp")
if keys:
raise RuntimeError("Unknown options in the [daemon] section: %s" % (keys,))
return (
DaemonConfig(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
),
hddtemp,
)
return daemon_config, programs
def _parse_actions(config: configparser.ConfigParser) -> Tuple[str, Actions]:
actions = config["actions"]
keys = set(actions.keys())
section: ConfigParserSection[str] = ConfigParserSection(config["actions"])
report_cmd = section.get("report_cmd", fallback=DEFAULT_REPORT_CMD)
actions = Actions.from_configparser(section)
section.ensure_no_unused_keys()
report_cmd = first_not_none(actions.get("report_cmd"), DEFAULT_REPORT_CMD)
assert report_cmd is not None
keys.discard("report_cmd")
panic = AlertCommands(
enter_cmd=first_not_none(actions.get("panic_enter_cmd")),
leave_cmd=first_not_none(actions.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
threshold = AlertCommands(
enter_cmd=first_not_none(actions.get("threshold_enter_cmd")),
leave_cmd=first_not_none(actions.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
if keys:
raise RuntimeError("Unknown options in the [actions] section: %s" % (keys,))
return report_cmd, Actions(panic=panic, threshold=threshold)
return report_cmd, actions
def _parse_arduino_connections(
config: configparser.ConfigParser,
) -> Mapping[ArduinoName, ArduinoConnection]:
arduino_connections = {} # type: Dict[ArduinoName, ArduinoConnection]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "arduino":
continue
arduino_name = ArduinoName(section_name_parts[1].strip())
arduino = config[section_name]
keys = set(arduino.keys())
serial_url = arduino["serial_url"]
keys.discard("serial_url")
baudrate = arduino.getint("baudrate", fallback=DEFAULT_BAUDRATE)
keys.discard("baudrate")
status_ttl = arduino.getint("status_ttl", fallback=DEFAULT_STATUS_TTL)
keys.discard("status_ttl")
if keys:
arduino_connections: Dict[ArduinoName, ArduinoConnection] = {}
for section in iter_sections(config, "arduino", ArduinoName):
if section.name in arduino_connections:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
"Duplicate arduino section declaration for '%s'" % section.name
)
if arduino_name in arduino_connections:
raise RuntimeError(
"Duplicate arduino section declaration for '%s'" % arduino_name
)
arduino_connections[arduino_name] = ArduinoConnection(
name=arduino_name,
serial_url=serial_url,
baudrate=baudrate,
status_ttl=status_ttl,
)
arduino_connections[section.name] = ArduinoConnection.from_configparser(section)
section.ensure_no_unused_keys()
# Empty arduino_connections is ok
return arduino_connections
@ -328,49 +202,14 @@ def _parse_arduino_connections(
def _parse_filters(
config: configparser.ConfigParser,
) -> Mapping[FilterName, TempFilter]:
filters = {} # type: Dict[FilterName, TempFilter]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "filter":
continue
filter_name = FilterName(section_name_parts[1].strip())
filter = config[section_name]
keys = set(filter.keys())
filter_type = filter["type"]
keys.discard("type")
if filter_type == "moving_median":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
f = MovingMedianFilter(window_size=window_size) # type: TempFilter
elif filter_type == "moving_quantile":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
quantile = filter.getfloat("quantile")
keys.discard("quantile")
f = MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
filters: Dict[FilterName, TempFilter] = {}
for section in iter_sections(config, "filter", FilterName):
if section.name in filters:
raise RuntimeError(
"Unsupported filter type '%s' for filter '%s'. "
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, filter_name)
"Duplicate filter section declaration for '%s'" % section.name
)
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if filter_name in filters:
raise RuntimeError(
"Duplicate filter section declaration for '%s'" % filter_name
)
filters[filter_name] = f
filters[section.name] = afancontrol.filters.from_configparser(section)
section.ensure_no_unused_keys()
# Empty filters is ok
return filters
@ -378,98 +217,19 @@ def _parse_filters(
def _parse_temps(
config: configparser.ConfigParser,
hddtemp: str,
programs: Programs,
filters: Mapping[FilterName, TempFilter],
) -> Tuple[Mapping[TempName, FilteredTemp], Mapping[TempName, Actions]]:
temps = {} # type: Dict[TempName, FilteredTemp]
temp_commands = {} # type: Dict[TempName, Actions]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "temp":
continue
temp_name = TempName(section_name_parts[1].strip())
temp = config[section_name]
keys = set(temp.keys())
actions_panic = AlertCommands(
enter_cmd=first_not_none(temp.get("panic_enter_cmd")),
leave_cmd=first_not_none(temp.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
actions_threshold = AlertCommands(
enter_cmd=first_not_none(temp.get("threshold_enter_cmd")),
leave_cmd=first_not_none(temp.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
panic = TempCelsius(temp.getfloat("panic"))
threshold = TempCelsius(temp.getfloat("threshold"))
min = TempCelsius(temp.getfloat("min"))
max = TempCelsius(temp.getfloat("max"))
keys.discard("panic")
keys.discard("threshold")
keys.discard("min")
keys.discard("max")
type = temp["type"]
keys.discard("type")
if type == "file":
t = FileTemp(
temp["path"], min=min, max=max, panic=panic, threshold=threshold
) # type: Temp
keys.discard("path")
elif type == "hdd":
if min is None or max is None:
raise RuntimeError(
"hdd temp '%s' doesn't define the mandatory `min` and `max` temps"
% temp_name
)
t = HDDTemp(
temp["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=hddtemp,
)
keys.discard("path")
elif type == "exec":
t = CommandTemp(
temp["command"], min=min, max=max, panic=panic, threshold=threshold
)
keys.discard("command")
else:
temps: Dict[TempName, FilteredTemp] = {}
temp_commands: Dict[TempName, Actions] = {}
for section in iter_sections(config, "temp", TempName):
if section.name in temps:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, temp_name)
"Duplicate temp section declaration for '%s'" % section.name
)
filter_name = temp.get("filter")
keys.discard("filter")
if filter_name is None:
filter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if temp_name in temps:
raise RuntimeError(
"Duplicate temp section declaration for '%s'" % temp_name
)
temps[temp_name] = FilteredTemp(temp=t, filter=filter)
temp_commands[temp_name] = Actions(
panic=actions_panic, threshold=actions_threshold
)
temps[section.name] = FilteredTemp.from_configparser(section, filters, programs)
temp_commands[section.name] = Actions.from_configparser(section)
section.ensure_no_unused_keys()
return temps, temp_commands
@ -478,95 +238,14 @@ def _parse_fans(
config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> Mapping[FanName, PWMFanNorm]:
fans = {} # type: Dict[FanName, PWMFanNorm]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "fan":
continue
fan_name = FanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
pwm = PWMDevice(fan["pwm"])
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("pwm")
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = LinuxFanPWMRead(pwm) # type: BaseFanPWMRead
pwm_write = LinuxFanPWMWrite(pwm) # type: BaseFanPWMWrite
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
keys.discard("pwm_pin")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
)
pwm_read = ArduinoFanPWMRead(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
pwm_write = ArduinoFanPWMWrite(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)
never_stop = fan.getboolean("never_stop", fallback=DEFAULT_NEVER_STOP)
keys.discard("never_stop")
pwm_line_start = PWMValue(
fan.getint("pwm_line_start", fallback=DEFAULT_PWM_LINE_START)
)
keys.discard("pwm_line_start")
pwm_line_end = PWMValue(
fan.getint("pwm_line_end", fallback=DEFAULT_PWM_LINE_END)
)
keys.discard("pwm_line_end")
for pwm_value in (pwm_line_start, pwm_line_end):
if not (pwm_read.min_pwm <= pwm_value <= pwm_read.max_pwm):
raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]"
% (pwm_value, fan_name, pwm_read.min_pwm, pwm_read.max_pwm)
)
if pwm_line_start >= pwm_line_end:
fans: Dict[FanName, PWMFanNorm] = {}
for section in iter_sections(config, "fan", FanName):
if section.name in fans:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (fan_name,)
"Duplicate fan section declaration for '%s'" % section.name
)
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if fan_name in fans:
raise RuntimeError("Duplicate fan section declaration for '%s'" % fan_name)
fans[fan_name] = PWMFanNorm(
fan_speed,
pwm_read,
pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
)
fans[section.name] = PWMFanNorm.from_configparser(section, arduino_connections)
section.ensure_no_unused_keys()
return fans
@ -574,75 +253,18 @@ def _parse_fans(
def _parse_readonly_fans(
config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]:
readonly_fans = {} # type: Dict[ReadonlyFanName, ReadonlyPWMFanNorm]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "readonly_fan":
continue
fan_name = ReadonlyFanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = None # type: Optional[BaseFanPWMRead]
if "pwm" in fan:
pwm = PWMDevice(fan["pwm"])
keys.discard("pwm")
pwm_read = LinuxFanPWMRead(pwm)
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
)
pwm_read = None
if "pwm_pin" in fan:
pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
keys.discard("pwm_pin")
pwm_read = ArduinoFanPWMRead(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
elif fan_type == "freeipmi":
name = fan["name"]
keys.discard("name")
ipmi_sensors_extra_args = fan.get("ipmi_sensors_extra_args", fallback="")
fan_speed = FreeIPMIFanSpeed(
name, ipmi_sensors_extra_args=ipmi_sensors_extra_args
)
pwm_read = None
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
if keys:
readonly_fans: Dict[ReadonlyFanName, ReadonlyPWMFanNorm] = {}
for section in iter_sections(config, "readonly_fan", ReadonlyFanName):
if section.name in readonly_fans:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
"Duplicate readonly_fan section declaration for '%s'" % section.name
)
if fan_name in readonly_fans:
raise RuntimeError(
"Duplicate readonly_fan section declaration for '%s'" % fan_name
)
readonly_fans[fan_name] = ReadonlyPWMFanNorm(fan_speed, pwm_read)
readonly_fans[section.name] = ReadonlyPWMFanNorm.from_configparser(
section, arduino_connections, programs
)
section.ensure_no_unused_keys()
return readonly_fans
@ -665,45 +287,35 @@ def _parse_mappings(
temps: Mapping[TempName, FilteredTemp],
) -> Mapping[MappingName, FansTempsRelation]:
mappings = {} # type: Dict[MappingName, FansTempsRelation]
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].lower() != "mapping":
continue
mapping_name = MappingName(section_name_parts[1])
mapping = config[section_name]
keys = set(mapping.keys())
mappings: Dict[MappingName, FansTempsRelation] = {}
for section in iter_sections(config, "mapping", MappingName):
# temps:
mapping_temps = [
TempName(temp_name.strip()) for temp_name in mapping["temps"].split(",")
TempName(temp_name.strip()) for temp_name in section["temps"].split(",")
]
mapping_temps = [s for s in mapping_temps if s]
keys.discard("temps")
if not mapping_temps:
raise RuntimeError(
"Temps must not be empty in the '%s' mapping" % mapping_name
"Temps must not be empty in the '%s' mapping" % section.name
)
for temp_name in mapping_temps:
if temp_name not in temps:
raise RuntimeError(
"Unknown temp '%s' in mapping '%s'" % (temp_name, mapping_name)
"Unknown temp '%s' in mapping '%s'" % (temp_name, section.name)
)
if len(mapping_temps) != len(set(mapping_temps)):
raise RuntimeError(
"There are duplicate temps in mapping '%s'" % mapping_name
"There are duplicate temps in mapping '%s'" % section.name
)
# fans:
fans_with_speed = [
fan_with_speed.strip() for fan_with_speed in mapping["fans"].split(",")
fan_with_speed.strip() for fan_with_speed in section["fans"].split(",")
]
fans_with_speed = [s for s in fans_with_speed if s]
keys.discard("fans")
fan_speed_pairs = [
fan_with_speed.split("*") for fan_with_speed in fans_with_speed
@ -712,7 +324,7 @@ def _parse_mappings(
if len(fan_speed_pair) not in (1, 2):
raise RuntimeError(
"Invalid fan specification '%s' in mapping '%s'"
% (fan_speed_pair, mapping_name)
% (fan_speed_pair, section.name)
)
mapping_fans = [
FanSpeedModifier(
@ -729,7 +341,7 @@ def _parse_mappings(
if fan_speed_modifier.fan not in fans:
raise RuntimeError(
"Unknown fan '%s' in mapping '%s'"
% (fan_speed_modifier.fan, mapping_name)
% (fan_speed_modifier.fan, section.name)
)
if not (0 < fan_speed_modifier.modifier <= 1.0):
raise RuntimeError(
@ -737,7 +349,7 @@ def _parse_mappings(
"the allowed range is (0.0;1.0]."
% (
fan_speed_modifier.modifier,
mapping_name,
section.name,
fan_speed_modifier.fan,
)
)
@ -745,21 +357,17 @@ def _parse_mappings(
set(fan_speed_modifier.fan for fan_speed_modifier in mapping_fans)
):
raise RuntimeError(
"There are duplicate fans in mapping '%s'" % mapping_name
"There are duplicate fans in mapping '%s'" % section.name
)
if keys:
if section.name in mappings:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
"Duplicate mapping section declaration for '%s'" % section.name
)
if mapping_name in fans:
raise RuntimeError(
"Duplicate mapping section declaration for '%s'" % mapping_name
)
mappings[mapping_name] = FansTempsRelation(
mappings[section.name] = FansTempsRelation(
temps=mapping_temps, fans=mapping_fans
)
section.ensure_no_unused_keys()
unused_temps = set(temps.keys())
unused_fans = set(fans.keys())

View File

@ -0,0 +1,129 @@
import configparser
from typing import Any, Generic, Iterator, Optional, Type, TypeVar, Union, overload
T = TypeVar("T", bound=str)
F = TypeVar("F", None, Any)
_UNSET = object()
def iter_sections(
config: configparser.ConfigParser, section_type: str, name_typevar: Type[T]
) -> Iterator["ConfigParserSection[T]"]:
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != section_type:
continue
name = name_typevar(section_name_parts[1].strip())
section = ConfigParserSection(config[section_name], name)
yield section
class ConfigParserSection(Generic[T]):
def __init__(
self, section: configparser.SectionProxy, name: Optional[T] = None
) -> None:
self.__name = name
self.__section = section
self.__unused_keys = set(section.keys())
@property
def name(self) -> T:
assert self.__name is not None
return self.__name
def ensure_no_unused_keys(self) -> None:
if self.__unused_keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s"
% (self.__section.name, self.__unused_keys)
)
def __contains__(self, key):
return self.__section.__contains__(key)
def __getitem__(self, key):
self.__unused_keys.discard(key)
return self.__section.__getitem__(key)
@overload
def get(self, option: str) -> str:
...
@overload
def get(self, option: str, *, fallback: F) -> Union[str, F]:
...
def get(self, option: str, *, fallback=_UNSET) -> Union[str, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.get(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getint(self, option: str) -> int:
...
@overload
def getint(self, option: str, *, fallback: F) -> Union[int, F]:
...
def getint(self, option: str, *, fallback=_UNSET) -> Union[int, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getint(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getfloat(self, option: str) -> float:
...
@overload
def getfloat(self, option: str, *, fallback: F) -> Union[float, F]:
...
def getfloat(self, option: str, *, fallback=_UNSET) -> Union[float, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getfloat(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getboolean(self, option: str) -> bool:
...
@overload
def getboolean(self, option: str, *, fallback: F) -> Union[bool, F]:
...
def getboolean(self, option: str, *, fallback=_UNSET) -> Union[bool, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getboolean(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res

View File

@ -66,9 +66,7 @@ def daemon(
parsed_config = parse_config(config_path, daemon_cli_config)
if parsed_config.daemon.exporter_listen_host:
metrics = PrometheusMetrics(
parsed_config.daemon.exporter_listen_host
) # type: Metrics
metrics: Metrics = PrometheusMetrics(parsed_config.daemon.exporter_listen_host)
else:
metrics = NullMetrics()
@ -83,7 +81,7 @@ def daemon(
metrics=metrics,
)
pidfile_instance = None # type: Optional[PidFile]
pidfile_instance: Optional[PidFile] = None
if parsed_config.daemon.pidfile is not None:
pidfile_instance = PidFile(parsed_config.daemon.pidfile)

View File

@ -1,8 +1,22 @@
import subprocess
from typing import NamedTuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger
class Programs(NamedTuple):
hddtemp: str
ipmi_sensors: str
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> "Programs":
return cls(
hddtemp=section.get("hddtemp", fallback="hddtemp"),
ipmi_sensors=section.get("ipmi_sensors", fallback="ipmi-sensors"),
)
def exec_shell_command(shell_command: str, timeout: int = 5) -> str:
try:
p = subprocess.run(

View File

@ -2,8 +2,8 @@ import itertools
from contextlib import ExitStack
from typing import Iterator, Mapping, MutableSet, Optional, Tuple, Union, cast
from afancontrol.config import AnyFanName, FanName, ReadonlyFanName
from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, PWMValueNorm, ReadonlyPWMFanNorm
from afancontrol.report import Report
@ -19,13 +19,13 @@ class Fans:
self.fans = fans
self.readonly_fans = readonly_fans
self.report = report
self._stack = None # type: Optional[ExitStack]
self._stack: Optional[ExitStack] = None
# Set of fans marked as failing (which speed is 0)
self._failed_fans = set() # type: MutableSet[AnyFanName]
self._failed_fans: MutableSet[AnyFanName] = set()
# Set of fans that will be skipped on speed check
self._stopped_fans = set() # type: MutableSet[AnyFanName]
self._stopped_fans: MutableSet[AnyFanName] = set()
def is_fan_failing(self, fan_name: AnyFanName) -> bool:
return fan_name in self._failed_fans

View File

@ -1,7 +1,7 @@
import abc
import sys
from time import sleep
from typing import NamedTuple, Optional
from typing import Optional
import click
@ -15,9 +15,6 @@ from afancontrol.pwmfan import (
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice,
FanValue,
LinuxFanPWMRead,
@ -25,6 +22,7 @@ from afancontrol.pwmfan import (
LinuxFanSpeed,
PWMDevice,
PWMValue,
ReadWriteFan,
)
# Time to wait before measuring fan speed after setting a PWM value.
@ -74,15 +72,6 @@ HELP_PWM_STEP_SIZE = (
"faster."
)
Fan = NamedTuple(
"Fan",
[
("fan_speed", BaseFanSpeed),
("pwm_read", BaseFanPWMRead),
("pwm_write", BaseFanPWMWrite),
],
)
@click.command()
@click.option(
@ -156,25 +145,24 @@ def fantest(
) -> None:
"""The PWM fan testing program.
This program tests how changing the PWM value of a fan affects its speed.
This program tests how changing the PWM value of a fan affects its speed.
In the beginning the fan would be stopped (by setting it to a minimum PWM value),
and then the PWM value would be increased in small steps, while also
measuring the speed as reported by the fan.
In the beginning the fan would be stopped (by setting it to a minimum PWM value),
and then the PWM value would be increased in small steps, while also
measuring the speed as reported by the fan.
This data would help you to find the effective range of values
for the `pwm_line_start` and `pwm_line_end` settings where the correlation
between PWM and fan speed is close to linear. Usually its
`pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual
for each fan. The allowed range for a PWM value is from 0 to 255.
This data would help you to find the effective range of values
for the `pwm_line_start` and `pwm_line_end` settings where the correlation
between PWM and fan speed is close to linear. Usually its
`pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual
for each fan. The allowed range for a PWM value is from 0 to 255.
Note that the fan would be stopped for some time during the test. If you'll
feel nervous, press Ctrl+C to stop the test and return the fan to full speed.
Note that the fan would be stopped for some time during the test. If you'll
feel nervous, press Ctrl+C to stop the test and return the fan to full speed.
Before starting the test ensure that no fan control software is currently
controlling the fan you're going to test.
"""
Before starting the test ensure that no fan control software is currently
controlling the fan you're going to test.
"""
try:
if fan_type == "linux":
if not linux_fan_pwm:
@ -191,7 +179,7 @@ controlling the fan you're going to test.
assert linux_fan_pwm is not None
assert linux_fan_input is not None
fan = Fan(
fan = ReadWriteFan(
fan_speed=LinuxFanSpeed(FanInputDevice(linux_fan_input)),
pwm_read=LinuxFanPWMRead(PWMDevice(linux_fan_pwm)),
pwm_write=LinuxFanPWMWrite(PWMDevice(linux_fan_pwm)),
@ -230,7 +218,7 @@ controlling the fan you're going to test.
)
assert arduino_pwm_pin is not None
assert arduino_tacho_pin is not None
fan = Fan(
fan = ReadWriteFan(
fan_speed=ArduinoFanSpeed(
arduino_connection, tacho_pin=ArduinoPin(arduino_tacho_pin)
),
@ -268,7 +256,7 @@ controlling the fan you're going to test.
def run_fantest(
fan: Fan, pwm_step_size: PWMValue, output: "MeasurementsOutput"
fan: ReadWriteFan, pwm_step_size: PWMValue, output: "MeasurementsOutput"
) -> None:
with fan.fan_speed, fan.pwm_read, fan.pwm_write:
start = fan.pwm_read.min_pwm

View File

@ -1,10 +1,32 @@
import abc
import collections
from typing import Deque, Optional, TypeVar
from typing import TYPE_CHECKING, Deque, NewType, Optional, TypeVar
from afancontrol.temp import TempStatus
from afancontrol.configparser import ConfigParserSection
if TYPE_CHECKING:
from afancontrol.temp import TempStatus
T = TypeVar("T")
FilterName = NewType("FilterName", str)
def from_configparser(section: ConfigParserSection[FilterName]) -> "TempFilter":
filter_type = section["type"]
if filter_type == "moving_median":
window_size = section.getint("window_size", fallback=3)
return MovingMedianFilter(window_size=window_size)
elif filter_type == "moving_quantile":
window_size = section.getint("window_size", fallback=3)
quantile = section.getfloat("quantile")
return MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
raise RuntimeError(
"Unsupported filter type '%s' for filter '%s'. "
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, section.name)
)
class TempFilter(abc.ABC):
@ -13,7 +35,7 @@ class TempFilter(abc.ABC):
pass
@abc.abstractmethod
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
pass
def __enter__(self): # reusable
@ -27,7 +49,7 @@ class NullFilter(TempFilter):
def copy(self: T) -> T:
return type(self)()
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
return status
def __eq__(self, other):
@ -43,7 +65,7 @@ class NullFilter(TempFilter):
return "%s()" % (type(self).__name__,)
def _temp_status_sorting_key(status: Optional[TempStatus]) -> float:
def _temp_status_sorting_key(status: Optional["TempStatus"]) -> float:
if status is None:
return float("+inf")
return status.temp
@ -53,14 +75,14 @@ class MovingQuantileFilter(TempFilter):
def __init__(self, quantile: float, *, window_size: int) -> None:
self.quantile = quantile
self.window_size = window_size
self.history = None # type: Optional[Deque[Optional[TempStatus]]]
self.history: Optional[Deque[Optional["TempStatus"]]] = None
def copy(self: T) -> T:
return type(self)( # type: ignore
quantile=self.quantile, window_size=self.window_size # type: ignore
)
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
assert self.history is not None
self.history.append(status)

View File

@ -41,7 +41,7 @@ class Manager:
self.mappings = mappings
self.triggers = Triggers(triggers_config, report)
self.metrics = metrics
self._stack = None # type: Optional[ExitStack]
self._stack: Optional[ExitStack] = None
def __enter__(self): # reusable
self._stack = ExitStack()
@ -88,9 +88,7 @@ class Manager:
for temp_name, temp_status in temps.items()
}
fan_speeds = defaultdict(
lambda: PWMValueNorm(0.0)
) # type: Dict[FanName, PWMValueNorm]
fan_speeds: Dict[FanName, PWMValueNorm] = defaultdict(lambda: PWMValueNorm(0.0))
for mapping_name, relation in self.mappings.items():
mapping_speed = max(temp_speeds[temp_name] for temp_name in relation.temps)

View File

@ -1,23 +1,20 @@
import abc
import contextlib
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import HTTPServer
from socketserver import ThreadingMixIn
from timeit import default_timer
from typing import TYPE_CHECKING, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse
from typing import ContextManager, Mapping, Optional, Union
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.config import AnyFanName, FanName, ReadonlyFanName, TempName
from afancontrol.config import TempName
from afancontrol.fans import Fans
from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temps import ObservedTempStatus
from afancontrol.trigger import Triggers
if TYPE_CHECKING:
from typing import ContextManager # Added in 3.6
try:
import prometheus_client as prom
@ -46,7 +43,7 @@ class Metrics(abc.ABC):
pass
@abc.abstractmethod
def measure_tick(self) -> "ContextManager[None]":
def measure_tick(self) -> ContextManager[None]:
pass
@ -66,7 +63,7 @@ class NullMetrics(Metrics):
) -> None:
pass
def measure_tick(self) -> "ContextManager[None]":
def measure_tick(self) -> ContextManager[None]:
@contextlib.contextmanager
def null_context_manager():
yield
@ -85,7 +82,7 @@ class PrometheusMetrics(Metrics):
self._listen_addr, port_str = listen_host.rsplit(":", 1)
self._listen_port = int(port_str)
self._http_server = None # type: Optional[HTTPServer]
self._http_server: Optional[HTTPServer] = None
self._last_metrics_collect_clock = float("nan")
@ -255,7 +252,7 @@ class PrometheusMetrics(Metrics):
def _start(self):
# `prometheus_client.start_http_server` which persists a server reference
# so it could be stopped later.
CustomMetricsHandler = MetricsHandler.factory(self.registry)
CustomMetricsHandler = prom.MetricsHandler.factory(self.registry)
httpd = _ThreadingSimpleServer(
(self._listen_addr, self._listen_port), CustomMetricsHandler
)
@ -335,7 +332,7 @@ class PrometheusMetrics(Metrics):
self._last_metrics_collect_clock = self._clock()
def measure_tick(self) -> "ContextManager[None]":
def measure_tick(self) -> ContextManager[None]:
return self.tick_duration.time()
def _collect_fan_metrics(
@ -393,40 +390,3 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True
# `MetricsHandler` of `prometheus_client==0.0.18` doesn't support exposing
# a custom registry. This is backported below:
if prometheus_available:
if hasattr(prom.MetricsHandler, "factory"):
MetricsHandler = prom.MetricsHandler
else:
from prometheus_client.exposition import generate_latest, CONTENT_TYPE_LATEST
class MetricsHandler(BaseHTTPRequestHandler): # type: ignore
# https://github.com/prometheus/client_python/blob/31f5557e2e84ca4ffa9a03abf6e3f4d0c8b8c3eb/prometheus_client/exposition.py#L141-L177 # noqa
registry = prom.REGISTRY
def do_GET(self):
registry = self.registry
params = parse_qs(urlparse(self.path).query)
if "name[]" in params:
registry = registry.restricted_registry(params["name[]"])
try:
output = generate_latest(registry)
except Exception:
self.send_error(500, "error generating metric output")
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
def log_message(self, format, *args):
"""Log nothing."""
@classmethod
def factory(cls, registry):
cls_name = str(cls.__name__)
MyMetricsHandler = type(cls_name, (cls, object), {"registry": registry})
return MyMetricsHandler

View File

@ -1,3 +1,8 @@
from typing import Mapping, NamedTuple, NewType, Optional, Union
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan.arduino import (
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
@ -35,3 +40,92 @@ __all__ = (
"PWMDevice",
"PWMValue",
)
DEFAULT_FAN_TYPE = "linux"
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
class ReadOnlyFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: Optional[BaseFanPWMRead]
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadOnlyFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=(
LinuxFanPWMRead.from_configparser(section)
if "pwm" in section
else None
),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=(
ArduinoFanPWMRead.from_configparser(section, arduino_connections)
if "pwm_pin" in section
else None
),
)
elif fan_type == "freeipmi":
return cls(
fan_speed=FreeIPMIFanSpeed.from_configparser(section, programs),
pwm_read=None,
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
class ReadWriteFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: BaseFanPWMRead
pwm_write: BaseFanPWMWrite
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "ReadWriteFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=LinuxFanPWMRead.from_configparser(section),
pwm_write=LinuxFanPWMWrite.from_configparser(section),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=ArduinoFanPWMRead.from_configparser(
section, arduino_connections
),
pwm_write=ArduinoFanPWMWrite.from_configparser(
section, arduino_connections
),
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)

View File

@ -1,4 +1,7 @@
from afancontrol.arduino import ArduinoConnection, ArduinoPin
from typing import Mapping
from afancontrol.arduino import ArduinoConnection, ArduinoName, ArduinoPin
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import (
BaseFanPWMRead,
BaseFanPWMWrite,
@ -8,6 +11,14 @@ from afancontrol.pwmfan.base import (
)
def _ensure_arduino_connection(
arduino_name: ArduinoName,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> None:
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
class ArduinoFanSpeed(BaseFanSpeed):
__slots__ = "_conn", "_tacho_pin"
@ -17,6 +28,17 @@ class ArduinoFanSpeed(BaseFanSpeed):
self._conn = arduino_connection
self._tacho_pin = tacho_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanSpeed:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
tacho_pin = ArduinoPin(section.getint("tacho_pin"))
return cls(arduino_connections[arduino_name], tacho_pin=tacho_pin)
def get_speed(self) -> FanValue:
return FanValue(self._conn.get_rpm(self._tacho_pin))
@ -35,11 +57,25 @@ class ArduinoFanPWMRead(BaseFanPWMRead):
min_pwm = PWMValue(0)
def __init__(
self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin
self,
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None:
self._conn = arduino_connection
self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMRead:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def get(self) -> PWMValue:
return PWMValue(int(self._conn.get_pwm(self._pwm_pin)))
@ -57,11 +93,25 @@ class ArduinoFanPWMWrite(BaseFanPWMWrite):
read_cls = ArduinoFanPWMRead
def __init__(
self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin
self,
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None:
self._conn = arduino_connection
self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMWrite:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def _set_raw(self, pwm: PWMValue) -> None:
self._conn.set_pwm(self._pwm_pin, pwm)

View File

@ -40,8 +40,8 @@ class BaseFanSpeed(abc.ABC, _SlotsReprMixin):
class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
max_pwm = None # type: PWMValue
min_pwm = None # type: PWMValue
max_pwm: PWMValue
min_pwm: PWMValue
def is_stopped(self) -> bool:
return type(self).is_pwm_stopped(self.get())
@ -62,7 +62,7 @@ class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
class BaseFanPWMWrite(abc.ABC, _SlotsReprMixin):
read_cls = None # type: Type[BaseFanPWMRead]
read_cls: Type[BaseFanPWMRead]
def set(self, pwm: PWMValue) -> None:
if not (self.read_cls.min_pwm <= pwm <= self.read_cls.max_pwm):

View File

@ -1,7 +1,8 @@
import csv
import io
from afancontrol.exec import exec_shell_command
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.pwmfan.base import BaseFanSpeed, FanValue
# TODO maybe switch to `python3-pyghmi`? although it looks like the current version
@ -18,6 +19,16 @@ class FreeIPMIFanSpeed(BaseFanSpeed):
self._ipmi_sensors_bin = ipmi_sensors_bin
self._ipmi_sensors_extra_args = ipmi_sensors_extra_args
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> BaseFanSpeed:
return cls(
section["name"],
ipmi_sensors_bin=programs.ipmi_sensors,
ipmi_sensors_extra_args=section.get("ipmi_sensors_extra_args", fallback=""),
)
def get_speed(self) -> FanValue:
out = self._call_ipmi_sensors()
reader = csv.DictReader(io.StringIO(out))

View File

@ -1,6 +1,7 @@
from pathlib import Path
from typing import NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import (
BaseFanPWMRead,
BaseFanPWMWrite,
@ -19,6 +20,10 @@ class LinuxFanSpeed(BaseFanSpeed):
def __init__(self, fan_input: FanInputDevice) -> None:
self._fan_input = Path(fan_input)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanSpeed:
return cls(FanInputDevice(section["fan_input"]))
def get_speed(self) -> FanValue:
return FanValue(int(self._fan_input.read_text()))
@ -32,6 +37,10 @@ class LinuxFanPWMRead(BaseFanPWMRead):
def __init__(self, pwm: PWMDevice) -> None:
self._pwm = Path(pwm)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMRead:
return cls(PWMDevice(section["pwm"]))
def get(self) -> PWMValue:
return PWMValue(int(self._pwm.read_text()))
@ -45,6 +54,10 @@ class LinuxFanPWMWrite(BaseFanPWMWrite):
self._pwm = Path(pwm)
self._pwm_enable = Path(pwm + "_enable")
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMWrite:
return cls(PWMDevice(section["pwm"]))
def _set_raw(self, pwm: PWMValue) -> None:
self._pwm.write_text(str(int(pwm)))

View File

@ -1,13 +1,20 @@
import math
from contextlib import ExitStack
from typing import NewType, Optional
from typing import Mapping, NewType, Optional
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan import (
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanName,
FanValue,
PWMValue,
ReadOnlyFan,
ReadonlyFanName,
ReadWriteFan,
)
PWMValueNorm = NewType("PWMValueNorm", float) # [0..1]
@ -19,7 +26,19 @@ class ReadonlyPWMFanNorm:
) -> None:
self.fan_speed = fan_speed
self.pwm_read = pwm_read
self._stack = None # type: Optional[ExitStack]
self._stack: Optional[ExitStack] = None
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadonlyPWMFanNorm":
readonly_fan = ReadOnlyFan.from_configparser(
section, arduino_connections, programs
)
return cls(readonly_fan.fan_speed, readonly_fan.pwm_read)
def __enter__(self):
self._stack = ExitStack()
@ -98,7 +117,48 @@ class PWMFanNorm:
"Invalid pwm_line_end. Expected: pwm_line_end <= max_pwm. "
"Got: %s <= %s" % (self.pwm_line_end, type(self.pwm_read).max_pwm)
)
self._stack = None # type: Optional[ExitStack]
self._stack: Optional[ExitStack] = None
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "PWMFanNorm":
readwrite_fan = ReadWriteFan.from_configparser(section, arduino_connections)
never_stop = section.getboolean("never_stop", fallback=True)
pwm_line_start = PWMValue(section.getint("pwm_line_start", fallback=100))
pwm_line_end = PWMValue(section.getint("pwm_line_end", fallback=240))
for pwm_value in (pwm_line_start, pwm_line_end):
if not (
readwrite_fan.pwm_read.min_pwm
<= pwm_value
<= readwrite_fan.pwm_read.max_pwm
):
raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]"
% (
pwm_value,
section.name,
readwrite_fan.pwm_read.min_pwm,
readwrite_fan.pwm_read.max_pwm,
)
)
if pwm_line_start >= pwm_line_end:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (section.name,)
)
return cls(
readwrite_fan.fan_speed,
readwrite_fan.pwm_read,
readwrite_fan.pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
)
def __eq__(self, other):
if isinstance(other, type(self)):

View File

@ -1,3 +1,8 @@
from typing import Mapping, NamedTuple, NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.filters import FilterName, NullFilter, TempFilter
from afancontrol.temp.base import Temp, TempCelsius, TempStatus
from afancontrol.temp.command import CommandTemp
from afancontrol.temp.file import FileTemp
@ -11,3 +16,40 @@ __all__ = (
"TempCelsius",
"TempStatus",
)
TempName = NewType("TempName", str)
class FilteredTemp(NamedTuple):
temp: Temp
filter: TempFilter
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[TempName],
filters: Mapping[FilterName, TempFilter],
programs: Programs,
) -> "FilteredTemp":
type = section["type"]
if type == "file":
temp: Temp = FileTemp.from_configparser(section)
elif type == "hdd":
temp = HDDTemp.from_configparser(section, programs)
elif type == "exec":
temp = CommandTemp.from_configparser(section)
else:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, section.name)
)
filter_name = section.get("filter", fallback=None)
if filter_name is None:
filter: TempFilter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
return cls(temp=temp, filter=filter)

View File

@ -3,18 +3,15 @@ from typing import NamedTuple, NewType, Optional, Tuple
TempCelsius = NewType("TempCelsius", float)
TempStatus = NamedTuple(
"TempStatus",
[
("temp", TempCelsius),
("min", TempCelsius),
("max", TempCelsius),
("panic", Optional[TempCelsius]),
("threshold", Optional[TempCelsius]),
("is_panic", bool),
("is_threshold", bool),
],
)
class TempStatus(NamedTuple):
temp: TempCelsius
min: TempCelsius
max: TempCelsius
panic: Optional[TempCelsius]
threshold: Optional[TempCelsius]
is_panic: bool
is_threshold: bool
class Temp(abc.ABC):

View File

@ -1,5 +1,6 @@
from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius
@ -19,6 +20,16 @@ class CommandTemp(Temp):
self._min = min
self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(
section["command"], min=min, max=max, panic=panic, threshold=threshold
)
def __eq__(self, other):
if isinstance(other, type(self)):
return (

View File

@ -3,6 +3,7 @@ import re
from pathlib import Path
from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.temp.base import Temp, TempCelsius
@ -41,6 +42,14 @@ class FileTemp(Temp):
self._min = min
self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(section["path"], min=min, max=max, panic=panic, threshold=threshold)
def __eq__(self, other):
if isinstance(other, type(self)):
return (

View File

@ -1,6 +1,7 @@
from typing import Optional, Tuple
from afancontrol.exec import exec_shell_command
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius
@ -32,6 +33,23 @@ class HDDTemp(Temp):
self._max = max
self._hddtemp_bin = hddtemp_bin
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min"))
max = TempCelsius(section.getfloat("max"))
return cls(
section["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=programs.hddtemp,
)
def __eq__(self, other):
if isinstance(other, type(self)):
return (

View File

@ -7,10 +7,10 @@ from afancontrol.filters import TempFilter
from afancontrol.logger import logger
from afancontrol.temp import Temp, TempStatus
ObservedTempStatus = NamedTuple(
"ObservedTempStatus",
[("raw", Optional[TempStatus]), ("filtered", Optional[TempStatus])],
)
class ObservedTempStatus(NamedTuple):
raw: Optional[TempStatus]
filtered: Optional[TempStatus]
def filtered_temps(
@ -25,8 +25,8 @@ def filtered_temps(
class Temps:
def __init__(self, temps: Mapping[TempName, FilteredTemp]) -> None:
self.temps = temps
self._stack = None # type: Optional[ExitStack]
self._executor = None # type: Optional[concurrent.futures.Executor]
self._stack: Optional[ExitStack] = None
self._executor: Optional[concurrent.futures.Executor] = None
def __enter__(self): # reusable
self._stack = ExitStack()
@ -64,7 +64,7 @@ def _get_temp_status(
name: TempName, temp: Temp, filter: TempFilter
) -> ObservedTempStatus:
try:
sensor_value = temp.get() # type: Optional[TempStatus]
sensor_value: Optional[TempStatus] = temp.get()
except Exception as e:
sensor_value = None
logger.warning("Temp sensor [%s] has failed: %s", name, e, exc_info=True)

View File

@ -20,7 +20,7 @@ class Trigger(abc.ABC):
self.global_commands = global_commands
self.temp_commands = temp_commands
self.report = report
self._alerting_temps = set() # type: Set[TempName]
self._alerting_temps: Set[TempName] = set()
@property
@abc.abstractmethod
@ -184,7 +184,7 @@ class Triggers:
},
report=report,
)
self._stack = None # type: Optional[ExitStack]
self._stack: Optional[ExitStack] = None
def __enter__(self): # reusable
self._stack = ExitStack()

View File

@ -430,3 +430,69 @@ fan_input = /sys/class/hwmon/hwmon0/device/fan1_input
},
mappings={},
)
def test_multiline_mapping():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp:cpu]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
[temp:mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp2_input
[fan: case]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[fan: hdd]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[mapping:1]
fans =
case*0.6,
hdd,
temps =
mobo,
cpu
"""
parsed = parse_config(path_from_str(config), daemon_cli_config)
assert parsed.mappings == {
MappingName("1"): FansTempsRelation(
temps=[TempName("mobo"), TempName("cpu")],
fans=[
FanSpeedModifier(fan=FanName("case"), modifier=0.6),
FanSpeedModifier(fan=FanName("hdd"), modifier=1.0),
],
)
}
def test_extraneous_keys_raises():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp: mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
aa = 55
"""
with pytest.raises(RuntimeError) as cm:
parse_config(path_from_str(config), daemon_cli_config)
assert str(cm.value) == "Unknown options in the [temp: mobo] section: {'aa'}"

View File

@ -8,7 +8,6 @@ from click.testing import CliRunner
from afancontrol import fantest
from afancontrol.fantest import (
CSVMeasurementsOutput,
Fan,
HumanMeasurementsOutput,
MeasurementsOutput,
fantest as main,
@ -24,6 +23,7 @@ from afancontrol.pwmfan import (
LinuxFanSpeed,
PWMDevice,
PWMValue,
ReadWriteFan,
)
@ -65,7 +65,7 @@ def test_main_smoke(temp_path):
args, kwargs = mocked_fantest.call_args
assert not args
assert kwargs.keys() == {"fan", "pwm_step_size", "output"}
assert kwargs["fan"] == Fan(
assert kwargs["fan"] == ReadWriteFan(
fan_speed=LinuxFanSpeed(FanInputDevice(str(fan_input_path))),
pwm_read=LinuxFanPWMRead(PWMDevice(str(pwm_path))),
pwm_write=LinuxFanPWMWrite(PWMDevice(str(pwm_path))),
@ -77,11 +77,11 @@ def test_main_smoke(temp_path):
@pytest.mark.parametrize("pwm_step_size", [5, -5])
@pytest.mark.parametrize("output_cls", [HumanMeasurementsOutput, CSVMeasurementsOutput])
def test_fantest(output_cls: Type[MeasurementsOutput], pwm_step_size: PWMValue):
fan = Fan(
fan: Any = ReadWriteFan(
fan_speed=MagicMock(spec=BaseFanSpeed),
pwm_read=MagicMock(spec=BaseFanPWMRead),
pwm_write=MagicMock(spec=BaseFanPWMWrite),
) # type: Any
)
fan.pwm_read.min_pwm = 0
fan.pwm_read.max_pwm = 255
output = output_cls()

View File

@ -1,4 +1,5 @@
from contextlib import ExitStack
from typing import cast
from unittest.mock import MagicMock, patch, sentinel
import pytest
@ -68,7 +69,7 @@ def test_manager(report):
manager.tick()
mocked_triggers = manager.triggers # type: MagicMock
mocked_triggers = cast(MagicMock, manager.triggers)
assert mocked_triggers.check.call_count == 1
assert mocked_case_fan.__enter__.call_count == 1
assert mocked_metrics.__enter__.call_count == 1

View File

@ -1,11 +1,7 @@
[tox]
envlist=py{35,36,37,38,39}{,-arduino,-metrics},lint,check-docs
envlist=py{36,37,38,39,310}{,-arduino,-metrics},lint,check-docs
[testenv]
deps =
; Python 3.5 still ships an old setuptools version which doesn't support
; declarative setup.cfg format.
setuptools>=41.4.0
extras =
arduino: arduino
dev