Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
"gpio_slowdown": 3,
"rp1_rio": 0
},
"double_sided": {
"enabled": false,
"copies": 2,
"axis": "horizontal"
},
"display_durations": {},
"use_short_date_format": true,
"vegas_scroll": {
Expand Down
164 changes: 156 additions & 8 deletions src/display_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from contextlib import contextmanager
from PIL import Image, ImageDraw, ImageFont
import time
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
import logging
import math
import freetype
Expand All @@ -42,6 +42,106 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) # Set to INFO level


class _LogicalMatrix:
"""Proxy that reports a logical (per-screen) size for a physical matrix.

In double-sided mode the physical panel chain shows N identical copies of a
smaller logical screen. Plugins size themselves from ``matrix.width`` /
``matrix.height`` (the documented convention, used at 30+ call sites), so
this proxy reports the logical dimensions while delegating every real
operation — ``CreateFrameCanvas``, ``SwapOnVSync``, ``brightness``,
``Clear`` and so on — to the underlying physical matrix. The duplication
itself happens once per frame in :meth:`DisplayManager.update_display`.
"""

__slots__ = ("_logical_height", "_logical_width", "_matrix")

def __init__(self, matrix: RGBMatrix, logical_width: int, logical_height: int) -> None:
object.__setattr__(self, "_matrix", matrix)
object.__setattr__(self, "_logical_width", logical_width)
object.__setattr__(self, "_logical_height", logical_height)

@property
def width(self) -> int:
"""Logical (per-screen) width reported to plugins."""
return self._logical_width

@property
def height(self) -> int:
"""Logical (per-screen) height reported to plugins."""
return self._logical_height

def __getattr__(self, name: str) -> Any:
"""Forward any non-overridden attribute access to the physical matrix.

Reached only when normal lookup fails (i.e. not width/height/_*).
"""
return getattr(object.__getattribute__(self, "_matrix"), name)

def __setattr__(self, name: str, value: Any) -> None:
"""Forward attribute writes (e.g. ``matrix.brightness = 80``) to it."""
setattr(object.__getattribute__(self, "_matrix"), name, value)


