Imported Upstream version 3.0.0

This commit is contained in:
Mario Fetka
2021-10-26 12:58:36 +02:00
commit 27b4629279
87 changed files with 7722 additions and 0 deletions

0
tests/pwmfan/__init__.py Normal file
View File

View File

@@ -0,0 +1,177 @@
import json
import socket
import threading
import traceback
from contextlib import ExitStack
from time import sleep
from typing import Dict
import pytest
from afancontrol.arduino import (
ArduinoConnection,
ArduinoName,
ArduinoPin,
SetPWMCommand,
pyserial_available,
)
from afancontrol.pwmfan import (
ArduinoFanPWMRead,
ArduinoFanPWMWrite,
ArduinoFanSpeed,
PWMValue,
)
pytestmark = pytest.mark.skipif(
not pyserial_available, reason="pyserial is not installed"
)
class DummyArduino:
"""Emulate an Arduino board, i.e. the other side of the pyserial connection.
Slightly mimics the Arduino program `micro.ino`.
"""
def __init__(self) -> None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
s.listen(1)
listening_port = s.getsockname()[1]
self.sock = s
self.pyserial_url = "socket://127.0.0.1:%s" % listening_port
self._lock = threading.Lock()
self._loop_iteration_complete = threading.Event()
self._first_loop_iteration_complete = threading.Event()
self._disconnected = threading.Event()
self._thread_error = threading.Event()
self._is_connected = False
self._inner_state_pwms = {"5": 255, "9": 255, "10": 255, "11": 255}
self._inner_state_speeds = {"0": 0, "1": 0, "2": 0, "3": 0, "7": 0}
def set_inner_state_pwms(self, pwms: Dict[str, int]) -> None:
with self._lock:
self._inner_state_pwms.update(pwms)
if self.is_connected:
self._loop_iteration_complete.clear()
assert self._loop_iteration_complete.wait(5) is True
def set_speeds(self, speeds: Dict[str, int]) -> None:
with self._lock:
self._inner_state_speeds.update(speeds)
if self.is_connected:
self._loop_iteration_complete.clear()
assert self._loop_iteration_complete.wait(5) is True
@property
def inner_state_pwms(self):
with self._lock:
copy = self._inner_state_pwms.copy()
return copy
@property
def is_connected(self):
with self._lock:
if not self._is_connected:
return False
assert self._first_loop_iteration_complete.wait(5) is True
return True
def wait_for_disconnected(self):
assert self._disconnected.wait(5) is True
def accept(self):
client, _ = self.sock.accept()
self.sock.close() # Don't accept any more connections
with self._lock:
self._is_connected = True
threading.Thread(target=self._thread_run, args=(client,), daemon=True).start()
def _thread_run(self, sock):
sock.settimeout(0.001)
command_buffer = bytearray()
try:
while True:
# The code in this loop mimics the `loop` function
# in the `micro.ino` program.
try:
command_buffer.extend(sock.recv(1024))
except socket.timeout:
pass
while len(command_buffer) >= 3:
command_raw = command_buffer[:3]
del command_buffer[:3]
command = SetPWMCommand.parse(command_raw)
with self._lock:
self._inner_state_pwms[str(command.pwm_pin)] = command.pwm
sock.sendall(self._make_status())
self._loop_iteration_complete.set()
self._first_loop_iteration_complete.set()
sleep(0.050)
except (ConnectionResetError, BrokenPipeError):
pass
except Exception:
traceback.print_exc()
self._thread_error.set()
finally:
with self._lock:
self._is_connected = False
sock.close()
self._disconnected.set()
def _make_status(self):
with self._lock:
status = {
"fan_inputs": self._inner_state_speeds,
"fan_pwm": self._inner_state_pwms,
}
return (json.dumps(status) + "\n").encode("ascii")
def ensure_no_errors_in_thread(self):
assert self._thread_error.is_set() is not True
@pytest.fixture
def dummy_arduino():
return DummyArduino()
def test_smoke(dummy_arduino):
conn = ArduinoConnection(ArduinoName("test"), dummy_arduino.pyserial_url)
fan_speed = ArduinoFanSpeed(conn, tacho_pin=ArduinoPin(3))
pwm_read = ArduinoFanPWMRead(conn, pwm_pin=ArduinoPin(9))
pwm_write = ArduinoFanPWMWrite(conn, pwm_pin=ArduinoPin(9))
dummy_arduino.set_inner_state_pwms({"9": 42})
with ExitStack() as stack:
assert not dummy_arduino.is_connected
stack.enter_context(fan_speed)
stack.enter_context(pwm_read)
stack.enter_context(pwm_write)
dummy_arduino.accept()
assert dummy_arduino.is_connected
dummy_arduino.set_speeds({"3": 1200})
conn.wait_for_status() # required only for synchronization in the tests
assert fan_speed.get_speed() == 1200
assert pwm_read.get() == 255
assert dummy_arduino.inner_state_pwms["9"] == 255
pwm_write.set(PWMValue(192))
dummy_arduino.set_speeds({"3": 998})
conn.wait_for_status() # required only for synchronization in the tests
assert fan_speed.get_speed() == 998
assert pwm_read.get() == 192
assert dummy_arduino.inner_state_pwms["9"] == 192
dummy_arduino.wait_for_disconnected()
assert dummy_arduino.inner_state_pwms["9"] == 255
assert not dummy_arduino.is_connected
dummy_arduino.ensure_no_errors_in_thread()

