# -*- coding: utf-8 -*- ######################################## # HXbot power control module # # EoF 2016 EoF@itphx.ru # ######################################## import HX import threading import time import array import struct class HXpowerClass(): def __init__(self, i2c_bus, lock): super(HXpowerClass, self).__init__() self.i2c_bus = i2c_bus self.__e = None self.__t = None self.lock = lock # MV self.__mv_enable_time = 0.0 self.mv_blocked = True # HXpower data self.online = False self.is_running = False self.error_count = 0 self.status = 0x000000 self.vcc = 0.0 self.vin = 0.0 self.vdc = 0.0 self.vbt = 0.0 self.vba = 0.0 self.vpf = 0.0 self.vzu = 0.0 self.abt = 0.0 self.aba = 0.0 self.ain = 0.0 self.ahx = 0.0 self.vcc_ok = False self.vin_ok = False self.vdc_ok = False self.vbt_ok = False self.vba_ok = False self.vpf_ok = False self.vzu_ok = False self.abt_ok = False self.aba_ok = False self.ain_ok = False self.ahx_ok = False self.bt_enabled = False self.ba_enabled = False self.bt_blocked = False self.ba_blocked = False self.vcc_low = False self.vcc_high = False self.vin_low = False self.vin_high = False self.vdc_low = False self.vdc_high = False self.vbt_low = False self.vbt_full = False self.vba_low = False self.vba_full = False self.vpf_low = False self.vpf_high = False self.vzu_low = False self.vzu_high = False self.abt_low = False self.abt_high = False self.aba_low = False self.aba_high = False self.ain_low = False self.ain_high = False self.ahx_low = False self.ahx_high = False self.mv_enabled = False self.us_enabled = False self.in_plugged = False self.ba_full = False self.bt_full = False def start(self): self.__e = threading.Event() self.__t = threading.Thread(target=self.__process, args=(self.__e, )) self.__t.start() self.is_running = True def stop(self): self.__e.set() self.__t.join() self.is_running = False self.__e = None self.__t = None def __process(self, stopRequest): stat1 = 0x00 stat2 = 0x00 stat3 = 0x00 # Process loop while not stopRequest.is_set(): try: self.status = 0x0000 # Get status 1 tmp = self.__read_byte(HX.COM_GET_STAT1) if tmp is not None: stat1 = tmp << 8 else: if self.online: self.error_count += 1 # Get status 2 tmp = self.__read_byte(HX.COM_GET_STAT2) if tmp is not None: stat2 = tmp else: if self.online: self.error_count += 1 # Get status 3 tmp = self.__read_byte(HX.COM_GET_STAT3) if tmp is not None: stat3 = tmp << 16 else: if self.online: self.error_count += 1 self.status |= stat1 self.status |= stat2 self.status |= stat3 # Get Voltages tmp = self.__read_float(HX.COM_GET_VCC) if tmp is not None: self.vcc = tmp tmp = self.__read_float(HX.COM_GET_VIN) if tmp is not None: self.vin = tmp tmp = self.__read_float(HX.COM_GET_VDC) if tmp is not None: self.vdc = tmp tmp = self.__read_float(HX.COM_GET_VBT) if tmp is not None: self.vbt = tmp tmp = self.__read_float(HX.COM_GET_VBA) if tmp is not None: self.vba = tmp tmp = self.__read_float(HX.COM_GET_VPF) if tmp is not None: self.vpf = tmp tmp = self.__read_float(HX.COM_GET_VZU) if tmp is not None: self.vzu = tmp tmp = self.__read_float(HX.COM_GET_ABT) if tmp is not None: self.abt = tmp tmp = self.__read_float(HX.COM_GET_ABA) if tmp is not None: self.aba = tmp tmp = self.__read_float(HX.COM_GET_AIN) if tmp is not None: self.ain = tmp tmp = self.__read_float(HX.COM_GET_AHX) if tmp is not None: self.ahx = tmp self.online = True self.error_count = 0 except IOError: if self.online: self.error_count += 1 # Decode Status if self.status & 0x0100: self.vcc_ok = True else: self.vcc_ok = False if self.status & 0x0200: self.vin_ok = True else: self.vin_ok = False if self.status & 0x0400: self.vdc_ok = True else: self.vdc_ok = False if self.status & 0x0800: self.vbt_ok = True else: self.vbt_ok = False if self.status & 0x1000: self.vba_ok = True else: self.vba_ok = False if self.status & 0x0010: self.vpf_ok = True else: self.vpf_ok = False if self.status & 0x0020: self.vzu_ok = True else: self.vzu_ok = False if self.status & 0x2000: self.bt_enabled = True else: self.bt_enabled = False if self.status & 0x4000: self.ba_enabled = True else: self.ba_enabled = False if self.status & 0x0040: self.mv_enabled = True else: self.mv_enabled = False if self.status & 0x0080: self.us_enabled = True else: self.us_enabled = False if self.status & 0x8000: self.in_plugged = True else: self.in_plugged = False if self.status & 0x0001: self.bt_blocked = True else: self.bt_blocked = False if self.status & 0x0002: self.ba_blocked = True else: self.ba_blocked = False if self.status & 0x0004: self.bt_full = True else: self.bt_full = False if self.status & 0x0008: self.ba_full = True else: self.ba_full = False if self.status & 0x010000: self.abt_ok = True else: self.abt_ok = False if self.status & 0x020000: self.aba_ok = True else: self.aba_ok = False if self.status & 0x040000: self.ain_ok = True else: self.ain_ok = False if self.status & 0x080000: self.ahx_ok = True else: self.ahx_ok = False # LOW | HIGH if self.vcc < HX.VCC_LOW: self.vcc_low = True else: self.vcc_low = False if self.vcc > HX.VCC_HIGH: self.vcc_high = True else: self.vcc_high = False if self.vin < HX.VIN_LOW: self.vin_low = True else: self.vin_low = False if self.vin > HX.VIN_HIGH: self.vin_high = True else: self.vin_high = False if self.vdc < HX.VDC_LOW: self.vdc_low = True else: self.vdc_low = False if self.vdc > HX.VDC_HIGH: self.vdc_high = True else: self.vdc_high = False if self.vbt < HX.VBT_LOW: self.vbt_low = True else: self.vbt_low = False if self.vbt > HX.VBT_FULL: self.vbt_full = True else: self.vbt_full = False if self.vba < HX.VBA_LOW: self.vba_low = True else: self.vba_low = False if self.vba > HX.VBA_FULL: self.vba_full = True else: self.vba_full = False if self.vpf < HX.VPF_LOW: self.vpf_low = True else: self.vpf_low = False if self.vpf > HX.VPF_HIGH: self.vpf_high = True else: self.vpf_high = False if self.vzu < HX.VZU_LOW: self.vzu_low = True else: self.vzu_low = False if self.vzu > HX.VZU_HIGH: self.vzu_high = True else: self.vzu_high = False if self.abt < HX.ABT_LOW: self.abt_low = True else: self.abt_low = False if self.abt > HX.ABT_HIGH: self.abt_high = True else: self.abt_high = False if self.aba < HX.ABA_LOW: self.aba_low = True else: self.aba_low = False if self.aba > HX.ABA_HIGH: self.aba_high = True else: self.aba_high = False if self.ain < HX.AIN_LOW: self.ain_low = True else: self.ain_low = False if self.ain > HX.AIN_HIGH: self.ain_high = True else: self.ain_high = False if self.ahx < HX.AHX_LOW: self.ahx_low = True else: self.ahx_low = False if self.ahx > HX.AHX_HIGH: self.ahx_high = True else: self.ahx_high = False # Too many errors if self.error_count >= HX.POWER_ERROR_LIMIT: self.online = False # Unblock PF if (self.mv_enabled and time.clock() - self.__mv_enable_time >= HX.POWER_MV_TIMEOUT / 100): self.mv_blocked = False # Delay time.sleep(HX.POWER_DELAY) def __sum_check(self, val, sum_): tmp = 0x00 for b in val: tmp ^= b tmp ^= HX.XOR_SEQ if tmp == sum_: return True else: return False def __read_float(self, cmd): # Byte array data = array.array('B', [0, 0, 0, 0]) with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.POWER_RESPONSE_DELAY) # Get bytes data[0] = self.i2c_bus.read_byte(HX.POWER_ADDRESS) data[1] = self.i2c_bus.read_byte(HX.POWER_ADDRESS) data[2] = self.i2c_bus.read_byte(HX.POWER_ADDRESS) data[3] = self.i2c_bus.read_byte(HX.POWER_ADDRESS) # Get check sum sum_ = self.i2c_bus.read_byte(HX.POWER_ADDRESS) # Check and return value if self.__sum_check(data, sum_): return struct.unpack('f', data)[0] else: return None def __read_byte(self, cmd): with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.POWER_RESPONSE_DELAY) # Get byte data = self.i2c_bus.read_byte(HX.POWER_ADDRESS) # Get check sum sum_ = self.i2c_bus.read_byte(HX.POWER_ADDRESS) # Check and return value if data ^ HX.XOR_SEQ == sum_: return data else: return None def __write_byte(self, cmd): cmd = (cmd << 8) | (cmd ^ HX.XOR_SEQ) self.i2c_bus.write_word_data(HX.POWER_ADDRESS, HX.EXT_COM, cmd) def send_cmd(self, cmd): try: with self.lock: # Send command self.__write_byte(cmd) time.sleep(HX.POWER_RESPONSE_DELAY) # Get response response = self.i2c_bus.read_byte(HX.POWER_ADDRESS) # Return return response except IOError: return HX.IOE_RSP def enable_bt(self): # Enable power from BT if not self.bt_enabled: # Run command result = self.send_cmd(HX.COM_ENABLE_BT) # Return return result else: # Already enabled? OK! return HX.OK_RSP def disable_bt(self): # Disable power from BT if self.bt_enabled: # Run command result = self.send_cmd(HX.COM_DISABLE_BT) # Return return result else: # Already disabled? OK! return HX.OK_RSP def enable_us(self): # Enable USB power if not self.us_enabled: # Run command result = self.send_cmd(HX.COM_ENABLE_US) # Return return result else: # Already enabled? OK! return HX.OK_RSP def disable_us(self): # Disable USB power if self.us_enabled: # Run command result = self.send_cmd(HX.COM_DISABLE_US) # Return return result else: # Already disabled? OK! return HX.OK_RSP def enable_mv(self): # Enable PowerFunctions and servos if not self.mv_enabled: # Save MV enable time self.__mv_enable_time = time.clock() self.mv_blocked = True # Run command result = self.send_cmd(HX.COM_ENABLE_MV) # Return return result else: # Already enabled? OK! return HX.OK_RSP def disable_mv(self): # Disable PowerFunctions and servos if self.mv_enabled: # Run command result = self.send_cmd(HX.COM_DISABLE_MV) # Return return result else: # Already disabled? OK! return HX.OK_RSP def enable_ba(self): # Enable BA if not self.ba_enabled: # Run command result = self.send_cmd(HX.COM_ENABLE_BA) # Return return result else: # Already enabled? OK! return HX.OK_RSP def disable_ba(self): # Disable BA if self.ba_enabled: # Run command result = self.send_cmd(HX.COM_DISABLE_BA) # Return return result else: # Already disabled? OK! return HX.OK_RSP