def _resolve_double_sided(physical_width: int, physical_height: int,
ds_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Validate the ``display.double_sided`` config against the physical size.

Returns a dict ``{copies, axis, logical_width, logical_height}`` when the
feature is enabled and the physical panel divides evenly into ``copies``
along the chosen axis, otherwise ``None`` (single-screen behaviour). Bad
config is logged and disabled rather than raised — a misconfigured panel
should still light up.
"""
if not isinstance(ds_config, dict) or not ds_config.get('enabled', False):
return None

copies = ds_config.get('copies', 2)
if not isinstance(copies, int) or copies < 2:
logger.warning(
"double_sided: 'copies' must be an integer >= 2 (got %r); "
"disabling double-sided mode", copies)
return None

axis = ds_config.get('axis', 'horizontal')
if axis not in ('horizontal', 'vertical'):
logger.warning(
"double_sided: 'axis' must be 'horizontal' or 'vertical' "
"(got %r); defaulting to 'horizontal'", axis)
axis = 'horizontal'

# Horizontal splits the chain (panels side by side); vertical splits the
# parallel outputs (panels stacked). The split axis must divide evenly.
if axis == 'horizontal':
if physical_width % copies != 0:
logger.warning(
"double_sided: physical width %d is not divisible by copies "
"%d; disabling double-sided mode", physical_width, copies)
return None
logical_width = physical_width // copies
logical_height = physical_height
else:
if physical_height % copies != 0:
logger.warning(
"double_sided: physical height %d is not divisible by copies "
"%d; disabling double-sided mode", physical_height, copies)
return None
logical_width = physical_width
logical_height = physical_height // copies

logger.info(
"double_sided enabled: %d copies on %s axis — logical screen %dx%d "
"tiled across physical %dx%d", copies, axis, logical_width,
logical_height, physical_width, physical_height)
return {
'copies': copies,
'axis': axis,
'logical_width': logical_width,
'logical_height': logical_height,
}


class DisplayManager:
"""
Singleton hardware abstraction layer for the RGB LED matrix.
Expand Down Expand Up @@ -76,6 +176,10 @@ def __init__(self, config: Dict[str, Any] = None, force_fallback: bool = False,
self._suppress_test_pattern = suppress_test_pattern
# When True, update_display() and clear() skip hardware writes (used during off-screen content capture)
self._capture_mode_active = False
# Double-sided mode state (resolved in _setup_matrix). When disabled,
# the logical image is blitted to the matrix unchanged.
self._double_sided = None # dict {copies, axis, logical_width, logical_height} or None
self._physical_image = None # full-chain buffer reused each frame when tiling
# Text-width measurement cache: (text, id(font)) -> pixel_width
# Avoids re-measuring the same string+font on every display() call.
# Cleared on _load_fonts() so stale entries don't survive a font reload.
Expand Down Expand Up @@ -168,13 +272,26 @@ def _setup_matrix(self):
# Initialize the matrix
self.matrix = RGBMatrix(options=options)
logger.info("RGB Matrix initialized successfully")

# Create double buffer for smooth updates

# Create double buffer for smooth updates. The canvases are always
# full physical size — they back the real chain regardless of mode.
self.offscreen_canvas = self.matrix.CreateFrameCanvas()
self.current_canvas = self.matrix.CreateFrameCanvas()
logger.info("Frame canvases created successfully")

# Create image with full chain width

# Double-sided mode: wrap the physical matrix so plugins see the
# logical (per-screen) size, and keep a full-chain buffer to tile
# the rendered screen into once per frame.
ds_config = self.config.get('display', {}).get('double_sided', {})
ds = _resolve_double_sided(self.matrix.width, self.matrix.height, ds_config)
self._double_sided = ds
if ds is not None:
self._physical_image = Image.new(
'RGB', (self.matrix.width, self.matrix.height))
self.matrix = _LogicalMatrix(
self.matrix, ds['logical_width'], ds['logical_height'])

# Create image with the (logical) display dimensions
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
logger.info(f"Image canvas created with dimensions: {self.matrix.width}x{self.matrix.height}")
Expand All @@ -201,8 +318,16 @@ def _setup_matrix(self):
rows = int(hardware_config.get('rows', 32))
cols = int(hardware_config.get('cols', 64))
chain_length = int(hardware_config.get('chain_length', 2))
parallel = int(hardware_config.get('parallel', 1))
fallback_width = max(1, cols * chain_length)
fallback_height = max(1, rows)
fallback_height = max(1, rows * parallel)
# Mirror double-sided in fallback so the preview shows one screen.
ds_config = self.config.get('display', {}).get('double_sided', {}) if self.config else {}
ds = _resolve_double_sided(fallback_width, fallback_height, ds_config)
self._double_sided = ds
if ds is not None:
fallback_width = ds['logical_width']
fallback_height = ds['logical_height']
except Exception:
fallback_width, fallback_height = 128, 32

Expand Down Expand Up @@ -364,6 +489,25 @@ def capture_mode(self):
finally:
self._capture_mode_active = False

def _composite_double_sided(self):
"""Tile the logical screen across the full physical chain.

Renders once into ``self._physical_image`` by pasting the rendered
logical image ``copies`` times along the configured axis. The paste is
a single memcpy per copy, so the per-frame cost is negligible and the
plugin render path is untouched.
"""
ds = self._double_sided
phys = self._physical_image
lw = ds['logical_width']
lh = ds['logical_height']
for i in range(ds['copies']):
if ds['axis'] == 'vertical':
phys.paste(self.image, (0, i * lh))
else:
phys.paste(self.image, (i * lw, 0))
return phys

def update_display(self):
"""Update the display using double buffering with proper sync."""
try:
Expand All @@ -377,8 +521,12 @@ def update_display(self):
if self._capture_mode_active:
return # Skip hardware write — content is being captured off-screen

# Copy the current image to the offscreen canvas
self.offscreen_canvas.SetImage(self.image)
# Copy the current image to the offscreen canvas. In double-sided
# mode the logical screen is first tiled across the full chain.
if self._double_sided is not None:
self.offscreen_canvas.SetImage(self._composite_double_sided())
else:
self.offscreen_canvas.SetImage(self.image)

# Swap buffers immediately
self.matrix.SwapOnVSync(self.offscreen_canvas)
Expand Down
107 changes: 105 additions & 2 deletions test/test_display_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,114 @@ def test_draw_image(self, test_config, mock_rgb_matrix):

class TestDisplayManagerResourceManagement:
"""Test resource management."""

def test_cleanup(self, test_config, mock_rgb_matrix):
"""Test cleanup operation."""
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(test_config)
dm.cleanup()

dm.matrix.Clear.assert_called()


class TestDisplayManagerDoubleSided:
"""Double-sided mode: render once at logical size, tile across the chain."""

def _config(self, **double_sided):
"""Build a config (physical 128x32) with the given double_sided block."""
return {
'display': {
'hardware': {
'rows': 32, 'cols': 64, 'chain_length': 2, 'parallel': 1,
'hardware_mapping': 'adafruit-hat-pwm', 'brightness': 90,
},
'runtime': {'gpio_slowdown': 2},
'double_sided': double_sided,
},
'timezone': 'UTC',
'plugin_system': {'plugins_directory': 'plugins'},
}

def _captured_physical(self, mock_rgb_matrix):
"""Return the image handed to the canvas on the last update_display()."""
canvas = mock_rgb_matrix['matrix_instance'].CreateFrameCanvas.return_value
return canvas.SetImage.call_args[0][0]

def test_horizontal_reports_logical_dimensions(self, mock_rgb_matrix):
"""Plugins see the per-screen size, not the full physical chain."""
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=True, copies=2, axis='horizontal'),
suppress_test_pattern=True)
# Physical chain is 128x32; two side-by-side copies -> logical 64x32.
assert dm.matrix.width == 64
assert dm.matrix.height == 32
assert (dm.width, dm.height) == (64, 32)
assert dm.image.size == (64, 32)

