initial commit
This commit is contained in:
108
fan.py
Normal file
108
fan.py
Normal file
@@ -0,0 +1,108 @@
|
||||
#!/usr/bin/env python3
|
||||
import os.path
|
||||
import time
|
||||
import traceback
|
||||
import threading
|
||||
|
||||
import gpiod
|
||||
|
||||
import misc
|
||||
|
||||
pin = None
|
||||
|
||||
|
||||
class Pwm:
|
||||
def __init__(self, chip):
|
||||
self.period_value = None
|
||||
try:
|
||||
int(chip)
|
||||
chip = f'pwmchip{chip}'
|
||||
except ValueError:
|
||||
pass
|
||||
self.filepath = f"/sys/class/pwm/{chip}/pwm0/"
|
||||
try:
|
||||
with open(f"/sys/class/pwm/{chip}/export", 'w') as f:
|
||||
f.write('0')
|
||||
except OSError:
|
||||
print("Waring: init pwm error")
|
||||
traceback.print_exc()
|
||||
|
||||
def period(self, ns: int):
|
||||
self.period_value = ns
|
||||
with open(os.path.join(self.filepath, 'period'), 'w') as f:
|
||||
f.write(str(ns))
|
||||
|
||||
def period_us(self, us: int):
|
||||
self.period(us * 1000)
|
||||
|
||||
def enable(self, t: bool):
|
||||
with open(os.path.join(self.filepath, 'enable'), 'w') as f:
|
||||
f.write(f"{int(t)}")
|
||||
|
||||
def write(self, duty: float):
|
||||
assert self.period_value, "The Period is not set."
|
||||
with open(os.path.join(self.filepath, 'duty_cycle'), 'w') as f:
|
||||
f.write(f"{int(self.period_value * duty)}")
|
||||
|
||||
|
||||
class Gpio:
|
||||
|
||||
def tr(self):
|
||||
while True:
|
||||
self.line.set_value(1)
|
||||
time.sleep(self.value[0])
|
||||
self.line.set_value(0)
|
||||
time.sleep(self.value[1])
|
||||
|
||||
def __init__(self, period_s):
|
||||
self.line = gpiod.Chip(os.environ['FAN_CHIP']).get_line(int(os.environ['FAN_LINE']))
|
||||
self.line.request(consumer='fan', type=gpiod.LINE_REQ_DIR_OUT)
|
||||
self.value = [period_s / 2, period_s / 2]
|
||||
self.period_s = period_s
|
||||
self.thread = threading.Thread(target=self.tr, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def write(self, duty):
|
||||
self.value[1] = duty * self.period_s
|
||||
self.value[0] = self.period_s - self.value[1]
|
||||
|
||||
|
||||
def read_temp():
|
||||
with open('/sys/class/thermal/thermal_zone0/temp') as f:
|
||||
t = int(f.read().strip()) / 1000.0
|
||||
return t
|
||||
|
||||
|
||||
def get_dc(cache={}):
|
||||
if misc.conf['run'].value == 0:
|
||||
return 0.999
|
||||
|
||||
if time.time() - cache.get('time', 0) > 60:
|
||||
cache['time'] = time.time()
|
||||
cache['dc'] = misc.fan_temp2dc(read_temp())
|
||||
|
||||
return cache['dc']
|
||||
|
||||
|
||||
def change_dc(dc, cache={}):
|
||||
if dc != cache.get('dc'):
|
||||
cache['dc'] = dc
|
||||
pin.write(dc)
|
||||
|
||||
|
||||
def running():
|
||||
global pin
|
||||
if os.environ['HARDWARE_PWM'] == '1':
|
||||
chip = os.environ['PWMCHIP']
|
||||
pin = Pwm(chip)
|
||||
pin.period_us(40)
|
||||
pin.enable(True)
|
||||
else:
|
||||
pin = Gpio(0.025)
|
||||
while True:
|
||||
change_dc(get_dc())
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
running()
|
||||
BIN
fonts/DejaVuSansMono-Bold.ttf
Normal file
BIN
fonts/DejaVuSansMono-Bold.ttf
Normal file
Binary file not shown.
BIN
fonts/DejaVuSansMono.ttf
Normal file
BIN
fonts/DejaVuSansMono.ttf
Normal file
Binary file not shown.
BIN
fonts/NotoSansMono-Bold.ttf
Normal file
BIN
fonts/NotoSansMono-Bold.ttf
Normal file
Binary file not shown.
BIN
fonts/NotoSansMono-Regular.ttf
Normal file
BIN
fonts/NotoSansMono-Regular.ttf
Normal file
Binary file not shown.
56
main.py
Normal file
56
main.py
Normal file
@@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env python3
|
||||
import queue
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import fan
|
||||
import misc
|
||||
|
||||
try:
|
||||
import oled
|
||||
|
||||
top_board = True
|
||||
except Exception as ex:
|
||||
traceback.print_exc()
|
||||
top_board = False
|
||||
|
||||
q = queue.Queue()
|
||||
lock = threading.Lock()
|
||||
|
||||
action = {
|
||||
'none': lambda: 'nothing',
|
||||
'slider': lambda: oled.slider(lock),
|
||||
'switch': lambda: misc.fan_switch(),
|
||||
'reboot': lambda: misc.check_call('reboot'),
|
||||
'poweroff': lambda: misc.check_call('poweroff'),
|
||||
}
|
||||
|
||||
|
||||
def receive_key(q):
|
||||
while True:
|
||||
func = misc.get_func(q.get())
|
||||
action[func]()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if top_board:
|
||||
oled.welcome()
|
||||
p0 = threading.Thread(target=receive_key, args=(q,), daemon=True)
|
||||
p1 = threading.Thread(target=misc.watch_key, args=(q,), daemon=True)
|
||||
p2 = threading.Thread(target=oled.auto_slider, args=(lock,), daemon=True)
|
||||
p3 = threading.Thread(target=fan.running, daemon=True)
|
||||
|
||||
p0.start()
|
||||
p1.start()
|
||||
p2.start()
|
||||
p3.start()
|
||||
try:
|
||||
p3.join()
|
||||
except KeyboardInterrupt:
|
||||
print("GoodBye ~")
|
||||
oled.goodbye()
|
||||
|
||||
else:
|
||||
p3 = threading.Thread(target=fan.running, daemon=False)
|
||||
p3.start()
|
||||
167
misc.py
Normal file
167
misc.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import multiprocessing as mp
|
||||
import traceback
|
||||
|
||||
import gpiod
|
||||
from configparser import ConfigParser
|
||||
from collections import defaultdict, OrderedDict
|
||||
|
||||
cmds = {
|
||||
'blk': "lsblk | awk '{print $1}'",
|
||||
'up': "echo Uptime: `uptime | sed 's/.*up \\([^,]*\\), .*/\\1/'`",
|
||||
'temp': "cat /sys/class/thermal/thermal_zone0/temp",
|
||||
'ip': "hostname -I | awk '{printf \"IP %s\", $1}'",
|
||||
'cpu': "uptime | awk '{printf \"CPU Load: %.2f\", $(NF-2)}'",
|
||||
'men': "free -m | awk 'NR==2{printf \"Mem: %s/%sMB\", $3,$2}'",
|
||||
'disk': "df -h | awk '$NF==\"/\"{printf \"Disk: %d/%dGB %s\", $3,$2,$5}'"
|
||||
}
|
||||
|
||||
lv2dc = OrderedDict({'lv3': 0, 'lv2': 0.25, 'lv1': 0.5, 'lv0': 0.75})
|
||||
|
||||
|
||||
def check_output(cmd):
|
||||
return subprocess.check_output(cmd, shell=True).decode().strip()
|
||||
|
||||
|
||||
def check_call(cmd):
|
||||
return subprocess.check_call(cmd, shell=True)
|
||||
|
||||
|
||||
def get_blk():
|
||||
conf['disk'] = [x for x in check_output(cmds['blk']).strip().split('\n') if x.startswith('sd')]
|
||||
|
||||
|
||||
def get_info(s):
|
||||
return check_output(cmds[s])
|
||||
|
||||
|
||||
def get_cpu_temp():
|
||||
t = float(get_info('temp')) / 1000
|
||||
if conf['oled']['f-temp']:
|
||||
temp = "CPU Temp: {:.0f}°F".format(t * 1.8 + 32)
|
||||
else:
|
||||
temp = "CPU Temp: {:.1f}°C".format(t)
|
||||
return temp
|
||||
|
||||
|
||||
def read_conf():
|
||||
conf = defaultdict(dict)
|
||||
|
||||
try:
|
||||
cfg = ConfigParser()
|
||||
cfg.read('/etc/rockpi-penta.conf')
|
||||
# fan
|
||||
conf['fan']['lv0'] = cfg.getfloat('fan', 'lv0')
|
||||
conf['fan']['lv1'] = cfg.getfloat('fan', 'lv1')
|
||||
conf['fan']['lv2'] = cfg.getfloat('fan', 'lv2')
|
||||
conf['fan']['lv3'] = cfg.getfloat('fan', 'lv3')
|
||||
# key
|
||||
conf['key']['click'] = cfg.get('key', 'click')
|
||||
conf['key']['twice'] = cfg.get('key', 'twice')
|
||||
conf['key']['press'] = cfg.get('key', 'press')
|
||||
# time
|
||||
conf['time']['twice'] = cfg.getfloat('time', 'twice')
|
||||
conf['time']['press'] = cfg.getfloat('time', 'press')
|
||||
# other
|
||||
conf['slider']['auto'] = cfg.getboolean('slider', 'auto')
|
||||
conf['slider']['time'] = cfg.getfloat('slider', 'time')
|
||||
conf['oled']['rotate'] = cfg.getboolean('oled', 'rotate')
|
||||
conf['oled']['f-temp'] = cfg.getboolean('oled', 'f-temp')
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
# fan
|
||||
conf['fan']['lv0'] = 35
|
||||
conf['fan']['lv1'] = 40
|
||||
conf['fan']['lv2'] = 45
|
||||
conf['fan']['lv3'] = 50
|
||||
# key
|
||||
conf['key']['click'] = 'slider'
|
||||
conf['key']['twice'] = 'switch'
|
||||
conf['key']['press'] = 'none'
|
||||
# time
|
||||
conf['time']['twice'] = 0.7 # second
|
||||
conf['time']['press'] = 1.8
|
||||
# other
|
||||
conf['slider']['auto'] = True
|
||||
conf['slider']['time'] = 10 # second
|
||||
conf['oled']['rotate'] = False
|
||||
conf['oled']['f-temp'] = False
|
||||
|
||||
return conf
|
||||
|
||||
|
||||
def read_key(pattern, size):
|
||||
CHIP_NAME = os.environ['BUTTON_CHIP']
|
||||
LINE_NUMBER = os.environ['BUTTON_LINE']
|
||||
|
||||
s = ''
|
||||
chip = gpiod.Chip(str(CHIP_NAME))
|
||||
line = chip.get_line(int(LINE_NUMBER))
|
||||
line.request(consumer='hat_button', type=gpiod.LINE_REQ_DIR_OUT)
|
||||
line.set_value(1)
|
||||
|
||||
while True:
|
||||
s = s[-size:] + str(line.get_value())
|
||||
for t, p in pattern.items():
|
||||
if p.match(s):
|
||||
return t
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def watch_key(q=None):
|
||||
size = int(conf['time']['press'] * 10)
|
||||
wait = int(conf['time']['twice'] * 10)
|
||||
pattern = {
|
||||
'click': re.compile(r'1+0+1{%d,}' % wait),
|
||||
'twice': re.compile(r'1+0+1+0+1{3,}'),
|
||||
'press': re.compile(r'1+0{%d,}' % size),
|
||||
}
|
||||
|
||||
while True:
|
||||
q.put(read_key(pattern, size))
|
||||
|
||||
|
||||
def get_disk_info(cache={}):
|
||||
if not cache.get('time') or time.time() - cache['time'] > 30:
|
||||
info = {}
|
||||
cmd = "df -h | awk '$NF==\"/\"{printf \"%s\", $5}'"
|
||||
info['root'] = check_output(cmd)
|
||||
for x in conf['disk']:
|
||||
cmd = "df -Bg | awk '$1==\"/dev/{}\" {{printf \"%s\", $5}}'".format(x)
|
||||
info[x] = check_output(cmd)
|
||||
cache['info'] = list(zip(*info.items()))
|
||||
cache['time'] = time.time()
|
||||
|
||||
return cache['info']
|
||||
|
||||
|
||||
def slider_next(pages):
|
||||
conf['idx'].value += 1
|
||||
return pages[conf['idx'].value % len(pages)]
|
||||
|
||||
|
||||
def slider_sleep():
|
||||
time.sleep(conf['slider']['time'])
|
||||
|
||||
|
||||
def fan_temp2dc(t):
|
||||
for lv, dc in lv2dc.items():
|
||||
if t >= conf['fan'][lv]:
|
||||
return dc
|
||||
return 0.999
|
||||
|
||||
|
||||
def fan_switch():
|
||||
conf['run'].value = not conf['run'].value
|
||||
|
||||
|
||||
def get_func(key):
|
||||
return conf['key'].get(key, 'none')
|
||||
|
||||
|
||||
conf = {'disk': [], 'idx': mp.Value('d', -1), 'run': mp.Value('d', 1)}
|
||||
conf.update(read_conf())
|
||||
118
oled.py
Normal file
118
oled.py
Normal file
@@ -0,0 +1,118 @@
|
||||
#!/usr/bin/python3
|
||||
import os
|
||||
import time
|
||||
|
||||
import adafruit_ssd1306
|
||||
import board
|
||||
import digitalio
|
||||
import busio
|
||||
from PIL import Image
|
||||
from PIL import ImageDraw
|
||||
from PIL import ImageFont
|
||||
import multiprocessing as mp
|
||||
|
||||
import misc
|
||||
|
||||
font = {
|
||||
'10': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 10),
|
||||
'11': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 11),
|
||||
'12': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 12),
|
||||
'14': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 14),
|
||||
}
|
||||
|
||||
|
||||
def disp_init():
|
||||
RESET = getattr(board.pin, os.environ['OLED_RESET'])
|
||||
i2c = busio.I2C(getattr(board.pin, os.environ['SCL']), getattr(board.pin, os.environ['SDA']))
|
||||
disp = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c, reset=digitalio.DigitalInOut(RESET))
|
||||
disp.fill(0)
|
||||
disp.show()
|
||||
return disp
|
||||
|
||||
|
||||
disp = disp_init()
|
||||
|
||||
image = Image.new('1', (disp.width, disp.height))
|
||||
draw = ImageDraw.Draw(image)
|
||||
|
||||
|
||||
def disp_show():
|
||||
im = image.rotate(180) if misc.conf['oled']['rotate'] else image
|
||||
disp.image(im)
|
||||
disp.write_framebuf()
|
||||
draw.rectangle((0, 0, disp.width, disp.height), outline=0, fill=0)
|
||||
|
||||
|
||||
def welcome():
|
||||
draw.text((0, 0), 'ROCKPi SATA HAT', font=font['14'], fill=255)
|
||||
draw.text((32, 16), 'Loading...', font=font['12'], fill=255)
|
||||
disp_show()
|
||||
|
||||
|
||||
def goodbye():
|
||||
draw.text((32, 8), 'Good Bye ~', font=font['14'], fill=255)
|
||||
disp_show()
|
||||
time.sleep(2)
|
||||
disp_show() # clear
|
||||
|
||||
|
||||
def put_disk_info():
|
||||
k, v = misc.get_disk_info()
|
||||
text1 = 'Disk: {} {}'.format(k[0], v[0])
|
||||
|
||||
if len(k) == 5:
|
||||
text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2])
|
||||
text3 = '{} {} {} {}'.format(k[3], v[3], k[4], v[4])
|
||||
page = [
|
||||
{'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']},
|
||||
{'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']},
|
||||
{'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']},
|
||||
]
|
||||
elif len(k) == 3:
|
||||
text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2])
|
||||
page = [
|
||||
{'xy': (0, 2), 'text': text1, 'fill': 255, 'font': font['12']},
|
||||
{'xy': (0, 18), 'text': text2, 'fill': 255, 'font': font['12']},
|
||||
]
|
||||
else:
|
||||
page = [{'xy': (0, 2), 'text': text1, 'fill': 255, 'font': font['14']}]
|
||||
|
||||
return page
|
||||
|
||||
|
||||
def gen_pages():
|
||||
pages = {
|
||||
0: [
|
||||
{'xy': (0, -2), 'text': misc.get_info('up'), 'fill': 255, 'font': font['11']},
|
||||
{'xy': (0, 10), 'text': misc.get_cpu_temp(), 'fill': 255, 'font': font['11']},
|
||||
{'xy': (0, 21), 'text': misc.get_info('ip'), 'fill': 255, 'font': font['11']},
|
||||
],
|
||||
1: [
|
||||
{'xy': (0, 2), 'text': misc.get_info('cpu'), 'fill': 255, 'font': font['12']},
|
||||
{'xy': (0, 18), 'text': misc.get_info('men'), 'fill': 255, 'font': font['12']},
|
||||
],
|
||||
2: put_disk_info()
|
||||
}
|
||||
|
||||
return pages
|
||||
|
||||
|
||||
def slider(lock):
|
||||
with lock:
|
||||
for item in misc.slider_next(gen_pages()):
|
||||
draw.text(**item)
|
||||
disp_show()
|
||||
|
||||
|
||||
def auto_slider(lock):
|
||||
while misc.conf['slider']['auto']:
|
||||
slider(lock)
|
||||
misc.slider_sleep()
|
||||
else:
|
||||
slider(lock)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# for test
|
||||
lock = mp.Lock()
|
||||
auto_slider(lock)
|
||||
Reference in New Issue
Block a user