hxbot/HXbot/HXpower.py

587 lines
15 KiB
Python
Executable File

# -*- coding: utf-8 -*-
########################################
# HXbot power control module #
# EoF 2016 EoF@itphx.ru #
########################################
import HX
import threading
import time
import array
import struct
class HXpowerClass():
def __init__(self, i2c_bus, lock):
super(HXpowerClass, self).__init__()
self.i2c_bus = i2c_bus
self.__e = None
self.__t = None
self.lock = lock
# MV
self.__mv_enable_time = 0.0
self.mv_blocked = True
# HXpower data
self.online = False
self.is_running = False
self.error_count = 0
self.status = 0x000000
self.vcc = 0.0
self.vin = 0.0
self.vdc = 0.0
self.vbt = 0.0
self.vba = 0.0
self.vpf = 0.0
self.vzu = 0.0
self.abt = 0.0
self.aba = 0.0
self.ain = 0.0
self.ahx = 0.0
self.vcc_ok = False
self.vin_ok = False
self.vdc_ok = False
self.vbt_ok = False
self.vba_ok = False
self.vpf_ok = False
self.vzu_ok = False
self.abt_ok = False
self.aba_ok = False
self.ain_ok = False
self.ahx_ok = False
self.bt_enabled = False
self.ba_enabled = False
self.bt_blocked = False
self.ba_blocked = False
self.vcc_low = False
self.vcc_high = False
self.vin_low = False
self.vin_high = False
self.vdc_low = False
self.vdc_high = False
self.vbt_low = False
self.vbt_full = False
self.vba_low = False
self.vba_full = False
self.vpf_low = False
self.vpf_high = False
self.vzu_low = False
self.vzu_high = False
self.abt_low = False
self.abt_high = False
self.aba_low = False
self.aba_high = False
self.ain_low = False
self.ain_high = False
self.ahx_low = False
self.ahx_high = False
self.mv_enabled = False
self.us_enabled = False
self.in_plugged = False
self.ba_full = False
self.bt_full = False
def start(self):
self.__e = threading.Event()
self.__t = threading.Thread(target=self.__process, args=(self.__e, ))
self.__t.start()
self.is_running = True
def stop(self):
self.__e.set()
self.__t.join()
self.is_running = False
self.__e = None
self.__t = None
def __process(self, stopRequest):
stat1 = 0x00
stat2 = 0x00
stat3 = 0x00
# Process loop
while not stopRequest.is_set():
try:
self.status = 0x0000
# Get status 1
tmp = self.__read_byte(HX.COM_GET_STAT1)
if tmp is not None:
stat1 = tmp << 8
else:
if self.online:
self.error_count += 1
# Get status 2
tmp = self.__read_byte(HX.COM_GET_STAT2)
if tmp is not None:
stat2 = tmp
else:
if self.online:
self.error_count += 1
# Get status 3
tmp = self.__read_byte(HX.COM_GET_STAT3)
if tmp is not None:
stat3 = tmp << 16
else:
if self.online:
self.error_count += 1
self.status |= stat1
self.status |= stat2
self.status |= stat3
# Get Voltages
tmp = self.__read_float(HX.COM_GET_VCC)
if tmp is not None:
self.vcc = tmp
tmp = self.__read_float(HX.COM_GET_VIN)
if tmp is not None:
self.vin = tmp
tmp = self.__read_float(HX.COM_GET_VDC)
if tmp is not None:
self.vdc = tmp
tmp = self.__read_float(HX.COM_GET_VBT)
if tmp is not None:
self.vbt = tmp
tmp = self.__read_float(HX.COM_GET_VBA)
if tmp is not None:
self.vba = tmp
tmp = self.__read_float(HX.COM_GET_VPF)
if tmp is not None:
self.vpf = tmp
tmp = self.__read_float(HX.COM_GET_VZU)
if tmp is not None:
self.vzu = tmp
tmp = self.__read_float(HX.COM_GET_ABT)
if tmp is not None:
self.abt = tmp
tmp = self.__read_float(HX.COM_GET_ABA)
if tmp is not None:
self.aba = tmp
tmp = self.__read_float(HX.COM_GET_AIN)
if tmp is not None:
self.ain = tmp
tmp = self.__read_float(HX.COM_GET_AHX)
if tmp is not None:
self.ahx = tmp
self.online = True
self.error_count = 0
except IOError:
if self.online:
self.error_count += 1
# Decode Status
if self.status & 0x0100:
self.vcc_ok = True
else:
self.vcc_ok = False
if self.status & 0x0200:
self.vin_ok = True
else:
self.vin_ok = False
if self.status & 0x0400:
self.vdc_ok = True
else:
self.vdc_ok = False
if self.status & 0x0800:
self.vbt_ok = True
else:
self.vbt_ok = False
if self.status & 0x1000:
self.vba_ok = True
else:
self.vba_ok = False
if self.status & 0x0010:
self.vpf_ok = True
else:
self.vpf_ok = False
if self.status & 0x0020:
self.vzu_ok = True
else:
self.vzu_ok = False
if self.status & 0x2000:
self.bt_enabled = True
else:
self.bt_enabled = False
if self.status & 0x4000:
self.ba_enabled = True
else:
self.ba_enabled = False
if self.status & 0x0040:
self.mv_enabled = True
else:
self.mv_enabled = False
if self.status & 0x0080:
self.us_enabled = True
else:
self.us_enabled = False
if self.status & 0x8000:
self.in_plugged = True
else:
self.in_plugged = False
if self.status & 0x0001:
self.bt_blocked = True
else:
self.bt_blocked = False
if self.status & 0x0002:
self.ba_blocked = True
else:
self.ba_blocked = False
if self.status & 0x0004:
self.bt_full = True
else:
self.bt_full = False
if self.status & 0x0008:
self.ba_full = True
else:
self.ba_full = False
if self.status & 0x010000:
self.abt_ok = True
else:
self.abt_ok = False
if self.status & 0x020000:
self.aba_ok = True
else:
self.aba_ok = False
if self.status & 0x040000:
self.ain_ok = True
else:
self.ain_ok = False
if self.status & 0x080000:
self.ahx_ok = True
else:
self.ahx_ok = False
# LOW | HIGH
if self.vcc < HX.VCC_LOW:
self.vcc_low = True
else:
self.vcc_low = False
if self.vcc > HX.VCC_HIGH:
self.vcc_high = True
else:
self.vcc_high = False
if self.vin < HX.VIN_LOW:
self.vin_low = True
else:
self.vin_low = False
if self.vin > HX.VIN_HIGH:
self.vin_high = True
else:
self.vin_high = False
if self.vdc < HX.VDC_LOW:
self.vdc_low = True
else:
self.vdc_low = False
if self.vdc > HX.VDC_HIGH:
self.vdc_high = True
else:
self.vdc_high = False
if self.vbt < HX.VBT_LOW:
self.vbt_low = True
else:
self.vbt_low = False
if self.vbt > HX.VBT_FULL:
self.vbt_full = True
else:
self.vbt_full = False
if self.vba < HX.VBA_LOW:
self.vba_low = True
else:
self.vba_low = False
if self.vba > HX.VBA_FULL:
self.vba_full = True
else:
self.vba_full = False
if self.vpf < HX.VPF_LOW:
self.vpf_low = True
else:
self.vpf_low = False
if self.vpf > HX.VPF_HIGH:
self.vpf_high = True
else:
self.vpf_high = False
if self.vzu < HX.VZU_LOW:
self.vzu_low = True
else:
self.vzu_low = False
if self.vzu > HX.VZU_HIGH:
self.vzu_high = True
else:
self.vzu_high = False
if self.abt < HX.ABT_LOW:
self.abt_low = True
else:
self.abt_low = False
if self.abt > HX.ABT_HIGH:
self.abt_high = True
else:
self.abt_high = False
if self.aba < HX.ABA_LOW:
self.aba_low = True
else:
self.aba_low = False
if self.aba > HX.ABA_HIGH:
self.aba_high = True
else:
self.aba_high = False
if self.ain < HX.AIN_LOW:
self.ain_low = True
else:
self.ain_low = False
if self.ain > HX.AIN_HIGH:
self.ain_high = True
else:
self.ain_high = False
if self.ahx < HX.AHX_LOW:
self.ahx_low = True
else:
self.ahx_low = False
if self.ahx > HX.AHX_HIGH:
self.ahx_high = True
else:
self.ahx_high = False
# Too many errors
if self.error_count >= HX.POWER_ERROR_LIMIT:
self.online = False
# Unblock PF
if (self.mv_enabled and time.clock() - self.__mv_enable_time >=
HX.POWER_MV_TIMEOUT / 100):
self.mv_blocked = False
# Delay
time.sleep(HX.POWER_DELAY)
def __sum_check(self, val, sum_):
tmp = 0x00
for b in val:
tmp ^= b
tmp ^= HX.XOR_SEQ
if tmp == sum_:
return True
else:
return False
def __read_float(self, cmd):
# Byte array
data = array.array('B', [0, 0, 0, 0])
with self.lock:
# Send command
self.__write_byte(cmd)
time.sleep(HX.POWER_RESPONSE_DELAY)
# Get bytes
data[0] = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
data[1] = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
data[2] = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
data[3] = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
# Get check sum
sum_ = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
# Check and return value
if self.__sum_check(data, sum_):
return struct.unpack('f', data)[0]
else:
return None
def __read_byte(self, cmd):
with self.lock:
# Send command
self.__write_byte(cmd)
time.sleep(HX.POWER_RESPONSE_DELAY)
# Get byte
data = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
# Get check sum
sum_ = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
# Check and return value
if data ^ HX.XOR_SEQ == sum_:
return data
else:
return None
def __write_byte(self, cmd):
cmd = (cmd << 8) | (cmd ^ HX.XOR_SEQ)
self.i2c_bus.write_word_data(HX.POWER_ADDRESS, HX.EXT_COM, cmd)
def send_cmd(self, cmd):
try:
with self.lock:
# Send command
self.__write_byte(cmd)
time.sleep(HX.POWER_RESPONSE_DELAY)
# Get response
response = self.i2c_bus.read_byte(HX.POWER_ADDRESS)
# Return
return response
except IOError:
return HX.IOE_RSP
def enable_bt(self):
# Enable power from BT
if not self.bt_enabled:
# Run command
result = self.send_cmd(HX.COM_ENABLE_BT)
# Return
return result
else:
# Already enabled? OK!
return HX.OK_RSP
def disable_bt(self):
# Disable power from BT
if self.bt_enabled:
# Run command
result = self.send_cmd(HX.COM_DISABLE_BT)
# Return
return result
else:
# Already disabled? OK!
return HX.OK_RSP
def enable_us(self):
# Enable USB power
if not self.us_enabled:
# Run command
result = self.send_cmd(HX.COM_ENABLE_US)
# Return
return result
else:
# Already enabled? OK!
return HX.OK_RSP
def disable_us(self):
# Disable USB power
if self.us_enabled:
# Run command
result = self.send_cmd(HX.COM_DISABLE_US)
# Return
return result
else:
# Already disabled? OK!
return HX.OK_RSP
def enable_mv(self):
# Enable PowerFunctions and servos
if not self.mv_enabled:
# Save MV enable time
self.__mv_enable_time = time.clock()
self.mv_blocked = True
# Run command
result = self.send_cmd(HX.COM_ENABLE_MV)
# Return
return result
else:
# Already enabled? OK!
return HX.OK_RSP
def disable_mv(self):
# Disable PowerFunctions and servos
if self.mv_enabled:
# Run command
result = self.send_cmd(HX.COM_DISABLE_MV)
# Return
return result
else:
# Already disabled? OK!
return HX.OK_RSP
def enable_ba(self):
# Enable BA
if not self.ba_enabled:
# Run command
result = self.send_cmd(HX.COM_ENABLE_BA)
# Return
return result
else:
# Already enabled? OK!
return HX.OK_RSP
def disable_ba(self):
# Disable BA
if self.ba_enabled:
# Run command
result = self.send_cmd(HX.COM_DISABLE_BA)
# Return
return result
else:
# Already disabled? OK!
return HX.OK_RSP