From 1461c39400bf59de2e91b13c190c54fa2f77ae3c Mon Sep 17 00:00:00 2001 From: Dejvino Date: Sat, 22 Apr 2023 19:14:21 +0200 Subject: [PATCH] SDCard data loading --- Makefile | 11 ++ display.py | 124 ++++++++++++++++++++++ main.py | 52 +++++++++ sdcard.py | 307 +++++++++++++++++++++++++++++++++++++++++++++++++++++ storage.py | 47 ++++++++ test.py | 68 +++++++++--- 6 files changed, 595 insertions(+), 14 deletions(-) create mode 100644 Makefile create mode 100644 display.py create mode 100644 main.py create mode 100644 sdcard.py create mode 100644 storage.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7bf37fc --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ + +all: images download + +images: + cd gallery && for F in *.jpg; do ./convert_image.sh $F $F.bmp + +download: jokes.json + +jokes.json: + curl -o jokes.json https://github.com/wiz64/superfun/raw/main/database/ichd/jokes-file.json + diff --git a/display.py b/display.py new file mode 100644 index 0000000..a3ef00d --- /dev/null +++ b/display.py @@ -0,0 +1,124 @@ +import epaper +import microbmp +import time +import random +import os +import storage +import gc + +epd_colormap = [ + [0x00, 0x00, 0x00], # black + [0xff, 0xff, 0xff], # white + [0x00, 0x90, 0x10], # green + [0x00, 0x00, 0xee], # blue + [0xff, 0x00, 0x00], # red + [0xff, 0xdd, 0x00], # yellow + [0xff, 0x77, 0x00], # orange + ] + +max_free_height = 200 + +def init_display(): + #print("ePaper init ", str(time.localtime())) + epd = epaper.EPD_5in65() + epd.fill(epd.White) + return epd + +def draw_image(epd, filename): + global epd_colormap + global free_x, free_y, free_width, free_height + offset_x = 0 + offset_y = 0 + free_x = 0 + free_y = 0 + free_width = 0 + free_height = 0 + color_map = {} + def header_callback(header): + #print("header callback: ", str(header)) + #global epd + global offset_x, offset_y + global free_x, free_y, free_width, free_height + global max_free_height + w = header[0] + h = header[1] + rest_x = (epd.width - w) + rest_y = (epd.height - h) + free_x = 0 + free_y = max(h, epd.height - max_free_height) + free_width = epd.width + free_height = min(rest_y, max_free_height) + offset_x = rest_x//2 + offset_y = max(0, rest_y - max_free_height)//2 + def pixel_callback(x, y, color): + #global epd + global epd_colormap + global offset_x, offset_y + # translate color to color index + color_index = 0 + color_key = color[0] + color[1] << 8 + color[2] << 16 + if color_key in color_map: + color_index = color_map[color_key] + else: + # search for the best color + best_index = 0 + best_score = 1000 + for index in range(len(epd_colormap)): + c1 = epd_colormap[index] + c2 = color + score = abs(c1[0] - c2[0]) + abs(c1[1] - c2[1]) + abs(c1[2] - c2[2]) + if score < best_score: + best_score = score + best_index = index + if score < 10: + break + color_index = best_index + color_map[color_key] = color_index + epd.pixel(offset_x + x, offset_y + y, color_index) + + #print(str(time.localtime()), " BMP ", filename, " loading") + time_start = time.ticks_ms() + epd.fill(epd.White) + microbmp.MicroBMP(header_callback=header_callback, data_callback=pixel_callback).load(filename) + time_loaded = time.ticks_ms() + print(" time to load: ", (time_loaded - time_start) / 1000, " s") + #print(str(time.localtime()), " BMP loaded") + #epd.EPD_5IN65F_Display(epd.buffer) + #print(str(time.localtime()), " ePaper printed") + return [free_x, free_y, free_width, free_height] + +def draw_pattern(epd): + free_x = 0 + free_y = 400 + free_width = epd.width + free_height = epd.height - free_y + epd.fill_rect(0, 0, epd.width, free_y, random.randrange(2,7)) + return [free_x, free_y, free_width, free_height] + +def print_text(epd, text, region, center=False): + fnt = 8 + x = region[0] + y = region[1] + w = region[2] + h = region[3] + if center: + x = (w - len(text)*fnt) // 2 + epd.text(text, x, y, epd.Black) + +def draw_extra(epd, region, caption): + fnt = 8 + region[0] += 5 + region[1] += 8 + if region[2] < fnt or region[3] < fnt: + print("Not enough space for extra: ", str(region)) + return + #epd.rect(region[0], region[1], region[2], region[3], epd.Black) + #region[0] += random.randrange(50) + #print_text(str(time.localtime()), region, center=True) + if caption is not None: + #region[1] += fnt * 2 + print_text(epd, caption, region) + print("Caption: ", caption) + else: + print("No caption") + \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..a41c3d5 --- /dev/null +++ b/main.py @@ -0,0 +1,52 @@ +import epaper +import microbmp +import time +import random +import os +import storage +import display +import gc + +gc.enable() + +epd = display.init_display() +disk = storage.Storage() + +while True: + disk.mount() + images = list(filter(lambda x : x.endswith(".bmp"), os.listdir(disk.get_root_path()))) + epd.EPD_5IN65F_Init() + epd.fill(epd.White) + filename = random.choice(images) + print("TV drawing image ", filename) + free_space = [0,0,0,0] + try: + free_space = display.draw_image(epd, disk.get_root_path() + "/" + filename) + except Exception as e: + print("Failed drawing image from disk: ", e) + try: + free_space = display.draw_image(epd, "tiny.bmp") + except Exception as e: + print("Failed drawing fallback image: ", e) + free_space = display.draw_pattern(epd) + caption = None + try: + caption = storage.load_joke(disk.get_root_path()) + except Exception as e: + print("Failed loading a joke: ", e) + try: + display.draw_extra(epd, free_space, caption) + except Exception as e: + print("Failed drawing extra: ", e) + time_render_start = time.ticks_ms() + epd.EPD_5IN65F_Display(epd.buffer) + time_render_stop = time.ticks_ms() + print(" time to render: ", (time_render_stop - time_render_start) / 1000, " s") + print("TV showing ", filename) + epd.Sleep() + disk.umount() + + print("") + gc.collect() + epd.delay_ms(10000 * 1) + diff --git a/sdcard.py b/sdcard.py new file mode 100644 index 0000000..f723d1f --- /dev/null +++ b/sdcard.py @@ -0,0 +1,307 @@ +""" +MicroPython driver for SD cards using SPI bus. + +Requires an SPI bus and a CS pin. Provides readblocks and writeblocks +methods so the device can be mounted as a filesystem. + +Example usage on pyboard: + + import pyb, sdcard, os + sd = sdcard.SDCard(pyb.SPI(1), pyb.Pin.board.X5) + pyb.mount(sd, '/sd2') + os.listdir('/') + +Example usage on ESP8266: + + import machine, sdcard, os + sd = sdcard.SDCard(machine.SPI(1), machine.Pin(15)) + os.mount(sd, '/sd') + os.listdir('/') + +S.ource: https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/drivers/storage/sdcard/sdcard.py +""" + +from micropython import const +import time + + +_CMD_TIMEOUT = const(100) + +_R1_IDLE_STATE = const(1 << 0) +# R1_ERASE_RESET = const(1 << 1) +_R1_ILLEGAL_COMMAND = const(1 << 2) +# R1_COM_CRC_ERROR = const(1 << 3) +# R1_ERASE_SEQUENCE_ERROR = const(1 << 4) +# R1_ADDRESS_ERROR = const(1 << 5) +# R1_PARAMETER_ERROR = const(1 << 6) +_TOKEN_CMD25 = const(0xFC) +_TOKEN_STOP_TRAN = const(0xFD) +_TOKEN_DATA = const(0xFE) + + +class SDCard: + def __init__(self, spi, cs, baudrate=1320000): + self.spi = spi + self.cs = cs + + self.cmdbuf = bytearray(6) + self.dummybuf = bytearray(512) + self.tokenbuf = bytearray(1) + for i in range(512): + self.dummybuf[i] = 0xFF + self.dummybuf_memoryview = memoryview(self.dummybuf) + + # initialise the card + self.init_card(baudrate) + + def init_spi(self, baudrate): + try: + master = self.spi.MASTER + except AttributeError: + # on ESP8266 + self.spi.init(baudrate=baudrate, phase=0, polarity=0) + else: + # on pyboard + self.spi.init(master, baudrate=baudrate, phase=0, polarity=0) + + def init_card(self, baudrate): + # init CS pin + self.cs.init(self.cs.OUT, value=1) + + # init SPI bus; use low data rate for initialisation + self.init_spi(100000) + + # clock card at least 100 cycles with cs high + for i in range(16): + self.spi.write(b"\xff") + + # CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts) + for _ in range(5): + if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE: + break + else: + raise OSError("no SD card") + + # CMD8: determine card version + r = self.cmd(8, 0x01AA, 0x87, 4) + if r == _R1_IDLE_STATE: + self.init_card_v2() + elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND): + self.init_card_v1() + else: + raise OSError("couldn't determine SD card version") + + # get the number of sectors + # CMD9: response R2 (R1 byte + 16-byte block read) + if self.cmd(9, 0, 0, 0, False) != 0: + raise OSError("no response from SD card") + csd = bytearray(16) + self.readinto(csd) + if csd[0] & 0xC0 == 0x40: # CSD version 2.0 + self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024 + elif csd[0] & 0xC0 == 0x00: # CSD version 1.0 (old, <=2GB) + c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6 + c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7 + read_bl_len = csd[5] & 0b1111 + capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len) + self.sectors = capacity // 512 + else: + raise OSError("SD card CSD format not supported") + # print('sectors', self.sectors) + + # CMD16: set block length to 512 bytes + if self.cmd(16, 512, 0) != 0: + raise OSError("can't set 512 block size") + + # set to high data rate now that it's initialised + self.init_spi(baudrate) + + def init_card_v1(self): + for i in range(_CMD_TIMEOUT): + time.sleep_ms(50) + self.cmd(55, 0, 0) + if self.cmd(41, 0, 0) == 0: + # SDSC card, uses byte addressing in read/write/erase commands + self.cdv = 512 + # print("[SDCard] v1 card") + return + raise OSError("timeout waiting for v1 card") + + def init_card_v2(self): + for i in range(_CMD_TIMEOUT): + time.sleep_ms(50) + self.cmd(58, 0, 0, 4) + self.cmd(55, 0, 0) + if self.cmd(41, 0x40000000, 0) == 0: + self.cmd(58, 0, 0, -4) # 4-byte response, negative means keep the first byte + ocr = self.tokenbuf[0] # get first byte of response, which is OCR + if not ocr & 0x40: + # SDSC card, uses byte addressing in read/write/erase commands + self.cdv = 512 + else: + # SDHC/SDXC card, uses block addressing in read/write/erase commands + self.cdv = 1 + # print("[SDCard] v2 card") + return + raise OSError("timeout waiting for v2 card") + + def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False): + self.cs(0) + + # create and send the command + buf = self.cmdbuf + buf[0] = 0x40 | cmd + buf[1] = arg >> 24 + buf[2] = arg >> 16 + buf[3] = arg >> 8 + buf[4] = arg + buf[5] = crc + self.spi.write(buf) + + if skip1: + self.spi.readinto(self.tokenbuf, 0xFF) + + # wait for the response (response[7] == 0) + for i in range(_CMD_TIMEOUT): + self.spi.readinto(self.tokenbuf, 0xFF) + response = self.tokenbuf[0] + if not (response & 0x80): + # this could be a big-endian integer that we are getting here + # if final<0 then store the first byte to tokenbuf and discard the rest + if final < 0: + self.spi.readinto(self.tokenbuf, 0xFF) + final = -1 - final + for j in range(final): + self.spi.write(b"\xff") + if release: + self.cs(1) + self.spi.write(b"\xff") + return response + + # timeout + self.cs(1) + self.spi.write(b"\xff") + return -1 + + def readinto(self, buf): + self.cs(0) + + # read until start byte (0xff) + for i in range(_CMD_TIMEOUT): + self.spi.readinto(self.tokenbuf, 0xFF) + if self.tokenbuf[0] == _TOKEN_DATA: + break + time.sleep_ms(1) + else: + self.cs(1) + raise OSError("timeout waiting for response") + + # read data + mv = self.dummybuf_memoryview + if len(buf) != len(mv): + mv = mv[: len(buf)] + self.spi.write_readinto(mv, buf) + + # read checksum + self.spi.write(b"\xff") + self.spi.write(b"\xff") + + self.cs(1) + self.spi.write(b"\xff") + + def write(self, token, buf): + self.cs(0) + + # send: start of block, data, checksum + self.spi.read(1, token) + self.spi.write(buf) + self.spi.write(b"\xff") + self.spi.write(b"\xff") + + # check the response + if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05: + self.cs(1) + self.spi.write(b"\xff") + return + + # wait for write to finish + while self.spi.read(1, 0xFF)[0] == 0: + pass + + self.cs(1) + self.spi.write(b"\xff") + + def write_token(self, token): + self.cs(0) + self.spi.read(1, token) + self.spi.write(b"\xff") + # wait for write to finish + while self.spi.read(1, 0xFF)[0] == 0x00: + pass + + self.cs(1) + self.spi.write(b"\xff") + + def readblocks(self, block_num, buf): + # workaround for shared bus, required for (at least) some Kingston + # devices, ensure MOSI is high before starting transaction + self.spi.write(b"\xff") + + nblocks = len(buf) // 512 + assert nblocks and not len(buf) % 512, "Buffer length is invalid" + if nblocks == 1: + # CMD17: set read address for single block + if self.cmd(17, block_num * self.cdv, 0, release=False) != 0: + # release the card + self.cs(1) + raise OSError(5) # EIO + # receive the data and release card + self.readinto(buf) + else: + # CMD18: set read address for multiple blocks + if self.cmd(18, block_num * self.cdv, 0, release=False) != 0: + # release the card + self.cs(1) + raise OSError(5) # EIO + offset = 0 + mv = memoryview(buf) + while nblocks: + # receive the data and release card + self.readinto(mv[offset : offset + 512]) + offset += 512 + nblocks -= 1 + if self.cmd(12, 0, 0xFF, skip1=True): + raise OSError(5) # EIO + + def writeblocks(self, block_num, buf): + # workaround for shared bus, required for (at least) some Kingston + # devices, ensure MOSI is high before starting transaction + self.spi.write(b"\xff") + + nblocks, err = divmod(len(buf), 512) + assert nblocks and not err, "Buffer length is invalid" + if nblocks == 1: + # CMD24: set write address for single block + if self.cmd(24, block_num * self.cdv, 0) != 0: + raise OSError(5) # EIO + + # send the data + self.write(_TOKEN_DATA, buf) + else: + # CMD25: set write address for first block + if self.cmd(25, block_num * self.cdv, 0) != 0: + raise OSError(5) # EIO + # send the data + offset = 0 + mv = memoryview(buf) + while nblocks: + self.write(_TOKEN_CMD25, mv[offset : offset + 512]) + offset += 512 + nblocks -= 1 + self.write_token(_TOKEN_STOP_TRAN) + + def ioctl(self, op, arg): + if op == 4: # get number of blocks + return self.sectors + if op == 5: # get block size in bytes + return 512 diff --git a/storage.py b/storage.py new file mode 100644 index 0000000..50a97b4 --- /dev/null +++ b/storage.py @@ -0,0 +1,47 @@ +import machine +import sdcard +import uos +import random + +class Storage: + def __init__(self): + self.cs = machine.Pin(17, machine.Pin.OUT) + self.spi = machine.SPI(0, + baudrate=1000000, + polarity=0, + phase=0, + bits=8, + firstbit=machine.SPI.MSB, + sck=machine.Pin(18), + mosi=machine.Pin(19), + miso=machine.Pin(16)) + self.sd = None + self.vfs = None + self.sd_path = "/sd" + def mount(self): + self.sd = sdcard.SDCard(self.spi, self.cs) + self.vfs = uos.VfsFat(self.sd) + uos.mount(self.vfs, self.sd_path) + def umount(self): + uos.umount(self.sd_path) + def get_root_path(self): + return self.sd_path + + +def load_joke(root): + filename = root + "/jokes.json" + num_lines = sum(1 for line in open(filename)) + with open(filename, mode="r") as f: + joke = None + start_pattern = "\"joke\": \"" + end_pattern = "\"" + for i in range(random.randrange(num_lines)): + line = f.readline() + for i in range(10): + line = f.readline() + joke_start = line.find(start_pattern) + joke_end = line.rfind(end_pattern) + if joke_start >= 0 and joke_end >= 0: + joke = line[joke_start+len(start_pattern):joke_end] + return joke + return None diff --git a/test.py b/test.py index af9da70..07d53e8 100644 --- a/test.py +++ b/test.py @@ -3,6 +3,7 @@ import microbmp import time import random import os +import storage import gc gc.enable() @@ -18,7 +19,6 @@ epd_colormap = [ [0xff, 0x77, 0x00], # orange ] -images = list(filter(lambda x : x.endswith(".bmp"), os.listdir())) max_free_height = 200 def init_display(): @@ -107,6 +107,25 @@ def draw_pattern(): epd.fill_rect(0, 0, epd_resolution[0], free_y, random.randrange(2,7)) return [free_x, free_y, free_width, free_height] +def load_joke(root): + filename = root + "/jokes.json" + num_lines = sum(1 for line in open(filename)) + with open(filename, mode="r") as f: + joke = None + start_pattern = "\"joke\": \"" + end_pattern = "\"" + for i in range(random.randrange(num_lines)): + line = f.readline() + for i in range(10): + line = f.readline() + joke_start = line.find(start_pattern) + joke_end = line.rfind(end_pattern) + if joke_start >= 0 and joke_end >= 0: + joke = line[joke_start+len(start_pattern):joke_end] + return joke + return None + + def print_text(text, region, center=False): global epd fnt = 8 @@ -118,40 +137,61 @@ def print_text(text, region, center=False): x = (w - len(text)*fnt) // 2 epd.text(text, x, y, epd.Black) -def draw_extra(region): +def draw_extra(region, caption): fnt = 8 region[0] += 5 - region[1] += 5 + region[1] += 8 if region[2] < fnt or region[3] < fnt: print("Not enough space for extra: ", str(region)) return #epd.rect(region[0], region[1], region[2], region[3], epd.Black) #region[0] += random.randrange(50) - print_text(str(time.localtime()), region, center=True) - region[1] += fnt * 3 - print_text(" Today: Rainy, 24 C", region) - - region[1] += fnt * 2 - print_text("Tomorrow: Sunshine, 26 C", region) + #print_text(str(time.localtime()), region, center=True) + if caption is not None: + #region[1] += fnt * 2 + print_text(caption, region) + print("Caption: ", caption) + else: + print("No caption") # MAIN epd = init_display() - -while True: +disk = storage.Storage() + +while True: + disk.mount() + images = list(filter(lambda x : x.endswith(".bmp"), os.listdir(disk.get_root_path()))) epd.EPD_5IN65F_Init() epd.fill(epd.White) filename = random.choice(images) print("TV drawing image ", filename) - free_space = draw_image(filename) - #free_space = draw_pattern() - draw_extra(free_space) + free_space = [0,0,0,0] + try: + free_space = draw_image(disk.get_root_path() + "/" + filename) + except Exception as e: + print("Failed drawing image from disk: ", e) + try: + free_space = draw_image("tiny.bmp") + except Exception as e: + print("Failed drawing fallback image: ", e) + free_space = draw_pattern() + caption = None + try: + caption = load_joke(disk.get_root_path()) + except Exception as e: + print("Failed loading a joke: ", e) + try: + draw_extra(free_space, caption) + except Exception as e: + print("Failed drawing extra: ", e) time_render_start = time.ticks_ms() epd.EPD_5IN65F_Display(epd.buffer) time_render_stop = time.ticks_ms() print(" time to render: ", (time_render_stop - time_render_start) / 1000, " s") print("TV showing ", filename) epd.Sleep() + disk.umount() print("") gc.collect()