149 lines
3.9 KiB
Python
Executable File
149 lines
3.9 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
|
|
########################################
|
|
# HXbot LCD module #
|
|
# EoF 2016 EoF@itphx.ru #
|
|
########################################
|
|
|
|
import HX
|
|
import threading
|
|
import time
|
|
import array
|
|
import struct
|
|
|
|
|
|
class HXlcdClass():
|
|
|
|
def __init__(self, i2c_bus, lock):
|
|
super(HXlcdClass, self).__init__()
|
|
|
|
self.i2c_bus = i2c_bus
|
|
self.__e = None
|
|
self.__t = None
|
|
|
|
self.lock = lock
|
|
self.online = False
|
|
self.is_running = False
|
|
self.error_count = 0
|
|
|
|
def start(self):
|
|
self.__e = threading.Event()
|
|
self.__t = threading.Thread(target=self.__process, args=(self.__e, ))
|
|
|
|
self.__t.start()
|
|
self.is_running = True
|
|
|
|
def stop(self):
|
|
self.__e.set()
|
|
self.__t.join()
|
|
self.is_running = False
|
|
|
|
self.__e = None
|
|
self.__t = None
|
|
|
|
def __process(self, stopRequest):
|
|
while not stopRequest.is_set():
|
|
try:
|
|
self.__send_command(HX.COM_PING)
|
|
|
|
self.online = True
|
|
|
|
except IOError:
|
|
if self.online:
|
|
self.error_count += 1
|
|
|
|
# Too many errors
|
|
if self.error_count >= HX.LCD_ERROR_LIMIT:
|
|
self.online = False
|
|
|
|
# Delay
|
|
time.sleep(HX.LCD_DELAY)
|
|
|
|
def __sum_check(self, val, sum_):
|
|
tmp = 0x00
|
|
for b in val:
|
|
tmp ^= b
|
|
tmp ^= HX.XOR_SEQ
|
|
if tmp == sum_:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __read_float(self, cmd):
|
|
# Byte array
|
|
data = array.array('B', [0, 0, 0, 0])
|
|
with self.lock:
|
|
# Send command
|
|
self.__write_byte(cmd)
|
|
time.sleep(HX.LCD_RESPONSE_DELAY)
|
|
|
|
# Get bytes
|
|
data[0] = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
data[1] = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
data[2] = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
data[3] = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
|
|
# Get check sum
|
|
sum_ = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
|
|
# Check and return value
|
|
if self.__sum_check(data, sum_):
|
|
return struct.unpack('f', data)[0]
|
|
else:
|
|
return None
|
|
|
|
def __read_byte(self, cmd):
|
|
with self.lock:
|
|
# Send command
|
|
self.__write_byte(cmd)
|
|
time.sleep(HX.LCD_RESPONSE_DELAY)
|
|
|
|
# Get byte
|
|
data = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
|
|
# Get check sum
|
|
sum_ = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
|
|
# Check and return value
|
|
if data ^ HX.XOR_SEQ == sum_:
|
|
return data
|
|
else:
|
|
return None
|
|
|
|
def __write_byte(self, cmd):
|
|
cmd = (cmd << 8) | (cmd ^ HX.XOR_SEQ)
|
|
self.i2c_bus.write_word_data(HX.LCD_ADDRESS, HX.EXT_COM, cmd)
|
|
|
|
def send_cmd(self, cmd):
|
|
try:
|
|
with self.lock:
|
|
# Send command
|
|
self.__write_byte(cmd)
|
|
time.sleep(HX.LCD_RESPONSE_DELAY)
|
|
|
|
# Get response
|
|
response = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
|
|
# Return
|
|
return response
|
|
except IOError:
|
|
return HX.IOE_RSP
|
|
|
|
def __send_command(self, command):
|
|
try:
|
|
with self.lock:
|
|
self.i2c_bus.write_word_data(HX.LCD_ADDRESS, command, 0)
|
|
time.sleep(HX.LCD_RESPONSE_DELAY)
|
|
response = self.i2c_bus.read_byte(HX.LCD_ADDRESS)
|
|
except IOError:
|
|
if self.online:
|
|
self.error_count += 1
|
|
if self.error_count >= HX.LCD_ERROR_LIMIT:
|
|
self.online = False
|
|
return HX.IOE_RSP
|
|
else:
|
|
self.error_count = 0
|
|
self.online = True
|
|
return response
|
|
|
|
return HX.NO_RSP |