Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion feeph/emc2101/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

# the following imports are provided for user convenience
# flake8: noqa: F401
from feeph.emc2101.calibration import calibrate_pwm_fan
from feeph.emc2101.core import CONVERSIONS_PER_SECOND, DEFAULTS, ExternalSensorStatus, SpinUpDuration, SpinUpStrength
from feeph.emc2101.fan_configs import FanConfig, RpmControlMode, Steps, export_fan_config, generic_pwm_fan
from feeph.emc2101.pwm import DeviceConfig, Emc2101_PWM, ExternalTemperatureSensorConfig, FanSpeedUnit, PinSixMode, TemperatureLimitType, calibrate_pwm_fan, emc2101_default_config, ets_2n3904, ets_2n3906
from feeph.emc2101.pwm import DeviceConfig, Emc2101_PWM, ExternalTemperatureSensorConfig, FanSpeedUnit, PinSixMode, TemperatureLimitType, emc2101_default_config, ets_2n3904, ets_2n3906
133 changes: 133 additions & 0 deletions feeph/emc2101/calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python3

import logging
import time

# module busio provides no type hints
import busio # type: ignore

import feeph.emc2101.core
import feeph.emc2101.utilities
from feeph.emc2101.fan_configs import FanConfig, RpmControlMode, Steps

LH = logging.getLogger('feeph.emc2101')


# This function has limited code coverage since it depends on active
# feedback from the underlying device. We will need a mock to be able
# to test the missing lines of code.
def calibrate_pwm_fan(i2c_bus: busio.I2C, model: str, pwm_frequency: int = 22500) -> FanConfig | None:
"""
walk through various settings and determine the fan's configuration
parameters
"""
LH.info("Calibrating fan parameters.")
pwm_d, pwm_f = feeph.emc2101.utilities.calculate_pwm_factors(pwm_frequency=pwm_frequency)
steps_list = list(range(pwm_f * 2))
emc2101 = feeph.emc2101.core.Emc2101_core(i2c_bus=i2c_bus)
emc2101.configure_pin_six_as_tacho()
emc2101.configure_pwm_control(pwm_d=pwm_d, pwm_f=pwm_f, step_max=max(steps_list))
# -----------------------------------------------------------------
LH.debug("Disabling gradual speed rampup.")
# TODO disable gradual rampup
# TODO set initial driver strength to 100%
# -----------------------------------------------------------------
LH.info("Testing if fan responds to PWM signal:")
LH.debug("speed control steps: %s", steps_list)
if len(steps_list) <= 2:
LH.warning("Fan does not have enough steps to calibrate!")
return None
step1 = steps_list[int(len(steps_list) / 2)] # pick something in the middle
step2 = steps_list[-2] # pick the second highest possible setting
emc2101.set_driver_strength(step1)
time.sleep(5)
dutycycle1 = int(step1 * 100 / len(steps_list))
rpm1 = emc2101.get_rpm()
LH.debug("dutycycle: %i%% -> RPM: %i", dutycycle1, rpm1)
emc2101.set_driver_strength(step2)
time.sleep(5)
dutycycle2 = int(step2 * 100 / len(steps_list))
rpm2 = emc2101.get_rpm()
LH.debug("dutycycle: %i%% -> RPM: %i", dutycycle2, rpm2)
if rpm1 is None or rpm2 is None:
LH.error("Unable to get a reliable RPM reading. Aborting.")
return None
if rpm1 * 100 / rpm2 < 96:
LH.info("Yes, it does. Observed an RPM change in response to PWM signal. (%i%%: %i -> %i%%: %i RPM)", dutycycle1, rpm1, dutycycle2, rpm2)
else:
LH.warning("Failed to observe a significant speed change in response to PWM signal! Aborting.")
LH.warning("Please verify wiring and configuration.")
return None
# -----------------------------------------------------------------
LH.info("Mapping PWM dutycycle to RPM. Please wait.")
mappings = list()
for step in steps_list:
dutycycle = int(step * 100 / len(steps_list))
# set fan speed and wait for the speed to settle
emc2101.set_driver_strength(step)
time.sleep(1)
readings = [99999, 99999, 99999]
for i in range(24):
cursor = i % len(readings)
rpm_cur = emc2101.get_rpm()
if rpm_cur is not None:
# order is important! (update readings before calculating the average)
readings[cursor] = rpm_cur
rpm_avg = sum(readings) / len(readings)
# calculate deviation from average
deviation = rpm_cur / rpm_avg
LH.debug("step: %2i i: %2i -> rpm: %4i deviation: %3.2f", step, cursor, rpm_cur, deviation)
if 0.99 <= deviation <= 1.01:
# RPM will never be exact and fluctuates slightly
# -> round to nearest factor of 5
rpm = round(rpm_avg / 5) * 5
LH.debug("Fan has settled: (step: %i -> dutycycle: %3i%%, rpm: %i)", step, dutycycle, rpm)
mappings.append((step, dutycycle, rpm))
break
else:
time.sleep(0.5)
else:
LH.error("Unable to get a reliable RPM reading. Aborting.")
return None
else:
LH.warning("Fan never settled! (step: %i -> dutycycle: %3i%%, rpm: <n/a>)", step, dutycycle)
mappings.append((step, dutycycle, rpm))

