acoused/Model/acoustic_data_loader_UBSedi...

251 lines
12 KiB
Python

# ============================================================================== #
# acoustic_data_loder_UBSediFlow.py - AcouSed #
# Copyright (C) 2024 INRAE #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# by Brahim MOUDJED #
# ============================================================================== #
# -*- coding: utf-8 -*-
import numpy as np
from Model.udt_extract.raw_extract import raw_extract
class AcousticDataLoaderUBSediFlow:
def __init__(self, path_BS_raw_data: str):
self.path_BS_raw_data = path_BS_raw_data
# --- Extract Backscatter acoustic raw data with class ---
# Extraction function:
device_name, time_begin, time_end, param_us_dicts, data_us_dicts, data_dicts, settings_dict \
= raw_extract(self.path_BS_raw_data)
# # --- Date and Hour of measurements read on udt data file ---
self._date = time_begin.date()
self._hour = time_begin.time()
self._freq = np.array([[]])
self._nb_profiles = []
self._nb_profiles_per_sec = []
self._nb_cells = []
self._cell_size = []
self._pulse_length = []
self._nb_pings_per_sec = []
self._nb_pings_averaged_per_profile = []
self._kt = []
self._gain_rx = []
self._gain_tx = []
self._r = np.array([[]])
self._time = np.array([[]])
self._time_snr = np.array([[]])
self._BS_raw_data = np.array([[[]]])
time_len = []
for config in param_us_dicts.keys():
for channel in param_us_dicts[config].keys():
# --- Frequencies ---
self._freq = np.append(self._freq, param_us_dicts[config][channel]['f0'])
self._nb_cells = np.append(self._nb_cells, param_us_dicts[config][channel]['n_cell'])
self._cell_size = np.append(self._cell_size, param_us_dicts[config][channel]['r_dcell'])
self._pulse_length = np.append(self._pulse_length, 0)
self._nb_pings_per_sec = np.append(self._nb_pings_per_sec, param_us_dicts[config][channel]['prf'])
self._nb_pings_averaged_per_profile = np.append(self._nb_pings_averaged_per_profile, param_us_dicts[config][channel]['n_p'])
self._kt = np.append(self._kt, 0)
self._gain_rx = np.append(self._gain_rx, param_us_dicts[config][channel]['a0'])
self._gain_tx = np.append(self._gain_tx, param_us_dicts[config][channel]['a1'])
# --- Depth for each frequencies ---
depth = [param_us_dicts[config][channel]['r_dcell'] * i
for i in list(range(param_us_dicts[config][channel]['n_cell']))]
if self._r.shape[1] == 0:
self._r = np.array([depth])
else:
if len(depth) == self._r.shape[1]:
self._r = np.append(self._r, np.array([depth]), axis=0)
elif len(depth) < self._r.shape[1]:
self._r = self._r[:, :len(depth)]
self._r = np.append(self._r, np.array([depth]), axis=0)
elif len(depth) > self._r.shape[1]:
self._r = np.append(self._r, np.array([depth[:self._r.shape[1]]]), axis=0)
# --- BS Time for each frequencies ---
time = [[(t - data_us_dicts[config][channel]['echo_avg_profile']['time'][0]).total_seconds()
for t in data_us_dicts[config][channel]['echo_avg_profile']['time']]]
time_len = np.append(time_len, len(time[0]))
if len(time_len) == 1:
self._time = np.array(time)
elif self._time.shape[1] == len(time[0]):
self._time = np.append(self._time, time, axis=0)
elif self._time.shape[1] > len(time[0]):
self._time = np.delete(self._time,
[int(np.min(time_len)) + int(i) - 1 for i in range(1, int(np.max(time_len))-int(np.min(time_len))+1)],
axis=1)
self._time = np.append(self._time, time, axis=0)
elif self._time.shape[1] < len(time[0]):
time = time[:int(np.max(time_len)) - (int(np.max(time_len)) - int(np.min(time_len)))]
self._time = np.append(self._time, time, axis=0)
self._nb_profiles = np.append(self._nb_profiles, self._time.shape[1])
self._nb_profiles_per_sec = np.append(self._nb_profiles_per_sec,
param_us_dicts[config][channel]['n_avg'])
# --- US Backscatter raw signal ---
BS_data = np.array([[]])
if config == 1:
BS_data = np.array([data_us_dicts[config][channel]['echo_avg_profile']['data'][0]])
for i in range(self._time.shape[1]):
BS_data = np.append(BS_data,
np.array(
[data_us_dicts[config][channel]['echo_avg_profile']['data'][i]]),
axis=0)
self._BS_raw_data = np.array([BS_data[:self._time.shape[1], :].transpose()])
else:
BS_data = np.array([data_us_dicts[config][channel]['echo_avg_profile']['data'][0]])
for j in range(self._time.shape[1]):
BS_data = np.append(BS_data,
np.array(
[data_us_dicts[config][channel]['echo_avg_profile']['data'][j]]),
axis=0)
BS_data = np.array([BS_data.transpose()])
# 1- time shape > BS data shape
# <=> data recorded with the frequency are longer than data recorded with the other lower frequencies
if (BS_data.shape[2] > self._BS_raw_data.shape[2]):
if (BS_data.shape[1] > self._BS_raw_data.shape[1]):
# print(f"BS_data shape[0] = {BS_data.shape[0]}")
# print(f"BS_raw_data shape[2] = {BS_raw_data.shape[2]}")
self._BS_raw_data = np.append(self._BS_raw_data,
BS_data[:, :self._BS_raw_data.shape[1], :self._BS_raw_data.shape[2]],
axis=0)
elif (BS_data.shape[1] < self._BS_raw_data.shape[1]):
self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], :],
BS_data[:, :, :self._BS_raw_data.shape[2]],
axis=0)
else:
self._BS_raw_data = np.append(self._BS_raw_data,
BS_data[:, :, :self._BS_raw_data.shape[2]],
axis=0)
# 2- time shape < BS data shape
# <=> data recorded with the frequency are shorter than data recorded with the other lower frequencies
elif BS_data.shape[2] < self._BS_raw_data.shape[2]:
if (BS_data.shape[1] > self._BS_raw_data.shape[1]):
self._BS_raw_data = np.append(self._BS_raw_data[:, :, BS_data.shape[2]],
BS_data[:, :self._BS_raw_data.shape[1], :],
axis=0)
elif (BS_data.shape[1] < self._BS_raw_data.shape[1]):
self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], BS_data.shape[2]],
BS_data,
axis=0)
else:
self._BS_raw_data = np.append(self._BS_raw_data[:, :, BS_data.shape[0]],
BS_data,
axis=0)
# 3- time shape = BS data shape
# <=> data recorded with the frequency have the same duration than data recorded with the other lower frequency
else:
if (BS_data.shape[1] > self._BS_raw_data.shape[1]):
self._BS_raw_data = np.append(self._BS_raw_data,
BS_data[:, :self._BS_raw_data.shape[1], :], axis=0)
elif (BS_data.shape[1] < self._BS_raw_data.shape[1]):
self._BS_raw_data = np.append(self._BS_raw_data[:, :BS_data.shape[1], :],
BS_data, axis=0)
else:
self._BS_raw_data = np.append(self._BS_raw_data,
BS_data, axis=0)
if self._time.shape[1] > self._BS_raw_data.shape[2]:
self._time = self._time[:, :self._BS_raw_data.shape[2]]
elif self._time.shape[1] < self._BS_raw_data.shape[2]:
self._BS_raw_data = self._BS_raw_data[:, :, :self._time.shape[1]]
else:
self._time = self._time
self._BS_raw_data = self._BS_raw_data
self._time = self._time[:, :self._BS_raw_data.shape[2]]
self._freq_text = np.array([str(f) + " MHz" for f in [np.round(f*1e-6, 2) for f in self._freq]])
def reshape_BS_raw_data(self):
BS_raw_cross_section = np.reshape(self._BS_raw_data,
(self._r.shape[1]*self._time.shape[1], len(self._freq)),
order="F")
return BS_raw_cross_section
def reshape_r(self):
r = np.zeros((self._r.shape[1]*self._time.shape[1], len(self._freq)))
for i, _ in enumerate(self._freq):
r[:, i] = np.repeat(self._r[i, :], self._time.shape[1])
return r
def compute_r_2D(self):
r2D = np.zeros((self._freq.shape[0], self._r.shape[1], self._time.shape[1]))
for f, _ in enumerate(self._freq):
r2D[f, :, :] = np.repeat(np.transpose(self._r[0, :])[:, np.newaxis], self._time.shape[1], axis=1)
return r2D
def reshape_t(self):
t = np.zeros((self._r.shape[1]*self._time.shape[1], len(self._freq)))
for i, _ in enumerate(self._freq):
t[:, i] = np.repeat(self._time[i, :], self._r.shape[1])
return t
def reshape_t_snr(self):
t = np.zeros((self._r.shape[1]*self._time_snr.shape[1], len(self._freq)))
for i, _ in enumerate(self._freq):
t[:, i] = np.repeat(self._time_snr[i, :], self._r.shape[1])
return t
# if __name__ == "__main__":
# AcousticDataLoaderUBSediFlow(path_BS_raw_data0 + filename0)