acoused/Model/peacock_uvp/apf04_driver.py

178 lines
6.9 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @copyright this code is the property of Ubertone.
# You may use this code for your personal, informational, non-commercial purpose.
# You may not distribute, transmit, display, reproduce, publish, license, create derivative works from, transfer or sell any information, software, products or services based on this code.
# @author Stéphane Fischer
from datetime import datetime
import struct
import logging
from Model.peacock_uvp.apf04_modbus import Apf04Modbus
from Model.peacock_uvp.apf04_addr_cmd import *
from Model.peacock_uvp.apf04_config_hw import ConfigHw
from Model.peacock_uvp.apf_timestamp import encode_timestamp
from Model.peacock_uvp.apf04_exception import apf04_exception
# TODO gérer ici les erreur spécifiques au HW
class Apf04Driver (Apf04Modbus):
""" @brief gère l'instrument APF04
"""
# TODO : tester la com dans un init par une lecture de la version
def __init__(self, _baudrate, _f_sys, _dev=None, _addr_dict=None):
self.f_sys=_f_sys
Apf04Modbus.__init__(self, _baudrate, _dev)
self.addr = _addr_dict
def new_config (self):
""" @brief create an empty config
"""
# TODO pourrait aussi s'appeler create_config ou empty_config @marie : un avis ?
return ConfigHw(self.f_sys)
def read_config (self, _id_config=0):
""" @brief lecture des paramètres d'une configuration
@param _id_config : identifiant de la configuration [0..2] (par défaut la config n°1/3)
principalement utilisé pour relire la config après un check_config
"""
self.config = ConfigHw(self.f_sys)
self.config.id_config = _id_config
# tous les paramètres des settings sont en signé
self.config.from_list(self.read_list_i16(int(self.addr["ADDR_CONFIG"])+_id_config*int(self.addr["OFFSET_CONFIG"]), int(self.addr["SIZE_CONFIG"]))) # en mots
return self.config
# TODO .to_list() à faire par l'appelant ? APF04Driver ne connait pas config_hw ou passer config_hw en self.config (actuellement au niveau au dessus) ?
def write_config (self, _config, _id_config):
""" @brief écriture des paramètres d'une configuration
@param _config : configuration (de type ConfigHw)
@param _id_config : identifiant de la configuration [0..2]
"""
logging.debug("%s"%(_config.to_list()))
self.write_buf_i16(_config.to_list(), self.addr["ADDR_CONFIG"]+_id_config*self.addr["OFFSET_CONFIG"])
# DEFINI LA CONFIG 0 UTILISEE PAR L'APPAREIL
# _config = [0..2]
def select_config (self, _id_config):
logging.debug("selecting config %d [0..N-1]"%(_id_config))
self.write_i16(_id_config, self.addr["ADDR_CONFIG_ID"])
def read_version (self):
""" @brief Lecture des versions C et VHDL
"""
self.version_vhdl = self.read_i16(ADDR_VERSION_VHDL)
self.version_c = self.read_i16(ADDR_VERSION_C)
logging.debug("Version VHDL=%s", self.version_vhdl)
logging.debug("Version C=%s", self.version_c)
if self.version_c < 45:
print ("WARNING firmware version %d do not provide noise measurements in profile's header" % self.version_c)
self.model = 0
self.year = 2018
self.serial_num = 0
else :
model_year = self.read_i16(ADDR_MODEL_YEAR)
self.model = (model_year & 0xFF00)>>8
self.year = 2000 + (model_year & 0x00FF)
if self.model == 0x01 :
logging.debug("Model is Peacock UVP")
else :
logging.info("Warning, model (id %s) is not defined"%self.model)
logging.debug("Year of production = %s", self.year)
self.serial_num = self.read_i16(ADDR_SERIAL_NUM)
logging.debug("Serial number=%s", self.serial_num)
return self.version_vhdl, self.version_c
def write_sound_speed (self, sound_speed=1480, sound_speed_auto=False):
""" @brief Writing of the sound speed global parameter in RAM
"""
addr_ss_auto = self.addr["ADDR_SOUND_SPEED_AUTO"]
addr_ss_set = self.addr["ADDR_SOUND_SPEED_SET"]
# fix for firmware prior to 45
if self.version_c < 45:
addr_ss_auto -= 2
addr_ss_set -= 2
if sound_speed_auto:
self.write_i16(1, addr_ss_auto)
else:
self.write_i16(0, addr_ss_auto)
self.write_i16(sound_speed, addr_ss_set)
def __action_cmd__(self, _cmd, _timeout=0.0):
""" @brief generic action function
send a command asking for a given action. Unless specific case,
the function is released when the action is finished. The timeout
should be set consequently. """
try:
self.write_i16(_cmd, ADDR_ACTION, _timeout)
except apf04_exception as ae:
logging.info("apf04_exception catched with command %s with timeout %e"%(_cmd, _timeout))
raise ae
def act_stop (self):
""" @brief Stop the measurement (only in non blocking mode)"""
self.__action_cmd__(CMD_STOP, 5.0)
def act_meas_I2C (self):
""" @brief Make one measure of pitch, roll and temp. Those values are then updated in the RAM.
"""
self.__action_cmd__(CMD_TEST_I2C, 2.0)
def act_test_led (self):
self.__action_cmd__(CMD_TEST_LED, 1.5)
# timeout set to 1.5 seconds to let the Led blink
def act_meas_IQ (self):
self.__action_cmd__(CMD_PROFILE_IQ) # TODO timeout
def act_meas_profile (self, _timeout=0.):
""" @brief start to measure a block of profils
@param _timeout maximum delay to get an answer from the board
"""
# get UTC timestamp just before strating the measurements
self.timestamp_profile = datetime.utcnow()
logging.debug ("setting timeout to %f"%_timeout)
self.__action_cmd__(CMD_PROFILE_BLOCKING, _timeout)
def act_check_config (self):
self.__action_cmd__(CMD_CHECK_CONFIG, 0.2)
def act_start_auto_mode (self):
self.__action_cmd__(CMD_START_AUTO) # TODO timeout
def read_temp (self):
return self.read_i16(self.addr["ADDR_TEMP_MOY"])
def read_pitch (self):
return self.read_i16(self.addr["ADDR_TANGAGE"])
def read_roll (self):
return self.read_i16(self.addr["ADDR_ROULIS"])
def read_profile (self, _n_vol):
logging.debug("timestamp: %s"%self.timestamp_profile)
#logging.debug("pitch: %s, roll: %s,"%(self.read_i16(self.addr["ADDR_TANGAGE"]), self.read_i16(self.addr["ADDR_ROULIS"])))
#logging.debug("pitch: %s, roll: %s, temps: %s, sound_speed: %s, ca0: %s, ca1: %s"%(self.read_i16(self.addr["ADDR_TANGAGE"]), self.read_i16(self.addr["ADDR_ROULIS"]), self.read_i16(self.addr["ADDR_TEMP_MOY"]), self.read_i16(self.addr["ADDR_SOUND_SPEED"]), self.read_i16(self.addr["ADDR_GAIN_CA0"]), self.read_i16(self.addr["ADDR_GAIN_CA1"])))
data_list = self.read_buf_i16(self.addr["ADDR_PROFILE_HEADER"], self.addr["SIZE_PROFILE_HEADER"] + _n_vol*4)
logging.debug("processing+transfert delay = %fs"%(datetime.utcnow()-self.timestamp_profile).total_seconds())
# on passe en litte endian (les données initiales sont en big endian)
# traitement < 1ms pour 50 cellules sur macbook pro
data_packed = struct.pack('<%sh'%int(len(data_list)/2), \
*struct.unpack('>%sh'%int(len(data_list)/2), data_list))
logging.debug("pack string = '%s'"%'>%sh'%int(len(data_list)/2))
logging.debug("processing+transfert+swap delay = %fs"%(datetime.utcnow()-self.timestamp_profile).total_seconds())
return encode_timestamp(self.timestamp_profile) + data_packed