Compare commits

..

2 Commits

Author SHA1 Message Date
Mario Fetka
fdfbc23d0f build for stretch and jessie 2021-10-26 14:14:08 +02:00
Mario Fetka
1d2c07ae63 Imported Upstream version 2.2.1 2021-10-26 14:11:39 +02:00
39 changed files with 720 additions and 875 deletions

View File

@ -2,10 +2,10 @@ language: python
dist: xenial dist: xenial
python: python:
- "3.5"
- "3.6" - "3.6"
- "3.7" - "3.7"
- "3.8" - "3.8"
# TODO add 3.9
- "3.9-dev" - "3.9-dev"
install: pip install tox-travis tox tox-venv install: pip install tox-travis tox tox-venv

View File

@ -1,43 +0,0 @@
# Contributing to afancontrol
I started afancontrol in 2013 in an attempt to make my custom PC case quiet.
It's been working 24/7 ever since with no issues, and eventually I started using
it on my other machines as well.
I'm quite happy with how this package serves my needs, and I hope
it can be useful for someone else too.
Contributions are welcome, however, keep in mind, that:
* Complex features and large diffs would probably be rejected,
because it would make maintenance more complicated for me,
* I don't have any plans for active development and promotion
of the package.
## Dev workflow
Prepare a virtualenv:
mkvirtualenv afancontrol
make develop
I use [TDD](https://en.wikipedia.org/wiki/Test-driven_development) for development.
Run tests:
make test
Autoformat the code and imports:
make format
Run linters:
make lint
So essentially after writing a small part of code and tests I call these
three commands and fix the errors until they stop failing.
To build docs:
make docs

View File

@ -14,14 +14,14 @@ RUN apt-get update \
RUN mkdir ~/.gnupg && echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf RUN mkdir ~/.gnupg && echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf
# Import the GPG key used to sign the PyPI releases of `afancontrol`: # Import the GPG key used to sign the PyPI releases of `afancontrol`:
RUN gpg --recv-keys "AA7B5406547AF062" RUN gpg --recv-keys "2D3B9C1712FF84F7"
COPY debian /build/afancontrol/debian COPY debian /build/afancontrol/debian
WORKDIR /build/afancontrol/ WORKDIR /build/afancontrol/
RUN mkdir -p debian/upstream \ RUN mkdir -p debian/upstream \
&& gpg --export --export-options export-minimal --armor \ && gpg --export --export-options export-minimal --armor \
'A18FE9F6F570D5B4E1E1853FAA7B5406547AF062' \ 'BE3D633AB6792715ECF34D742D3B9C1712FF84F7' \
> debian/upstream/signing-key.asc > debian/upstream/signing-key.asc
RUN apt-get -y build-dep . RUN apt-get -y build-dep .

View File

@ -1,11 +1,11 @@
.PHONY: format .PHONY: format
format: format:
black src tests *.py && isort src tests *.py black src tests *.py && isort -rc src tests *.py
.PHONY: lint .PHONY: lint
lint: lint:
flake8 src tests *.py && isort --check-only src tests *.py && black --check src tests *.py && mypy src tests flake8 src tests *.py && isort --check-only -rc src tests *.py && black --check src tests *.py && mypy src tests
.PHONY: test .PHONY: test
test: test:

View File

@ -17,7 +17,7 @@ afancontrol
`fancontrol <https://github.com/lm-sensors/lm-sensors/blob/master/prog/pwm/fancontrol>`_ `fancontrol <https://github.com/lm-sensors/lm-sensors/blob/master/prog/pwm/fancontrol>`_
with more advanced configuration abilities. with more advanced configuration abilities.
`afancontrol` measures temperatures from sensors, computes the required `afancontrol` measures temperature from the sensors, computes the required
airflow and sets PWM fan speeds accordingly. airflow and sets the PWM fan speeds accordingly.
The docs are available at `<https://afancontrol.readthedocs.io/>`_. The docs are available at `<https://afancontrol.readthedocs.io/>`_.

11
debian/changelog vendored
View File

@ -1,14 +1,3 @@
afancontrol (3.0.0-1) unstable; urgency=medium
* Drop support for prometheus-client < 0.1.0 (debian stretch)
* Drop support for Python 3.5 (debian stretch)
* Add support for Python 3.9
* config: add `ipmi_sensors` location property
* Add dh-systemd (would automatically (re)start the systemd service upon
package (re)installation)
-- Kostya Esmukov <kostya@esmukov.ru> Sat, 10 Oct 2020 14:43:01 +0000
afancontrol (2.2.1-1) unstable; urgency=medium afancontrol (2.2.1-1) unstable; urgency=medium
* Fix compatibility with py3.5 * Fix compatibility with py3.5

5
debian/control vendored
View File

@ -4,13 +4,12 @@ Priority: optional
Maintainer: Kostya Esmukov <kostya@esmukov.ru> Maintainer: Kostya Esmukov <kostya@esmukov.ru>
Build-Depends: debhelper (>= 9), Build-Depends: debhelper (>= 9),
dh-python, dh-python,
debhelper (>= 9.20160709) | dh-systemd,
python3-all, python3-all,
python3-setuptools python3-setuptools
Build-Depends-Indep: python3-pytest, Build-Depends-Indep: python3-pytest,
python3-requests, python3-requests,
python3-click, python3-click,
python3-prometheus-client (>= 0.1.0), python3-prometheus-client,
python3-serial python3-serial
Standards-Version: 3.9.8 Standards-Version: 3.9.8
Homepage: https://github.com/KostyaEsmukov/afancontrol Homepage: https://github.com/KostyaEsmukov/afancontrol
@ -28,7 +27,7 @@ Depends: ${python3:Depends},
lm-sensors, lm-sensors,
python3-click, python3-click,
python3-pkg-resources, python3-pkg-resources,
python3-prometheus-client (>= 0.1.0), python3-prometheus-client,
python3-serial python3-serial
Suggests: freeipmi-tools, Suggests: freeipmi-tools,
Description: Advanced Fan Control program (Python 3) Description: Advanced Fan Control program (Python 3)

2
debian/rules vendored
View File

@ -9,4 +9,4 @@ export PYBUILD_TEST_PYTEST=1
export PYBUILD_TEST_ARGS={dir}/tests/ export PYBUILD_TEST_ARGS={dir}/tests/
%: %:
dh $@ --with systemd,python3 --buildsystem=pybuild dh $@ --with python3 --buildsystem=pybuild

View File

@ -220,7 +220,8 @@ There's a Dockerfile which can be used to build a Debian `.deb` package:
make deb-from-pypi make deb-from-pypi
# Install the package: # Install the package:
sudo apt install ./dist/debian/*.deb sudo dpkg -i dist/debian/*.deb
sudo apt install -f
Perhaps one day the package might get published to the Debian repos, Perhaps one day the package might get published to the Debian repos,
so a simple ``apt install afancontrol`` would work. But for now, given so a simple ``apt install afancontrol`` would work. But for now, given

View File

@ -10,15 +10,10 @@ logfile = /var/log/afancontrol.log
# Default: 5 # Default: 5
interval = 5 interval = 5
# Hddtemp location. Used by the `type = hdd` temperature sensors. # Hddtemp location. Relevant only when there're `type = hdd` temperature sensors.
# Default: hddtemp # Default: hddtemp
;hddtemp = /usr/local/bin/hddtemp ;hddtemp = /usr/local/bin/hddtemp
# `ipmi-sensors` location from the `freeipmi-tools` package.
# Used by the `type = freeipmi` fans.
# Default: ipmi-sensors
;ipmi_sensors = /usr/local/bin/ipmi-sensors
# Prometheus exporter listening hostname and TCP port. # Prometheus exporter listening hostname and TCP port.
# Default: (empty value) # Default: (empty value)
;exporter_listen_host = 127.0.0.1:8083 ;exporter_listen_host = 127.0.0.1:8083

View File

@ -32,12 +32,13 @@ include_trailing_comma = True
force_grid_wrap = 0 force_grid_wrap = 0
combine_as_imports = True combine_as_imports = True
line_length = 88 line_length = 88
not_skip = __init__.py
[metadata] [metadata]
author = Kostya Esmukov author = Kostya Esmukov
author_email = kostya@esmukov.ru author_email = kostya@esmukov.ru
classifier = classifier =
Development Status :: 5 - Production/Stable Development Status :: 4 - Beta
Intended Audience :: System Administrators Intended Audience :: System Administrators
License :: OSI Approved :: MIT License License :: OSI Approved :: MIT License
Natural Language :: English Natural Language :: English
@ -45,10 +46,10 @@ classifier =
Programming Language :: Python Programming Language :: Python
Programming Language :: Python :: 3 Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: System :: Hardware Topic :: System :: Hardware
Topic :: System :: Monitoring Topic :: System :: Monitoring
Topic :: System :: Systems Administration Topic :: System :: Systems Administration
@ -76,7 +77,7 @@ install_requires =
package_dir = package_dir =
= src = src
packages = find: packages = find:
python_requires = >=3.6 python_requires = >=3.5
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
@ -86,16 +87,16 @@ console_scripts =
arduino = arduino =
pyserial>=3.0 pyserial>=3.0
metrics = metrics =
prometheus-client>=0.1.0 prometheus-client
dev = dev =
black==20.8b1 black==19.10b0; python_version>='3.6'
coverage==5.3 coverage==5.1
flake8==3.8.4 flake8==3.7.9
isort==5.5.4 isort==4.3.21
mypy==0.782 mypy==0.770
pytest==6.1.0 pytest==5.4.2
requests requests
sphinx==3.2.1 sphinx==3.0.3
wheel wheel
[options.packages.find] [options.packages.find]

View File

@ -1 +1 @@
__version__ = "3.0.0" __version__ = "2.2.1"

View File

@ -5,7 +5,6 @@ import threading
from timeit import default_timer from timeit import default_timer
from typing import TYPE_CHECKING, Any, Dict, NewType, Optional from typing import TYPE_CHECKING, Any, Dict, NewType, Optional
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger from afancontrol.logger import logger
if TYPE_CHECKING: if TYPE_CHECKING:
@ -51,22 +50,11 @@ class ArduinoConnection:
lambda: _StatusProtocol(self), url=serial_url, baudrate=baudrate lambda: _StatusProtocol(self), url=serial_url, baudrate=baudrate
) )
self._context_manager_depth = 0 self._context_manager_depth = 0
self._status: Optional[Dict[str, Dict[str, int]]] = None self._status = None # type: Optional[Dict[str, Dict[str, int]]]
self._status_clock: Optional[float] = None self._status_clock = None # type: Optional[float]
self._status_lock = threading.Lock() self._status_lock = threading.Lock()
self._status_event = threading.Event() self._status_event = threading.Event()
@classmethod
def from_configparser(
cls, section: ConfigParserSection[ArduinoName]
) -> "ArduinoConnection":
return cls(
name=section.name,
serial_url=section["serial_url"],
baudrate=section.getint("baudrate", fallback=DEFAULT_BAUDRATE),
status_ttl=section.getint("status_ttl", fallback=DEFAULT_STATUS_TTL),
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (
@ -230,10 +218,10 @@ class _AutoRetriedReaderThread:
def __init__(self, protocol_factory, **serial_for_url_kwargs) -> None: def __init__(self, protocol_factory, **serial_for_url_kwargs) -> None:
self.protocol_factory = protocol_factory self.protocol_factory = protocol_factory
self.serial_for_url_kwargs = serial_for_url_kwargs self.serial_for_url_kwargs = serial_for_url_kwargs
self._reader_thread: Optional[ReaderThread] = None self._reader_thread = None # type: Optional[ReaderThread]
self._transport: Optional[ReaderThread] = None self._transport = None # type: Optional[ReaderThread]
self._watchdog_thread: Optional[threading.Thread] = None self._watchdog_thread = None # type: Optional[threading.Thread]
self._watchdog_queue: queue.Queue[Any] = queue.Queue() self._watchdog_queue = queue.Queue() # type: queue.Queue[Any]
def __enter__(self): # reusable def __enter__(self): # reusable
# TODO ?? maybe clean the _watchdog_queue? # TODO ?? maybe clean the _watchdog_queue?

View File

@ -9,120 +9,171 @@ from typing import (
Sequence, Sequence,
Tuple, Tuple,
TypeVar, TypeVar,
Union,
) )
import afancontrol.filters from afancontrol.arduino import (
from afancontrol.arduino import ArduinoConnection, ArduinoName DEFAULT_BAUDRATE,
from afancontrol.configparser import ConfigParserSection, iter_sections DEFAULT_STATUS_TTL,
from afancontrol.exec import Programs ArduinoConnection,
from afancontrol.filters import FilterName, TempFilter ArduinoName,
ArduinoPin,
)
from afancontrol.filters import (
MovingMedianFilter,
MovingQuantileFilter,
NullFilter,
TempFilter,
)
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import FanName, ReadonlyFanName from afancontrol.pwmfan import (
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice,
FreeIPMIFanSpeed,
LinuxFanPWMRead,
LinuxFanPWMWrite,
LinuxFanSpeed,
PWMDevice,
PWMValue,
)
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temp import FilteredTemp, TempName from afancontrol.temp import CommandTemp, FileTemp, HDDTemp, Temp, TempCelsius
DEFAULT_CONFIG = "/etc/afancontrol/afancontrol.conf" DEFAULT_CONFIG = "/etc/afancontrol/afancontrol.conf"
DEFAULT_PIDFILE = "/run/afancontrol.pid" DEFAULT_PIDFILE = "/run/afancontrol.pid"
DEFAULT_INTERVAL = 5
DEFAULT_FANS_SPEED_CHECK_INTERVAL = 3
DEFAULT_HDDTEMP = "hddtemp"
DEFAULT_REPORT_CMD = ( DEFAULT_REPORT_CMD = (
'printf "Subject: %s\nTo: %s\n\n%b"' 'printf "Subject: %s\nTo: %s\n\n%b"'
' "afancontrol daemon report: %REASON%" root "%MESSAGE%"' ' "afancontrol daemon report: %REASON%" root "%MESSAGE%"'
" | sendmail -t" " | sendmail -t"
) )
DEFAULT_FAN_TYPE = "linux"
DEFAULT_PWM_LINE_START = 100
DEFAULT_PWM_LINE_END = 240
DEFAULT_NEVER_STOP = True
DEFAULT_WINDOW_SIZE = 3
FilterName = NewType("FilterName", str)
TempName = NewType("TempName", str)
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
MappingName = NewType("MappingName", str) MappingName = NewType("MappingName", str)
T = TypeVar("T") T = TypeVar("T")
class FanSpeedModifier(NamedTuple): FanSpeedModifier = NamedTuple(
fan: FanName "FanSpeedModifier",
modifier: float # [0..1] # fmt: off
[
("fan", FanName),
("modifier", float), # [0..1]
]
# fmt: on
)
FansTempsRelation = NamedTuple(
"FansTempsRelation",
# fmt: off
[
("temps", Sequence[TempName]),
("fans", Sequence[FanSpeedModifier]),
]
# fmt: on
)
AlertCommands = NamedTuple(
"AlertCommands",
# fmt: off
[
("enter_cmd", Optional[str]),
("leave_cmd", Optional[str]),
]
# fmt: on
)
class FansTempsRelation(NamedTuple): Actions = NamedTuple(
temps: Sequence[TempName] "Actions",
fans: Sequence[FanSpeedModifier] # fmt: off
[
("panic", AlertCommands),
("threshold", AlertCommands),
]
# fmt: on
)
class AlertCommands(NamedTuple): TriggerConfig = NamedTuple(
enter_cmd: Optional[str] "TriggerConfig",
leave_cmd: Optional[str] # fmt: off
[
("global_commands", Actions),
("temp_commands", Mapping[TempName, Actions]),
]
# fmt: on
)
class Actions(NamedTuple): DaemonCLIConfig = NamedTuple(
panic: AlertCommands "DaemonCLIConfig",
threshold: AlertCommands # fmt: off
[
("pidfile", Optional[str]),
("logfile", Optional[str]),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
@classmethod DaemonConfig = NamedTuple(
def from_configparser(cls, section: ConfigParserSection) -> "Actions": "DaemonConfig",
panic = AlertCommands( # fmt: off
enter_cmd=section.get("panic_enter_cmd", fallback=None), [
leave_cmd=section.get("panic_leave_cmd", fallback=None), ("pidfile", Optional[str]),
) ("logfile", Optional[str]),
("interval", int),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
threshold = AlertCommands( FilteredTemp = NamedTuple(
enter_cmd=section.get("threshold_enter_cmd", fallback=None), "FilteredTemp",
leave_cmd=section.get("threshold_leave_cmd", fallback=None), # fmt: off
) [
("temp", Temp),
("filter", TempFilter),
]
# fmt: on
)
return cls(panic=panic, threshold=threshold) ParsedConfig = NamedTuple(
"ParsedConfig",
# fmt: off
class TriggerConfig(NamedTuple): [
global_commands: Actions ("daemon", DaemonConfig),
temp_commands: Mapping[TempName, Actions] ("report_cmd", str),
("triggers", TriggerConfig),
("arduino_connections", Mapping[ArduinoName, ArduinoConnection]),
class DaemonCLIConfig(NamedTuple): ("fans", Mapping[FanName, PWMFanNorm]),
pidfile: Optional[str] ("readonly_fans", Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]),
logfile: Optional[str] ("temps", Mapping[TempName, FilteredTemp]),
exporter_listen_host: Optional[str] ("mappings", Mapping[MappingName, FansTempsRelation]),
]
# fmt: on
class DaemonConfig(NamedTuple): )
pidfile: Optional[str]
logfile: Optional[str]
interval: int
exporter_listen_host: Optional[str]
@classmethod
def from_configparser(
cls, section: ConfigParserSection, daemon_cli_config: DaemonCLIConfig
) -> "DaemonConfig":
pidfile = first_not_none(
daemon_cli_config.pidfile, section.get("pidfile", fallback=DEFAULT_PIDFILE)
)
if pidfile is not None and not pidfile.strip():
pidfile = None
logfile = first_not_none(
daemon_cli_config.logfile, section.get("logfile", fallback=None)
)
interval = section.getint("interval", fallback=5)
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host,
section.get("exporter_listen_host", fallback=None),
)
return cls(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
)
class ParsedConfig(NamedTuple):
daemon: DaemonConfig
report_cmd: str
triggers: TriggerConfig
arduino_connections: Mapping[ArduinoName, ArduinoConnection]
fans: Mapping[FanName, PWMFanNorm]
readonly_fans: Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]
temps: Mapping[TempName, FilteredTemp]
mappings: Mapping[MappingName, FansTempsRelation]
def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> ParsedConfig: def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> ParsedConfig:
@ -132,13 +183,13 @@ def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> Parse
except Exception as e: except Exception as e:
raise RuntimeError("Unable to parse %s:\n%s" % (config_path, e)) raise RuntimeError("Unable to parse %s:\n%s" % (config_path, e))
daemon, programs = _parse_daemon(config, daemon_cli_config) daemon, hddtemp = _parse_daemon(config, daemon_cli_config)
report_cmd, global_commands = _parse_actions(config) report_cmd, global_commands = _parse_actions(config)
arduino_connections = _parse_arduino_connections(config) arduino_connections = _parse_arduino_connections(config)
filters = _parse_filters(config) filters = _parse_filters(config)
temps, temp_commands = _parse_temps(config, programs, filters) temps, temp_commands = _parse_temps(config, hddtemp, filters)
fans = _parse_fans(config, arduino_connections) fans = _parse_fans(config, arduino_connections)
readonly_fans = _parse_readonly_fans(config, arduino_connections, programs) readonly_fans = _parse_readonly_fans(config, arduino_connections)
_check_fans_namespace(fans, readonly_fans) _check_fans_namespace(fans, readonly_fans)
mappings = _parse_mappings(config, fans, temps) mappings = _parse_mappings(config, fans, temps)
@ -165,35 +216,110 @@ def first_not_none(*parts: Optional[T]) -> Optional[T]:
def _parse_daemon( def _parse_daemon(
config: configparser.ConfigParser, daemon_cli_config: DaemonCLIConfig config: configparser.ConfigParser, daemon_cli_config: DaemonCLIConfig
) -> Tuple[DaemonConfig, Programs]: ) -> Tuple[DaemonConfig, str]:
section: ConfigParserSection[str] = ConfigParserSection(config["daemon"]) daemon = config["daemon"]
daemon_config = DaemonConfig.from_configparser(section, daemon_cli_config) keys = set(daemon.keys())
programs = Programs.from_configparser(section)
section.ensure_no_unused_keys()
return daemon_config, programs pidfile = first_not_none(
daemon_cli_config.pidfile, daemon.get("pidfile"), DEFAULT_PIDFILE
)
if pidfile is not None and not pidfile.strip():
pidfile = None
keys.discard("pidfile")
logfile = first_not_none(daemon_cli_config.logfile, daemon.get("logfile"))
keys.discard("logfile")
interval = daemon.getint("interval", fallback=DEFAULT_INTERVAL)
keys.discard("interval")
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host, daemon.get("exporter_listen_host")
)
keys.discard("exporter_listen_host")
hddtemp = daemon.get("hddtemp") or DEFAULT_HDDTEMP
keys.discard("hddtemp")
if keys:
raise RuntimeError("Unknown options in the [daemon] section: %s" % (keys,))
return (
DaemonConfig(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
),
hddtemp,
)
def _parse_actions(config: configparser.ConfigParser) -> Tuple[str, Actions]: def _parse_actions(config: configparser.ConfigParser) -> Tuple[str, Actions]:
section: ConfigParserSection[str] = ConfigParserSection(config["actions"]) actions = config["actions"]
report_cmd = section.get("report_cmd", fallback=DEFAULT_REPORT_CMD) keys = set(actions.keys())
actions = Actions.from_configparser(section)
section.ensure_no_unused_keys()
return report_cmd, actions report_cmd = first_not_none(actions.get("report_cmd"), DEFAULT_REPORT_CMD)
assert report_cmd is not None
keys.discard("report_cmd")
panic = AlertCommands(
enter_cmd=first_not_none(actions.get("panic_enter_cmd")),
leave_cmd=first_not_none(actions.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
threshold = AlertCommands(
enter_cmd=first_not_none(actions.get("threshold_enter_cmd")),
leave_cmd=first_not_none(actions.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
if keys:
raise RuntimeError("Unknown options in the [actions] section: %s" % (keys,))
return report_cmd, Actions(panic=panic, threshold=threshold)
def _parse_arduino_connections( def _parse_arduino_connections(
config: configparser.ConfigParser, config: configparser.ConfigParser,
) -> Mapping[ArduinoName, ArduinoConnection]: ) -> Mapping[ArduinoName, ArduinoConnection]:
arduino_connections: Dict[ArduinoName, ArduinoConnection] = {} arduino_connections = {} # type: Dict[ArduinoName, ArduinoConnection]
for section in iter_sections(config, "arduino", ArduinoName): for section_name in config.sections():
if section.name in arduino_connections: section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "arduino":
continue
arduino_name = ArduinoName(section_name_parts[1].strip())
arduino = config[section_name]
keys = set(arduino.keys())
serial_url = arduino["serial_url"]
keys.discard("serial_url")
baudrate = arduino.getint("baudrate", fallback=DEFAULT_BAUDRATE)
keys.discard("baudrate")
status_ttl = arduino.getint("status_ttl", fallback=DEFAULT_STATUS_TTL)
keys.discard("status_ttl")
if keys:
raise RuntimeError( raise RuntimeError(
"Duplicate arduino section declaration for '%s'" % section.name "Unknown options in the [%s] section: %s" % (section_name, keys)
) )
arduino_connections[section.name] = ArduinoConnection.from_configparser(section)
section.ensure_no_unused_keys() if arduino_name in arduino_connections:
raise RuntimeError(
"Duplicate arduino section declaration for '%s'" % arduino_name
)
arduino_connections[arduino_name] = ArduinoConnection(
name=arduino_name,
serial_url=serial_url,
baudrate=baudrate,
status_ttl=status_ttl,
)
# Empty arduino_connections is ok # Empty arduino_connections is ok
return arduino_connections return arduino_connections
@ -202,14 +328,49 @@ def _parse_arduino_connections(
def _parse_filters( def _parse_filters(
config: configparser.ConfigParser, config: configparser.ConfigParser,
) -> Mapping[FilterName, TempFilter]: ) -> Mapping[FilterName, TempFilter]:
filters: Dict[FilterName, TempFilter] = {} filters = {} # type: Dict[FilterName, TempFilter]
for section in iter_sections(config, "filter", FilterName): for section_name in config.sections():
if section.name in filters: section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != "filter":
continue
filter_name = FilterName(section_name_parts[1].strip())
filter = config[section_name]
keys = set(filter.keys())
filter_type = filter["type"]
keys.discard("type")
if filter_type == "moving_median":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
f = MovingMedianFilter(window_size=window_size) # type: TempFilter
elif filter_type == "moving_quantile":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
quantile = filter.getfloat("quantile")
keys.discard("quantile")
f = MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
raise RuntimeError( raise RuntimeError(
"Duplicate filter section declaration for '%s'" % section.name "Unsupported filter type '%s' for filter '%s'. "
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, filter_name)
) )
filters[section.name] = afancontrol.filters.from_configparser(section)
section.ensure_no_unused_keys() if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if filter_name in filters:
raise RuntimeError(
"Duplicate filter section declaration for '%s'" % filter_name
)
filters[filter_name] = f
# Empty filters is ok # Empty filters is ok
return filters return filters
@ -217,19 +378,98 @@ def _parse_filters(
def _parse_temps( def _parse_temps(
config: configparser.ConfigParser, config: configparser.ConfigParser,
programs: Programs, hddtemp: str,
filters: Mapping[FilterName, TempFilter], filters: Mapping[FilterName, TempFilter],
) -> Tuple[Mapping[TempName, FilteredTemp], Mapping[TempName, Actions]]: ) -> Tuple[Mapping[TempName, FilteredTemp], Mapping[TempName, Actions]]:
temps: Dict[TempName, FilteredTemp] = {} temps = {} # type: Dict[TempName, FilteredTemp]
temp_commands: Dict[TempName, Actions] = {} temp_commands = {} # type: Dict[TempName, Actions]
for section in iter_sections(config, "temp", TempName): for section_name in config.sections():
if section.name in temps: section_name_parts = section_name.split(":", 1)
raise RuntimeError(
"Duplicate temp section declaration for '%s'" % section.name if section_name_parts[0].strip().lower() != "temp":
continue
temp_name = TempName(section_name_parts[1].strip())
temp = config[section_name]
keys = set(temp.keys())
actions_panic = AlertCommands(
enter_cmd=first_not_none(temp.get("panic_enter_cmd")),
leave_cmd=first_not_none(temp.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
actions_threshold = AlertCommands(
enter_cmd=first_not_none(temp.get("threshold_enter_cmd")),
leave_cmd=first_not_none(temp.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
panic = TempCelsius(temp.getfloat("panic"))
threshold = TempCelsius(temp.getfloat("threshold"))
min = TempCelsius(temp.getfloat("min"))
max = TempCelsius(temp.getfloat("max"))
keys.discard("panic")
keys.discard("threshold")
keys.discard("min")
keys.discard("max")
type = temp["type"]
keys.discard("type")
if type == "file":
t = FileTemp(
temp["path"], min=min, max=max, panic=panic, threshold=threshold
) # type: Temp
keys.discard("path")
elif type == "hdd":
if min is None or max is None:
raise RuntimeError(
"hdd temp '%s' doesn't define the mandatory `min` and `max` temps"
% temp_name
)
t = HDDTemp(
temp["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=hddtemp,
) )
temps[section.name] = FilteredTemp.from_configparser(section, filters, programs) keys.discard("path")
temp_commands[section.name] = Actions.from_configparser(section) elif type == "exec":
section.ensure_no_unused_keys() t = CommandTemp(
temp["command"], min=min, max=max, panic=panic, threshold=threshold
)
keys.discard("command")
else:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, temp_name)
)
filter_name = temp.get("filter")
keys.discard("filter")
if filter_name is None:
filter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if temp_name in temps:
raise RuntimeError(
"Duplicate temp section declaration for '%s'" % temp_name
)
temps[temp_name] = FilteredTemp(temp=t, filter=filter)
temp_commands[temp_name] = Actions(
panic=actions_panic, threshold=actions_threshold
)
return temps, temp_commands return temps, temp_commands
@ -238,14 +478,95 @@ def _parse_fans(
config: configparser.ConfigParser, config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection], arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> Mapping[FanName, PWMFanNorm]: ) -> Mapping[FanName, PWMFanNorm]:
fans: Dict[FanName, PWMFanNorm] = {} fans = {} # type: Dict[FanName, PWMFanNorm]
for section in iter_sections(config, "fan", FanName): for section_name in config.sections():
if section.name in fans: section_name_parts = section_name.split(":", 1)
raise RuntimeError(
"Duplicate fan section declaration for '%s'" % section.name if section_name_parts[0].strip().lower() != "fan":
continue
fan_name = FanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
pwm = PWMDevice(fan["pwm"])
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("pwm")
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = LinuxFanPWMRead(pwm) # type: BaseFanPWMRead
pwm_write = LinuxFanPWMWrite(pwm) # type: BaseFanPWMWrite
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
keys.discard("pwm_pin")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
) )
fans[section.name] = PWMFanNorm.from_configparser(section, arduino_connections) pwm_read = ArduinoFanPWMRead(
section.ensure_no_unused_keys() arduino_connections[arduino_name], pwm_pin=pwm_pin
)
pwm_write = ArduinoFanPWMWrite(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)
never_stop = fan.getboolean("never_stop", fallback=DEFAULT_NEVER_STOP)
keys.discard("never_stop")
pwm_line_start = PWMValue(
fan.getint("pwm_line_start", fallback=DEFAULT_PWM_LINE_START)
)
keys.discard("pwm_line_start")
pwm_line_end = PWMValue(
fan.getint("pwm_line_end", fallback=DEFAULT_PWM_LINE_END)
)
keys.discard("pwm_line_end")
for pwm_value in (pwm_line_start, pwm_line_end):
if not (pwm_read.min_pwm <= pwm_value <= pwm_read.max_pwm):
raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]"
% (pwm_value, fan_name, pwm_read.min_pwm, pwm_read.max_pwm)
)
if pwm_line_start >= pwm_line_end:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (fan_name,)
)
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if fan_name in fans:
raise RuntimeError("Duplicate fan section declaration for '%s'" % fan_name)
fans[fan_name] = PWMFanNorm(
fan_speed,
pwm_read,
pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
)
return fans return fans
@ -253,18 +574,75 @@ def _parse_fans(
def _parse_readonly_fans( def _parse_readonly_fans(
config: configparser.ConfigParser, config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection], arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]: ) -> Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]:
readonly_fans: Dict[ReadonlyFanName, ReadonlyPWMFanNorm] = {} readonly_fans = {} # type: Dict[ReadonlyFanName, ReadonlyPWMFanNorm]
for section in iter_sections(config, "readonly_fan", ReadonlyFanName): for section_name in config.sections():
if section.name in readonly_fans: section_name_parts = section_name.split(":", 1)
raise RuntimeError(
"Duplicate readonly_fan section declaration for '%s'" % section.name if section_name_parts[0].strip().lower() != "readonly_fan":
continue
fan_name = ReadonlyFanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = None # type: Optional[BaseFanPWMRead]
if "pwm" in fan:
pwm = PWMDevice(fan["pwm"])
keys.discard("pwm")
pwm_read = LinuxFanPWMRead(pwm)
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
) )
readonly_fans[section.name] = ReadonlyPWMFanNorm.from_configparser( pwm_read = None
section, arduino_connections, programs if "pwm_pin" in fan:
) pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
section.ensure_no_unused_keys() keys.discard("pwm_pin")
pwm_read = ArduinoFanPWMRead(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
elif fan_type == "freeipmi":
name = fan["name"]
keys.discard("name")
ipmi_sensors_extra_args = fan.get("ipmi_sensors_extra_args", fallback="")
fan_speed = FreeIPMIFanSpeed(
name, ipmi_sensors_extra_args=ipmi_sensors_extra_args
)
pwm_read = None
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if fan_name in readonly_fans:
raise RuntimeError(
"Duplicate readonly_fan section declaration for '%s'" % fan_name
)
readonly_fans[fan_name] = ReadonlyPWMFanNorm(fan_speed, pwm_read)
return readonly_fans return readonly_fans
@ -287,35 +665,45 @@ def _parse_mappings(
temps: Mapping[TempName, FilteredTemp], temps: Mapping[TempName, FilteredTemp],
) -> Mapping[MappingName, FansTempsRelation]: ) -> Mapping[MappingName, FansTempsRelation]:
mappings: Dict[MappingName, FansTempsRelation] = {} mappings = {} # type: Dict[MappingName, FansTempsRelation]
for section in iter_sections(config, "mapping", MappingName): for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].lower() != "mapping":
continue
mapping_name = MappingName(section_name_parts[1])
mapping = config[section_name]
keys = set(mapping.keys())
# temps: # temps:
mapping_temps = [ mapping_temps = [
TempName(temp_name.strip()) for temp_name in section["temps"].split(",") TempName(temp_name.strip()) for temp_name in mapping["temps"].split(",")
] ]
mapping_temps = [s for s in mapping_temps if s] mapping_temps = [s for s in mapping_temps if s]
keys.discard("temps")
if not mapping_temps: if not mapping_temps:
raise RuntimeError( raise RuntimeError(
"Temps must not be empty in the '%s' mapping" % section.name "Temps must not be empty in the '%s' mapping" % mapping_name
) )
for temp_name in mapping_temps: for temp_name in mapping_temps:
if temp_name not in temps: if temp_name not in temps:
raise RuntimeError( raise RuntimeError(
"Unknown temp '%s' in mapping '%s'" % (temp_name, section.name) "Unknown temp '%s' in mapping '%s'" % (temp_name, mapping_name)
) )
if len(mapping_temps) != len(set(mapping_temps)): if len(mapping_temps) != len(set(mapping_temps)):
raise RuntimeError( raise RuntimeError(
"There are duplicate temps in mapping '%s'" % section.name "There are duplicate temps in mapping '%s'" % mapping_name
) )
# fans: # fans:
fans_with_speed = [ fans_with_speed = [
fan_with_speed.strip() for fan_with_speed in section["fans"].split(",") fan_with_speed.strip() for fan_with_speed in mapping["fans"].split(",")
] ]
fans_with_speed = [s for s in fans_with_speed if s] fans_with_speed = [s for s in fans_with_speed if s]
keys.discard("fans")
fan_speed_pairs = [ fan_speed_pairs = [
fan_with_speed.split("*") for fan_with_speed in fans_with_speed fan_with_speed.split("*") for fan_with_speed in fans_with_speed
@ -324,7 +712,7 @@ def _parse_mappings(
if len(fan_speed_pair) not in (1, 2): if len(fan_speed_pair) not in (1, 2):
raise RuntimeError( raise RuntimeError(
"Invalid fan specification '%s' in mapping '%s'" "Invalid fan specification '%s' in mapping '%s'"
% (fan_speed_pair, section.name) % (fan_speed_pair, mapping_name)
) )
mapping_fans = [ mapping_fans = [
FanSpeedModifier( FanSpeedModifier(
@ -341,7 +729,7 @@ def _parse_mappings(
if fan_speed_modifier.fan not in fans: if fan_speed_modifier.fan not in fans:
raise RuntimeError( raise RuntimeError(
"Unknown fan '%s' in mapping '%s'" "Unknown fan '%s' in mapping '%s'"
% (fan_speed_modifier.fan, section.name) % (fan_speed_modifier.fan, mapping_name)
) )
if not (0 < fan_speed_modifier.modifier <= 1.0): if not (0 < fan_speed_modifier.modifier <= 1.0):
raise RuntimeError( raise RuntimeError(
@ -349,7 +737,7 @@ def _parse_mappings(
"the allowed range is (0.0;1.0]." "the allowed range is (0.0;1.0]."
% ( % (
fan_speed_modifier.modifier, fan_speed_modifier.modifier,
section.name, mapping_name,
fan_speed_modifier.fan, fan_speed_modifier.fan,
) )
) )
@ -357,17 +745,21 @@ def _parse_mappings(
set(fan_speed_modifier.fan for fan_speed_modifier in mapping_fans) set(fan_speed_modifier.fan for fan_speed_modifier in mapping_fans)
): ):
raise RuntimeError( raise RuntimeError(
"There are duplicate fans in mapping '%s'" % section.name "There are duplicate fans in mapping '%s'" % mapping_name
) )
if section.name in mappings: if keys:
raise RuntimeError( raise RuntimeError(
"Duplicate mapping section declaration for '%s'" % section.name "Unknown options in the [%s] section: %s" % (section_name, keys)
) )
mappings[section.name] = FansTempsRelation(
if mapping_name in fans:
raise RuntimeError(
"Duplicate mapping section declaration for '%s'" % mapping_name
)
mappings[mapping_name] = FansTempsRelation(
temps=mapping_temps, fans=mapping_fans temps=mapping_temps, fans=mapping_fans
) )
section.ensure_no_unused_keys()
unused_temps = set(temps.keys()) unused_temps = set(temps.keys())
unused_fans = set(fans.keys()) unused_fans = set(fans.keys())

View File

@ -1,129 +0,0 @@
import configparser
from typing import Any, Generic, Iterator, Optional, Type, TypeVar, Union, overload
T = TypeVar("T", bound=str)
F = TypeVar("F", None, Any)
_UNSET = object()
def iter_sections(
config: configparser.ConfigParser, section_type: str, name_typevar: Type[T]
) -> Iterator["ConfigParserSection[T]"]:
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != section_type:
continue
name = name_typevar(section_name_parts[1].strip())
section = ConfigParserSection(config[section_name], name)
yield section
class ConfigParserSection(Generic[T]):
def __init__(
self, section: configparser.SectionProxy, name: Optional[T] = None
) -> None:
self.__name = name
self.__section = section
self.__unused_keys = set(section.keys())
@property
def name(self) -> T:
assert self.__name is not None
return self.__name
def ensure_no_unused_keys(self) -> None:
if self.__unused_keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s"
% (self.__section.name, self.__unused_keys)
)
def __contains__(self, key):
return self.__section.__contains__(key)
def __getitem__(self, key):
self.__unused_keys.discard(key)
return self.__section.__getitem__(key)
@overload
def get(self, option: str) -> str:
...
@overload
def get(self, option: str, *, fallback: F) -> Union[str, F]:
...
def get(self, option: str, *, fallback=_UNSET) -> Union[str, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.get(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getint(self, option: str) -> int:
...
@overload
def getint(self, option: str, *, fallback: F) -> Union[int, F]:
...
def getint(self, option: str, *, fallback=_UNSET) -> Union[int, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getint(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getfloat(self, option: str) -> float:
...
@overload
def getfloat(self, option: str, *, fallback: F) -> Union[float, F]:
...
def getfloat(self, option: str, *, fallback=_UNSET) -> Union[float, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getfloat(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getboolean(self, option: str) -> bool:
...
@overload
def getboolean(self, option: str, *, fallback: F) -> Union[bool, F]:
...
def getboolean(self, option: str, *, fallback=_UNSET) -> Union[bool, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getboolean(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res

View File

@ -66,7 +66,9 @@ def daemon(
parsed_config = parse_config(config_path, daemon_cli_config) parsed_config = parse_config(config_path, daemon_cli_config)
if parsed_config.daemon.exporter_listen_host: if parsed_config.daemon.exporter_listen_host:
metrics: Metrics = PrometheusMetrics(parsed_config.daemon.exporter_listen_host) metrics = PrometheusMetrics(
parsed_config.daemon.exporter_listen_host
) # type: Metrics
else: else:
metrics = NullMetrics() metrics = NullMetrics()
@ -81,7 +83,7 @@ def daemon(
metrics=metrics, metrics=metrics,
) )
pidfile_instance: Optional[PidFile] = None pidfile_instance = None # type: Optional[PidFile]
if parsed_config.daemon.pidfile is not None: if parsed_config.daemon.pidfile is not None:
pidfile_instance = PidFile(parsed_config.daemon.pidfile) pidfile_instance = PidFile(parsed_config.daemon.pidfile)

View File

@ -1,22 +1,8 @@
import subprocess import subprocess
from typing import NamedTuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger from afancontrol.logger import logger
class Programs(NamedTuple):
hddtemp: str
ipmi_sensors: str
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> "Programs":
return cls(
hddtemp=section.get("hddtemp", fallback="hddtemp"),
ipmi_sensors=section.get("ipmi_sensors", fallback="ipmi-sensors"),
)
def exec_shell_command(shell_command: str, timeout: int = 5) -> str: def exec_shell_command(shell_command: str, timeout: int = 5) -> str:
try: try:
p = subprocess.run( p = subprocess.run(

View File

@ -2,8 +2,8 @@ import itertools
from contextlib import ExitStack from contextlib import ExitStack
from typing import Iterator, Mapping, MutableSet, Optional, Tuple, Union, cast from typing import Iterator, Mapping, MutableSet, Optional, Tuple, Union, cast
from afancontrol.config import AnyFanName, FanName, ReadonlyFanName
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, PWMValueNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, PWMValueNorm, ReadonlyPWMFanNorm
from afancontrol.report import Report from afancontrol.report import Report
@ -19,13 +19,13 @@ class Fans:
self.fans = fans self.fans = fans
self.readonly_fans = readonly_fans self.readonly_fans = readonly_fans
self.report = report self.report = report
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
# Set of fans marked as failing (which speed is 0) # Set of fans marked as failing (which speed is 0)
self._failed_fans: MutableSet[AnyFanName] = set() self._failed_fans = set() # type: MutableSet[AnyFanName]
# Set of fans that will be skipped on speed check # Set of fans that will be skipped on speed check
self._stopped_fans: MutableSet[AnyFanName] = set() self._stopped_fans = set() # type: MutableSet[AnyFanName]
def is_fan_failing(self, fan_name: AnyFanName) -> bool: def is_fan_failing(self, fan_name: AnyFanName) -> bool:
return fan_name in self._failed_fans return fan_name in self._failed_fans

View File

@ -1,7 +1,7 @@
import abc import abc
import sys import sys
from time import sleep from time import sleep
from typing import Optional from typing import NamedTuple, Optional
import click import click
@ -15,6 +15,9 @@ from afancontrol.pwmfan import (
ArduinoFanPWMRead, ArduinoFanPWMRead,
ArduinoFanPWMWrite, ArduinoFanPWMWrite,
ArduinoFanSpeed, ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice, FanInputDevice,
FanValue, FanValue,
LinuxFanPWMRead, LinuxFanPWMRead,
@ -22,7 +25,6 @@ from afancontrol.pwmfan import (
LinuxFanSpeed, LinuxFanSpeed,
PWMDevice, PWMDevice,
PWMValue, PWMValue,
ReadWriteFan,
) )
# Time to wait before measuring fan speed after setting a PWM value. # Time to wait before measuring fan speed after setting a PWM value.
@ -72,6 +74,15 @@ HELP_PWM_STEP_SIZE = (
"faster." "faster."
) )
Fan = NamedTuple(
"Fan",
[
("fan_speed", BaseFanSpeed),
("pwm_read", BaseFanPWMRead),
("pwm_write", BaseFanPWMWrite),
],
)
@click.command() @click.command()
@click.option( @click.option(
@ -145,24 +156,25 @@ def fantest(
) -> None: ) -> None:
"""The PWM fan testing program. """The PWM fan testing program.
This program tests how changing the PWM value of a fan affects its speed. This program tests how changing the PWM value of a fan affects its speed.
In the beginning the fan would be stopped (by setting it to a minimum PWM value), In the beginning the fan would be stopped (by setting it to a minimum PWM value),
and then the PWM value would be increased in small steps, while also and then the PWM value would be increased in small steps, while also
measuring the speed as reported by the fan. measuring the speed as reported by the fan.
This data would help you to find the effective range of values This data would help you to find the effective range of values
for the `pwm_line_start` and `pwm_line_end` settings where the correlation for the `pwm_line_start` and `pwm_line_end` settings where the correlation
between PWM and fan speed is close to linear. Usually its between PWM and fan speed is close to linear. Usually its
`pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual `pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual
for each fan. The allowed range for a PWM value is from 0 to 255. for each fan. The allowed range for a PWM value is from 0 to 255.
Note that the fan would be stopped for some time during the test. If you'll Note that the fan would be stopped for some time during the test. If you'll
feel nervous, press Ctrl+C to stop the test and return the fan to full speed. feel nervous, press Ctrl+C to stop the test and return the fan to full speed.
Before starting the test ensure that no fan control software is currently Before starting the test ensure that no fan control software is currently
controlling the fan you're going to test. controlling the fan you're going to test.
"""
"""
try: try:
if fan_type == "linux": if fan_type == "linux":
if not linux_fan_pwm: if not linux_fan_pwm:
@ -179,7 +191,7 @@ def fantest(
assert linux_fan_pwm is not None assert linux_fan_pwm is not None
assert linux_fan_input is not None assert linux_fan_input is not None
fan = ReadWriteFan( fan = Fan(
fan_speed=LinuxFanSpeed(FanInputDevice(linux_fan_input)), fan_speed=LinuxFanSpeed(FanInputDevice(linux_fan_input)),
pwm_read=LinuxFanPWMRead(PWMDevice(linux_fan_pwm)), pwm_read=LinuxFanPWMRead(PWMDevice(linux_fan_pwm)),
pwm_write=LinuxFanPWMWrite(PWMDevice(linux_fan_pwm)), pwm_write=LinuxFanPWMWrite(PWMDevice(linux_fan_pwm)),
@ -218,7 +230,7 @@ def fantest(
) )
assert arduino_pwm_pin is not None assert arduino_pwm_pin is not None
assert arduino_tacho_pin is not None assert arduino_tacho_pin is not None
fan = ReadWriteFan( fan = Fan(
fan_speed=ArduinoFanSpeed( fan_speed=ArduinoFanSpeed(
arduino_connection, tacho_pin=ArduinoPin(arduino_tacho_pin) arduino_connection, tacho_pin=ArduinoPin(arduino_tacho_pin)
), ),
@ -256,7 +268,7 @@ def fantest(
def run_fantest( def run_fantest(
fan: ReadWriteFan, pwm_step_size: PWMValue, output: "MeasurementsOutput" fan: Fan, pwm_step_size: PWMValue, output: "MeasurementsOutput"
) -> None: ) -> None:
with fan.fan_speed, fan.pwm_read, fan.pwm_write: with fan.fan_speed, fan.pwm_read, fan.pwm_write:
start = fan.pwm_read.min_pwm start = fan.pwm_read.min_pwm

View File

@ -1,32 +1,10 @@
import abc import abc
import collections import collections
from typing import TYPE_CHECKING, Deque, NewType, Optional, TypeVar from typing import Deque, Optional, TypeVar
from afancontrol.configparser import ConfigParserSection from afancontrol.temp import TempStatus
if TYPE_CHECKING:
from afancontrol.temp import TempStatus
T = TypeVar("T") T = TypeVar("T")
FilterName = NewType("FilterName", str)
def from_configparser(section: ConfigParserSection[FilterName]) -> "TempFilter":
filter_type = section["type"]
if filter_type == "moving_median":
window_size = section.getint("window_size", fallback=3)
return MovingMedianFilter(window_size=window_size)
elif filter_type == "moving_quantile":
window_size = section.getint("window_size", fallback=3)
quantile = section.getfloat("quantile")
return MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
raise RuntimeError(
"Unsupported filter type '%s' for filter '%s'. "
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, section.name)
)
class TempFilter(abc.ABC): class TempFilter(abc.ABC):
@ -35,7 +13,7 @@ class TempFilter(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]: def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
pass pass
def __enter__(self): # reusable def __enter__(self): # reusable
@ -49,7 +27,7 @@ class NullFilter(TempFilter):
def copy(self: T) -> T: def copy(self: T) -> T:
return type(self)() return type(self)()
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]: def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
return status return status
def __eq__(self, other): def __eq__(self, other):
@ -65,7 +43,7 @@ class NullFilter(TempFilter):
return "%s()" % (type(self).__name__,) return "%s()" % (type(self).__name__,)
def _temp_status_sorting_key(status: Optional["TempStatus"]) -> float: def _temp_status_sorting_key(status: Optional[TempStatus]) -> float:
if status is None: if status is None:
return float("+inf") return float("+inf")
return status.temp return status.temp
@ -75,14 +53,14 @@ class MovingQuantileFilter(TempFilter):
def __init__(self, quantile: float, *, window_size: int) -> None: def __init__(self, quantile: float, *, window_size: int) -> None:
self.quantile = quantile self.quantile = quantile
self.window_size = window_size self.window_size = window_size
self.history: Optional[Deque[Optional["TempStatus"]]] = None self.history = None # type: Optional[Deque[Optional[TempStatus]]]
def copy(self: T) -> T: def copy(self: T) -> T:
return type(self)( # type: ignore return type(self)( # type: ignore
quantile=self.quantile, window_size=self.window_size # type: ignore quantile=self.quantile, window_size=self.window_size # type: ignore
) )
def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]: def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]:
assert self.history is not None assert self.history is not None
self.history.append(status) self.history.append(status)

View File

@ -41,7 +41,7 @@ class Manager:
self.mappings = mappings self.mappings = mappings
self.triggers = Triggers(triggers_config, report) self.triggers = Triggers(triggers_config, report)
self.metrics = metrics self.metrics = metrics
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()
@ -88,7 +88,9 @@ class Manager:
for temp_name, temp_status in temps.items() for temp_name, temp_status in temps.items()
} }
fan_speeds: Dict[FanName, PWMValueNorm] = defaultdict(lambda: PWMValueNorm(0.0)) fan_speeds = defaultdict(
lambda: PWMValueNorm(0.0)
) # type: Dict[FanName, PWMValueNorm]
for mapping_name, relation in self.mappings.items(): for mapping_name, relation in self.mappings.items():
mapping_speed = max(temp_speeds[temp_name] for temp_name in relation.temps) mapping_speed = max(temp_speeds[temp_name] for temp_name in relation.temps)

View File

@ -1,20 +1,23 @@
import abc import abc
import contextlib import contextlib
import threading import threading
from http.server import HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from timeit import default_timer from timeit import default_timer
from typing import ContextManager, Mapping, Optional, Union from typing import TYPE_CHECKING, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse
from afancontrol.arduino import ArduinoConnection, ArduinoName from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.config import TempName from afancontrol.config import AnyFanName, FanName, ReadonlyFanName, TempName
from afancontrol.fans import Fans from afancontrol.fans import Fans
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temps import ObservedTempStatus from afancontrol.temps import ObservedTempStatus
from afancontrol.trigger import Triggers from afancontrol.trigger import Triggers
if TYPE_CHECKING:
from typing import ContextManager # Added in 3.6
try: try:
import prometheus_client as prom import prometheus_client as prom
@ -43,7 +46,7 @@ class Metrics(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def measure_tick(self) -> ContextManager[None]: def measure_tick(self) -> "ContextManager[None]":
pass pass
@ -63,7 +66,7 @@ class NullMetrics(Metrics):
) -> None: ) -> None:
pass pass
def measure_tick(self) -> ContextManager[None]: def measure_tick(self) -> "ContextManager[None]":
@contextlib.contextmanager @contextlib.contextmanager
def null_context_manager(): def null_context_manager():
yield yield
@ -82,7 +85,7 @@ class PrometheusMetrics(Metrics):
self._listen_addr, port_str = listen_host.rsplit(":", 1) self._listen_addr, port_str = listen_host.rsplit(":", 1)
self._listen_port = int(port_str) self._listen_port = int(port_str)
self._http_server: Optional[HTTPServer] = None self._http_server = None # type: Optional[HTTPServer]
self._last_metrics_collect_clock = float("nan") self._last_metrics_collect_clock = float("nan")
@ -252,7 +255,7 @@ class PrometheusMetrics(Metrics):
def _start(self): def _start(self):
# `prometheus_client.start_http_server` which persists a server reference # `prometheus_client.start_http_server` which persists a server reference
# so it could be stopped later. # so it could be stopped later.
CustomMetricsHandler = prom.MetricsHandler.factory(self.registry) CustomMetricsHandler = MetricsHandler.factory(self.registry)
httpd = _ThreadingSimpleServer( httpd = _ThreadingSimpleServer(
(self._listen_addr, self._listen_port), CustomMetricsHandler (self._listen_addr, self._listen_port), CustomMetricsHandler
) )
@ -332,7 +335,7 @@ class PrometheusMetrics(Metrics):
self._last_metrics_collect_clock = self._clock() self._last_metrics_collect_clock = self._clock()
def measure_tick(self) -> ContextManager[None]: def measure_tick(self) -> "ContextManager[None]":
return self.tick_duration.time() return self.tick_duration.time()
def _collect_fan_metrics( def _collect_fan_metrics(
@ -390,3 +393,40 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``. # same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True daemon_threads = True
# `MetricsHandler` of `prometheus_client==0.0.18` doesn't support exposing
# a custom registry. This is backported below:
if prometheus_available:
if hasattr(prom.MetricsHandler, "factory"):
MetricsHandler = prom.MetricsHandler
else:
from prometheus_client.exposition import generate_latest, CONTENT_TYPE_LATEST
class MetricsHandler(BaseHTTPRequestHandler): # type: ignore
# https://github.com/prometheus/client_python/blob/31f5557e2e84ca4ffa9a03abf6e3f4d0c8b8c3eb/prometheus_client/exposition.py#L141-L177 # noqa
registry = prom.REGISTRY
def do_GET(self):
registry = self.registry
params = parse_qs(urlparse(self.path).query)
if "name[]" in params:
registry = registry.restricted_registry(params["name[]"])
try:
output = generate_latest(registry)
except Exception:
self.send_error(500, "error generating metric output")
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
def log_message(self, format, *args):
"""Log nothing."""
@classmethod
def factory(cls, registry):
cls_name = str(cls.__name__)
MyMetricsHandler = type(cls_name, (cls, object), {"registry": registry})
return MyMetricsHandler

View File

@ -1,8 +1,3 @@
from typing import Mapping, NamedTuple, NewType, Optional, Union
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan.arduino import ( from afancontrol.pwmfan.arduino import (
ArduinoFanPWMRead, ArduinoFanPWMRead,
ArduinoFanPWMWrite, ArduinoFanPWMWrite,
@ -40,92 +35,3 @@ __all__ = (
"PWMDevice", "PWMDevice",
"PWMValue", "PWMValue",
) )
DEFAULT_FAN_TYPE = "linux"
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
class ReadOnlyFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: Optional[BaseFanPWMRead]
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadOnlyFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=(
LinuxFanPWMRead.from_configparser(section)
if "pwm" in section
else None
),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=(
ArduinoFanPWMRead.from_configparser(section, arduino_connections)
if "pwm_pin" in section
else None
),
)
elif fan_type == "freeipmi":
return cls(
fan_speed=FreeIPMIFanSpeed.from_configparser(section, programs),
pwm_read=None,
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
class ReadWriteFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: BaseFanPWMRead
pwm_write: BaseFanPWMWrite
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "ReadWriteFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=LinuxFanPWMRead.from_configparser(section),
pwm_write=LinuxFanPWMWrite.from_configparser(section),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=ArduinoFanPWMRead.from_configparser(
section, arduino_connections
),
pwm_write=ArduinoFanPWMWrite.from_configparser(
section, arduino_connections
),
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)

View File

@ -1,7 +1,4 @@
from typing import Mapping from afancontrol.arduino import ArduinoConnection, ArduinoPin
from afancontrol.arduino import ArduinoConnection, ArduinoName, ArduinoPin
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import ( from afancontrol.pwmfan.base import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
@ -11,14 +8,6 @@ from afancontrol.pwmfan.base import (
) )
def _ensure_arduino_connection(
arduino_name: ArduinoName,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> None:
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
class ArduinoFanSpeed(BaseFanSpeed): class ArduinoFanSpeed(BaseFanSpeed):
__slots__ = "_conn", "_tacho_pin" __slots__ = "_conn", "_tacho_pin"
@ -28,17 +17,6 @@ class ArduinoFanSpeed(BaseFanSpeed):
self._conn = arduino_connection self._conn = arduino_connection
self._tacho_pin = tacho_pin self._tacho_pin = tacho_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanSpeed:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
tacho_pin = ArduinoPin(section.getint("tacho_pin"))
return cls(arduino_connections[arduino_name], tacho_pin=tacho_pin)
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
return FanValue(self._conn.get_rpm(self._tacho_pin)) return FanValue(self._conn.get_rpm(self._tacho_pin))
@ -57,25 +35,11 @@ class ArduinoFanPWMRead(BaseFanPWMRead):
min_pwm = PWMValue(0) min_pwm = PWMValue(0)
def __init__( def __init__(
self, self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None: ) -> None:
self._conn = arduino_connection self._conn = arduino_connection
self._pwm_pin = pwm_pin self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMRead:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def get(self) -> PWMValue: def get(self) -> PWMValue:
return PWMValue(int(self._conn.get_pwm(self._pwm_pin))) return PWMValue(int(self._conn.get_pwm(self._pwm_pin)))
@ -93,25 +57,11 @@ class ArduinoFanPWMWrite(BaseFanPWMWrite):
read_cls = ArduinoFanPWMRead read_cls = ArduinoFanPWMRead
def __init__( def __init__(
self, self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None: ) -> None:
self._conn = arduino_connection self._conn = arduino_connection
self._pwm_pin = pwm_pin self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMWrite:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def _set_raw(self, pwm: PWMValue) -> None: def _set_raw(self, pwm: PWMValue) -> None:
self._conn.set_pwm(self._pwm_pin, pwm) self._conn.set_pwm(self._pwm_pin, pwm)

View File

@ -40,8 +40,8 @@ class BaseFanSpeed(abc.ABC, _SlotsReprMixin):
class BaseFanPWMRead(abc.ABC, _SlotsReprMixin): class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
max_pwm: PWMValue max_pwm = None # type: PWMValue
min_pwm: PWMValue min_pwm = None # type: PWMValue
def is_stopped(self) -> bool: def is_stopped(self) -> bool:
return type(self).is_pwm_stopped(self.get()) return type(self).is_pwm_stopped(self.get())
@ -62,7 +62,7 @@ class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
class BaseFanPWMWrite(abc.ABC, _SlotsReprMixin): class BaseFanPWMWrite(abc.ABC, _SlotsReprMixin):
read_cls: Type[BaseFanPWMRead] read_cls = None # type: Type[BaseFanPWMRead]
def set(self, pwm: PWMValue) -> None: def set(self, pwm: PWMValue) -> None:
if not (self.read_cls.min_pwm <= pwm <= self.read_cls.max_pwm): if not (self.read_cls.min_pwm <= pwm <= self.read_cls.max_pwm):

View File

@ -1,8 +1,7 @@
import csv import csv
import io import io
from afancontrol.configparser import ConfigParserSection from afancontrol.exec import exec_shell_command
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.pwmfan.base import BaseFanSpeed, FanValue from afancontrol.pwmfan.base import BaseFanSpeed, FanValue
# TODO maybe switch to `python3-pyghmi`? although it looks like the current version # TODO maybe switch to `python3-pyghmi`? although it looks like the current version
@ -19,16 +18,6 @@ class FreeIPMIFanSpeed(BaseFanSpeed):
self._ipmi_sensors_bin = ipmi_sensors_bin self._ipmi_sensors_bin = ipmi_sensors_bin
self._ipmi_sensors_extra_args = ipmi_sensors_extra_args self._ipmi_sensors_extra_args = ipmi_sensors_extra_args
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> BaseFanSpeed:
return cls(
section["name"],
ipmi_sensors_bin=programs.ipmi_sensors,
ipmi_sensors_extra_args=section.get("ipmi_sensors_extra_args", fallback=""),
)
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
out = self._call_ipmi_sensors() out = self._call_ipmi_sensors()
reader = csv.DictReader(io.StringIO(out)) reader = csv.DictReader(io.StringIO(out))

View File

@ -1,7 +1,6 @@
from pathlib import Path from pathlib import Path
from typing import NewType from typing import NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import ( from afancontrol.pwmfan.base import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
@ -20,10 +19,6 @@ class LinuxFanSpeed(BaseFanSpeed):
def __init__(self, fan_input: FanInputDevice) -> None: def __init__(self, fan_input: FanInputDevice) -> None:
self._fan_input = Path(fan_input) self._fan_input = Path(fan_input)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanSpeed:
return cls(FanInputDevice(section["fan_input"]))
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
return FanValue(int(self._fan_input.read_text())) return FanValue(int(self._fan_input.read_text()))
@ -37,10 +32,6 @@ class LinuxFanPWMRead(BaseFanPWMRead):
def __init__(self, pwm: PWMDevice) -> None: def __init__(self, pwm: PWMDevice) -> None:
self._pwm = Path(pwm) self._pwm = Path(pwm)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMRead:
return cls(PWMDevice(section["pwm"]))
def get(self) -> PWMValue: def get(self) -> PWMValue:
return PWMValue(int(self._pwm.read_text())) return PWMValue(int(self._pwm.read_text()))
@ -54,10 +45,6 @@ class LinuxFanPWMWrite(BaseFanPWMWrite):
self._pwm = Path(pwm) self._pwm = Path(pwm)
self._pwm_enable = Path(pwm + "_enable") self._pwm_enable = Path(pwm + "_enable")
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMWrite:
return cls(PWMDevice(section["pwm"]))
def _set_raw(self, pwm: PWMValue) -> None: def _set_raw(self, pwm: PWMValue) -> None:
self._pwm.write_text(str(int(pwm))) self._pwm.write_text(str(int(pwm)))

View File

@ -1,20 +1,13 @@
import math import math
from contextlib import ExitStack from contextlib import ExitStack
from typing import Mapping, NewType, Optional from typing import NewType, Optional
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan import ( from afancontrol.pwmfan import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
BaseFanSpeed, BaseFanSpeed,
FanName,
FanValue, FanValue,
PWMValue, PWMValue,
ReadOnlyFan,
ReadonlyFanName,
ReadWriteFan,
) )
PWMValueNorm = NewType("PWMValueNorm", float) # [0..1] PWMValueNorm = NewType("PWMValueNorm", float) # [0..1]
@ -26,19 +19,7 @@ class ReadonlyPWMFanNorm:
) -> None: ) -> None:
self.fan_speed = fan_speed self.fan_speed = fan_speed
self.pwm_read = pwm_read self.pwm_read = pwm_read
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadonlyPWMFanNorm":
readonly_fan = ReadOnlyFan.from_configparser(
section, arduino_connections, programs
)
return cls(readonly_fan.fan_speed, readonly_fan.pwm_read)
def __enter__(self): def __enter__(self):
self._stack = ExitStack() self._stack = ExitStack()
@ -117,48 +98,7 @@ class PWMFanNorm:
"Invalid pwm_line_end. Expected: pwm_line_end <= max_pwm. " "Invalid pwm_line_end. Expected: pwm_line_end <= max_pwm. "
"Got: %s <= %s" % (self.pwm_line_end, type(self.pwm_read).max_pwm) "Got: %s <= %s" % (self.pwm_line_end, type(self.pwm_read).max_pwm)
) )
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "PWMFanNorm":
readwrite_fan = ReadWriteFan.from_configparser(section, arduino_connections)
never_stop = section.getboolean("never_stop", fallback=True)
pwm_line_start = PWMValue(section.getint("pwm_line_start", fallback=100))
pwm_line_end = PWMValue(section.getint("pwm_line_end", fallback=240))
for pwm_value in (pwm_line_start, pwm_line_end):
if not (
readwrite_fan.pwm_read.min_pwm
<= pwm_value
<= readwrite_fan.pwm_read.max_pwm
):
raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]"
% (
pwm_value,
section.name,
readwrite_fan.pwm_read.min_pwm,
readwrite_fan.pwm_read.max_pwm,
)
)
if pwm_line_start >= pwm_line_end:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (section.name,)
)
return cls(
readwrite_fan.fan_speed,
readwrite_fan.pwm_read,
readwrite_fan.pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):

View File

@ -1,8 +1,3 @@
from typing import Mapping, NamedTuple, NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.filters import FilterName, NullFilter, TempFilter
from afancontrol.temp.base import Temp, TempCelsius, TempStatus from afancontrol.temp.base import Temp, TempCelsius, TempStatus
from afancontrol.temp.command import CommandTemp from afancontrol.temp.command import CommandTemp
from afancontrol.temp.file import FileTemp from afancontrol.temp.file import FileTemp
@ -16,40 +11,3 @@ __all__ = (
"TempCelsius", "TempCelsius",
"TempStatus", "TempStatus",
) )
TempName = NewType("TempName", str)
class FilteredTemp(NamedTuple):
temp: Temp
filter: TempFilter
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[TempName],
filters: Mapping[FilterName, TempFilter],
programs: Programs,
) -> "FilteredTemp":
type = section["type"]
if type == "file":
temp: Temp = FileTemp.from_configparser(section)
elif type == "hdd":
temp = HDDTemp.from_configparser(section, programs)
elif type == "exec":
temp = CommandTemp.from_configparser(section)
else:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, section.name)
)
filter_name = section.get("filter", fallback=None)
if filter_name is None:
filter: TempFilter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
return cls(temp=temp, filter=filter)

View File

@ -3,15 +3,18 @@ from typing import NamedTuple, NewType, Optional, Tuple
TempCelsius = NewType("TempCelsius", float) TempCelsius = NewType("TempCelsius", float)
TempStatus = NamedTuple(
class TempStatus(NamedTuple): "TempStatus",
temp: TempCelsius [
min: TempCelsius ("temp", TempCelsius),
max: TempCelsius ("min", TempCelsius),
panic: Optional[TempCelsius] ("max", TempCelsius),
threshold: Optional[TempCelsius] ("panic", Optional[TempCelsius]),
is_panic: bool ("threshold", Optional[TempCelsius]),
is_threshold: bool ("is_panic", bool),
("is_threshold", bool),
],
)
class Temp(abc.ABC): class Temp(abc.ABC):

View File

@ -1,6 +1,5 @@
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import exec_shell_command from afancontrol.exec import exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -20,16 +19,6 @@ class CommandTemp(Temp):
self._min = min self._min = min
self._max = max self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(
section["command"], min=min, max=max, panic=panic, threshold=threshold
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -3,7 +3,6 @@ import re
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -42,14 +41,6 @@ class FileTemp(Temp):
self._min = min self._min = min
self._max = max self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(section["path"], min=min, max=max, panic=panic, threshold=threshold)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -1,7 +1,6 @@
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection from afancontrol.exec import exec_shell_command
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -33,23 +32,6 @@ class HDDTemp(Temp):
self._max = max self._max = max
self._hddtemp_bin = hddtemp_bin self._hddtemp_bin = hddtemp_bin
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min"))
max = TempCelsius(section.getfloat("max"))
return cls(
section["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=programs.hddtemp,
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -7,10 +7,10 @@ from afancontrol.filters import TempFilter
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.temp import Temp, TempStatus from afancontrol.temp import Temp, TempStatus
ObservedTempStatus = NamedTuple(
class ObservedTempStatus(NamedTuple): "ObservedTempStatus",
raw: Optional[TempStatus] [("raw", Optional[TempStatus]), ("filtered", Optional[TempStatus])],
filtered: Optional[TempStatus] )
def filtered_temps( def filtered_temps(
@ -25,8 +25,8 @@ def filtered_temps(
class Temps: class Temps:
def __init__(self, temps: Mapping[TempName, FilteredTemp]) -> None: def __init__(self, temps: Mapping[TempName, FilteredTemp]) -> None:
self.temps = temps self.temps = temps
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
self._executor: Optional[concurrent.futures.Executor] = None self._executor = None # type: Optional[concurrent.futures.Executor]
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()
@ -64,7 +64,7 @@ def _get_temp_status(
name: TempName, temp: Temp, filter: TempFilter name: TempName, temp: Temp, filter: TempFilter
) -> ObservedTempStatus: ) -> ObservedTempStatus:
try: try:
sensor_value: Optional[TempStatus] = temp.get() sensor_value = temp.get() # type: Optional[TempStatus]
except Exception as e: except Exception as e:
sensor_value = None sensor_value = None
logger.warning("Temp sensor [%s] has failed: %s", name, e, exc_info=True) logger.warning("Temp sensor [%s] has failed: %s", name, e, exc_info=True)

View File

@ -20,7 +20,7 @@ class Trigger(abc.ABC):
self.global_commands = global_commands self.global_commands = global_commands
self.temp_commands = temp_commands self.temp_commands = temp_commands
self.report = report self.report = report
self._alerting_temps: Set[TempName] = set() self._alerting_temps = set() # type: Set[TempName]
@property @property
@abc.abstractmethod @abc.abstractmethod
@ -184,7 +184,7 @@ class Triggers:
}, },
report=report, report=report,
) )
self._stack: Optional[ExitStack] = None self._stack = None # type: Optional[ExitStack]
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()

View File

@ -430,69 +430,3 @@ fan_input = /sys/class/hwmon/hwmon0/device/fan1_input
}, },
mappings={}, mappings={},
) )
def test_multiline_mapping():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp:cpu]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
[temp:mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp2_input
[fan: case]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[fan: hdd]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[mapping:1]
fans =
case*0.6,
hdd,
temps =
mobo,
cpu
"""
parsed = parse_config(path_from_str(config), daemon_cli_config)
assert parsed.mappings == {
MappingName("1"): FansTempsRelation(
temps=[TempName("mobo"), TempName("cpu")],
fans=[
FanSpeedModifier(fan=FanName("case"), modifier=0.6),
FanSpeedModifier(fan=FanName("hdd"), modifier=1.0),
],
)
}
def test_extraneous_keys_raises():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp: mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
aa = 55
"""
with pytest.raises(RuntimeError) as cm:
parse_config(path_from_str(config), daemon_cli_config)
assert str(cm.value) == "Unknown options in the [temp: mobo] section: {'aa'}"

View File

@ -8,6 +8,7 @@ from click.testing import CliRunner
from afancontrol import fantest from afancontrol import fantest
from afancontrol.fantest import ( from afancontrol.fantest import (
CSVMeasurementsOutput, CSVMeasurementsOutput,
Fan,
HumanMeasurementsOutput, HumanMeasurementsOutput,
MeasurementsOutput, MeasurementsOutput,
fantest as main, fantest as main,
@ -23,7 +24,6 @@ from afancontrol.pwmfan import (
LinuxFanSpeed, LinuxFanSpeed,
PWMDevice, PWMDevice,
PWMValue, PWMValue,
ReadWriteFan,
) )
@ -65,7 +65,7 @@ def test_main_smoke(temp_path):
args, kwargs = mocked_fantest.call_args args, kwargs = mocked_fantest.call_args
assert not args assert not args
assert kwargs.keys() == {"fan", "pwm_step_size", "output"} assert kwargs.keys() == {"fan", "pwm_step_size", "output"}
assert kwargs["fan"] == ReadWriteFan( assert kwargs["fan"] == Fan(
fan_speed=LinuxFanSpeed(FanInputDevice(str(fan_input_path))), fan_speed=LinuxFanSpeed(FanInputDevice(str(fan_input_path))),
pwm_read=LinuxFanPWMRead(PWMDevice(str(pwm_path))), pwm_read=LinuxFanPWMRead(PWMDevice(str(pwm_path))),
pwm_write=LinuxFanPWMWrite(PWMDevice(str(pwm_path))), pwm_write=LinuxFanPWMWrite(PWMDevice(str(pwm_path))),
@ -77,11 +77,11 @@ def test_main_smoke(temp_path):
@pytest.mark.parametrize("pwm_step_size", [5, -5]) @pytest.mark.parametrize("pwm_step_size", [5, -5])
@pytest.mark.parametrize("output_cls", [HumanMeasurementsOutput, CSVMeasurementsOutput]) @pytest.mark.parametrize("output_cls", [HumanMeasurementsOutput, CSVMeasurementsOutput])
def test_fantest(output_cls: Type[MeasurementsOutput], pwm_step_size: PWMValue): def test_fantest(output_cls: Type[MeasurementsOutput], pwm_step_size: PWMValue):
fan: Any = ReadWriteFan( fan = Fan(
fan_speed=MagicMock(spec=BaseFanSpeed), fan_speed=MagicMock(spec=BaseFanSpeed),
pwm_read=MagicMock(spec=BaseFanPWMRead), pwm_read=MagicMock(spec=BaseFanPWMRead),
pwm_write=MagicMock(spec=BaseFanPWMWrite), pwm_write=MagicMock(spec=BaseFanPWMWrite),
) ) # type: Any
fan.pwm_read.min_pwm = 0 fan.pwm_read.min_pwm = 0
fan.pwm_read.max_pwm = 255 fan.pwm_read.max_pwm = 255
output = output_cls() output = output_cls()

View File

@ -1,5 +1,4 @@
from contextlib import ExitStack from contextlib import ExitStack
from typing import cast
from unittest.mock import MagicMock, patch, sentinel from unittest.mock import MagicMock, patch, sentinel
import pytest import pytest
@ -69,7 +68,7 @@ def test_manager(report):
manager.tick() manager.tick()
mocked_triggers = cast(MagicMock, manager.triggers) mocked_triggers = manager.triggers # type: MagicMock
assert mocked_triggers.check.call_count == 1 assert mocked_triggers.check.call_count == 1
assert mocked_case_fan.__enter__.call_count == 1 assert mocked_case_fan.__enter__.call_count == 1
assert mocked_metrics.__enter__.call_count == 1 assert mocked_metrics.__enter__.call_count == 1

View File

@ -1,7 +1,11 @@
[tox] [tox]
envlist=py{36,37,38,39,310}{,-arduino,-metrics},lint,check-docs envlist=py{35,36,37,38,39}{,-arduino,-metrics},lint,check-docs
[testenv] [testenv]
deps =
; Python 3.5 still ships an old setuptools version which doesn't support
; declarative setup.cfg format.
setuptools>=41.4.0
extras = extras =
arduino: arduino arduino: arduino
dev dev