41
tests/pwmfan/test_ipmi.py Normal file
View File

@@ -0,0 +1,41 @@
from unittest.mock import patch
import pytest
from afancontrol.pwmfan import FanValue, FreeIPMIFanSpeed
@pytest.fixture
def ipmi_sensors_output():
return """
ID,Name,Type,Reading,Units,Event
17,FAN1,Fan,1400.00,RPM,'OK'
18,FAN2,Fan,1800.00,RPM,'OK'
19,FAN3,Fan,N/A,RPM,N/A
20,FAN4,Fan,N/A,RPM,N/A
21,FAN5,Fan,N/A,RPM,N/A
22,FAN6,Fan,N/A,RPM,N/A
""".lstrip()
def test_fan_speed(ipmi_sensors_output):
fan_speed = FreeIPMIFanSpeed("FAN2")
with patch.object(FreeIPMIFanSpeed, "_call_ipmi_sensors") as mock_call:
mock_call.return_value = ipmi_sensors_output
assert fan_speed.get_speed() == FanValue(1800)
def test_fan_speed_na(ipmi_sensors_output):
fan_speed = FreeIPMIFanSpeed("FAN3")
with patch.object(FreeIPMIFanSpeed, "_call_ipmi_sensors") as mock_call:
mock_call.return_value = ipmi_sensors_output
with pytest.raises(ValueError):
fan_speed.get_speed()
def test_fan_speed_unknown(ipmi_sensors_output):
fan_speed = FreeIPMIFanSpeed("FAN30")
with patch.object(FreeIPMIFanSpeed, "_call_ipmi_sensors") as mock_call:
mock_call.return_value = ipmi_sensors_output
with pytest.raises(RuntimeError):
fan_speed.get_speed()

153
tests/pwmfan/test_linux.py Normal file
View File

