Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions nodescraper/plugins/inband/storage/analyzer_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
# SOFTWARE.
#
###############################################################################
from pydantic import BaseModel
from typing import Optional

from pydantic import BaseModel, Field


class StorageAnalyzerArgs(BaseModel):
min_required_free_space: str = "50G"
min_required_free_space_abs: Optional[str] = None
min_required_free_space_prct: Optional[int] = None
ignore_devices: Optional[list[str]] = Field(default_factory=list)
check_devices: Optional[list[str]] = Field(default_factory=list)
regex_match: bool = False
118 changes: 88 additions & 30 deletions nodescraper/plugins/inband/storage/storage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# SOFTWARE.
#
###############################################################################
import re
from typing import Optional

from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus
Expand All @@ -39,6 +40,36 @@ class StorageAnalyzer(DataAnalyzer[StorageDataModel, StorageAnalyzerArgs]):

DATA_MODEL = StorageDataModel

def _matches_device_filter(
self, device_name: str, exp_devices: list[str], regex_match: bool
) -> bool:
"""Check if the device name matches any of the expected devices""

Args:
device_name (str): device name to check
exp_devices (list[str]): list of expected devices to match against
regex_match (bool): if True, use regex matching; otherwise, use exact match

Returns:
bool: True if the device name matches any of the expected devices, False otherwise
"""
for exp_device in exp_devices:
if regex_match:
try:
device_regex = re.compile(exp_device)
except re.error:
self._log_event(
category=EventCategory.STORAGE,
description=f"Invalid regex pattern: {exp_device}",
priority=EventPriority.ERROR,
)
continue
if device_regex.match(device_name):
return True
elif device_name == exp_device:
return True
return False