# determine maximum RPM
rpm_max = max([rpm for (_, _, rpm) in mappings])
LH.info("Maximum RPM: %i", rpm_max)

# prune steps
# - multiple steps may result in the same RPM (e.g. minimum RPM)
# - ensure each step is significantly different from the previous
# - ensure each step increases RPM
prune = list()
rpm_delta_min = rpm_max * 0.011
for i in range(len(mappings) - 1):
step, _, rpm_this = mappings[i]
_, _, rpm_next = mappings[i + 1]
if rpm_this + rpm_delta_min <= rpm_next:
# significantly different from next element -> keep it
pass
else:
# within range of next element -> prune it
prune.append(step)

steps: Steps = dict()
for step, dutycycle, rpm in mappings:
rpm_percent = rpm * 100 / rpm_max
LH.info("step: %2i dutycycle: %3i%% -> RPM: %5i (%3.0f%%)", step, dutycycle, rpm, rpm_percent)
if step not in prune:
steps[step] = (dutycycle, rpm)

fan_profile = FanConfig(
model=model,
rpm_control_mode=RpmControlMode.PWM,
pwm_frequency=pwm_frequency,
minimum_duty_cycle=min([dutycycle for (_, (dutycycle, _)) in steps.items()]), # e.g. 20%
maximum_duty_cycle=max([dutycycle for (_, (dutycycle, _)) in steps.items()]), # typically 100%
minimum_rpm=min([rpm for (_, (_, rpm)) in steps.items() if rpm is not None]),
maximum_rpm=max([rpm for (_, (_, rpm)) in steps.items() if rpm is not None]),
steps=steps,
)
return fan_profile
52 changes: 15 additions & 37 deletions feeph/emc2101/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,29 +169,29 @@ def describe_device(self):
# ---------------------------------------------------------------------

def configure_pin_six_as_alert(self) -> bool:
# set 0x03.2 to 0
with BurstHandler(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr) as bh:
cfg_register_value = bh.read_register(0x03)
if cfg_register_value is not None:
try:
# set 0x03.2 to 0
with BurstHandler(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr) as bh:
cfg_register_value = bh.read_register(0x03)
bh.write_register(0x03, cfg_register_value & 0b1111_1011)
# clear spin up behavior settings
# (spin up is unavailable when pin 6 is in alert mode),
bh.write_register(0x4B, 0b0000_0000)
return True
else:
LH.error("Unable to read config register!")
return False
except RuntimeError:
LH.error("Unable to read config register!")
return False

def configure_pin_six_as_tacho(self) -> bool:
# set 0x03.2 to 1
with BurstHandler(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr) as bh:
cfg_register_value = bh.read_register(0x03)
if cfg_register_value is not None:
try:
# set 0x03.2 to 1
with BurstHandler(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr) as bh:
cfg_register_value = bh.read_register(0x03)
bh.write_register(0x03, cfg_register_value | 0b0000_0100)
return True
else:
LH.error("Unable to read config register!")
return False
except RuntimeError:
LH.error("Unable to read config register!")
return False

def configure_dac_control(self, step_max: int):
# enable DAC control (set 0x03.4 to 1)
Expand Down Expand Up @@ -577,41 +577,19 @@ def configure_external_temperature_sensor(self, dif: int, bcf: int) -> bool:
dev_status = bh.read_register(0x02)
if not dev_status & 0b0000_0100:
LH.debug("The diode fault bit is clear.")
bh.write_register(0x12, dif)
bh.write_register(0x17, dif)
bh.write_register(0x18, bcf)
return True
else:
LH.error("The diode fault bit is set: Sensor is faulty or missing.")
return False

def _uses_alert_mode(self) -> bool:
return not self._uses_tacho_mode()

def _uses_tacho_mode(self) -> bool:
with BurstHandler(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr) as bh:
status_register = bh.read_register(0x03)
return bool(status_register & 0b0000_0100)