@@ -0,0 +1,153 @@
from contextlib import ExitStack
from unittest.mock import MagicMock
import pytest
from afancontrol.pwmfan import (
FanInputDevice,
LinuxFanPWMRead,
LinuxFanPWMWrite,
LinuxFanSpeed,
PWMDevice,
PWMValue,
)
from afancontrol.pwmfannorm import PWMFanNorm
@pytest.fixture
def pwm_path(temp_path):
# pwm = /sys/class/hwmon/hwmon0/pwm2
pwm_path = temp_path / "pwm2"
pwm_path.write_text("0\n")
return pwm_path
@pytest.fixture
def pwm_enable_path(temp_path):
pwm_enable_path = temp_path / "pwm2_enable"
pwm_enable_path.write_text("0\n")
return pwm_enable_path
@pytest.fixture
def fan_input_path(temp_path):
# fan_input = /sys/class/hwmon/hwmon0/fan2_input
fan_input_path = temp_path / "fan2_input"
fan_input_path.write_text("1300\n")
return fan_input_path
@pytest.fixture
def fan_speed(fan_input_path):
return LinuxFanSpeed(fan_input=FanInputDevice(str(fan_input_path)))
@pytest.fixture
def pwm_read(pwm_path):
return LinuxFanPWMRead(pwm=PWMDevice(str(pwm_path)))
@pytest.fixture
def pwm_write(pwm_path):
pwm_write = LinuxFanPWMWrite(pwm=PWMDevice(str(pwm_path)))
# We write to the pwm_enable file values without newlines,
# but when they're read back, they might contain newlines.
# This hack below is to simulate just that: the written values should
# contain newlines.
original_pwm_enable = pwm_write._pwm_enable
pwm_enable = MagicMock(wraps=original_pwm_enable)
pwm_enable.write_text = lambda text: original_pwm_enable.write_text(text + "\n")
pwm_write._pwm_enable = pwm_enable
return pwm_write
@pytest.fixture
def pwmfan_norm(fan_speed, pwm_read, pwm_write):
return PWMFanNorm(
fan_speed,
pwm_read,
pwm_write,
pwm_line_start=PWMValue(100),
pwm_line_end=PWMValue(240),
never_stop=False,
)
@pytest.mark.parametrize("pwmfan_fixture", ["fan_speed", "pwmfan_norm"])
def test_get_speed(pwmfan_fixture, fan_speed, pwmfan_norm, fan_input_path):
fan = locals()[pwmfan_fixture]
fan_input_path.write_text("721\n")
assert 721 == fan.get_speed()
@pytest.mark.parametrize("pwmfan_fixture", ["pwm_write", "pwmfan_norm"])
@pytest.mark.parametrize("raises", [True, False])
def test_enter_exit(
raises, pwmfan_fixture, pwm_write, pwmfan_norm, pwm_enable_path, pwm_path
):
fan = locals()[pwmfan_fixture]
class Exc(Exception):
pass
with ExitStack() as stack:
if raises:
stack.enter_context(pytest.raises(Exc))
stack.enter_context(fan)
assert "1" == pwm_enable_path.read_text().strip()
assert "255" == pwm_path.read_text()
value = dict(pwm_write=100, pwmfan_norm=0.39)[pwmfan_fixture] # 100/255 ~= 0.39
fan.set(value)
assert "1" == pwm_enable_path.read_text().strip()
assert "100" == pwm_path.read_text()
if raises:
raise Exc()
assert "0" == pwm_enable_path.read_text().strip()
assert "100" == pwm_path.read_text() # `fancontrol` doesn't reset speed
def test_get_set_pwmfan(pwm_read, pwm_write, pwm_path):
pwm_write.set(142)
assert "142" == pwm_path.read_text()
pwm_path.write_text("132\n")
assert 132 == pwm_read.get()
pwm_write.set_full_speed()
assert "255" == pwm_path.read_text()
with pytest.raises(ValueError):
pwm_write.set(256)
with pytest.raises(ValueError):
pwm_write.set(-1)
def test_get_set_pwmfan_norm(pwmfan_norm, pwm_path):
pwmfan_norm.set(0.42)
assert "101" == pwm_path.read_text()
pwm_path.write_text("132\n")
assert pytest.approx(0.517, 0.01) == pwmfan_norm.get()
pwmfan_norm.set_full_speed()
assert "255" == pwm_path.read_text()
assert 238 == pwmfan_norm.set(0.99)
assert "238" == pwm_path.read_text()
assert 255 == pwmfan_norm.set(1.0)
assert "255" == pwm_path.read_text()
assert 255 == pwmfan_norm.set(1.1)
assert "255" == pwm_path.read_text()
assert 0 == pwmfan_norm.set(-0.1)
assert "0" == pwm_path.read_text()