Compare commits

..

No commits in common. "fdfbc23d0f3464682cdd5f06c00b164ca8f69cc4" and "3684dca243e2724ee4f97cf1821ad10c7379afb8" have entirely different histories.

39 changed files with 872 additions and 717 deletions

View File

@ -2,10 +2,10 @@ language: python
dist: xenial dist: xenial
python: python:
- "3.5"
- "3.6" - "3.6"
- "3.7" - "3.7"
- "3.8" - "3.8"
# TODO add 3.9
- "3.9-dev" - "3.9-dev"
install: pip install tox-travis tox tox-venv install: pip install tox-travis tox tox-venv

43
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,43 @@
# Contributing to afancontrol
I started afancontrol in 2013 in an attempt to make my custom PC case quiet.
It's been working 24/7 ever since with no issues, and eventually I started using
it on my other machines as well.
I'm quite happy with how this package serves my needs, and I hope
it can be useful for someone else too.
Contributions are welcome, however, keep in mind, that:
* Complex features and large diffs would probably be rejected,
because it would make maintenance more complicated for me,
* I don't have any plans for active development and promotion
of the package.
## Dev workflow
Prepare a virtualenv:
mkvirtualenv afancontrol
make develop
I use [TDD](https://en.wikipedia.org/wiki/Test-driven_development) for development.
Run tests:
make test
Autoformat the code and imports:
make format
Run linters:
make lint
So essentially after writing a small part of code and tests I call these
three commands and fix the errors until they stop failing.
To build docs:
make docs

View File

@ -14,14 +14,14 @@ RUN apt-get update \
RUN mkdir ~/.gnupg && echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf RUN mkdir ~/.gnupg && echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf
# Import the GPG key used to sign the PyPI releases of `afancontrol`: # Import the GPG key used to sign the PyPI releases of `afancontrol`:
RUN gpg --recv-keys "2D3B9C1712FF84F7" RUN gpg --recv-keys "AA7B5406547AF062"
COPY debian /build/afancontrol/debian COPY debian /build/afancontrol/debian
WORKDIR /build/afancontrol/ WORKDIR /build/afancontrol/
RUN mkdir -p debian/upstream \ RUN mkdir -p debian/upstream \
&& gpg --export --export-options export-minimal --armor \ && gpg --export --export-options export-minimal --armor \
'BE3D633AB6792715ECF34D742D3B9C1712FF84F7' \ 'A18FE9F6F570D5B4E1E1853FAA7B5406547AF062' \
> debian/upstream/signing-key.asc > debian/upstream/signing-key.asc
RUN apt-get -y build-dep . RUN apt-get -y build-dep .

View File

@ -1,11 +1,11 @@
.PHONY: format .PHONY: format
format: format:
black src tests *.py && isort -rc src tests *.py black src tests *.py && isort src tests *.py
.PHONY: lint .PHONY: lint
lint: lint:
flake8 src tests *.py && isort --check-only -rc src tests *.py && black --check src tests *.py && mypy src tests flake8 src tests *.py && isort --check-only src tests *.py && black --check src tests *.py && mypy src tests
.PHONY: test .PHONY: test
test: test:

View File

@ -17,7 +17,7 @@ afancontrol
`fancontrol <https://github.com/lm-sensors/lm-sensors/blob/master/prog/pwm/fancontrol>`_ `fancontrol <https://github.com/lm-sensors/lm-sensors/blob/master/prog/pwm/fancontrol>`_
with more advanced configuration abilities. with more advanced configuration abilities.
`afancontrol` measures temperature from the sensors, computes the required `afancontrol` measures temperatures from sensors, computes the required
airflow and sets the PWM fan speeds accordingly. airflow and sets PWM fan speeds accordingly.
The docs are available at `<https://afancontrol.readthedocs.io/>`_. The docs are available at `<https://afancontrol.readthedocs.io/>`_.

11
debian/changelog vendored
View File

@ -1,3 +1,14 @@
afancontrol (3.0.0-1) unstable; urgency=medium
* Drop support for prometheus-client < 0.1.0 (debian stretch)
* Drop support for Python 3.5 (debian stretch)
* Add support for Python 3.9
* config: add `ipmi_sensors` location property
* Add dh-systemd (would automatically (re)start the systemd service upon
package (re)installation)
-- Kostya Esmukov <kostya@esmukov.ru> Sat, 10 Oct 2020 14:43:01 +0000
afancontrol (2.2.1-1) unstable; urgency=medium afancontrol (2.2.1-1) unstable; urgency=medium
* Fix compatibility with py3.5 * Fix compatibility with py3.5

5
debian/control vendored
View File

@ -4,12 +4,13 @@ Priority: optional
Maintainer: Kostya Esmukov <kostya@esmukov.ru> Maintainer: Kostya Esmukov <kostya@esmukov.ru>
Build-Depends: debhelper (>= 9), Build-Depends: debhelper (>= 9),
dh-python, dh-python,
debhelper (>= 9.20160709) | dh-systemd,
python3-all, python3-all,
python3-setuptools python3-setuptools
Build-Depends-Indep: python3-pytest, Build-Depends-Indep: python3-pytest,
python3-requests, python3-requests,
python3-click, python3-click,
python3-prometheus-client, python3-prometheus-client (>= 0.1.0),
python3-serial python3-serial
Standards-Version: 3.9.8 Standards-Version: 3.9.8
Homepage: https://github.com/KostyaEsmukov/afancontrol Homepage: https://github.com/KostyaEsmukov/afancontrol
@ -27,7 +28,7 @@ Depends: ${python3:Depends},
lm-sensors, lm-sensors,
python3-click, python3-click,
python3-pkg-resources, python3-pkg-resources,
python3-prometheus-client, python3-prometheus-client (>= 0.1.0),
python3-serial python3-serial
Suggests: freeipmi-tools, Suggests: freeipmi-tools,
Description: Advanced Fan Control program (Python 3) Description: Advanced Fan Control program (Python 3)

2
debian/rules vendored
View File

@ -9,4 +9,4 @@ export PYBUILD_TEST_PYTEST=1
export PYBUILD_TEST_ARGS={dir}/tests/ export PYBUILD_TEST_ARGS={dir}/tests/
%: %:
dh $@ --with python3 --buildsystem=pybuild dh $@ --with systemd,python3 --buildsystem=pybuild

View File

@ -220,8 +220,7 @@ There's a Dockerfile which can be used to build a Debian `.deb` package:
make deb-from-pypi make deb-from-pypi
# Install the package: # Install the package:
sudo dpkg -i dist/debian/*.deb sudo apt install ./dist/debian/*.deb
sudo apt install -f
Perhaps one day the package might get published to the Debian repos, Perhaps one day the package might get published to the Debian repos,
so a simple ``apt install afancontrol`` would work. But for now, given so a simple ``apt install afancontrol`` would work. But for now, given

View File

@ -10,10 +10,15 @@ logfile = /var/log/afancontrol.log
# Default: 5 # Default: 5
interval = 5 interval = 5
# Hddtemp location. Relevant only when there're `type = hdd` temperature sensors. # Hddtemp location. Used by the `type = hdd` temperature sensors.
# Default: hddtemp # Default: hddtemp
;hddtemp = /usr/local/bin/hddtemp ;hddtemp = /usr/local/bin/hddtemp
# `ipmi-sensors` location from the `freeipmi-tools` package.
# Used by the `type = freeipmi` fans.
# Default: ipmi-sensors
;ipmi_sensors = /usr/local/bin/ipmi-sensors
# Prometheus exporter listening hostname and TCP port. # Prometheus exporter listening hostname and TCP port.
# Default: (empty value) # Default: (empty value)
;exporter_listen_host = 127.0.0.1:8083 ;exporter_listen_host = 127.0.0.1:8083

View File

@ -32,13 +32,12 @@ include_trailing_comma = True
force_grid_wrap = 0 force_grid_wrap = 0
combine_as_imports = True combine_as_imports = True
line_length = 88 line_length = 88
not_skip = __init__.py
[metadata] [metadata]
author = Kostya Esmukov author = Kostya Esmukov
author_email = kostya@esmukov.ru author_email = kostya@esmukov.ru
classifier = classifier =
Development Status :: 4 - Beta Development Status :: 5 - Production/Stable
Intended Audience :: System Administrators Intended Audience :: System Administrators
License :: OSI Approved :: MIT License License :: OSI Approved :: MIT License
Natural Language :: English Natural Language :: English
@ -46,10 +45,10 @@ classifier =
Programming Language :: Python Programming Language :: Python
Programming Language :: Python :: 3 Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: System :: Hardware Topic :: System :: Hardware
Topic :: System :: Monitoring Topic :: System :: Monitoring
Topic :: System :: Systems Administration Topic :: System :: Systems Administration
@ -77,7 +76,7 @@ install_requires =
package_dir = package_dir =
= src = src
packages = find: packages = find:
python_requires = >=3.5 python_requires = >=3.6
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
@ -87,16 +86,16 @@ console_scripts =
arduino = arduino =
pyserial>=3.0 pyserial>=3.0
metrics = metrics =
prometheus-client prometheus-client>=0.1.0
dev = dev =
black==19.10b0; python_version>='3.6' black==20.8b1
coverage==5.1 coverage==5.3
flake8==3.7.9 flake8==3.8.4
isort==4.3.21 isort==5.5.4
mypy==0.770 mypy==0.782
pytest==5.4.2 pytest==6.1.0
requests requests
sphinx==3.0.3 sphinx==3.2.1
wheel wheel
[options.packages.find] [options.packages.find]

View File

@ -1 +1 @@
__version__ = "2.2.1" __version__ = "3.0.0"

View File

@ -5,6 +5,7 @@ import threading
from timeit import default_timer from timeit import default_timer
from typing import TYPE_CHECKING, Any, Dict, NewType, Optional from typing import TYPE_CHECKING, Any, Dict, NewType, Optional
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger from afancontrol.logger import logger
if TYPE_CHECKING: if TYPE_CHECKING:
@ -50,11 +51,22 @@ class ArduinoConnection:
lambda: _StatusProtocol(self), url=serial_url, baudrate=baudrate lambda: _StatusProtocol(self), url=serial_url, baudrate=baudrate
) )
self._context_manager_depth = 0 self._context_manager_depth = 0
self._status = None # type: Optional[Dict[str, Dict[str, int]]] self._status: Optional[Dict[str, Dict[str, int]]] = None
self._status_clock = None # type: Optional[float] self._status_clock: Optional[float] = None
self._status_lock = threading.Lock() self._status_lock = threading.Lock()
self._status_event = threading.Event() self._status_event = threading.Event()
@classmethod
def from_configparser(
cls, section: ConfigParserSection[ArduinoName]
) -> "ArduinoConnection":
return cls(
name=section.name,
serial_url=section["serial_url"],
baudrate=section.getint("baudrate", fallback=DEFAULT_BAUDRATE),
status_ttl=section.getint("status_ttl", fallback=DEFAULT_STATUS_TTL),
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (
@ -218,10 +230,10 @@ class _AutoRetriedReaderThread:
def __init__(self, protocol_factory, **serial_for_url_kwargs) -> None: def __init__(self, protocol_factory, **serial_for_url_kwargs) -> None:
self.protocol_factory = protocol_factory self.protocol_factory = protocol_factory
self.serial_for_url_kwargs = serial_for_url_kwargs self.serial_for_url_kwargs = serial_for_url_kwargs
self._reader_thread = None # type: Optional[ReaderThread] self._reader_thread: Optional[ReaderThread] = None
self._transport = None # type: Optional[ReaderThread] self._transport: Optional[ReaderThread] = None
self._watchdog_thread = None # type: Optional[threading.Thread] self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_queue = queue.Queue() # type: queue.Queue[Any] self._watchdog_queue: queue.Queue[Any] = queue.Queue()
def __enter__(self): # reusable def __enter__(self): # reusable
# TODO ?? maybe clean the _watchdog_queue? # TODO ?? maybe clean the _watchdog_queue?

View File

@ -9,171 +9,120 @@ from typing import (
Sequence, Sequence,
Tuple, Tuple,
TypeVar, TypeVar,
Union,
) )
from afancontrol.arduino import ( import afancontrol.filters
DEFAULT_BAUDRATE, from afancontrol.arduino import ArduinoConnection, ArduinoName
DEFAULT_STATUS_TTL, from afancontrol.configparser import ConfigParserSection, iter_sections
ArduinoConnection, from afancontrol.exec import Programs
ArduinoName, from afancontrol.filters import FilterName, TempFilter
ArduinoPin,
)
from afancontrol.filters import (
MovingMedianFilter,
MovingQuantileFilter,
NullFilter,
TempFilter,
)
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import ( from afancontrol.pwmfan import FanName, ReadonlyFanName
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice,
FreeIPMIFanSpeed,
LinuxFanPWMRead,
LinuxFanPWMWrite,
LinuxFanSpeed,
PWMDevice,
PWMValue,
)
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temp import CommandTemp, FileTemp, HDDTemp, Temp, TempCelsius from afancontrol.temp import FilteredTemp, TempName
DEFAULT_CONFIG = "/etc/afancontrol/afancontrol.conf" DEFAULT_CONFIG = "/etc/afancontrol/afancontrol.conf"
DEFAULT_PIDFILE = "/run/afancontrol.pid" DEFAULT_PIDFILE = "/run/afancontrol.pid"
DEFAULT_INTERVAL = 5
DEFAULT_FANS_SPEED_CHECK_INTERVAL = 3
DEFAULT_HDDTEMP = "hddtemp"
DEFAULT_REPORT_CMD = ( DEFAULT_REPORT_CMD = (
'printf "Subject: %s\nTo: %s\n\n%b"' 'printf "Subject: %s\nTo: %s\n\n%b"'
' "afancontrol daemon report: %REASON%" root "%MESSAGE%"' ' "afancontrol daemon report: %REASON%" root "%MESSAGE%"'
" | sendmail -t" " | sendmail -t"
) )
DEFAULT_FAN_TYPE = "linux"
DEFAULT_PWM_LINE_START = 100
DEFAULT_PWM_LINE_END = 240
DEFAULT_NEVER_STOP = True
DEFAULT_WINDOW_SIZE = 3
FilterName = NewType("FilterName", str)
TempName = NewType("TempName", str)
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
MappingName = NewType("MappingName", str) MappingName = NewType("MappingName", str)
T = TypeVar("T") T = TypeVar("T")
FanSpeedModifier = NamedTuple( class FanSpeedModifier(NamedTuple):
"FanSpeedModifier", fan: FanName
# fmt: off modifier: float # [0..1]
[
("fan", FanName),
("modifier", float), # [0..1]
]
# fmt: on
)
FansTempsRelation = NamedTuple(
"FansTempsRelation",
# fmt: off
[
("temps", Sequence[TempName]),
("fans", Sequence[FanSpeedModifier]),
]
# fmt: on
)
AlertCommands = NamedTuple(
"AlertCommands",
# fmt: off
[
("enter_cmd", Optional[str]),
("leave_cmd", Optional[str]),
]
# fmt: on
)
Actions = NamedTuple( class FansTempsRelation(NamedTuple):
"Actions", temps: Sequence[TempName]
# fmt: off fans: Sequence[FanSpeedModifier]
[
("panic", AlertCommands),
("threshold", AlertCommands),
]
# fmt: on
)
TriggerConfig = NamedTuple( class AlertCommands(NamedTuple):
"TriggerConfig", enter_cmd: Optional[str]
# fmt: off leave_cmd: Optional[str]
[
("global_commands", Actions),
("temp_commands", Mapping[TempName, Actions]),
]
# fmt: on
)
DaemonCLIConfig = NamedTuple( class Actions(NamedTuple):
"DaemonCLIConfig", panic: AlertCommands
# fmt: off threshold: AlertCommands
[
("pidfile", Optional[str]),
("logfile", Optional[str]),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
DaemonConfig = NamedTuple( @classmethod
"DaemonConfig", def from_configparser(cls, section: ConfigParserSection) -> "Actions":
# fmt: off panic = AlertCommands(
[ enter_cmd=section.get("panic_enter_cmd", fallback=None),
("pidfile", Optional[str]), leave_cmd=section.get("panic_leave_cmd", fallback=None),
("logfile", Optional[str]), )
("interval", int),
("exporter_listen_host", Optional[str]),
]
# fmt: on
)
FilteredTemp = NamedTuple( threshold = AlertCommands(
"FilteredTemp", enter_cmd=section.get("threshold_enter_cmd", fallback=None),
# fmt: off leave_cmd=section.get("threshold_leave_cmd", fallback=None),
[ )
("temp", Temp),
("filter", TempFilter),
]
# fmt: on
)
ParsedConfig = NamedTuple( return cls(panic=panic, threshold=threshold)
"ParsedConfig",
# fmt: off
[ class TriggerConfig(NamedTuple):
("daemon", DaemonConfig), global_commands: Actions
("report_cmd", str), temp_commands: Mapping[TempName, Actions]
("triggers", TriggerConfig),
("arduino_connections", Mapping[ArduinoName, ArduinoConnection]),
("fans", Mapping[FanName, PWMFanNorm]), class DaemonCLIConfig(NamedTuple):
("readonly_fans", Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]), pidfile: Optional[str]
("temps", Mapping[TempName, FilteredTemp]), logfile: Optional[str]
("mappings", Mapping[MappingName, FansTempsRelation]), exporter_listen_host: Optional[str]
]
# fmt: on
) class DaemonConfig(NamedTuple):
pidfile: Optional[str]
logfile: Optional[str]
interval: int
exporter_listen_host: Optional[str]
@classmethod
def from_configparser(
cls, section: ConfigParserSection, daemon_cli_config: DaemonCLIConfig
) -> "DaemonConfig":
pidfile = first_not_none(
daemon_cli_config.pidfile, section.get("pidfile", fallback=DEFAULT_PIDFILE)
)
if pidfile is not None and not pidfile.strip():
pidfile = None
logfile = first_not_none(
daemon_cli_config.logfile, section.get("logfile", fallback=None)
)
interval = section.getint("interval", fallback=5)
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host,
section.get("exporter_listen_host", fallback=None),
)
return cls(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
)
class ParsedConfig(NamedTuple):
daemon: DaemonConfig
report_cmd: str
triggers: TriggerConfig
arduino_connections: Mapping[ArduinoName, ArduinoConnection]
fans: Mapping[FanName, PWMFanNorm]
readonly_fans: Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]
temps: Mapping[TempName, FilteredTemp]
mappings: Mapping[MappingName, FansTempsRelation]
def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> ParsedConfig: def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> ParsedConfig:
@ -183,13 +132,13 @@ def parse_config(config_path: Path, daemon_cli_config: DaemonCLIConfig) -> Parse
except Exception as e: except Exception as e:
raise RuntimeError("Unable to parse %s:\n%s" % (config_path, e)) raise RuntimeError("Unable to parse %s:\n%s" % (config_path, e))
daemon, hddtemp = _parse_daemon(config, daemon_cli_config) daemon, programs = _parse_daemon(config, daemon_cli_config)
report_cmd, global_commands = _parse_actions(config) report_cmd, global_commands = _parse_actions(config)
arduino_connections = _parse_arduino_connections(config) arduino_connections = _parse_arduino_connections(config)
filters = _parse_filters(config) filters = _parse_filters(config)
temps, temp_commands = _parse_temps(config, hddtemp, filters) temps, temp_commands = _parse_temps(config, programs, filters)
fans = _parse_fans(config, arduino_connections) fans = _parse_fans(config, arduino_connections)
readonly_fans = _parse_readonly_fans(config, arduino_connections) readonly_fans = _parse_readonly_fans(config, arduino_connections, programs)
_check_fans_namespace(fans, readonly_fans) _check_fans_namespace(fans, readonly_fans)
mappings = _parse_mappings(config, fans, temps) mappings = _parse_mappings(config, fans, temps)
@ -216,110 +165,35 @@ def first_not_none(*parts: Optional[T]) -> Optional[T]:
def _parse_daemon( def _parse_daemon(
config: configparser.ConfigParser, daemon_cli_config: DaemonCLIConfig config: configparser.ConfigParser, daemon_cli_config: DaemonCLIConfig
) -> Tuple[DaemonConfig, str]: ) -> Tuple[DaemonConfig, Programs]:
daemon = config["daemon"] section: ConfigParserSection[str] = ConfigParserSection(config["daemon"])
keys = set(daemon.keys()) daemon_config = DaemonConfig.from_configparser(section, daemon_cli_config)
programs = Programs.from_configparser(section)
section.ensure_no_unused_keys()
pidfile = first_not_none( return daemon_config, programs
daemon_cli_config.pidfile, daemon.get("pidfile"), DEFAULT_PIDFILE
)
if pidfile is not None and not pidfile.strip():
pidfile = None
keys.discard("pidfile")
logfile = first_not_none(daemon_cli_config.logfile, daemon.get("logfile"))
keys.discard("logfile")
interval = daemon.getint("interval", fallback=DEFAULT_INTERVAL)
keys.discard("interval")
exporter_listen_host = first_not_none(
daemon_cli_config.exporter_listen_host, daemon.get("exporter_listen_host")
)
keys.discard("exporter_listen_host")
hddtemp = daemon.get("hddtemp") or DEFAULT_HDDTEMP
keys.discard("hddtemp")
if keys:
raise RuntimeError("Unknown options in the [daemon] section: %s" % (keys,))
return (
DaemonConfig(
pidfile=pidfile,
logfile=logfile,
interval=interval,
exporter_listen_host=exporter_listen_host,
),
hddtemp,
)
def _parse_actions(config: configparser.ConfigParser) -> Tuple[str, Actions]: def _parse_actions(config: configparser.ConfigParser) -> Tuple[str, Actions]:
actions = config["actions"] section: ConfigParserSection[str] = ConfigParserSection(config["actions"])
keys = set(actions.keys()) report_cmd = section.get("report_cmd", fallback=DEFAULT_REPORT_CMD)
actions = Actions.from_configparser(section)
section.ensure_no_unused_keys()
report_cmd = first_not_none(actions.get("report_cmd"), DEFAULT_REPORT_CMD) return report_cmd, actions
assert report_cmd is not None
keys.discard("report_cmd")
panic = AlertCommands(
enter_cmd=first_not_none(actions.get("panic_enter_cmd")),
leave_cmd=first_not_none(actions.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
threshold = AlertCommands(
enter_cmd=first_not_none(actions.get("threshold_enter_cmd")),
leave_cmd=first_not_none(actions.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
if keys:
raise RuntimeError("Unknown options in the [actions] section: %s" % (keys,))
return report_cmd, Actions(panic=panic, threshold=threshold)
def _parse_arduino_connections( def _parse_arduino_connections(
config: configparser.ConfigParser, config: configparser.ConfigParser,
) -> Mapping[ArduinoName, ArduinoConnection]: ) -> Mapping[ArduinoName, ArduinoConnection]:
arduino_connections = {} # type: Dict[ArduinoName, ArduinoConnection] arduino_connections: Dict[ArduinoName, ArduinoConnection] = {}
for section_name in config.sections(): for section in iter_sections(config, "arduino", ArduinoName):
section_name_parts = section_name.split(":", 1) if section.name in arduino_connections:
if section_name_parts[0].strip().lower() != "arduino":
continue
arduino_name = ArduinoName(section_name_parts[1].strip())
arduino = config[section_name]
keys = set(arduino.keys())
serial_url = arduino["serial_url"]
keys.discard("serial_url")
baudrate = arduino.getint("baudrate", fallback=DEFAULT_BAUDRATE)
keys.discard("baudrate")
status_ttl = arduino.getint("status_ttl", fallback=DEFAULT_STATUS_TTL)
keys.discard("status_ttl")
if keys:
raise RuntimeError( raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys) "Duplicate arduino section declaration for '%s'" % section.name
)
if arduino_name in arduino_connections:
raise RuntimeError(
"Duplicate arduino section declaration for '%s'" % arduino_name
)
arduino_connections[arduino_name] = ArduinoConnection(
name=arduino_name,
serial_url=serial_url,
baudrate=baudrate,
status_ttl=status_ttl,
) )
arduino_connections[section.name] = ArduinoConnection.from_configparser(section)
section.ensure_no_unused_keys()
# Empty arduino_connections is ok # Empty arduino_connections is ok
return arduino_connections return arduino_connections
@ -328,49 +202,14 @@ def _parse_arduino_connections(
def _parse_filters( def _parse_filters(
config: configparser.ConfigParser, config: configparser.ConfigParser,
) -> Mapping[FilterName, TempFilter]: ) -> Mapping[FilterName, TempFilter]:
filters = {} # type: Dict[FilterName, TempFilter] filters: Dict[FilterName, TempFilter] = {}
for section_name in config.sections(): for section in iter_sections(config, "filter", FilterName):
section_name_parts = section_name.split(":", 1) if section.name in filters:
if section_name_parts[0].strip().lower() != "filter":
continue
filter_name = FilterName(section_name_parts[1].strip())
filter = config[section_name]
keys = set(filter.keys())
filter_type = filter["type"]
keys.discard("type")
if filter_type == "moving_median":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
f = MovingMedianFilter(window_size=window_size) # type: TempFilter
elif filter_type == "moving_quantile":
window_size = filter.getint("window_size", fallback=DEFAULT_WINDOW_SIZE)
keys.discard("window_size")
quantile = filter.getfloat("quantile")
keys.discard("quantile")
f = MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
raise RuntimeError( raise RuntimeError(
"Unsupported filter type '%s' for filter '%s'. " "Duplicate filter section declaration for '%s'" % section.name
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, filter_name)
) )
filters[section.name] = afancontrol.filters.from_configparser(section)
if keys: section.ensure_no_unused_keys()
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if filter_name in filters:
raise RuntimeError(
"Duplicate filter section declaration for '%s'" % filter_name
)
filters[filter_name] = f
# Empty filters is ok # Empty filters is ok
return filters return filters
@ -378,98 +217,19 @@ def _parse_filters(
def _parse_temps( def _parse_temps(
config: configparser.ConfigParser, config: configparser.ConfigParser,
hddtemp: str, programs: Programs,
filters: Mapping[FilterName, TempFilter], filters: Mapping[FilterName, TempFilter],
) -> Tuple[Mapping[TempName, FilteredTemp], Mapping[TempName, Actions]]: ) -> Tuple[Mapping[TempName, FilteredTemp], Mapping[TempName, Actions]]:
temps = {} # type: Dict[TempName, FilteredTemp] temps: Dict[TempName, FilteredTemp] = {}
temp_commands = {} # type: Dict[TempName, Actions] temp_commands: Dict[TempName, Actions] = {}
for section_name in config.sections(): for section in iter_sections(config, "temp", TempName):
section_name_parts = section_name.split(":", 1) if section.name in temps:
if section_name_parts[0].strip().lower() != "temp":
continue
temp_name = TempName(section_name_parts[1].strip())
temp = config[section_name]
keys = set(temp.keys())
actions_panic = AlertCommands(
enter_cmd=first_not_none(temp.get("panic_enter_cmd")),
leave_cmd=first_not_none(temp.get("panic_leave_cmd")),
)
keys.discard("panic_enter_cmd")
keys.discard("panic_leave_cmd")
actions_threshold = AlertCommands(
enter_cmd=first_not_none(temp.get("threshold_enter_cmd")),
leave_cmd=first_not_none(temp.get("threshold_leave_cmd")),
)
keys.discard("threshold_enter_cmd")
keys.discard("threshold_leave_cmd")
panic = TempCelsius(temp.getfloat("panic"))
threshold = TempCelsius(temp.getfloat("threshold"))
min = TempCelsius(temp.getfloat("min"))
max = TempCelsius(temp.getfloat("max"))
keys.discard("panic")
keys.discard("threshold")
keys.discard("min")
keys.discard("max")
type = temp["type"]
keys.discard("type")
if type == "file":
t = FileTemp(
temp["path"], min=min, max=max, panic=panic, threshold=threshold
) # type: Temp
keys.discard("path")
elif type == "hdd":
if min is None or max is None:
raise RuntimeError( raise RuntimeError(
"hdd temp '%s' doesn't define the mandatory `min` and `max` temps" "Duplicate temp section declaration for '%s'" % section.name
% temp_name
)
t = HDDTemp(
temp["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=hddtemp,
)
keys.discard("path")
elif type == "exec":
t = CommandTemp(
temp["command"], min=min, max=max, panic=panic, threshold=threshold
)
keys.discard("command")
else:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, temp_name)
)
filter_name = temp.get("filter")
keys.discard("filter")
if filter_name is None:
filter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if temp_name in temps:
raise RuntimeError(
"Duplicate temp section declaration for '%s'" % temp_name
)
temps[temp_name] = FilteredTemp(temp=t, filter=filter)
temp_commands[temp_name] = Actions(
panic=actions_panic, threshold=actions_threshold
) )
temps[section.name] = FilteredTemp.from_configparser(section, filters, programs)
temp_commands[section.name] = Actions.from_configparser(section)
section.ensure_no_unused_keys()
return temps, temp_commands return temps, temp_commands
@ -478,95 +238,14 @@ def _parse_fans(
config: configparser.ConfigParser, config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection], arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> Mapping[FanName, PWMFanNorm]: ) -> Mapping[FanName, PWMFanNorm]:
fans = {} # type: Dict[FanName, PWMFanNorm] fans: Dict[FanName, PWMFanNorm] = {}
for section_name in config.sections(): for section in iter_sections(config, "fan", FanName):
section_name_parts = section_name.split(":", 1) if section.name in fans:
if section_name_parts[0].strip().lower() != "fan":
continue
fan_name = FanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
pwm = PWMDevice(fan["pwm"])
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("pwm")
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = LinuxFanPWMRead(pwm) # type: BaseFanPWMRead
pwm_write = LinuxFanPWMWrite(pwm) # type: BaseFanPWMWrite
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
keys.discard("pwm_pin")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
)
pwm_read = ArduinoFanPWMRead(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
pwm_write = ArduinoFanPWMWrite(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)
never_stop = fan.getboolean("never_stop", fallback=DEFAULT_NEVER_STOP)
keys.discard("never_stop")
pwm_line_start = PWMValue(
fan.getint("pwm_line_start", fallback=DEFAULT_PWM_LINE_START)
)
keys.discard("pwm_line_start")
pwm_line_end = PWMValue(
fan.getint("pwm_line_end", fallback=DEFAULT_PWM_LINE_END)
)
keys.discard("pwm_line_end")
for pwm_value in (pwm_line_start, pwm_line_end):
if not (pwm_read.min_pwm <= pwm_value <= pwm_read.max_pwm):
raise RuntimeError( raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]" "Duplicate fan section declaration for '%s'" % section.name
% (pwm_value, fan_name, pwm_read.min_pwm, pwm_read.max_pwm)
)
if pwm_line_start >= pwm_line_end:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (fan_name,)
)
if keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys)
)
if fan_name in fans:
raise RuntimeError("Duplicate fan section declaration for '%s'" % fan_name)
fans[fan_name] = PWMFanNorm(
fan_speed,
pwm_read,
pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
) )
fans[section.name] = PWMFanNorm.from_configparser(section, arduino_connections)
section.ensure_no_unused_keys()
return fans return fans
@ -574,75 +253,18 @@ def _parse_fans(
def _parse_readonly_fans( def _parse_readonly_fans(
config: configparser.ConfigParser, config: configparser.ConfigParser,
arduino_connections: Mapping[ArduinoName, ArduinoConnection], arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]: ) -> Mapping[ReadonlyFanName, ReadonlyPWMFanNorm]:
readonly_fans = {} # type: Dict[ReadonlyFanName, ReadonlyPWMFanNorm] readonly_fans: Dict[ReadonlyFanName, ReadonlyPWMFanNorm] = {}
for section_name in config.sections(): for section in iter_sections(config, "readonly_fan", ReadonlyFanName):
section_name_parts = section_name.split(":", 1) if section.name in readonly_fans:
if section_name_parts[0].strip().lower() != "readonly_fan":
continue
fan_name = ReadonlyFanName(section_name_parts[1].strip())
fan = config[section_name]
keys = set(fan.keys())
fan_type = fan.get("type", fallback=DEFAULT_FAN_TYPE)
keys.discard("type")
if fan_type == "linux":
fan_input = FanInputDevice(fan["fan_input"])
keys.discard("fan_input")
fan_speed = LinuxFanSpeed(fan_input) # type: BaseFanSpeed
pwm_read = None # type: Optional[BaseFanPWMRead]
if "pwm" in fan:
pwm = PWMDevice(fan["pwm"])
keys.discard("pwm")
pwm_read = LinuxFanPWMRead(pwm)
elif fan_type == "arduino":
arduino_name = ArduinoName(fan["arduino_name"])
keys.discard("arduino_name")
tacho_pin = ArduinoPin(fan.getint("tacho_pin"))
keys.discard("tacho_pin")
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
fan_speed = ArduinoFanSpeed(
arduino_connections[arduino_name], tacho_pin=tacho_pin
)
pwm_read = None
if "pwm_pin" in fan:
pwm_pin = ArduinoPin(fan.getint("pwm_pin"))
keys.discard("pwm_pin")
pwm_read = ArduinoFanPWMRead(
arduino_connections[arduino_name], pwm_pin=pwm_pin
)
elif fan_type == "freeipmi":
name = fan["name"]
keys.discard("name")
ipmi_sensors_extra_args = fan.get("ipmi_sensors_extra_args", fallback="")
fan_speed = FreeIPMIFanSpeed(
name, ipmi_sensors_extra_args=ipmi_sensors_extra_args
)
pwm_read = None
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
if keys:
raise RuntimeError( raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys) "Duplicate readonly_fan section declaration for '%s'" % section.name
) )
readonly_fans[section.name] = ReadonlyPWMFanNorm.from_configparser(
if fan_name in readonly_fans: section, arduino_connections, programs
raise RuntimeError(
"Duplicate readonly_fan section declaration for '%s'" % fan_name
) )
readonly_fans[fan_name] = ReadonlyPWMFanNorm(fan_speed, pwm_read) section.ensure_no_unused_keys()
return readonly_fans return readonly_fans
@ -665,45 +287,35 @@ def _parse_mappings(
temps: Mapping[TempName, FilteredTemp], temps: Mapping[TempName, FilteredTemp],
) -> Mapping[MappingName, FansTempsRelation]: ) -> Mapping[MappingName, FansTempsRelation]:
mappings = {} # type: Dict[MappingName, FansTempsRelation] mappings: Dict[MappingName, FansTempsRelation] = {}
for section_name in config.sections(): for section in iter_sections(config, "mapping", MappingName):
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].lower() != "mapping":
continue
mapping_name = MappingName(section_name_parts[1])
mapping = config[section_name]
keys = set(mapping.keys())
# temps: # temps:
mapping_temps = [ mapping_temps = [
TempName(temp_name.strip()) for temp_name in mapping["temps"].split(",") TempName(temp_name.strip()) for temp_name in section["temps"].split(",")
] ]
mapping_temps = [s for s in mapping_temps if s] mapping_temps = [s for s in mapping_temps if s]
keys.discard("temps")
if not mapping_temps: if not mapping_temps:
raise RuntimeError( raise RuntimeError(
"Temps must not be empty in the '%s' mapping" % mapping_name "Temps must not be empty in the '%s' mapping" % section.name
) )
for temp_name in mapping_temps: for temp_name in mapping_temps:
if temp_name not in temps: if temp_name not in temps:
raise RuntimeError( raise RuntimeError(
"Unknown temp '%s' in mapping '%s'" % (temp_name, mapping_name) "Unknown temp '%s' in mapping '%s'" % (temp_name, section.name)
) )
if len(mapping_temps) != len(set(mapping_temps)): if len(mapping_temps) != len(set(mapping_temps)):
raise RuntimeError( raise RuntimeError(
"There are duplicate temps in mapping '%s'" % mapping_name "There are duplicate temps in mapping '%s'" % section.name
) )
# fans: # fans:
fans_with_speed = [ fans_with_speed = [
fan_with_speed.strip() for fan_with_speed in mapping["fans"].split(",") fan_with_speed.strip() for fan_with_speed in section["fans"].split(",")
] ]
fans_with_speed = [s for s in fans_with_speed if s] fans_with_speed = [s for s in fans_with_speed if s]
keys.discard("fans")
fan_speed_pairs = [ fan_speed_pairs = [
fan_with_speed.split("*") for fan_with_speed in fans_with_speed fan_with_speed.split("*") for fan_with_speed in fans_with_speed
@ -712,7 +324,7 @@ def _parse_mappings(
if len(fan_speed_pair) not in (1, 2): if len(fan_speed_pair) not in (1, 2):
raise RuntimeError( raise RuntimeError(
"Invalid fan specification '%s' in mapping '%s'" "Invalid fan specification '%s' in mapping '%s'"
% (fan_speed_pair, mapping_name) % (fan_speed_pair, section.name)
) )
mapping_fans = [ mapping_fans = [
FanSpeedModifier( FanSpeedModifier(
@ -729,7 +341,7 @@ def _parse_mappings(
if fan_speed_modifier.fan not in fans: if fan_speed_modifier.fan not in fans:
raise RuntimeError( raise RuntimeError(
"Unknown fan '%s' in mapping '%s'" "Unknown fan '%s' in mapping '%s'"
% (fan_speed_modifier.fan, mapping_name) % (fan_speed_modifier.fan, section.name)
) )
if not (0 < fan_speed_modifier.modifier <= 1.0): if not (0 < fan_speed_modifier.modifier <= 1.0):
raise RuntimeError( raise RuntimeError(
@ -737,7 +349,7 @@ def _parse_mappings(
"the allowed range is (0.0;1.0]." "the allowed range is (0.0;1.0]."
% ( % (
fan_speed_modifier.modifier, fan_speed_modifier.modifier,
mapping_name, section.name,
fan_speed_modifier.fan, fan_speed_modifier.fan,
) )
) )
@ -745,21 +357,17 @@ def _parse_mappings(
set(fan_speed_modifier.fan for fan_speed_modifier in mapping_fans) set(fan_speed_modifier.fan for fan_speed_modifier in mapping_fans)
): ):
raise RuntimeError( raise RuntimeError(
"There are duplicate fans in mapping '%s'" % mapping_name "There are duplicate fans in mapping '%s'" % section.name
) )
if keys: if section.name in mappings:
raise RuntimeError( raise RuntimeError(
"Unknown options in the [%s] section: %s" % (section_name, keys) "Duplicate mapping section declaration for '%s'" % section.name
) )
mappings[section.name] = FansTempsRelation(
if mapping_name in fans:
raise RuntimeError(
"Duplicate mapping section declaration for '%s'" % mapping_name
)
mappings[mapping_name] = FansTempsRelation(
temps=mapping_temps, fans=mapping_fans temps=mapping_temps, fans=mapping_fans
) )
section.ensure_no_unused_keys()
unused_temps = set(temps.keys()) unused_temps = set(temps.keys())
unused_fans = set(fans.keys()) unused_fans = set(fans.keys())

View File

@ -0,0 +1,129 @@
import configparser
from typing import Any, Generic, Iterator, Optional, Type, TypeVar, Union, overload
T = TypeVar("T", bound=str)
F = TypeVar("F", None, Any)
_UNSET = object()
def iter_sections(
config: configparser.ConfigParser, section_type: str, name_typevar: Type[T]
) -> Iterator["ConfigParserSection[T]"]:
for section_name in config.sections():
section_name_parts = section_name.split(":", 1)
if section_name_parts[0].strip().lower() != section_type:
continue
name = name_typevar(section_name_parts[1].strip())
section = ConfigParserSection(config[section_name], name)
yield section
class ConfigParserSection(Generic[T]):
def __init__(
self, section: configparser.SectionProxy, name: Optional[T] = None
) -> None:
self.__name = name
self.__section = section
self.__unused_keys = set(section.keys())
@property
def name(self) -> T:
assert self.__name is not None
return self.__name
def ensure_no_unused_keys(self) -> None:
if self.__unused_keys:
raise RuntimeError(
"Unknown options in the [%s] section: %s"
% (self.__section.name, self.__unused_keys)
)
def __contains__(self, key):
return self.__section.__contains__(key)
def __getitem__(self, key):
self.__unused_keys.discard(key)
return self.__section.__getitem__(key)
@overload
def get(self, option: str) -> str:
...
@overload
def get(self, option: str, *, fallback: F) -> Union[str, F]:
...
def get(self, option: str, *, fallback=_UNSET) -> Union[str, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.get(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getint(self, option: str) -> int:
...
@overload
def getint(self, option: str, *, fallback: F) -> Union[int, F]:
...
def getint(self, option: str, *, fallback=_UNSET) -> Union[int, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getint(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getfloat(self, option: str) -> float:
...
@overload
def getfloat(self, option: str, *, fallback: F) -> Union[float, F]:
...
def getfloat(self, option: str, *, fallback=_UNSET) -> Union[float, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getfloat(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res
@overload
def getboolean(self, option: str) -> bool:
...
@overload
def getboolean(self, option: str, *, fallback: F) -> Union[bool, F]:
...
def getboolean(self, option: str, *, fallback=_UNSET) -> Union[bool, F]:
kwargs = {}
if fallback is not _UNSET:
kwargs["fallback"] = fallback
self.__unused_keys.discard(option)
res = self.__section.getboolean(option, **kwargs)
if res is None and fallback is _UNSET:
raise ValueError(
"[%s] %r option is expected to be set" % (self.__section.name, option)
)
return res

View File

@ -66,9 +66,7 @@ def daemon(
parsed_config = parse_config(config_path, daemon_cli_config) parsed_config = parse_config(config_path, daemon_cli_config)
if parsed_config.daemon.exporter_listen_host: if parsed_config.daemon.exporter_listen_host:
metrics = PrometheusMetrics( metrics: Metrics = PrometheusMetrics(parsed_config.daemon.exporter_listen_host)
parsed_config.daemon.exporter_listen_host
) # type: Metrics
else: else:
metrics = NullMetrics() metrics = NullMetrics()
@ -83,7 +81,7 @@ def daemon(
metrics=metrics, metrics=metrics,
) )
pidfile_instance = None # type: Optional[PidFile] pidfile_instance: Optional[PidFile] = None
if parsed_config.daemon.pidfile is not None: if parsed_config.daemon.pidfile is not None:
pidfile_instance = PidFile(parsed_config.daemon.pidfile) pidfile_instance = PidFile(parsed_config.daemon.pidfile)

View File

@ -1,8 +1,22 @@
import subprocess import subprocess
from typing import NamedTuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.logger import logger from afancontrol.logger import logger
class Programs(NamedTuple):
hddtemp: str
ipmi_sensors: str
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> "Programs":
return cls(
hddtemp=section.get("hddtemp", fallback="hddtemp"),
ipmi_sensors=section.get("ipmi_sensors", fallback="ipmi-sensors"),
)
def exec_shell_command(shell_command: str, timeout: int = 5) -> str: def exec_shell_command(shell_command: str, timeout: int = 5) -> str:
try: try:
p = subprocess.run( p = subprocess.run(

View File

@ -2,8 +2,8 @@ import itertools
from contextlib import ExitStack from contextlib import ExitStack
from typing import Iterator, Mapping, MutableSet, Optional, Tuple, Union, cast from typing import Iterator, Mapping, MutableSet, Optional, Tuple, Union, cast
from afancontrol.config import AnyFanName, FanName, ReadonlyFanName
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, PWMValueNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, PWMValueNorm, ReadonlyPWMFanNorm
from afancontrol.report import Report from afancontrol.report import Report
@ -19,13 +19,13 @@ class Fans:
self.fans = fans self.fans = fans
self.readonly_fans = readonly_fans self.readonly_fans = readonly_fans
self.report = report self.report = report
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
# Set of fans marked as failing (which speed is 0) # Set of fans marked as failing (which speed is 0)
self._failed_fans = set() # type: MutableSet[AnyFanName] self._failed_fans: MutableSet[AnyFanName] = set()
# Set of fans that will be skipped on speed check # Set of fans that will be skipped on speed check
self._stopped_fans = set() # type: MutableSet[AnyFanName] self._stopped_fans: MutableSet[AnyFanName] = set()
def is_fan_failing(self, fan_name: AnyFanName) -> bool: def is_fan_failing(self, fan_name: AnyFanName) -> bool:
return fan_name in self._failed_fans return fan_name in self._failed_fans

View File

@ -1,7 +1,7 @@
import abc import abc
import sys import sys
from time import sleep from time import sleep
from typing import NamedTuple, Optional from typing import Optional
import click import click
@ -15,9 +15,6 @@ from afancontrol.pwmfan import (
ArduinoFanPWMRead, ArduinoFanPWMRead,
ArduinoFanPWMWrite, ArduinoFanPWMWrite,
ArduinoFanSpeed, ArduinoFanSpeed,
BaseFanPWMRead,
BaseFanPWMWrite,
BaseFanSpeed,
FanInputDevice, FanInputDevice,
FanValue, FanValue,
LinuxFanPWMRead, LinuxFanPWMRead,
@ -25,6 +22,7 @@ from afancontrol.pwmfan import (
LinuxFanSpeed, LinuxFanSpeed,
PWMDevice, PWMDevice,
PWMValue, PWMValue,
ReadWriteFan,
) )
# Time to wait before measuring fan speed after setting a PWM value. # Time to wait before measuring fan speed after setting a PWM value.
@ -74,15 +72,6 @@ HELP_PWM_STEP_SIZE = (
"faster." "faster."
) )
Fan = NamedTuple(
"Fan",
[
("fan_speed", BaseFanSpeed),
("pwm_read", BaseFanPWMRead),
("pwm_write", BaseFanPWMWrite),
],
)
@click.command() @click.command()
@click.option( @click.option(
@ -156,25 +145,24 @@ def fantest(
) -> None: ) -> None:
"""The PWM fan testing program. """The PWM fan testing program.
This program tests how changing the PWM value of a fan affects its speed. This program tests how changing the PWM value of a fan affects its speed.
In the beginning the fan would be stopped (by setting it to a minimum PWM value), In the beginning the fan would be stopped (by setting it to a minimum PWM value),
and then the PWM value would be increased in small steps, while also and then the PWM value would be increased in small steps, while also
measuring the speed as reported by the fan. measuring the speed as reported by the fan.
This data would help you to find the effective range of values This data would help you to find the effective range of values
for the `pwm_line_start` and `pwm_line_end` settings where the correlation for the `pwm_line_start` and `pwm_line_end` settings where the correlation
between PWM and fan speed is close to linear. Usually its between PWM and fan speed is close to linear. Usually its
`pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual `pwm_line_start = 100` and `pwm_line_end = 240`, but it is individual
for each fan. The allowed range for a PWM value is from 0 to 255. for each fan. The allowed range for a PWM value is from 0 to 255.
Note that the fan would be stopped for some time during the test. If you'll Note that the fan would be stopped for some time during the test. If you'll
feel nervous, press Ctrl+C to stop the test and return the fan to full speed. feel nervous, press Ctrl+C to stop the test and return the fan to full speed.
Before starting the test ensure that no fan control software is currently Before starting the test ensure that no fan control software is currently
controlling the fan you're going to test. controlling the fan you're going to test.
"""
"""
try: try:
if fan_type == "linux": if fan_type == "linux":
if not linux_fan_pwm: if not linux_fan_pwm:
@ -191,7 +179,7 @@ controlling the fan you're going to test.
assert linux_fan_pwm is not None assert linux_fan_pwm is not None
assert linux_fan_input is not None assert linux_fan_input is not None
fan = Fan( fan = ReadWriteFan(
fan_speed=LinuxFanSpeed(FanInputDevice(linux_fan_input)), fan_speed=LinuxFanSpeed(FanInputDevice(linux_fan_input)),
pwm_read=LinuxFanPWMRead(PWMDevice(linux_fan_pwm)), pwm_read=LinuxFanPWMRead(PWMDevice(linux_fan_pwm)),
pwm_write=LinuxFanPWMWrite(PWMDevice(linux_fan_pwm)), pwm_write=LinuxFanPWMWrite(PWMDevice(linux_fan_pwm)),
@ -230,7 +218,7 @@ controlling the fan you're going to test.
) )
assert arduino_pwm_pin is not None assert arduino_pwm_pin is not None
assert arduino_tacho_pin is not None assert arduino_tacho_pin is not None
fan = Fan( fan = ReadWriteFan(
fan_speed=ArduinoFanSpeed( fan_speed=ArduinoFanSpeed(
arduino_connection, tacho_pin=ArduinoPin(arduino_tacho_pin) arduino_connection, tacho_pin=ArduinoPin(arduino_tacho_pin)
), ),
@ -268,7 +256,7 @@ controlling the fan you're going to test.
def run_fantest( def run_fantest(
fan: Fan, pwm_step_size: PWMValue, output: "MeasurementsOutput" fan: ReadWriteFan, pwm_step_size: PWMValue, output: "MeasurementsOutput"
) -> None: ) -> None:
with fan.fan_speed, fan.pwm_read, fan.pwm_write: with fan.fan_speed, fan.pwm_read, fan.pwm_write:
start = fan.pwm_read.min_pwm start = fan.pwm_read.min_pwm

View File

@ -1,10 +1,32 @@
import abc import abc
import collections import collections
from typing import Deque, Optional, TypeVar from typing import TYPE_CHECKING, Deque, NewType, Optional, TypeVar
from afancontrol.temp import TempStatus from afancontrol.configparser import ConfigParserSection
if TYPE_CHECKING:
from afancontrol.temp import TempStatus
T = TypeVar("T") T = TypeVar("T")
FilterName = NewType("FilterName", str)
def from_configparser(section: ConfigParserSection[FilterName]) -> "TempFilter":
filter_type = section["type"]
if filter_type == "moving_median":
window_size = section.getint("window_size", fallback=3)
return MovingMedianFilter(window_size=window_size)
elif filter_type == "moving_quantile":
window_size = section.getint("window_size", fallback=3)
quantile = section.getfloat("quantile")
return MovingQuantileFilter(quantile=quantile, window_size=window_size)
else:
raise RuntimeError(
"Unsupported filter type '%s' for filter '%s'. "
"Supported types: `moving_median`, `moving_quantile`."
% (filter_type, section.name)
)
class TempFilter(abc.ABC): class TempFilter(abc.ABC):
@ -13,7 +35,7 @@ class TempFilter(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]: def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
pass pass
def __enter__(self): # reusable def __enter__(self): # reusable
@ -27,7 +49,7 @@ class NullFilter(TempFilter):
def copy(self: T) -> T: def copy(self: T) -> T:
return type(self)() return type(self)()
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]: def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
return status return status
def __eq__(self, other): def __eq__(self, other):
@ -43,7 +65,7 @@ class NullFilter(TempFilter):
return "%s()" % (type(self).__name__,) return "%s()" % (type(self).__name__,)
def _temp_status_sorting_key(status: Optional[TempStatus]) -> float: def _temp_status_sorting_key(status: Optional["TempStatus"]) -> float:
if status is None: if status is None:
return float("+inf") return float("+inf")
return status.temp return status.temp
@ -53,14 +75,14 @@ class MovingQuantileFilter(TempFilter):
def __init__(self, quantile: float, *, window_size: int) -> None: def __init__(self, quantile: float, *, window_size: int) -> None:
self.quantile = quantile self.quantile = quantile
self.window_size = window_size self.window_size = window_size
self.history = None # type: Optional[Deque[Optional[TempStatus]]] self.history: Optional[Deque[Optional["TempStatus"]]] = None
def copy(self: T) -> T: def copy(self: T) -> T:
return type(self)( # type: ignore return type(self)( # type: ignore
quantile=self.quantile, window_size=self.window_size # type: ignore quantile=self.quantile, window_size=self.window_size # type: ignore
) )
def apply(self, status: Optional[TempStatus]) -> Optional[TempStatus]: def apply(self, status: Optional["TempStatus"]) -> Optional["TempStatus"]:
assert self.history is not None assert self.history is not None
self.history.append(status) self.history.append(status)

View File

@ -41,7 +41,7 @@ class Manager:
self.mappings = mappings self.mappings = mappings
self.triggers = Triggers(triggers_config, report) self.triggers = Triggers(triggers_config, report)
self.metrics = metrics self.metrics = metrics
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()
@ -88,9 +88,7 @@ class Manager:
for temp_name, temp_status in temps.items() for temp_name, temp_status in temps.items()
} }
fan_speeds = defaultdict( fan_speeds: Dict[FanName, PWMValueNorm] = defaultdict(lambda: PWMValueNorm(0.0))
lambda: PWMValueNorm(0.0)
) # type: Dict[FanName, PWMValueNorm]
for mapping_name, relation in self.mappings.items(): for mapping_name, relation in self.mappings.items():
mapping_speed = max(temp_speeds[temp_name] for temp_name in relation.temps) mapping_speed = max(temp_speeds[temp_name] for temp_name in relation.temps)

View File

@ -1,23 +1,20 @@
import abc import abc
import contextlib import contextlib
import threading import threading
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import HTTPServer
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from timeit import default_timer from timeit import default_timer
from typing import TYPE_CHECKING, Mapping, Optional, Union from typing import ContextManager, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse
from afancontrol.arduino import ArduinoConnection, ArduinoName from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.config import AnyFanName, FanName, ReadonlyFanName, TempName from afancontrol.config import TempName
from afancontrol.fans import Fans from afancontrol.fans import Fans
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.pwmfan import AnyFanName, FanName, ReadonlyFanName
from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm from afancontrol.pwmfannorm import PWMFanNorm, ReadonlyPWMFanNorm
from afancontrol.temps import ObservedTempStatus from afancontrol.temps import ObservedTempStatus
from afancontrol.trigger import Triggers from afancontrol.trigger import Triggers
if TYPE_CHECKING:
from typing import ContextManager # Added in 3.6
try: try:
import prometheus_client as prom import prometheus_client as prom
@ -46,7 +43,7 @@ class Metrics(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def measure_tick(self) -> "ContextManager[None]": def measure_tick(self) -> ContextManager[None]:
pass pass
@ -66,7 +63,7 @@ class NullMetrics(Metrics):
) -> None: ) -> None:
pass pass
def measure_tick(self) -> "ContextManager[None]": def measure_tick(self) -> ContextManager[None]:
@contextlib.contextmanager @contextlib.contextmanager
def null_context_manager(): def null_context_manager():
yield yield
@ -85,7 +82,7 @@ class PrometheusMetrics(Metrics):
self._listen_addr, port_str = listen_host.rsplit(":", 1) self._listen_addr, port_str = listen_host.rsplit(":", 1)
self._listen_port = int(port_str) self._listen_port = int(port_str)
self._http_server = None # type: Optional[HTTPServer] self._http_server: Optional[HTTPServer] = None
self._last_metrics_collect_clock = float("nan") self._last_metrics_collect_clock = float("nan")
@ -255,7 +252,7 @@ class PrometheusMetrics(Metrics):
def _start(self): def _start(self):
# `prometheus_client.start_http_server` which persists a server reference # `prometheus_client.start_http_server` which persists a server reference
# so it could be stopped later. # so it could be stopped later.
CustomMetricsHandler = MetricsHandler.factory(self.registry) CustomMetricsHandler = prom.MetricsHandler.factory(self.registry)
httpd = _ThreadingSimpleServer( httpd = _ThreadingSimpleServer(
(self._listen_addr, self._listen_port), CustomMetricsHandler (self._listen_addr, self._listen_port), CustomMetricsHandler
) )
@ -335,7 +332,7 @@ class PrometheusMetrics(Metrics):
self._last_metrics_collect_clock = self._clock() self._last_metrics_collect_clock = self._clock()
def measure_tick(self) -> "ContextManager[None]": def measure_tick(self) -> ContextManager[None]:
return self.tick_duration.time() return self.tick_duration.time()
def _collect_fan_metrics( def _collect_fan_metrics(
@ -393,40 +390,3 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``. # same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True daemon_threads = True
# `MetricsHandler` of `prometheus_client==0.0.18` doesn't support exposing
# a custom registry. This is backported below:
if prometheus_available:
if hasattr(prom.MetricsHandler, "factory"):
MetricsHandler = prom.MetricsHandler
else:
from prometheus_client.exposition import generate_latest, CONTENT_TYPE_LATEST
class MetricsHandler(BaseHTTPRequestHandler): # type: ignore
# https://github.com/prometheus/client_python/blob/31f5557e2e84ca4ffa9a03abf6e3f4d0c8b8c3eb/prometheus_client/exposition.py#L141-L177 # noqa
registry = prom.REGISTRY
def do_GET(self):
registry = self.registry
params = parse_qs(urlparse(self.path).query)
if "name[]" in params:
registry = registry.restricted_registry(params["name[]"])
try:
output = generate_latest(registry)
except Exception:
self.send_error(500, "error generating metric output")
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
def log_message(self, format, *args):
"""Log nothing."""
@classmethod
def factory(cls, registry):
cls_name = str(cls.__name__)
MyMetricsHandler = type(cls_name, (cls, object), {"registry": registry})
return MyMetricsHandler

View File

@ -1,3 +1,8 @@
from typing import Mapping, NamedTuple, NewType, Optional, Union
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan.arduino import ( from afancontrol.pwmfan.arduino import (
ArduinoFanPWMRead, ArduinoFanPWMRead,
ArduinoFanPWMWrite, ArduinoFanPWMWrite,
@ -35,3 +40,92 @@ __all__ = (
"PWMDevice", "PWMDevice",
"PWMValue", "PWMValue",
) )
DEFAULT_FAN_TYPE = "linux"
FanName = NewType("FanName", str)
ReadonlyFanName = NewType("ReadonlyFanName", str)
AnyFanName = Union[FanName, ReadonlyFanName]
class ReadOnlyFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: Optional[BaseFanPWMRead]
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadOnlyFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=(
LinuxFanPWMRead.from_configparser(section)
if "pwm" in section
else None
),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=(
ArduinoFanPWMRead.from_configparser(section, arduino_connections)
if "pwm_pin" in section
else None
),
)
elif fan_type == "freeipmi":
return cls(
fan_speed=FreeIPMIFanSpeed.from_configparser(section, programs),
pwm_read=None,
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux`, `arduino`, `freeipmi`." % fan_type
)
class ReadWriteFan(NamedTuple):
fan_speed: BaseFanSpeed
pwm_read: BaseFanPWMRead
pwm_write: BaseFanPWMWrite
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "ReadWriteFan":
fan_type = section.get("type", fallback=DEFAULT_FAN_TYPE)
if fan_type == "linux":
return cls(
fan_speed=LinuxFanSpeed.from_configparser(section),
pwm_read=LinuxFanPWMRead.from_configparser(section),
pwm_write=LinuxFanPWMWrite.from_configparser(section),
)
elif fan_type == "arduino":
return cls(
fan_speed=ArduinoFanSpeed.from_configparser(
section, arduino_connections
),
pwm_read=ArduinoFanPWMRead.from_configparser(
section, arduino_connections
),
pwm_write=ArduinoFanPWMWrite.from_configparser(
section, arduino_connections
),
)
else:
raise ValueError(
"Unsupported FAN type %s. Supported ones are "
"`linux` and `arduino`." % fan_type
)

View File

@ -1,4 +1,7 @@
from afancontrol.arduino import ArduinoConnection, ArduinoPin from typing import Mapping
from afancontrol.arduino import ArduinoConnection, ArduinoName, ArduinoPin
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import ( from afancontrol.pwmfan.base import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
@ -8,6 +11,14 @@ from afancontrol.pwmfan.base import (
) )
def _ensure_arduino_connection(
arduino_name: ArduinoName,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> None:
if arduino_name not in arduino_connections:
raise ValueError("[arduino:%s] section is missing" % arduino_name)
class ArduinoFanSpeed(BaseFanSpeed): class ArduinoFanSpeed(BaseFanSpeed):
__slots__ = "_conn", "_tacho_pin" __slots__ = "_conn", "_tacho_pin"
@ -17,6 +28,17 @@ class ArduinoFanSpeed(BaseFanSpeed):
self._conn = arduino_connection self._conn = arduino_connection
self._tacho_pin = tacho_pin self._tacho_pin = tacho_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanSpeed:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
tacho_pin = ArduinoPin(section.getint("tacho_pin"))
return cls(arduino_connections[arduino_name], tacho_pin=tacho_pin)
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
return FanValue(self._conn.get_rpm(self._tacho_pin)) return FanValue(self._conn.get_rpm(self._tacho_pin))
@ -35,11 +57,25 @@ class ArduinoFanPWMRead(BaseFanPWMRead):
min_pwm = PWMValue(0) min_pwm = PWMValue(0)
def __init__( def __init__(
self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin self,
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None: ) -> None:
self._conn = arduino_connection self._conn = arduino_connection
self._pwm_pin = pwm_pin self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMRead:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def get(self) -> PWMValue: def get(self) -> PWMValue:
return PWMValue(int(self._conn.get_pwm(self._pwm_pin))) return PWMValue(int(self._conn.get_pwm(self._pwm_pin)))
@ -57,11 +93,25 @@ class ArduinoFanPWMWrite(BaseFanPWMWrite):
read_cls = ArduinoFanPWMRead read_cls = ArduinoFanPWMRead
def __init__( def __init__(
self, arduino_connection: ArduinoConnection, *, pwm_pin: ArduinoPin self,
arduino_connection: ArduinoConnection,
*,
pwm_pin: ArduinoPin,
) -> None: ) -> None:
self._conn = arduino_connection self._conn = arduino_connection
self._pwm_pin = pwm_pin self._pwm_pin = pwm_pin
@classmethod
def from_configparser(
cls,
section: ConfigParserSection,
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> BaseFanPWMWrite:
arduino_name = ArduinoName(section["arduino_name"])
_ensure_arduino_connection(arduino_name, arduino_connections)
pwm_pin = ArduinoPin(section.getint("pwm_pin"))
return cls(arduino_connections[arduino_name], pwm_pin=pwm_pin)
def _set_raw(self, pwm: PWMValue) -> None: def _set_raw(self, pwm: PWMValue) -> None:
self._conn.set_pwm(self._pwm_pin, pwm) self._conn.set_pwm(self._pwm_pin, pwm)

View File

@ -40,8 +40,8 @@ class BaseFanSpeed(abc.ABC, _SlotsReprMixin):
class BaseFanPWMRead(abc.ABC, _SlotsReprMixin): class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
max_pwm = None # type: PWMValue max_pwm: PWMValue
min_pwm = None # type: PWMValue min_pwm: PWMValue
def is_stopped(self) -> bool: def is_stopped(self) -> bool:
return type(self).is_pwm_stopped(self.get()) return type(self).is_pwm_stopped(self.get())
@ -62,7 +62,7 @@ class BaseFanPWMRead(abc.ABC, _SlotsReprMixin):
class BaseFanPWMWrite(abc.ABC, _SlotsReprMixin): class BaseFanPWMWrite(abc.ABC, _SlotsReprMixin):
read_cls = None # type: Type[BaseFanPWMRead] read_cls: Type[BaseFanPWMRead]
def set(self, pwm: PWMValue) -> None: def set(self, pwm: PWMValue) -> None:
if not (self.read_cls.min_pwm <= pwm <= self.read_cls.max_pwm): if not (self.read_cls.min_pwm <= pwm <= self.read_cls.max_pwm):

View File

@ -1,7 +1,8 @@
import csv import csv
import io import io
from afancontrol.exec import exec_shell_command from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.pwmfan.base import BaseFanSpeed, FanValue from afancontrol.pwmfan.base import BaseFanSpeed, FanValue
# TODO maybe switch to `python3-pyghmi`? although it looks like the current version # TODO maybe switch to `python3-pyghmi`? although it looks like the current version
@ -18,6 +19,16 @@ class FreeIPMIFanSpeed(BaseFanSpeed):
self._ipmi_sensors_bin = ipmi_sensors_bin self._ipmi_sensors_bin = ipmi_sensors_bin
self._ipmi_sensors_extra_args = ipmi_sensors_extra_args self._ipmi_sensors_extra_args = ipmi_sensors_extra_args
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> BaseFanSpeed:
return cls(
section["name"],
ipmi_sensors_bin=programs.ipmi_sensors,
ipmi_sensors_extra_args=section.get("ipmi_sensors_extra_args", fallback=""),
)
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
out = self._call_ipmi_sensors() out = self._call_ipmi_sensors()
reader = csv.DictReader(io.StringIO(out)) reader = csv.DictReader(io.StringIO(out))

View File

@ -1,6 +1,7 @@
from pathlib import Path from pathlib import Path
from typing import NewType from typing import NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.pwmfan.base import ( from afancontrol.pwmfan.base import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
@ -19,6 +20,10 @@ class LinuxFanSpeed(BaseFanSpeed):
def __init__(self, fan_input: FanInputDevice) -> None: def __init__(self, fan_input: FanInputDevice) -> None:
self._fan_input = Path(fan_input) self._fan_input = Path(fan_input)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanSpeed:
return cls(FanInputDevice(section["fan_input"]))
def get_speed(self) -> FanValue: def get_speed(self) -> FanValue:
return FanValue(int(self._fan_input.read_text())) return FanValue(int(self._fan_input.read_text()))
@ -32,6 +37,10 @@ class LinuxFanPWMRead(BaseFanPWMRead):
def __init__(self, pwm: PWMDevice) -> None: def __init__(self, pwm: PWMDevice) -> None:
self._pwm = Path(pwm) self._pwm = Path(pwm)
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMRead:
return cls(PWMDevice(section["pwm"]))
def get(self) -> PWMValue: def get(self) -> PWMValue:
return PWMValue(int(self._pwm.read_text())) return PWMValue(int(self._pwm.read_text()))
@ -45,6 +54,10 @@ class LinuxFanPWMWrite(BaseFanPWMWrite):
self._pwm = Path(pwm) self._pwm = Path(pwm)
self._pwm_enable = Path(pwm + "_enable") self._pwm_enable = Path(pwm + "_enable")
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> BaseFanPWMWrite:
return cls(PWMDevice(section["pwm"]))
def _set_raw(self, pwm: PWMValue) -> None: def _set_raw(self, pwm: PWMValue) -> None:
self._pwm.write_text(str(int(pwm))) self._pwm.write_text(str(int(pwm)))

View File

@ -1,13 +1,20 @@
import math import math
from contextlib import ExitStack from contextlib import ExitStack
from typing import NewType, Optional from typing import Mapping, NewType, Optional
from afancontrol.arduino import ArduinoConnection, ArduinoName
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.pwmfan import ( from afancontrol.pwmfan import (
BaseFanPWMRead, BaseFanPWMRead,
BaseFanPWMWrite, BaseFanPWMWrite,
BaseFanSpeed, BaseFanSpeed,
FanName,
FanValue, FanValue,
PWMValue, PWMValue,
ReadOnlyFan,
ReadonlyFanName,
ReadWriteFan,
) )
PWMValueNorm = NewType("PWMValueNorm", float) # [0..1] PWMValueNorm = NewType("PWMValueNorm", float) # [0..1]
@ -19,7 +26,19 @@ class ReadonlyPWMFanNorm:
) -> None: ) -> None:
self.fan_speed = fan_speed self.fan_speed = fan_speed
self.pwm_read = pwm_read self.pwm_read = pwm_read
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[ReadonlyFanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
programs: Programs,
) -> "ReadonlyPWMFanNorm":
readonly_fan = ReadOnlyFan.from_configparser(
section, arduino_connections, programs
)
return cls(readonly_fan.fan_speed, readonly_fan.pwm_read)
def __enter__(self): def __enter__(self):
self._stack = ExitStack() self._stack = ExitStack()
@ -98,7 +117,48 @@ class PWMFanNorm:
"Invalid pwm_line_end. Expected: pwm_line_end <= max_pwm. " "Invalid pwm_line_end. Expected: pwm_line_end <= max_pwm. "
"Got: %s <= %s" % (self.pwm_line_end, type(self.pwm_read).max_pwm) "Got: %s <= %s" % (self.pwm_line_end, type(self.pwm_read).max_pwm)
) )
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[FanName],
arduino_connections: Mapping[ArduinoName, ArduinoConnection],
) -> "PWMFanNorm":
readwrite_fan = ReadWriteFan.from_configparser(section, arduino_connections)
never_stop = section.getboolean("never_stop", fallback=True)
pwm_line_start = PWMValue(section.getint("pwm_line_start", fallback=100))
pwm_line_end = PWMValue(section.getint("pwm_line_end", fallback=240))
for pwm_value in (pwm_line_start, pwm_line_end):
if not (
readwrite_fan.pwm_read.min_pwm
<= pwm_value
<= readwrite_fan.pwm_read.max_pwm
):
raise RuntimeError(
"Incorrect PWM value '%s' for fan '%s': it must be within [%s;%s]"
% (
pwm_value,
section.name,
readwrite_fan.pwm_read.min_pwm,
readwrite_fan.pwm_read.max_pwm,
)
)
if pwm_line_start >= pwm_line_end:
raise RuntimeError(
"`pwm_line_start` PWM value must be less than `pwm_line_end` for fan '%s'"
% (section.name,)
)
return cls(
readwrite_fan.fan_speed,
readwrite_fan.pwm_read,
readwrite_fan.pwm_write,
pwm_line_start=pwm_line_start,
pwm_line_end=pwm_line_end,
never_stop=never_stop,
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):

View File

@ -1,3 +1,8 @@
from typing import Mapping, NamedTuple, NewType
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs
from afancontrol.filters import FilterName, NullFilter, TempFilter
from afancontrol.temp.base import Temp, TempCelsius, TempStatus from afancontrol.temp.base import Temp, TempCelsius, TempStatus
from afancontrol.temp.command import CommandTemp from afancontrol.temp.command import CommandTemp
from afancontrol.temp.file import FileTemp from afancontrol.temp.file import FileTemp
@ -11,3 +16,40 @@ __all__ = (
"TempCelsius", "TempCelsius",
"TempStatus", "TempStatus",
) )
TempName = NewType("TempName", str)
class FilteredTemp(NamedTuple):
temp: Temp
filter: TempFilter
@classmethod
def from_configparser(
cls,
section: ConfigParserSection[TempName],
filters: Mapping[FilterName, TempFilter],
programs: Programs,
) -> "FilteredTemp":
type = section["type"]
if type == "file":
temp: Temp = FileTemp.from_configparser(section)
elif type == "hdd":
temp = HDDTemp.from_configparser(section, programs)
elif type == "exec":
temp = CommandTemp.from_configparser(section)
else:
raise RuntimeError(
"Unsupported temp type '%s' for temp '%s'" % (type, section.name)
)
filter_name = section.get("filter", fallback=None)
if filter_name is None:
filter: TempFilter = NullFilter()
else:
filter = filters[FilterName(filter_name.strip())].copy()
return cls(temp=temp, filter=filter)

View File

@ -3,18 +3,15 @@ from typing import NamedTuple, NewType, Optional, Tuple
TempCelsius = NewType("TempCelsius", float) TempCelsius = NewType("TempCelsius", float)
TempStatus = NamedTuple(
"TempStatus", class TempStatus(NamedTuple):
[ temp: TempCelsius
("temp", TempCelsius), min: TempCelsius
("min", TempCelsius), max: TempCelsius
("max", TempCelsius), panic: Optional[TempCelsius]
("panic", Optional[TempCelsius]), threshold: Optional[TempCelsius]
("threshold", Optional[TempCelsius]), is_panic: bool
("is_panic", bool), is_threshold: bool
("is_threshold", bool),
],
)
class Temp(abc.ABC): class Temp(abc.ABC):

View File

@ -1,5 +1,6 @@
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import exec_shell_command from afancontrol.exec import exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -19,6 +20,16 @@ class CommandTemp(Temp):
self._min = min self._min = min
self._max = max self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(
section["command"], min=min, max=max, panic=panic, threshold=threshold
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -3,6 +3,7 @@ import re
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.configparser import ConfigParserSection
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -41,6 +42,14 @@ class FileTemp(Temp):
self._min = min self._min = min
self._max = max self._max = max
@classmethod
def from_configparser(cls, section: ConfigParserSection) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min", fallback=None))
max = TempCelsius(section.getfloat("max", fallback=None))
return cls(section["path"], min=min, max=max, panic=panic, threshold=threshold)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -1,6 +1,7 @@
from typing import Optional, Tuple from typing import Optional, Tuple
from afancontrol.exec import exec_shell_command from afancontrol.configparser import ConfigParserSection
from afancontrol.exec import Programs, exec_shell_command
from afancontrol.temp.base import Temp, TempCelsius from afancontrol.temp.base import Temp, TempCelsius
@ -32,6 +33,23 @@ class HDDTemp(Temp):
self._max = max self._max = max
self._hddtemp_bin = hddtemp_bin self._hddtemp_bin = hddtemp_bin
@classmethod
def from_configparser(
cls, section: ConfigParserSection, programs: Programs
) -> Temp:
panic = TempCelsius(section.getfloat("panic", fallback=None))
threshold = TempCelsius(section.getfloat("threshold", fallback=None))
min = TempCelsius(section.getfloat("min"))
max = TempCelsius(section.getfloat("max"))
return cls(
section["path"],
min=min,
max=max,
panic=panic,
threshold=threshold,
hddtemp_bin=programs.hddtemp,
)
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, type(self)): if isinstance(other, type(self)):
return ( return (

View File

@ -7,10 +7,10 @@ from afancontrol.filters import TempFilter
from afancontrol.logger import logger from afancontrol.logger import logger
from afancontrol.temp import Temp, TempStatus from afancontrol.temp import Temp, TempStatus
ObservedTempStatus = NamedTuple(
"ObservedTempStatus", class ObservedTempStatus(NamedTuple):
[("raw", Optional[TempStatus]), ("filtered", Optional[TempStatus])], raw: Optional[TempStatus]
) filtered: Optional[TempStatus]
def filtered_temps( def filtered_temps(
@ -25,8 +25,8 @@ def filtered_temps(
class Temps: class Temps:
def __init__(self, temps: Mapping[TempName, FilteredTemp]) -> None: def __init__(self, temps: Mapping[TempName, FilteredTemp]) -> None:
self.temps = temps self.temps = temps
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
self._executor = None # type: Optional[concurrent.futures.Executor] self._executor: Optional[concurrent.futures.Executor] = None
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()
@ -64,7 +64,7 @@ def _get_temp_status(
name: TempName, temp: Temp, filter: TempFilter name: TempName, temp: Temp, filter: TempFilter
) -> ObservedTempStatus: ) -> ObservedTempStatus:
try: try:
sensor_value = temp.get() # type: Optional[TempStatus] sensor_value: Optional[TempStatus] = temp.get()
except Exception as e: except Exception as e:
sensor_value = None sensor_value = None
logger.warning("Temp sensor [%s] has failed: %s", name, e, exc_info=True) logger.warning("Temp sensor [%s] has failed: %s", name, e, exc_info=True)

View File

@ -20,7 +20,7 @@ class Trigger(abc.ABC):
self.global_commands = global_commands self.global_commands = global_commands
self.temp_commands = temp_commands self.temp_commands = temp_commands
self.report = report self.report = report
self._alerting_temps = set() # type: Set[TempName] self._alerting_temps: Set[TempName] = set()
@property @property
@abc.abstractmethod @abc.abstractmethod
@ -184,7 +184,7 @@ class Triggers:
}, },
report=report, report=report,
) )
self._stack = None # type: Optional[ExitStack] self._stack: Optional[ExitStack] = None
def __enter__(self): # reusable def __enter__(self): # reusable
self._stack = ExitStack() self._stack = ExitStack()

View File

@ -430,3 +430,69 @@ fan_input = /sys/class/hwmon/hwmon0/device/fan1_input
}, },
mappings={}, mappings={},
) )
def test_multiline_mapping():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp:cpu]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
[temp:mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp2_input
[fan: case]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[fan: hdd]
pwm = /sys/class/hwmon/hwmon0/device/pwm2
fan_input = /sys/class/hwmon/hwmon0/device/fan2_input
[mapping:1]
fans =
case*0.6,
hdd,
temps =
mobo,
cpu
"""
parsed = parse_config(path_from_str(config), daemon_cli_config)
assert parsed.mappings == {
MappingName("1"): FansTempsRelation(
temps=[TempName("mobo"), TempName("cpu")],
fans=[
FanSpeedModifier(fan=FanName("case"), modifier=0.6),
FanSpeedModifier(fan=FanName("hdd"), modifier=1.0),
],
)
}
def test_extraneous_keys_raises():
daemon_cli_config = DaemonCLIConfig(
pidfile=None, logfile=None, exporter_listen_host=None
)
config = """
[daemon]
[actions]
[temp: mobo]
type = file
path = /sys/class/hwmon/hwmon0/device/temp1_input
aa = 55
"""
with pytest.raises(RuntimeError) as cm:
parse_config(path_from_str(config), daemon_cli_config)
assert str(cm.value) == "Unknown options in the [temp: mobo] section: {'aa'}"

View File

@ -8,7 +8,6 @@ from click.testing import CliRunner
from afancontrol import fantest from afancontrol import fantest
from afancontrol.fantest import ( from afancontrol.fantest import (
CSVMeasurementsOutput, CSVMeasurementsOutput,
Fan,
HumanMeasurementsOutput, HumanMeasurementsOutput,
MeasurementsOutput, MeasurementsOutput,
fantest as main, fantest as main,
@ -24,6 +23,7 @@ from afancontrol.pwmfan import (
LinuxFanSpeed, LinuxFanSpeed,
PWMDevice, PWMDevice,
PWMValue, PWMValue,
ReadWriteFan,
) )
@ -65,7 +65,7 @@ def test_main_smoke(temp_path):
args, kwargs = mocked_fantest.call_args args, kwargs = mocked_fantest.call_args
assert not args assert not args
assert kwargs.keys() == {"fan", "pwm_step_size", "output"} assert kwargs.keys() == {"fan", "pwm_step_size", "output"}
assert kwargs["fan"] == Fan( assert kwargs["fan"] == ReadWriteFan(
fan_speed=LinuxFanSpeed(FanInputDevice(str(fan_input_path))), fan_speed=LinuxFanSpeed(FanInputDevice(str(fan_input_path))),
pwm_read=LinuxFanPWMRead(PWMDevice(str(pwm_path))), pwm_read=LinuxFanPWMRead(PWMDevice(str(pwm_path))),
pwm_write=LinuxFanPWMWrite(PWMDevice(str(pwm_path))), pwm_write=LinuxFanPWMWrite(PWMDevice(str(pwm_path))),
@ -77,11 +77,11 @@ def test_main_smoke(temp_path):
@pytest.mark.parametrize("pwm_step_size", [5, -5]) @pytest.mark.parametrize("pwm_step_size", [5, -5])
@pytest.mark.parametrize("output_cls", [HumanMeasurementsOutput, CSVMeasurementsOutput]) @pytest.mark.parametrize("output_cls", [HumanMeasurementsOutput, CSVMeasurementsOutput])
def test_fantest(output_cls: Type[MeasurementsOutput], pwm_step_size: PWMValue): def test_fantest(output_cls: Type[MeasurementsOutput], pwm_step_size: PWMValue):
fan = Fan( fan: Any = ReadWriteFan(
fan_speed=MagicMock(spec=BaseFanSpeed), fan_speed=MagicMock(spec=BaseFanSpeed),
pwm_read=MagicMock(spec=BaseFanPWMRead), pwm_read=MagicMock(spec=BaseFanPWMRead),
pwm_write=MagicMock(spec=BaseFanPWMWrite), pwm_write=MagicMock(spec=BaseFanPWMWrite),
) # type: Any )
fan.pwm_read.min_pwm = 0 fan.pwm_read.min_pwm = 0
fan.pwm_read.max_pwm = 255 fan.pwm_read.max_pwm = 255
output = output_cls() output = output_cls()

View File

@ -1,4 +1,5 @@
from contextlib import ExitStack from contextlib import ExitStack
from typing import cast
from unittest.mock import MagicMock, patch, sentinel from unittest.mock import MagicMock, patch, sentinel
import pytest import pytest
@ -68,7 +69,7 @@ def test_manager(report):
manager.tick() manager.tick()
mocked_triggers = manager.triggers # type: MagicMock mocked_triggers = cast(MagicMock, manager.triggers)
assert mocked_triggers.check.call_count == 1 assert mocked_triggers.check.call_count == 1
assert mocked_case_fan.__enter__.call_count == 1 assert mocked_case_fan.__enter__.call_count == 1
assert mocked_metrics.__enter__.call_count == 1 assert mocked_metrics.__enter__.call_count == 1

View File

@ -1,11 +1,7 @@
[tox] [tox]
envlist=py{35,36,37,38,39}{,-arduino,-metrics},lint,check-docs envlist=py{36,37,38,39,310}{,-arduino,-metrics},lint,check-docs
[testenv] [testenv]
deps =
; Python 3.5 still ships an old setuptools version which doesn't support
; declarative setup.cfg format.
setuptools>=41.4.0
extras = extras =
arduino: arduino arduino: arduino
dev dev