# def parse_fanconfig_register(value: int) -> dict[str, Any]:
# # 0b00000000
# # ^^-- tachometer input mode
# # ^---- clock frequency override
# # ^----- clock select
# # ^------ polarity (0 = 100->0, 1 = 0->100)
# # ^------- configure lookup table (0 = on, 1 = off)
# config = {
# "tachometer input mode": value & 0b0000_0011,
# "clock frequency override": 'use frequency divider' if value & 0b0000_0100 else 'use clock select',
# "clock select base frequency": '1.4kHz' if value & 0b0000_1000 else '360kHz',
# "polarity": '0x00 = 100%, 0xFF = 0%' if value & 0b0001_0000 else '0x00 = 0%, 0xFF = 100%',
# "configure lookup table": 'allow dutycycle update' if value & 0b0010_0000 else 'disable dutycycle update',
# "external temperature setting": 'override external temperature' if value & 0b0100_0000 else 'measure external temperature',
# # the highest bit is unused
# }
# return config


def _convert_rpm2tach(rpm: int) -> tuple[int, int]:
# due to the way the conversion works the RPM can never
# be less than 82
Expand Down
109 changes: 58 additions & 51 deletions feeph/emc2101/fan_configs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#!/usr/bin/env python3
"""
"""

from enum import Enum
from typing import TypedDict
Expand All @@ -21,17 +19,21 @@ class FanConfig:
def __init__(self, model: str, rpm_control_mode: RpmControlMode, minimum_duty_cycle: int | None, maximum_duty_cycle: int | None, minimum_rpm: int, maximum_rpm: int, pwm_frequency: int | None = None, steps: Steps | None = None):
self.model = model
self.rpm_control_mode = rpm_control_mode
self.pwm_frequency = 0
if rpm_control_mode == RpmControlMode.PWM:
if pwm_frequency is not None:
self.pwm_frequency = pwm_frequency
else:
raise ValueError("must provide a PWM frequency for PWM fans")
self.steps = steps
raise ValueError('must provide a PWM frequency for PWM fans')
if steps is None:
self.steps = dict()
else:
self.steps = steps
self.minimum_duty_cycle: int | None = None
self.maximum_duty_cycle: int | None = None
if minimum_duty_cycle is not None and maximum_duty_cycle is not None:
if minimum_duty_cycle > maximum_duty_cycle:
raise ValueError("minimum duty cycle must be smaller than maximum duty cycle")
raise ValueError('minimum duty cycle must be smaller than maximum duty cycle')
if minimum_duty_cycle >= 0:
self.minimum_duty_cycle = minimum_duty_cycle
else:
Expand All @@ -52,74 +54,79 @@ class FanConfigArgs(TypedDict):
minimum_rpm: int
maximum_rpm: int
pwm_frequency: int | None
steps: Steps | None
steps: Steps


def export_fan_config(fan_config: FanConfig) -> dict[str, str | int | Steps | None]:
def export_fan_config(fan_config: FanConfig) -> dict[str, str | int | dict[int, dict[str, int | None]] | None]:
steps: dict[int, dict[str, int | None]] = dict()
for step, (dutycycle, rpm) in fan_config.steps.items():
steps[step] = {
'dutycycle': dutycycle,
'rpm': rpm,
}
if fan_config.rpm_control_mode == RpmControlMode.VOLTAGE:
return {
"model": fan_config.model,
"control_mode": "VOLTAGE",
"minimum_rpm": fan_config.minimum_rpm,
"maximum_rpm": fan_config.maximum_rpm,
"steps": fan_config.steps,
'model': fan_config.model,
'control_mode': 'VOLTAGE',
'minimum_rpm': fan_config.minimum_rpm,
'maximum_rpm': fan_config.maximum_rpm,
'steps': steps,
}
elif fan_config.rpm_control_mode == RpmControlMode.PWM:
steps: dict[int, dict[str, int | None]] | None = None
if fan_config.steps is not None:
steps = {}
for step, (dutycycle, rpm) in fan_config.steps.items():
steps[step] = {
"dutycycle": dutycycle,
"rpm": rpm,
}
return {
"model": fan_config.model,
"control_mode": "PWM",
"pwm_frequency": fan_config.pwm_frequency,
"minimum_duty_cycle": fan_config.minimum_duty_cycle,
"maximum_duty_cycle": fan_config.maximum_duty_cycle,
"minimum_rpm": fan_config.minimum_rpm,
"maximum_rpm": fan_config.maximum_rpm,
"steps": steps, # type: ignore
'model': fan_config.model,
'control_mode': 'PWM',
'pwm_frequency': fan_config.pwm_frequency,
'minimum_duty_cycle': fan_config.minimum_duty_cycle,
'maximum_duty_cycle': fan_config.maximum_duty_cycle,
'minimum_rpm': fan_config.minimum_rpm,
'maximum_rpm': fan_config.maximum_rpm,
'steps': steps,
}
else:
raise ValueError("unknown control type")
raise ValueError('unknown control type')


