# PlotSedAdisDx.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging import numpy as np from functools import reduce from datetime import datetime from tools import timer, trace from View.Tools.PamhyrPlot import PamhyrPlot from PyQt5.QtCore import ( QCoreApplication ) _translate = QCoreApplication.translate logger = logging.getLogger() class PlotAdis_dx(PamhyrPlot): def __init__(self, canvas=None, trad=None, toolbar=None, results=None, reach_id=0, profile_id=0, pol_id=0, key="C", parent=None): super(PlotAdis_dx, self).__init__( canvas=canvas, trad=trad, data=results, toolbar=toolbar, parent=parent ) self._current_timestamp = max(results.get("timestamps")) self._current_reach_id = reach_id self._current_profile_id = profile_id self._current_pol_id = pol_id self._key = key self.label_x = self._trad["unit_pk"] self._available_values_y = self._trad.get_dict("values_y_pol") self.label = {} self.label["C"] = _translate("Results", "Concentration") self.label["M"] = _translate("Results", "Mass") self.label_max = {} self.label_max["C"] = _translate("Results", "Max Concentration") self.label_max["M"] = _translate("Results", "Max Mass") self.sed_id = {} self.sed_id["C"] = 0 self.sed_id["M"] = 1 self.label_y = self._available_values_y["unit_"+self._key] self._isometric_axis = False @property def results(self): return self.data @results.setter def results(self, results): self.data = results self._current_timestamp = max(results.get("timestamps")) @timer def draw(self, highlight=None): self.init_axes() if self.results is None: return reach = self.results.river.reach(self._current_reach_id) profile = reach.profile(self._current_profile_id) if reach.geometry.number_profiles == 0: self._init = False return self.draw_max(reach) self.draw_data(reach, profile) self.draw_current(reach) self.draw_profiles_hs(reach) self.enable_legend() self.set_y_lim() self.idle() self.update_current() self._init = True def draw_profiles_hs(self, reach): lhs = filter( lambda hs: hs._input_reach.reach is reach.geometry, filter( lambda hs: hs._input_reach is not None, self.results.study.river.hydraulic_structures.lst ) ) for hs in lhs: x = hs.input_rk z_min = reach.geometry.get_z_min() z_max = reach.geometry.get_z_max() self.canvas.axes.plot( [x, x], [min(z_min), max(z_max)], linestyle="--", lw=1., color=self.color_plot_previous, ) self.canvas.axes.annotate( " > " + hs.name, (x, max(z_max)), horizontalalignment='left', verticalalignment='top', annotation_clip=True, fontsize=9, color=self.color_plot_previous, ) def draw_data(self, reach, profile): profile = reach.profile(self._current_profile_id) x = reach.geometry.get_rk() y = list(map( lambda p: p.get_ts_key(self._current_timestamp, "pols")[self._current_pol_id][self.sed_id[self._key]], reach.profiles )) self.set_y_lim() self._line, = self.canvas.axes.plot( x, y, label=self.label[self._key], color=self.color_plot, **self.plot_default_kargs ) def draw_current(self, reach): rk = reach.geometry.get_rk() self._current, = self.canvas.axes.plot( [rk[self._current_profile_id], rk[self._current_profile_id]], [self.y_max, self.y_min], color="grey", linestyle="dashed", lw=1. ) def draw_max(self, reach): profile = reach.profile(self._current_profile_id) x = reach.geometry.get_rk() y = [] for p in reach.profiles: rk_y = list(map(lambda data_el: data_el[self._current_pol_id][self.sed_id[self._key]], p.get_key("pols") )) y.append(np.max(rk_y)) self._line_max, = self.canvas.axes.plot( x, y, label=self.label_max[self._key], color=self.color_plot_highlight, linestyle='dotted', lw=1., markersize=0 ) def set_reach(self, reach_id): self._current_reach_id = reach_id self._current_profile_id = 0 self.draw() def set_profile(self, profile_id): self._current_profile_id = profile_id self.update_current() self.update_idle() def set_timestamp(self, timestamp): self._current_timestamp = timestamp self.update() def update(self): if not self._init: self.draw() self.update_data() self.update_idle() def update_data(self): reach = self.results.river.reach(self._current_reach_id) profile = reach.profile(self._current_profile_id) x = reach.geometry.get_rk() y = list(map( lambda p: p.get_ts_key(self._current_timestamp, "pols")[self._current_pol_id][self.sed_id[self._key]], reach.profiles )) self._line.set_data(x, y) self.update_current() self.hide_current() self.canvas.axes.relim(visible_only=True) self.canvas.axes.autoscale_view() self.show_current() def update_current(self): reach = self.results.river.reach(self._current_reach_id) rk = reach.geometry.get_rk() cid = self._current_profile_id self._current.set_data( [rk[cid], rk[cid]], [self.y_max, self.y_min] ) self.canvas.figure.canvas.draw_idle() def hide_current(self): self._current.set_visible(False) def show_current(self): self._current.set_visible(True) def set_pollutant(self, pol_id): if pol_id != self._current_pol_id: self._current_pol_id = pol_id self.update() def set_y_lim(self): reach = self.results.river.reach(self._current_reach_id) y = list( map(lambda p: list(map(lambda data_el: data_el[self._current_pol_id][self.sed_id[self._key]], p.get_key("pols")) ), reach.profiles ) ) self.y_max = np.max(y) self.y_min = np.min(y) if self.y_max - self.y_min < 1: self.y_min = (self.y_max + self.y_min)/2 - 0.5 self.y_max = (self.y_max + self.y_min)/2 + 0.5 self.canvas.axes.set_ylim(self.y_min, self.y_max)