# ============================================================================== # # acoustic_data_loder_UBSediFlow.py - AcouSed # # Copyright (C) 2024 INRAE # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # by Brahim MOUDJED # # ============================================================================== # # -*- coding: utf-8 -*- import numpy as np from Model.udt_extract.raw_extract import raw_extract class AcousticDataLoaderUBSediFlow: def __init__(self, path_BS_raw_data: str): self.path_BS_raw_data = path_BS_raw_data # --- Extract Backscatter acoustic raw data with class --- # Extraction function: device_name, time_begin, time_end, param_us_dicts, data_us_dicts, data_dicts, settings_dict \ = raw_extract(self.path_BS_raw_data) # # --- Date and Hour of measurements read on udt data file --- self._date = time_begin.date() self._hour = time_begin.time() self._freq = np.array([[]]) self._nb_profiles = [] self._nb_profiles_per_sec = [] self._nb_cells = [] self._cell_size = [] self._pulse_length = [] self._nb_pings_per_sec = [] self._nb_pings_averaged_per_profile = [] self._kt = [] self._gain_rx = [] self._gain_tx = [] self._r = np.array([[]]) self._time = np.array([[]]) self._time_snr = np.array([[]]) self._BS_raw_data = np.array([[[]]]) time_len = [] for config in param_us_dicts.keys(): for channel in param_us_dicts[config].keys(): # --- Frequencies --- self._freq = np.append(self._freq, param_us_dicts[config][channel]['f0']) self._nb_cells = np.append(self._nb_cells, param_us_dicts[config][channel]['n_cell']) self._cell_size = np.append(self._cell_size, param_us_dicts[config][channel]['r_dcell']) self._pulse_length = np.append(self._pulse_length, 0) self._nb_pings_per_sec = np.append(self._nb_pings_per_sec, param_us_dicts[config][channel]['prf']) self._nb_pings_averaged_per_profile = np.append(self._nb_pings_averaged_per_profile, param_us_dicts[config][channel]['n_p']) self._kt = np.append(self._kt, 0) self._gain_rx = np.append(self._gain_rx, param_us_dicts[config][channel]['a0']) self._gain_tx = np.append(self._gain_tx, param_us_dicts[config][channel]['a1']) # --- Depth for each frequencies --- depth = [param_us_dicts[config][channel]['r_dcell'] * i for i in list(range(param_us_dicts[config][channel]['n_cell']))] if self._r.shape[1] == 0: self._r = np.array([depth]) else: if len(depth) == self._r.shape[1]: self._r = np.append(self._r, np.array([depth]), axis=0) elif len(depth) < self._r.shape[1]: self._r = self._r[:, :len(depth)] self._r = np.append(self._r, np.array([depth]), axis=0) elif len(depth) > self._r.shape[1]: self._r = np.append(self._r, np.array([depth[:self._r.shape[1]]]), axis=0) # --- BS Time for each frequencies --- time = [[(t - data_us_dicts[config][channel]['echo_avg_profile']['time'][0]).total_seconds() for t in data_us_dicts[config][channel]['echo_avg_profile']['time']]] time_len = np.append(time_len, len(time[0])) if len(time_len) == 1: self._time = np.array(time) elif self._time.shape[1] == len(time[0]): self._time = np.append(self._time, time, axis=0) elif self._time.shape[1] > len(time[0]): self._time = np.delete(self._time, [int(np.min(time_len)) + int(i) - 1 for i in range(1, int(np.max(time_len))-int(np.min(time_len))+1)], axis=1) self._time = np.append(self._time, time, axis=0) elif self._time.shape[1] < len(time[0]): time = time[:int(np.max(time_len)) - (int(np.max(time_len)) - int(np.min(time_len)))] self._time = np.append(self._time, time, axis=0) self._nb_profiles = np.append(self._nb_profiles, self._time.shape[1]) self._nb_profiles_per_sec = np.append(self._nb_profiles_per_sec, param_us_dicts[config][channel]['n_avg']) # --- US Backscatter raw signal --- BS_data = np.array([[]]) if config == 1: BS_data = np.array([data_us_dicts[config][channel]['echo_avg_profile']['data'][0]]) for i in range(self._time.shape[1]): BS_data = np.append(BS_data, np.array( [data_us_dicts[config][channel]['echo_avg_profile']['data'][i]]), axis=0) self._BS_raw_data = np.array([BS_data[:self._time.shape[1], :].transpose()]) else: BS_data = np.array([data_us_dicts[config][channel]['echo_avg_profile']['data'][0]]) for j in range(self._time.shape[1]): BS_data = np.append(BS_data, np.array( [data_us_dicts[config][channel]['echo_avg_profile']['data'][j]]), axis=0) BS_data = np.array([BS_data.transpose()]) # 1- time shape > BS data shape # <=> data recorded with the frequency are longer than data recorded with the other lower frequencies if (BS_data.shape[2] > self._BS_raw_data.shape[2]): if (BS_data.shape[1] > self._BS_raw_data.shape[1]): # print(f"BS_data shape[0] = {BS_data.shape[0]}") # print(f"BS_raw_data shape[2] = {BS_raw_data.shape[2]}") self._BS_raw_data = np.append(self._BS_raw_data, BS_data[:, :self._BS_raw_data.shape[1], :self._BS_raw_data.shape[2]], axis=0) elif (BS_data.shape[1] < self._BS_raw_data.shape[1]): self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], :], BS_data[:, :, :self._BS_raw_data.shape[2]], axis=0) else: self._BS_raw_data = np.append(self._BS_raw_data, BS_data[:, :, :self._BS_raw_data.shape[2]], axis=0) # 2- time shape < BS data shape # <=> data recorded with the frequency are shorter than data recorded with the other lower frequencies elif BS_data.shape[2] < self._BS_raw_data.shape[2]: if (BS_data.shape[1] > self._BS_raw_data.shape[1]): self._BS_raw_data = np.append(self._BS_raw_data[:, :, BS_data.shape[2]], BS_data[:, :self._BS_raw_data.shape[1], :], axis=0) elif (BS_data.shape[1] < self._BS_raw_data.shape[1]): self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], BS_data.shape[2]], BS_data, axis=0) else: self._BS_raw_data = np.append(self._BS_raw_data[:, :, BS_data.shape[0]], BS_data, axis=0) # 3- time shape = BS data shape # <=> data recorded with the frequency have the same duration than data recorded with the other lower frequency else: if (BS_data.shape[1] > self._BS_raw_data.shape[1]): self._BS_raw_data = np.append(self._BS_raw_data, BS_data[:, :self._BS_raw_data.shape[1], :], axis=0) elif (BS_data.shape[1] < self._BS_raw_data.shape[1]): self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], :], BS_data, axis=0) else: self._BS_raw_data = np.append(self._BS_raw_data, BS_data, axis=0) if self._time.shape[1] > self._BS_raw_data.shape[2]: self._time = self._time[:, :self._BS_raw_data.shape[2]] elif self._time.shape[1] < self._BS_raw_data.shape[2]: self._BS_raw_data = self._BS_raw_data[:, :, :self._time.shape[1]] else: self._time = self._time self._BS_raw_data = self._BS_raw_data self._time = self._time[:, :self._BS_raw_data.shape[2]] self._freq_text = np.array([str(f) + " MHz" for f in [np.round(f*1e-6, 2) for f in self._freq]]) def reshape_BS_raw_data(self): BS_raw_cross_section = np.reshape(self._BS_raw_data, (self._r.shape[1]*self._time.shape[1], len(self._freq)), order="F") return BS_raw_cross_section def reshape_r(self): r = np.zeros((self._r.shape[1]*self._time.shape[1], len(self._freq))) for i, _ in enumerate(self._freq): r[:, i] = np.repeat(self._r[i, :], self._time.shape[1]) return r def compute_r_2D(self): r2D = np.zeros((self._freq.shape[0], self._r.shape[1], self._time.shape[1])) for f, _ in enumerate(self._freq): r2D[f, :, :] = np.repeat(np.transpose(self._r[0, :])[:, np.newaxis], self._time.shape[1], axis=1) return r2D def reshape_t(self): t = np.zeros((self._r.shape[1]*self._time.shape[1], len(self._freq))) for i, _ in enumerate(self._freq): t[:, i] = np.repeat(self._time[i, :], self._r.shape[1]) return t def reshape_t_snr(self): t = np.zeros((self._r.shape[1]*self._time_snr.shape[1], len(self._freq))) for i, _ in enumerate(self._freq): t[:, i] = np.repeat(self._time_snr[i, :], self._r.shape[1]) return t # if __name__ == "__main__": # AcousticDataLoaderUBSediFlow(path_BS_raw_data0 + filename0)