# -*- coding: utf-8 -*- ######################################## # HXbot LCD module # # EoF 2016 EoF@itphx.ru # ######################################## import HX import threading import time import array import struct class HXlcdClass(): def __init__(self, i2c_bus, lock): super(HXlcdClass, self).__init__() self.i2c_bus = i2c_bus self.__e = None self.__t = None self.lock = lock self.online = False self.is_running = False self.error_count = 0 def start(self): self.__e = threading.Event() self.__t = threading.Thread(target=self.__process, args=(self.__e, )) self.__t.start() self.is_running = True def stop(self): self.__e.set() self.__t.join() self.is_running = False self.__e = None self.__t = None def __process(self, stopRequest): while not stopRequest.is_set(): try: self.__send_command(HX.COM_PING) self.online = True except IOError: if self.online: self.error_count += 1 # Too many errors if self.error_count >= HX.LCD_ERROR_LIMIT: self.online = False # Delay time.sleep(HX.LCD_DELAY) def __sum_check(self, val, sum_): tmp = 0x00 for b in val: tmp ^= b tmp ^= HX.XOR_SEQ if tmp == sum_: return True else: return False def __read_float(self, cmd): # Byte array data = array.array('B', [0, 0, 0, 0]) with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.LCD_RESPONSE_DELAY) # Get bytes data[0] = self.i2c_bus.read_byte(HX.LCD_ADDRESS) data[1] = self.i2c_bus.read_byte(HX.LCD_ADDRESS) data[2] = self.i2c_bus.read_byte(HX.LCD_ADDRESS) data[3] = self.i2c_bus.read_byte(HX.LCD_ADDRESS) # Get check sum sum_ = self.i2c_bus.read_byte(HX.LCD_ADDRESS) # Check and return value if self.__sum_check(data, sum_): return struct.unpack('f', data)[0] else: return None def __read_byte(self, cmd): with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.LCD_RESPONSE_DELAY) # Get byte data = self.i2c_bus.read_byte(HX.LCD_ADDRESS) # Get check sum sum_ = self.i2c_bus.read_byte(HX.LCD_ADDRESS) # Check and return value if data ^ HX.XOR_SEQ == sum_: return data else: return None def __write_byte(self, cmd): cmd = (cmd << 8) | (cmd ^ HX.XOR_SEQ) self.i2c_bus.write_word_data(HX.LCD_ADDRESS, HX.EXT_COM, cmd) def send_cmd(self, cmd): try: with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.LCD_RESPONSE_DELAY) # Get response response = self.i2c_bus.read_byte(HX.LCD_ADDRESS) # Return return response except IOError: return HX.IOE_RSP def __send_command(self, command): try: with self.lock: self.i2c_bus.write_word_data(HX.LCD_ADDRESS, command, 0) time.sleep(HX.LCD_RESPONSE_DELAY) response = self.i2c_bus.read_byte(HX.LCD_ADDRESS) except IOError: if self.online: self.error_count += 1 if self.error_count >= HX.LCD_ERROR_LIMIT: self.online = False return HX.IOE_RSP else: self.error_count = 0 self.online = True return response return HX.NO_RSP