diff --git a/fan.py b/fan.py new file mode 100644 index 0000000..9270eb2 --- /dev/null +++ b/fan.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +import os.path +import time +import traceback +import threading + +import gpiod + +import misc + +pin = None + + +class Pwm: + def __init__(self, chip): + self.period_value = None + try: + int(chip) + chip = f'pwmchip{chip}' + except ValueError: + pass + self.filepath = f"/sys/class/pwm/{chip}/pwm0/" + try: + with open(f"/sys/class/pwm/{chip}/export", 'w') as f: + f.write('0') + except OSError: + print("Waring: init pwm error") + traceback.print_exc() + + def period(self, ns: int): + self.period_value = ns + with open(os.path.join(self.filepath, 'period'), 'w') as f: + f.write(str(ns)) + + def period_us(self, us: int): + self.period(us * 1000) + + def enable(self, t: bool): + with open(os.path.join(self.filepath, 'enable'), 'w') as f: + f.write(f"{int(t)}") + + def write(self, duty: float): + assert self.period_value, "The Period is not set." + with open(os.path.join(self.filepath, 'duty_cycle'), 'w') as f: + f.write(f"{int(self.period_value * duty)}") + + +class Gpio: + + def tr(self): + while True: + self.line.set_value(1) + time.sleep(self.value[0]) + self.line.set_value(0) + time.sleep(self.value[1]) + + def __init__(self, period_s): + self.line = gpiod.Chip(os.environ['FAN_CHIP']).get_line(int(os.environ['FAN_LINE'])) + self.line.request(consumer='fan', type=gpiod.LINE_REQ_DIR_OUT) + self.value = [period_s / 2, period_s / 2] + self.period_s = period_s + self.thread = threading.Thread(target=self.tr, daemon=True) + self.thread.start() + + def write(self, duty): + self.value[1] = duty * self.period_s + self.value[0] = self.period_s - self.value[1] + + +def read_temp(): + with open('/sys/class/thermal/thermal_zone0/temp') as f: + t = int(f.read().strip()) / 1000.0 + return t + + +def get_dc(cache={}): + if misc.conf['run'].value == 0: + return 0.999 + + if time.time() - cache.get('time', 0) > 60: + cache['time'] = time.time() + cache['dc'] = misc.fan_temp2dc(read_temp()) + + return cache['dc'] + + +def change_dc(dc, cache={}): + if dc != cache.get('dc'): + cache['dc'] = dc + pin.write(dc) + + +def running(): + global pin + if os.environ['HARDWARE_PWM'] == '1': + chip = os.environ['PWMCHIP'] + pin = Pwm(chip) + pin.period_us(40) + pin.enable(True) + else: + pin = Gpio(0.025) + while True: + change_dc(get_dc()) + time.sleep(1) + + +if __name__ == '__main__': + running() diff --git a/fonts/DejaVuSansMono-Bold.ttf b/fonts/DejaVuSansMono-Bold.ttf new file mode 100644 index 0000000..34314f6 Binary files /dev/null and b/fonts/DejaVuSansMono-Bold.ttf differ diff --git a/fonts/DejaVuSansMono.ttf b/fonts/DejaVuSansMono.ttf new file mode 100644 index 0000000..da5abdd Binary files /dev/null and b/fonts/DejaVuSansMono.ttf differ diff --git a/fonts/NotoSansMono-Bold.ttf b/fonts/NotoSansMono-Bold.ttf new file mode 100644 index 0000000..a2a030f Binary files /dev/null and b/fonts/NotoSansMono-Bold.ttf differ diff --git a/fonts/NotoSansMono-Regular.ttf b/fonts/NotoSansMono-Regular.ttf new file mode 100644 index 0000000..cf8607e Binary files /dev/null and b/fonts/NotoSansMono-Regular.ttf differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..f67c8ee --- /dev/null +++ b/main.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import queue +import threading +import traceback + +import fan +import misc + +try: + import oled + + top_board = True +except Exception as ex: + traceback.print_exc() + top_board = False + +q = queue.Queue() +lock = threading.Lock() + +action = { + 'none': lambda: 'nothing', + 'slider': lambda: oled.slider(lock), + 'switch': lambda: misc.fan_switch(), + 'reboot': lambda: misc.check_call('reboot'), + 'poweroff': lambda: misc.check_call('poweroff'), +} + + +def receive_key(q): + while True: + func = misc.get_func(q.get()) + action[func]() + + +if __name__ == '__main__': + + if top_board: + oled.welcome() + p0 = threading.Thread(target=receive_key, args=(q,), daemon=True) + p1 = threading.Thread(target=misc.watch_key, args=(q,), daemon=True) + p2 = threading.Thread(target=oled.auto_slider, args=(lock,), daemon=True) + p3 = threading.Thread(target=fan.running, daemon=True) + + p0.start() + p1.start() + p2.start() + p3.start() + try: + p3.join() + except KeyboardInterrupt: + print("GoodBye ~") + oled.goodbye() + + else: + p3 = threading.Thread(target=fan.running, daemon=False) + p3.start() diff --git a/misc.py b/misc.py new file mode 100644 index 0000000..bc25894 --- /dev/null +++ b/misc.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +import re +import os +import time +import subprocess +import multiprocessing as mp +import traceback + +import gpiod +from configparser import ConfigParser +from collections import defaultdict, OrderedDict + +cmds = { + 'blk': "lsblk | awk '{print $1}'", + 'up': "echo Uptime: `uptime | sed 's/.*up \\([^,]*\\), .*/\\1/'`", + 'temp': "cat /sys/class/thermal/thermal_zone0/temp", + 'ip': "hostname -I | awk '{printf \"IP %s\", $1}'", + 'cpu': "uptime | awk '{printf \"CPU Load: %.2f\", $(NF-2)}'", + 'men': "free -m | awk 'NR==2{printf \"Mem: %s/%sMB\", $3,$2}'", + 'disk': "df -h | awk '$NF==\"/\"{printf \"Disk: %d/%dGB %s\", $3,$2,$5}'" +} + +lv2dc = OrderedDict({'lv3': 0, 'lv2': 0.25, 'lv1': 0.5, 'lv0': 0.75}) + + +def check_output(cmd): + return subprocess.check_output(cmd, shell=True).decode().strip() + + +def check_call(cmd): + return subprocess.check_call(cmd, shell=True) + + +def get_blk(): + conf['disk'] = [x for x in check_output(cmds['blk']).strip().split('\n') if x.startswith('sd')] + + +def get_info(s): + return check_output(cmds[s]) + + +def get_cpu_temp(): + t = float(get_info('temp')) / 1000 + if conf['oled']['f-temp']: + temp = "CPU Temp: {:.0f}°F".format(t * 1.8 + 32) + else: + temp = "CPU Temp: {:.1f}°C".format(t) + return temp + + +def read_conf(): + conf = defaultdict(dict) + + try: + cfg = ConfigParser() + cfg.read('/etc/rockpi-penta.conf') + # fan + conf['fan']['lv0'] = cfg.getfloat('fan', 'lv0') + conf['fan']['lv1'] = cfg.getfloat('fan', 'lv1') + conf['fan']['lv2'] = cfg.getfloat('fan', 'lv2') + conf['fan']['lv3'] = cfg.getfloat('fan', 'lv3') + # key + conf['key']['click'] = cfg.get('key', 'click') + conf['key']['twice'] = cfg.get('key', 'twice') + conf['key']['press'] = cfg.get('key', 'press') + # time + conf['time']['twice'] = cfg.getfloat('time', 'twice') + conf['time']['press'] = cfg.getfloat('time', 'press') + # other + conf['slider']['auto'] = cfg.getboolean('slider', 'auto') + conf['slider']['time'] = cfg.getfloat('slider', 'time') + conf['oled']['rotate'] = cfg.getboolean('oled', 'rotate') + conf['oled']['f-temp'] = cfg.getboolean('oled', 'f-temp') + except Exception: + traceback.print_exc() + # fan + conf['fan']['lv0'] = 35 + conf['fan']['lv1'] = 40 + conf['fan']['lv2'] = 45 + conf['fan']['lv3'] = 50 + # key + conf['key']['click'] = 'slider' + conf['key']['twice'] = 'switch' + conf['key']['press'] = 'none' + # time + conf['time']['twice'] = 0.7 # second + conf['time']['press'] = 1.8 + # other + conf['slider']['auto'] = True + conf['slider']['time'] = 10 # second + conf['oled']['rotate'] = False + conf['oled']['f-temp'] = False + + return conf + + +def read_key(pattern, size): + CHIP_NAME = os.environ['BUTTON_CHIP'] + LINE_NUMBER = os.environ['BUTTON_LINE'] + + s = '' + chip = gpiod.Chip(str(CHIP_NAME)) + line = chip.get_line(int(LINE_NUMBER)) + line.request(consumer='hat_button', type=gpiod.LINE_REQ_DIR_OUT) + line.set_value(1) + + while True: + s = s[-size:] + str(line.get_value()) + for t, p in pattern.items(): + if p.match(s): + return t + time.sleep(0.1) + + +def watch_key(q=None): + size = int(conf['time']['press'] * 10) + wait = int(conf['time']['twice'] * 10) + pattern = { + 'click': re.compile(r'1+0+1{%d,}' % wait), + 'twice': re.compile(r'1+0+1+0+1{3,}'), + 'press': re.compile(r'1+0{%d,}' % size), + } + + while True: + q.put(read_key(pattern, size)) + + +def get_disk_info(cache={}): + if not cache.get('time') or time.time() - cache['time'] > 30: + info = {} + cmd = "df -h | awk '$NF==\"/\"{printf \"%s\", $5}'" + info['root'] = check_output(cmd) + for x in conf['disk']: + cmd = "df -Bg | awk '$1==\"/dev/{}\" {{printf \"%s\", $5}}'".format(x) + info[x] = check_output(cmd) + cache['info'] = list(zip(*info.items())) + cache['time'] = time.time() + + return cache['info'] + + +def slider_next(pages): + conf['idx'].value += 1 + return pages[conf['idx'].value % len(pages)] + + +def slider_sleep(): + time.sleep(conf['slider']['time']) + + +def fan_temp2dc(t): + for lv, dc in lv2dc.items(): + if t >= conf['fan'][lv]: + return dc + return 0.999 + + +def fan_switch(): + conf['run'].value = not conf['run'].value + + +def get_func(key): + return conf['key'].get(key, 'none') + + +conf = {'disk': [], 'idx': mp.Value('d', -1), 'run': mp.Value('d', 1)} +conf.update(read_conf()) diff --git a/oled.py b/oled.py new file mode 100644 index 0000000..95af7a8 --- /dev/null +++ b/oled.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 +import os +import time + +import adafruit_ssd1306 +import board +import digitalio +import busio +from PIL import Image +from PIL import ImageDraw +from PIL import ImageFont +import multiprocessing as mp + +import misc + +font = { + '10': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 10), + '11': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 11), + '12': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 12), + '14': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 14), +} + + +def disp_init(): + RESET = getattr(board.pin, os.environ['OLED_RESET']) + i2c = busio.I2C(getattr(board.pin, os.environ['SCL']), getattr(board.pin, os.environ['SDA'])) + disp = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c, reset=digitalio.DigitalInOut(RESET)) + disp.fill(0) + disp.show() + return disp + + +disp = disp_init() + +image = Image.new('1', (disp.width, disp.height)) +draw = ImageDraw.Draw(image) + + +def disp_show(): + im = image.rotate(180) if misc.conf['oled']['rotate'] else image + disp.image(im) + disp.write_framebuf() + draw.rectangle((0, 0, disp.width, disp.height), outline=0, fill=0) + + +def welcome(): + draw.text((0, 0), 'ROCKPi SATA HAT', font=font['14'], fill=255) + draw.text((32, 16), 'Loading...', font=font['12'], fill=255) + disp_show() + + +def goodbye(): + draw.text((32, 8), 'Good Bye ~', font=font['14'], fill=255) + disp_show() + time.sleep(2) + disp_show() # clear + + +def put_disk_info(): + k, v = misc.get_disk_info() + text1 = 'Disk: {} {}'.format(k[0], v[0]) + + if len(k) == 5: + text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2]) + text3 = '{} {} {} {}'.format(k[3], v[3], k[4], v[4]) + page = [ + {'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']}, + {'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']}, + {'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']}, + ] + elif len(k) == 3: + text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2]) + page = [ + {'xy': (0, 2), 'text': text1, 'fill': 255, 'font': font['12']}, + {'xy': (0, 18), 'text': text2, 'fill': 255, 'font': font['12']}, + ] + else: + page = [{'xy': (0, 2), 'text': text1, 'fill': 255, 'font': font['14']}] + + return page + + +def gen_pages(): + pages = { + 0: [ + {'xy': (0, -2), 'text': misc.get_info('up'), 'fill': 255, 'font': font['11']}, + {'xy': (0, 10), 'text': misc.get_cpu_temp(), 'fill': 255, 'font': font['11']}, + {'xy': (0, 21), 'text': misc.get_info('ip'), 'fill': 255, 'font': font['11']}, + ], + 1: [ + {'xy': (0, 2), 'text': misc.get_info('cpu'), 'fill': 255, 'font': font['12']}, + {'xy': (0, 18), 'text': misc.get_info('men'), 'fill': 255, 'font': font['12']}, + ], + 2: put_disk_info() + } + + return pages + + +def slider(lock): + with lock: + for item in misc.slider_next(gen_pages()): + draw.text(**item) + disp_show() + + +def auto_slider(lock): + while misc.conf['slider']['auto']: + slider(lock) + misc.slider_sleep() + else: + slider(lock) + + +if __name__ == '__main__': + # for test + lock = mp.Lock() + auto_slider(lock)