def import_fan_config(fan_config: dict[str, str | int | Steps]) -> FanConfig:
if fan_config["control_mode"] == "VOLTAGE":
def import_fan_config(fan_config: dict[str, str | int | dict[int, dict[str, int | None]]]) -> FanConfig:
steps: Steps = dict()
if 'steps' in fan_config and isinstance(fan_config['steps'], dict):
for step, step_record in fan_config['steps'].items():
if step_record['dutycycle'] is not None:
steps[step] = (step_record['dutycycle'], step_record['rpm'])
else:
raise ValueError("dutycycle can't be empty")
if fan_config['control_mode'] == 'VOLTAGE':
params_dac: FanConfigArgs = {
"model": str(fan_config["model"]),
"rpm_control_mode": RpmControlMode.VOLTAGE,
"pwm_frequency": None,
"minimum_duty_cycle": None,
"maximum_duty_cycle": None,
"minimum_rpm": fan_config["minimum_rpm"], # type: ignore
"maximum_rpm": fan_config["maximum_rpm"], # type: ignore
"steps": fan_config["steps"], # type: ignore
'model': str(fan_config['model']),
'rpm_control_mode': RpmControlMode.VOLTAGE,
'pwm_frequency': None,
'minimum_duty_cycle': None,
'maximum_duty_cycle': None,
'minimum_rpm': fan_config['minimum_rpm'], # type: ignore
'maximum_rpm': fan_config['maximum_rpm'], # type: ignore
'steps': steps,
}
return FanConfig(**params_dac)
elif fan_config["control_mode"] == "PWM":
elif fan_config['control_mode'] == 'PWM':
params_pwm: FanConfigArgs = {
"model": str(fan_config["model"]),
"rpm_control_mode": RpmControlMode.PWM,
"pwm_frequency": fan_config["pwm_frequency"], # type: ignore
"minimum_duty_cycle": fan_config["minimum_duty_cycle"], # type: ignore
"maximum_duty_cycle": fan_config["maximum_duty_cycle"], # type: ignore
"minimum_rpm": fan_config["minimum_rpm"], # type: ignore
"maximum_rpm": fan_config["maximum_rpm"], # type: ignore
"steps": fan_config["steps"], # type: ignore
'model': str(fan_config['model']),
'rpm_control_mode': RpmControlMode.PWM,
'pwm_frequency': fan_config['pwm_frequency'], # type: ignore
'minimum_duty_cycle': fan_config['minimum_duty_cycle'], # type: ignore
'maximum_duty_cycle': fan_config['maximum_duty_cycle'], # type: ignore
'minimum_rpm': fan_config['minimum_rpm'], # type: ignore
'maximum_rpm': fan_config['maximum_rpm'], # type: ignore
'steps': steps,
}
return FanConfig(**params_pwm)
else:
raise ValueError("unknown control type")
raise ValueError('unknown control type')


# provide reasonable default configurations for DC and PWM fans

# probably a bad idea to provide less than 50% supply voltage (fan might fail to start properly)
generic_dc_fan = FanConfig(model="generic DC fan", rpm_control_mode=RpmControlMode.VOLTAGE, minimum_duty_cycle=50, maximum_duty_cycle=100, minimum_rpm=100, maximum_rpm=2000)
generic_dc_fan = FanConfig(model='generic DC fan', rpm_control_mode=RpmControlMode.VOLTAGE, minimum_duty_cycle=50, maximum_duty_cycle=100, minimum_rpm=100, maximum_rpm=2000)

# some fans treat a duty cycle of less than 20% as 'no signal' and go full speed instead
generic_pwm_fan = FanConfig(model="generic PWM fan", rpm_control_mode=RpmControlMode.PWM, minimum_duty_cycle=20, maximum_duty_cycle=100, minimum_rpm=100, maximum_rpm=2000, pwm_frequency=22500)
generic_pwm_fan = FanConfig(model='generic PWM fan', rpm_control_mode=RpmControlMode.PWM, minimum_duty_cycle=20, maximum_duty_cycle=100, minimum_rpm=100, maximum_rpm=2000, pwm_frequency=22500)
Loading