acoused/Model/peacock_uvp/apf04_modbus.py

335 lines
12 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @copyright this code is the property of Ubertone.
# You may use this code for your personal, informational, non-commercial purpose.
# You may not distribute, transmit, display, reproduce, publish, license, create derivative works from, transfer or sell any information, software, products or services based on this code.
# @author Stéphane Fischer, Marie Burckbuchler
import struct # Struct est utilisée pour extraite les données séries
import serial # Utilisé pour récuperer les donnée vennant de la liaison Série RS485
from sys import platform
import traceback
import logging
from time import time, sleep
from Model.peacock_uvp.apf04_exception import apf04_error, apf04_exception
from Model.peacock_uvp.modbus_crc import crc16
def hex_print (_bytes):
""" @brief print a byte array in hexadecimal string
"""
print (''.join('%02x'%i for i in _bytes))
def autodetect_usb_device() :
# In case of a *nux system, we can find the port of the APF04
# automatically thanks to the serial.tools.list_ports library
# and knowing that the RS485 to USB adapter has PID:VID = 0403:6001
usb_device = None
# Known USB chips defined by VID/PID
known_chips ={"0403:6001", "1486:5523", "1A86:5523", "1A86:7523"}
if platform in ["linux","linux2","darwin","cygwin"]: # linux and Mac OS
import serial.tools.list_ports as lPort
reslt = lPort.comports()
for res in reslt:
logging.debug("checking %s / %s"%(res[0],res[2]))
# get USB device id
try:
device_id = res[2].split("VID:PID=")[1].split(" ")[0]
logging.debug("usb_device_id = %s"%device_id)
except:
device_id = None
# check if the device is known
if device_id in known_chips : # dongle USB avec et sans alim
logging.debug("APF04 detected on serial port: %s", res[2])
usb_device = res[0]
elif usb_device == None : #if no device has been detected yet
print("unknown device detected on serial port: %s (the last found will be selected)"%(res))
print("You should add the device manually in 'known_chips' dict")
usb_device = res[0]
# for platform == "cygwin" and "win32", the serial port should be modified manually:
# for example "/dev/ttyS3" on cygwin or "COM10" on Windows
if usb_device is None : # usb device could not be detected
logging.critical("USB device cannot be detected automatically, check the wiring or specify the device port.")
raise apf04_error (1000, "No device port defined.")
return usb_device
class Apf04Modbus ():
""" @brief Modbus communication layer
modbus est en big-endian (défaut)
l'adressage est fait en 16 bits.
"""
def __init__(self, _baudrate=None, _dev=None):
""" @brief Initialisation de la couche communication de l'instrument
@param _baudrate : vitesse de communication, 57600 bits par seconde par defaut
_baudrate peut avoir pour valeur 230400 115200 57600 ...
"""
# Default device address on modbus
self.apf04_addr = 0x04
#la lecture et l'écriture de bloc sont segmentées en blocs de 123 mots
# Modbus limite les blocs à un maximum de 123 mots en ecriture et 125 mots en lecture
self.max_seg_size = 123
logging.debug("Platform is %s", platform)
self.usb_device = _dev
if self.usb_device is None :
print ("Getting the USB device automatically")
self.usb_device = autodetect_usb_device()
logging.debug("usb_device is at %s with baudrate %s"%(self.usb_device, _baudrate))
if _baudrate :
self.connect(_baudrate)
# In order to reduce serial latency of the linux driver, you may set the ASYNC_LOW_LATENCY flag :
# setserial /dev/<tty_name> low_latency
logging.debug("end init")
def connect (self, _baudrate):
try :
# Create an instance of the Peacock's driver at a given baudrate
self.ser = serial.Serial(self.usb_device, _baudrate, timeout=0.5, \
bytesize=8, parity='N', stopbits=1, xonxoff=0, rtscts=0)
# serial timeout is set to 500 ms. This can be changed by setting
# self.ser.timeout to balance between performance and efficiency
except serial.serialutil.SerialException :
raise apf04_error (1005, "Unable to connect to the device.")
def __del__(self):
""" @brief close serial port if necessary """
try : # au cas où le constructeur a planté
self.ser.close()
except :
pass
def autobaud (self):
""" @brief automatically detect the baudrate
@return baudrate if detected, None instead
If the baudrate is found, the connexion to the device is active
WARNING : be carefull, this method is not robust, you may try several times to get the baudrate
"""
# Scan available baudrates for the Peacock UVP
for baudrate in [57600, 115200, 230400, 750000]:
try:
logging.debug("try if baudrate = %d"%baudrate)
self.connect(baudrate)
# Read the firmware version
self.read_i16(0)
except:
# if failed, the baudrate is wrong
self.ser.close()
continue
# if success, the baudrate is correct
return baudrate
logging.debug("Fail to detect the baudrate automatically")
return None
def __check_addr_range(self, _begin, _size):
""" @brief check if the address range is allowed
@param _begin : addresse de début en octets
@param _size : taille du bloc en mots (16 bits)
"""
addr_ram_begin = 0x0000
addr_ram_end = 0x07FF
addr_reg_action = 0xFFFD # also defined as ADDR_ACTION in apf04_addr_cmd.py
# adresses des blocs mémoire :
assert(_begin>=addr_ram_begin)
if _begin>addr_ram_end:
assert _begin!=addr_reg_action and _size!=1, "Warning, access at %d, size= %d bytes not allowed"%(_begin, _size)
def __read__(self, _size, _timeout=0.0):
""" @brief Low level read method
@param _size number of bytes to read
"""
if _size == 0:
raise apf04_error(2002, "ask to read null size data." )
try :
read_data = b''
start_time = time()
# the read of modbus is not interuptible
while (True):
read_data += self.ser.read(_size)
if len (read_data) == _size or time() - start_time > _timeout:
break
except serial.serialutil.SerialException:
#self.log("hardware apparently disconnected")
#read_data = b''
raise apf04_error(1010, "Hardware apparently disconnected." )
if len (read_data) != _size :
if len (read_data) == 0:
logging.debug ("WARNING timeout, no answer from device")
raise apf04_exception(2003, "timeout : device do not answer (please check cable connexion, timeout or baudrate)" )
else :
logging.debug ("WARNING, uncomplete answer from device (%d/%d)"%(len (read_data), _size))
raise apf04_exception(2004, "timeout : uncomplete answer from device (please check timeout or baudrate) (%d/%d)"%(len (read_data), _size))
return read_data
############## Read functions ###############################################
def read_i16 (self, _addr):
""" @brief Read one word (signed 16 bits)
@param _addr : data address (given in bytes)
@return : integer
"""
# les données sont transmises en big endian (octet de poids faible en premier)
return struct.unpack(">h",self.read_seg_16(_addr , 1))[0]
def read_list_i16(self, _addr, _size):
""" @brief Read several words (signed 16 bits)
@param _addr : data address (given in bytes)
@param _size : number of word to read
@return : list of integers
"""
# TODO utiliser read_buf_i16
return struct.unpack(">%dh"%_size,self.read_seg_16(_addr , _size))
# TODO mettre en private
def read_seg_16(self, _addr, _size):
""" @brief Low level read (in a single modbus frame)
@param _addr : data address (given in bytes)
@param _size : number of word to read
@return : byte array
"""
assert (_size <= self.max_seg_size) # segment de 125 mots (max en lecture)
logging.debug ("reading %d words at %d"%(_size, _addr))
# on utilise la fonction modbus 3 pour la lecture des octets
#self.__check_addr_range(_addr, 2 * _size)
# request read
read_query = struct.pack(">BBHh",self.apf04_addr, 0x03, _addr, _size )
read_query += struct.pack(">H",crc16 (read_query) )
#print ("read query = ")
#hex_print(read_query)
try :
self.ser.write(read_query)
except serial.serialutil.SerialException:
#self.log("hardware apparently disconnected")
# TODO traiter les différentes erreurs, se mettre en 3 MBaud sur R0W (bcp de buffer overflow !)
raise apf04_error(1010, "Hardware apparently disconnected." )
# read answer
slave_response = self.__read__(3)
if slave_response[1] != 3:
logging.info ("WARNING error while reading %s"%slave_response)
slave_response += self.__read__(slave_response[2]+2)
#print ("slave answer = ")
#hex_print(slave_response)
# check crc
#print ("%X"%crc16 (slave_response[0:-2]))
#print ("%X"%struct.unpack(">H",slave_response[-2:]))
assert (crc16 (slave_response[0:-2]) == struct.unpack(">H",slave_response[-2:])[0])
return slave_response[3:-2]
def read_buf_i16 (self, _addr , _size):
""" @brief Read buffer
@param _addr : data address (given in bytes)
@param _size : number of word to read
@return : byte array
Note : data are transmitted in big endian
"""
data = b''
addr = _addr
remind = _size
logging.debug ("reading %d words at %d"%(_size, _addr))
while remind :
logging.debug ("remind = %s ; self.max_seg_size = %s ; div : %s"%(remind, self.max_seg_size, remind/self.max_seg_size))
if remind/self.max_seg_size>=1:
logging.debug ("read max_seg_size")
seg_size=self.max_seg_size
else :
seg_size = remind
logging.debug ("read remind")
data+=self.read_seg_16(addr , seg_size)
addr+=seg_size # addr en mots de 16 bits
remind-=seg_size
#print( "__Read_buf : %d readed"%(int(addr - _addr)) )
return data
############## Write functions ##############################################
def write_i16 (self, _value, _addr, _timeout=0.0):
""" @brief Write one word (signed 16 bits)
@param _value : value of the word
@param _addr : destination data address (given in bytes)
"""
try:
self.write_buf_i16 ([_value], _addr, _timeout)
except apf04_exception as ae:
raise ae # apf04_exception are simply raised upper
except :
print(traceback.format_exc())
raise apf04_error(3000, "write_i16 : FAIL to write 0%04x at %d\n"%(_value, _addr))
def write_buf_i16 (self, _data, _addr, _timeout=0.0):
""" @brief Write buffer
@param _data : list of words (max size : 123 words)
@param _addr : data address (given in bytes)
"""
# ATTENTION ici on ne gère pas de boucle sur un "write_seg_16" car on n'a pas besoin d'écrire de gros blocs de données
# segmenter en blocs de 123 mots (max en ecriture)
assert (len(_data)<=self.max_seg_size)
try:
# request read
write_query = struct.pack(">BBHhB%sh"%len(_data),self.apf04_addr, 16, _addr, len(_data), 2*len(_data), *_data )
write_query += struct.pack(">H",crc16 (write_query) )
try:
#print (write_query)
self.ser.write(write_query)
except serial.serialutil.SerialException:
logging.error("hardware apparently disconnected")
raise apf04_error(3004, "write_buf_i16 : hardware apparently disconnected")
# read answer
slave_response = self.__read__(2, _timeout)
# TODO 1 : format de la trame d'erreur et codes d'erreurs effectivement traités
if slave_response[1] == 16 :
slave_response += self.__read__(6)
# TODO sur le principe il faudrait vérifier que le bon nombre de mots a été écrit
else:
# TODO traiter les erreurs selon doc
size = struct.unpack("B",self.__read__(1))[0]
print ("size following : %d"%size)
self.__read__(size)
print("error while writing")
print (slave_response)
except apf04_exception as ae:
raise ae # apf04_exception are simply raised upper
except :
print(traceback.format_exc())
raise apf04_error(3001, "write_buf_i16 : Fail to write")