def analyze_data(
self, data: StorageDataModel, args: Optional[StorageAnalyzerArgs] = None
) -> TaskResult:
Expand All @@ -52,43 +83,70 @@ def analyze_data(
TaskResult: Result of the storage analysis containing the status and message.
"""
if args is None:
args = StorageAnalyzerArgs()
args = StorageAnalyzerArgs(min_required_free_space_prct=10)
elif args.min_required_free_space_abs is None and args.min_required_free_space_prct is None:
args.min_required_free_space_prct = 10
self.logger.warning(
"No thresholds provided for storage analyzer arguments; defaulting to 10% free"
)

min_free = convert_to_bytes(args.min_required_free_space)
if not data.storage_data:
self.result.message = "No storage data available"
self.result.status = ExecutionStatus.NOT_RAN
return self.result

self.result.status = ExecutionStatus.OK
passing_devices = []
failing_devices = []
for device_name, device_data in data.storage_data.items():
free = convert_to_bytes(str(device_data.free))
if free > min_free:
self.result.message = f"'{device_name}' has {bytes_to_human_readable(device_data.free)} available, {device_data.percent}% used"
self.result.status = ExecutionStatus.OK
break
else:
self.result.message = "Not enough disk storage!"
if args.check_devices:
if not self._matches_device_filter(
Comment thread
landrews-amd marked this conversation as resolved.
device_name, args.check_devices, args.regex_match
):
continue
elif args.ignore_devices:
if self._matches_device_filter(device_name, args.ignore_devices, args.regex_match):
continue

condition = False
if args.min_required_free_space_abs:
min_free_abs = convert_to_bytes(args.min_required_free_space_abs)
free_abs = convert_to_bytes(str(device_data.free))
if free_abs and free_abs > min_free_abs:
condition = True
else:
condition = True

if args.min_required_free_space_prct:
free_prct = 100 - device_data.percent
condition = condition and (free_prct > args.min_required_free_space_prct)

if condition:
passing_devices.append(device_name)
else:
device = convert_to_bytes(str(device_data.total))
prct = device_data.percent
failing_devices.append(device_name)
event_data = {
"offending_device": {
"device": device_name,
"total": device_data.total,
"free": device_data.free,
"percent": device_data.percent,
},
}
self._log_event(
category=EventCategory.STORAGE,
description=f"Insufficient disk space: {bytes_to_human_readable(device)} and {prct}%, used on {device_name}",
data=event_data,
priority=EventPriority.CRITICAL,
console_log=True,
)
if failing_devices:
self.result.message = f"Insufficient disk space on " f"[{', '.join(failing_devices)}]"
self.result.status = ExecutionStatus.ERROR
# find the device with the largest total storage, and its free space
largest_device = max(
data.storage_data,
key=lambda x: convert_to_bytes(str(data.storage_data[x].total)),
)
largest_free = data.storage_data[largest_device].free
largest_percent = data.storage_data[largest_device].percent
event_data = {
"largest_device": {
"device": largest_device,
"total": data.storage_data[largest_device].total,
"free": largest_free,
"percent": largest_percent,
},
}
self._log_event(
category=EventCategory.STORAGE,
description=f"{self.result.message} {largest_percent}% used on {largest_device}",
data=event_data,
priority=EventPriority.CRITICAL,
console_log=True,
else:
self.result.message = (
f"Sufficient disk space available on " f"[{', '.join(passing_devices)}]"
)
return self.result
106 changes: 86 additions & 20 deletions test/unit/plugin/test_storage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,79 @@ def model_obj():
)


@pytest.fixture
def multiple_dev():
return StorageDataModel(
storage_data={
"dev1": DeviceStorageData(total=100, free=90, used=10, percent=10),
"dev2": DeviceStorageData(total=100, free=5, used=95, percent=95),
}
)


@pytest.fixture
def analyzer(system_info):
return StorageAnalyzer(system_info=system_info)


def test_nominal_with_config(analyzer, model_obj):
args = StorageAnalyzerArgs(min_required_free_space="600G")
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.OK
assert result.message == "'/dev/nvme0n1p2' has 869.8GB available, 3.0% used"
assert len(result.events) == 0
def test_filter_exact_match(analyzer):
assert analyzer._matches_device_filter("foo", ["foo", "bar"], regex_match=False)
assert not analyzer._matches_device_filter("baz", ["foo", "bar"], regex_match=False)


def test_filter_regex_match(analyzer):
assert analyzer._matches_device_filter("disk0", [r"disk\d"], regex_match=True)
assert not analyzer._matches_device_filter("diskA", [r"disk\d"], regex_match=True)


def test_filter_invalid_regex(analyzer, monkeypatch):
calls = []
monkeypatch.setattr(analyzer, "_log_event", lambda **kw: calls.append(kw))
assert not analyzer._matches_device_filter("somestring", ["[invalid"], regex_match=True)
assert len(calls) == 1
event = calls[0]
assert event["category"] == EventCategory.STORAGE
assert event["priority"] == EventPriority.ERROR
assert "Invalid regex pattern" in event["description"]


def test_check_devices_only(analyzer, multiple_dev):
args = StorageAnalyzerArgs(min_required_free_space_prct=50, check_devices=["dev1"])
res = analyzer.analyze_data(multiple_dev, args)
assert res.status == ExecutionStatus.OK


def test_ignore_devices(analyzer, multiple_dev):
args = StorageAnalyzerArgs(min_required_free_space_prct=50, ignore_devices=["dev2"])
res = analyzer.analyze_data(multiple_dev, args)
assert res.status == ExecutionStatus.OK

def test_nominal_no_config(analyzer, model_obj):
result = analyzer.analyze_data(model_obj)

def test_check_overrides_ignore(analyzer, multiple_dev):
args = StorageAnalyzerArgs(
min_required_free_space_prct=50, check_devices=["dev2"], ignore_devices=["dev2", "dev1"]
)
res = analyzer.analyze_data(multiple_dev, args)
assert res.status == ExecutionStatus.ERROR
assert any(e.category == EventCategory.STORAGE.value for e in res.events)


def test_only_absolute_threshold_fails(analyzer, model_obj):
args = StorageAnalyzerArgs(min_required_free_space_abs="800GB")
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.OK
assert result.message == "'/dev/nvme0n1p2' has 869.8GB available, 3.0% used"
assert len(result.events) == 0
assert "Sufficient disk space available on [/dev/nvme0n1p2]" in result.message


def test_insufficient_free_storage(analyzer, model_obj):
args = StorageAnalyzerArgs(min_required_free_space="1TB")
def test_only_percentage_threshold_fails(analyzer, model_obj):
args = StorageAnalyzerArgs(min_required_free_space_prct=99)
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.ERROR
for event in result.events:
assert event.category == EventCategory.STORAGE.value
assert event.priority == EventPriority.CRITICAL
assert any(event.category == EventCategory.STORAGE.value for event in result.events)
assert any(event.priority == EventPriority.CRITICAL for event in result.events)


def test_windows_nominal(system_info):
def test_both_abs_and_prct_fail(system_info):
system_info.os_family = OSFamily.WINDOWS
analyzer = StorageAnalyzer(system_info=system_info)

Expand All @@ -94,9 +137,32 @@ def test_windows_nominal(system_info):
)
}
)
args = StorageAnalyzerArgs(min_required_free_space="10GB")

args = StorageAnalyzerArgs(min_required_free_space_abs="10GB", min_required_free_space_prct=96)
result = analyzer.analyze_data(model, args)
assert result.status == ExecutionStatus.OK
assert result.message == "'C:' has 466.44GB available, 53.97% used"
assert len(result.events) == 0
assert result.status == ExecutionStatus.ERROR
assert "Insufficient disk space" in result.message
assert len(result.events) == 1
assert any(e.category == EventCategory.STORAGE.value for e in result.events)
assert any(e.priority == EventPriority.CRITICAL for e in result.events)

args2 = StorageAnalyzerArgs(min_required_free_space_prct=40, min_required_dree_space_abs="1GB")
result2 = analyzer.analyze_data(model, args2)
assert result2.status == ExecutionStatus.OK


def test_device_filter(analyzer, model_obj):
model_obj.storage_data["some_device"] = DeviceStorageData(
total=1000, free=100, used=900, percent=90
)

args = StorageAnalyzerArgs(min_required_free_space_prct="20")
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.ERROR
assert len(result.events) == 1

args2 = StorageAnalyzerArgs(
min_required_free_space_prct="20", ignore_devices=["some_device"], regex_match=True
)
result2 = analyzer.analyze_data(model_obj, args2)
assert result2.status == ExecutionStatus.OK