def test_horizontal_tiles_image_across_chain(self, mock_rgb_matrix):
"""The logical screen is duplicated left/right into a full-chain frame."""
from PIL import Image
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=True, copies=2, axis='horizontal'),
suppress_test_pattern=True)
logical = Image.new('RGB', (64, 32), (0, 0, 0))
logical.putpixel((5, 5), (255, 0, 0))
dm.image = logical
dm.update_display()

physical = self._captured_physical(mock_rgb_matrix)
assert physical.size == (128, 32)
assert physical.getpixel((5, 5)) == (255, 0, 0)
assert physical.getpixel((69, 5)) == (255, 0, 0) # copy shifted +64

def test_vertical_axis_tiles_stacked(self, mock_rgb_matrix):
"""Vertical axis stacks copies (for panels on parallel outputs)."""
from PIL import Image
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=True, copies=2, axis='vertical'),
suppress_test_pattern=True)
# 128x32 split vertically -> logical 128x16.
assert (dm.matrix.width, dm.matrix.height) == (128, 16)
logical = Image.new('RGB', (128, 16), (0, 0, 0))
logical.putpixel((10, 3), (0, 255, 0))
dm.image = logical
dm.update_display()

physical = self._captured_physical(mock_rgb_matrix)
assert physical.size == (128, 32)
assert physical.getpixel((10, 3)) == (0, 255, 0)
assert physical.getpixel((10, 19)) == (0, 255, 0) # copy shifted +16

def test_indivisible_dimension_disables_mode(self, mock_rgb_matrix):
"""A physical size that doesn't divide evenly falls back to single."""
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=True, copies=3, axis='horizontal'),
suppress_test_pattern=True)
assert dm._double_sided is None # 128 % 3 != 0
assert dm.matrix.width == 128
assert dm.image.size == (128, 32)

def test_disabled_blits_logical_image_unchanged(self, mock_rgb_matrix):
"""With the feature off, the rendered image is sent through untouched."""
from PIL import Image
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=False), suppress_test_pattern=True)
assert dm._double_sided is None
img = Image.new('RGB', (128, 32))
dm.image = img
dm.update_display()
assert self._captured_physical(mock_rgb_matrix) is img

def test_brightness_write_forwards_through_proxy(self, mock_rgb_matrix):
"""Setting brightness via the proxy reaches the real matrix."""
DisplayManager._instance = None
with patch.dict('os.environ', {'EMULATOR': 'false'}):
dm = DisplayManager(self._config(enabled=True, copies=2, axis='horizontal'),
suppress_test_pattern=True)
assert dm.set_brightness(70) is True
assert mock_rgb_matrix['matrix_instance'].brightness == 70
Loading
Loading