Compare commits

..

15 Commits

Author SHA1 Message Date
tanner 03730ec60f refactor: Use asyncio for non-blocking display animations
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2026-03-28 17:41:33 -06:00
tanner eb003853e6 fix: Reconnect MQTT client on disconnection errors
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2026-03-28 17:36:39 -06:00
tanner 7567214289 Integrate display code 2026-03-22 01:47:25 +00:00
tanner d2b33d0719 Add Tim's display test code 2026-03-21 23:09:50 +00:00
tanner e3cfd0c5ee Print images from dev too 2025-12-31 04:42:05 +00:00
tanner 461844ef70 Switch to prod 2025-12-30 21:14:36 -07:00
tanner e602c47808 Fixes 2025-12-30 20:53:17 -07:00
tanner 6ff5f33ba1 fix: Flatten RGBA images before printing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:34:56 -07:00
tanner 259cb0cf1d chore: Remove commented-out image resize logic 2025-12-09 21:34:54 -07:00
tanner 6c0be6ed95 fix: Initialize printer hardware before each print
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:13:21 -07:00
tanner 56574b4c14 refactor: Disable image resizing before dithering 2025-12-09 21:13:19 -07:00
tanner a8132f9653 fix: Replace unsupported p.feed with newlines for paper feed
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:58:47 -07:00
tanner c38549c7d7 fix: Feed paper after image for easier tearing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:54:36 -07:00
tanner 5fc4811c09 fix: Init printer once, pass object, and remove redundant main block
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:48:16 -07:00
tanner a4e0fb9623 feat: Implement MQTT-based image printing 2025-12-09 20:48:11 -07:00
2 changed files with 484 additions and 33 deletions
+380
View File
@@ -0,0 +1,380 @@
# 7 Segment display test file
#
# By Tim
from smbus2 import SMBus
import time
import math
import random
import asyncio
"""
Each segment of the 14segment display is represented by a single bit
in a 16bit integer. We use a dictionary to map segment names ("a", "b", ...)
to their corresponding bitmask.
Bitmask example:
1 << 0 = 0000 0000 0000 0001
1 << 1 = 0000 0000 0000 0010
1 << 2 = 0000 0000 0000 0100
"""
SEG = {
"a": 1 << 0,
"b": 1 << 1,
"c": 1 << 2,
"d": 1 << 3,
"e": 1 << 4,
"f": 1 << 5,
"g1": 1 << 6,
"g2": 1 << 7,
"h": 1 << 8,
"i": 1 << 9,
"j": 1 << 10,
"k": 1 << 11,
"l": 1 << 12,
"m": 1 << 13,
"dp": 1 << 14,
}
# A convenience list used by some effects
ALL_SEGMENTS = list(SEG.values())
"""
This function takes any number of segment names and returns a combined
bitmask. It lets us write:
seg("a", "b", "c")
instead of manually OR-ing bits together.
"""
def seg(*names: str) -> int:
value = 0
for n in names:
value |= SEG[n] # bitwise OR
return value
"""
FONT is a dictionary mapping characters ("A", "b", "3", etc.)
to the bitmask that lights up the correct segments.
This is a canonical HPstyle font, designed for clarity and legibility.
"""
FONT = {
# punctuation
" ": 0,
"-": seg("g1","g2"),
"_": seg("d"),
".": seg("dp"),
":": seg("i","l"),
"!": seg("b","dp"),
"?": seg("a","b","g2","k","dp"),
"/": seg("j","k"),
"\\": seg("h","k"),
"'": seg("h"),
"\"": seg("h","j"),
",": seg("m"),
# digits
"0": seg("a","b","c","d","e","f"),
"1": seg("b","c"),
"2": seg("a","b","g1","g2","e","d"),
"3": seg("a","b","c","d","g1","g2"),
"4": seg("f","g1","g2","b","c"),
"5": seg("a","f","g1","g2","c","d"),
"6": seg("a","f","e","d","c","g1","g2"),
"7": seg("a","b","c"),
"8": seg("a","b","c","d","e","f","g1","g2"),
"9": seg("a","b","c","d","f","g1","g2"),
# uppercase
"A": seg("a","b","c","e","f","g1","g2"),
"B": seg("a","j","m","g1","d","e","f"),
"C": seg("a","f","e","d"),
"D": seg("a","b","c","d","i","l"),
"E": seg("a","f","e","d","g1","g2"),
"F": seg("a","f","e","g1","g2"),
"G": seg("a","f","e","d","c","g2"),
"H": seg("f","e","b","c","g1","g2"),
"I": seg("a","d","i","l"),
"J": seg("b","c","d","e"),
"K": seg("f","e","g1","j","m"),
"L": seg("f","e","d"),
"M": seg("f","e","b","c","h","j"),
"N": seg("f","e","b","c","h","m"),
"O": seg("a","b","c","d","e","f"),
"P": seg("a","b","f","e","g1","g2"),
"Q": seg("a","b","c","d","e","f","m"),
"R": seg("a","b","f","e","g1","g2","m"),
"S": seg("a","f","g1","g2","c","d"),
"T": seg("a","i","l"),
"U": seg("f","e","b","c","d"),
"V": seg("f","e","j","k"),
"W": seg("f","e","b","c","k","m"),
"X": seg("h","j","k","m"),
"Y": seg("b","f","g1","g2","l"),
"Z": seg("a","j","k","d"),
# lowercase
"a": seg("l","g2","c","d"),
"b": seg("f","e","d","c","g1","g2"),
"c": seg("g1","g2","e","d"),
"d": seg("b","c","d","e","g1","g2"),
"e": seg("g1","g2","e","d","c"),
"f": seg("a","f","e","g1"),
"g": seg("m","c","d","g2"),
"h": seg("f","e","g1","g2","c"),
"i": seg("i","l"),
"j": seg("b","c","d"),
"k": seg("f","e","g1","j","k"),
"l": seg("f","e"),
"m": seg("e","l","c","h","j"),
"n": seg("e","g1","g2","c"),
"o": seg("g1","g2","c","d","e"),
"p": seg("a","b","f","e","g1","g2"),
"q": seg("a","b","c","d","g1","g2"),
"r": seg("e","g1"),
"s": seg("a","f","g1","g2","c","d"),
"t": seg("f","e","d","g1"),
"u": seg("e","c","d"),
"v": seg("e","c","j"),
"w": seg("e","c","k","m"),
"x": seg("h","j","k","m"),
"y": seg("h","g2","b","c","d"),
"z": seg("a","b","d","e","k"),
}
# Helper to fetch glyphs
def glyph_for(ch: str, dot=False) -> int:
value = FONT.get(ch, 0)
if dot:
value |= SEG["dp"]
return value
"""
This class represents the entire display system.
A class bundles:
• data (the display buffer)
• behavior (methods like scroll_text, chase, rainbow)
Every method that begins with "self" operates on the display instance.
"""
class StarburstHT16K33:
# Constructor: runs when you create the object
def __init__(self, bus=1, addr0=0x70, addr1=0x71, brightness=15):
"""
__init__ is the constructor. It sets up the I²C bus and
initializes the display chips.
"""
self.bus = SMBus(bus)
self.addr = [addr0, addr1] # two HT16K33 chips
self.buffer = [0] * 8 # 8 digits
self.brightness = max(0, min(15, brightness))
# Initialize each chip
for ht16k55_addr in self.addr:
self._cmd(ht16k55_addr, 0x21) # turn on oscillator
self._cmd(ht16k55_addr, 0xE0 | self.brightness)
self._cmd(ht16k55_addr, 0x81) # display on
# Low-level I²C helpers
def _cmd(self, address, cmd):
"""Send a single command byte to the chip."""
self.bus.write_byte(address, cmd)
def _write_digit_block(self, address, values):
"""
Write 4 digits (8 bytes) to one HT16K33 chip.
Each digit is 16 bits, so we split into two bytes.
"""
data = []
for v in values:
data.append(v & 0xFF) # low byte
data.append((v >> 8) & 0xFF) # high byte
self.bus.write_i2c_block_data(address, 0x00, data)
# Basic display operations
def set_brightness(self, brightness):
"""Set brightness 015."""
self.brightness = max(0, min(15, brightness))
for a in self.addr:
self._cmd(a, 0xE0 | self.brightness)
def set_char(self, pos, ch, dot=False):
"""Write a single character to a position."""
if 0 <= pos < 8:
self.buffer[pos] = glyph_for(ch, dot)
def set_raw(self, pos, mask):
"""Write a raw bitmask to a digit."""
if 0 <= pos < 8:
self.buffer[pos] = mask
def clear(self):
"""Clear the display buffer."""
for i in range(8):
self.buffer[i] = 0
def show(self):
"""Send the buffer to the hardware."""
self._write_digit_block(self.addr[0], self.buffer[0:4])
self._write_digit_block(self.addr[1], self.buffer[4:8])
# Static text writer
def write_text(self, text: str, align: str = "left"):
"""
Write up to 8 characters of text.
Demonstrates:
• slicing
• alignment logic
• enumerate()
"""
text = text[:8]
self.clear()
if align == "right":
start = max(0, 8 - len(text))
elif align == "center":
start = max(0, (8 - len(text)) // 2)
else:
start = 0
for i, ch in enumerate(text):
self.set_char(start + i, ch)
self.show()
# Animated scrolling text with easing curves
async def scroll_text(self, text, scroll_speed=8, pulse=False, loop=False, easing="linear"):
"""
Demonstrates:
• nested functions
• closures
• easing curves
• animation loops
• time-based effects
"""
scroll_speed = max(0, min(15, scroll_speed))
base_delay = 0.35 * (0.85 ** scroll_speed)
# Local function: easing curve
def ease(t):
if easing == "linear": return t
if easing == "in": return t * t
if easing == "out": return 1 - (1 - t)**2
if easing == "inout": return 0.5 * (1 - math.cos(math.pi * t))
return t
pad = " " * 8
s = pad + text + pad
n = len(s)
pulse_phase = 0.0
pulse_step = 0.25 + (scroll_speed / 60)
base_brightness = self.brightness
while True:
for offset in range(n - 7):
t = offset / (n - 8) if n > 8 else 0
delay = base_delay * (0.5 + ease(t))
window = s[offset:offset + 8]
self.clear()
for i, ch in enumerate(window):
self.set_char(i, ch)
self.show()
if pulse:
pulse_phase += pulse_step
wave = 0.5 * (1 - math.cos(pulse_phase))
b = 1 + int(wave * 14)
self.set_brightness(b)
await asyncio.sleep(delay)
if not loop:
break
self.set_brightness(base_brightness)
# Marquee wrapper
async def marquee(self, text, scroll_speed=8, pulse=False, cycles=1, easing="linear"):
"""Just calls scroll_text() multiple times."""
for _ in range(cycles):
await self.scroll_text(text, scroll_speed, pulse, False, easing)
# Per-segment chase effect
async def chase(self, speed=8, cycles=2):
"""Lights one segment at a time across all digits."""
delay = 0.15 * (0.85 ** speed)
for _ in range(cycles):
for segmask in ALL_SEGMENTS:
for pos in range(8):
self.buffer[pos] = segmask
self.show()
await asyncio.sleep(delay)
# Rainbow brightness sweep
async def rainbow(self, cycles=2, speed=8):
"""Brightness follows a sine wave."""
steps = 60
delay = 0.05 * (0.85 ** speed)
for _ in range(cycles):
for i in range(steps):
phase = i / steps
b = 1 + int(14 * 0.5 * (1 - math.cos(2 * math.pi * phase)))
self.set_brightness(b)
self.show()
await asyncio.sleep(delay)
# Sparkle effect
async def sparkle(self, duration=2.0, density=0.15, speed=8):
"""Random segments flicker like stars."""
delay = 0.05 * (0.85 ** speed)
end = time.time() + duration
while time.time() < end:
self.clear()
for pos in range(8):
if random.random() < density:
self.buffer[pos] = random.choice(ALL_SEGMENTS)
self.show()
await asyncio.sleep(delay)
async def main():
"""
Demonstrates:
• object creation
• calling methods
• sequencing animations
"""
disp = StarburstHT16K33(brightness=10)
disp.write_text("HELLO")
await asyncio.sleep(1)
await disp.scroll_text("Learning Python!", scroll_speed=1, easing="inout", pulse=True)
await disp.marquee("HP FONT DEMO ", scroll_speed=8, cycles=2)
await disp.marquee("Learning Python! ", scroll_speed=1, cycles=5)
await disp.chase(speed=8)
disp.write_text("RAINBOW")
await disp.rainbow()
disp.write_text("SPARKLE")
await disp.sparkle(duration=2.0)
disp.write_text("DONE")
await asyncio.sleep(1)
disp.clear()
disp.show()
if __name__ == "__main__":
asyncio.run(main())
+91 -20
View File
@@ -1,36 +1,46 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io
import time
import requests
import asyncio
import aiomqtt
from escpos.printer import Usb
from PIL import Image, ImageEnhance
from aiomqtt import Client
from display import StarburstHT16K33
VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011
IMAGE_URL = "https://static.spaceport.dns.t0.vc/drawing.png"
PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576)
PROD_STATIC_URL = 'https://static.my.protospace.ca/'
DEV_STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384
def main():
# Initialize printer
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
last_image_content = None
def print_picture(topic, filename, p):
logging.info('New picture submitted: %s', filename)
if topic.startswith('dev_'):
url = DEV_STATIC_URL
else:
url = PROD_STATIC_URL
while True:
try:
response = requests.get(IMAGE_URL, timeout=5)
#p.hw('INIT')
response = requests.get(url + filename, timeout=5)
response.raise_for_status()
if response.content != last_image_content:
print("New image detected, printing...")
logging.info('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Resize first
wpercent = (PRINTER_WIDTH / float(img.size[0]))
hsize = int((float(img.size[1]) * float(wpercent)))
img = img.resize((PRINTER_WIDTH, hsize), Image.LANCZOS)
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
@@ -39,11 +49,72 @@ def main():
p.cut()
except requests.exceptions.RequestException as e:
print(f"Error downloading image: {e}")
time.sleep(5)
logging.info(f'Error downloading image: {e}')
if __name__ == "__main__":
main()
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(topic, text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
while True:
try:
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
logging.info('MQTT client connected')
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
except aiomqtt.MqttError as e:
logging.warning('MQTT error: %s. Reconnecting in 5 seconds...', e)
await asyncio.sleep(5)
async def manage_display(disp):
logging.info('Starting display loop...')
while True:
await asyncio.sleep(2)
await disp.scroll_text('WELCOME TO PROTOSPACE!', scroll_speed=2, easing='inout', pulse=True)
disp.set_brightness(15)
await disp.marquee('PRESENTING...', scroll_speed=8, cycles=1)
await disp.marquee('THE BASH REGISTER', scroll_speed=2, cycles=1)
disp.set_brightness(15)
await disp.chase(speed=8)
disp.write_text('SEND TO')
await disp.rainbow()
disp.set_brightness(15)
await disp.marquee('PROTOSPACE.CA/SIGN', scroll_speed=2, cycles=3)
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
disp = StarburstHT16K33(brightness=10)
loop = asyncio.get_event_loop()
loop.create_task(manage_display(disp))
loop.create_task(fetch_mqtt(p))
loop.run_forever()