# Window.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import csv import logging try: import rasterio import rasterio.control import rasterio.crs import rasterio.sample import rasterio.vrt import rasterio._features _rasterio_loaded = True except Exception as e: print(f"Module 'rasterio' is not available: {e}") _rasterio_loaded = False from functools import reduce from numpy import sqrt from datetime import datetime from tools import trace, timer, logger_exception from View.Tools.PamhyrWindow import PamhyrWindow from PyQt5.QtGui import ( QKeySequence, QIcon, QPixmap, ) from PyQt5.QtCore import ( Qt, QVariant, QAbstractTableModel, QCoreApplication, QModelIndex, pyqtSlot, QItemSelectionModel, QTimer, QSettings ) from PyQt5.QtWidgets import ( QDialogButtonBox, QPushButton, QLineEdit, QFileDialog, QTableView, QAbstractItemView, QUndoStack, QShortcut, QAction, QItemDelegate, QComboBox, QVBoxLayout, QHeaderView, QTabWidget, QSlider, QLabel, QWidget, QGridLayout, QTabBar, QInputDialog ) from Model.Results.Results import AdditionalData from View.Tools.Plot.PamhyrCanvas import MplCanvas from View.Tools.Plot.PamhyrToolbar import PamhyrPlotToolbar from View.Results.PlotXY import PlotXY from View.Results.PlotAC import PlotAC from View.Results.PlotRKC import PlotRKC from View.Results.PlotH import PlotH from View.Results.PlotSedReach import PlotSedReach from View.Results.PlotSedProfile import PlotSedProfile from View.Results.CustomPlot.Plot import CustomPlot from View.Results.CustomPlot.CustomPlotValuesSelectionDialog import ( CustomPlotValuesSelectionDialog, ) from View.Results.CustomExport.CustomExport import CustomExportDialog from View.Results.Table import TableModel from View.Results.translate import ( ResultsTranslate, CompareResultsTranslate ) from View.Results.CoordinatesDialog import CoordinatesDialog _translate = QCoreApplication.translate logger = logging.getLogger() class ResultsWindow(PamhyrWindow): _pamhyr_ui = "Results" _pamhyr_name = "Results" def _path_file(self, filename): return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "ui", "ressources", filename ) ) def __init__(self, study=None, config=None, results=None, parent=None, trad=None): self._solvers = [r._solver for r in results] self._results = results self._current_results = [0] if trad is None: trad = ResultsTranslate() name = ( trad[self._pamhyr_name] + " - " + study.name + " - " + " - ".join([r.solver_name for r in self._results]) ) super(ResultsWindow, self).__init__( title=name, study=study, config=config, trad=trad, parent=parent ) for r in results: self._hash_data.append(r._solver) self._hash_data.append(r._repertory) self._hash_data.append(r._name) self._additional_plot = {} try: self.get_timestamps() self.setup_slider() self.setup_table() self.setup_plots() self.setup_statusbar() self.setup_connections() except Exception as e: logger_exception(e) return self.update(reach_id=0, profile_id=[0]) self.update_table_selection_solver(0) def setup_table(self): self._table = {} for t in ["reach", "profile", "raw_data", "solver"]: table = self.find(QTableView, f"tableView_{t}") self._table[t] = TableModel( table_view=table, table_headers=self._trad.get_dict(f"table_headers_{t}"), data=self._results, undo=self._undo_stack, opt_data=t, parent=self ) self._table[t]._timestamp = self._timestamps[ self._slider_time.value()] if len(self._results) <= 1: table = self.find(QTableView, f"tableView_solver") table.hide() def setup_slider(self): default_reach = self._results[self._current_results[0]].river.reach(0) self._slider_time = self.find(QSlider, f"horizontalSlider_time") self._slider_time.setMaximum(len(self._timestamps) - 1) self._slider_time.setValue(len(self._timestamps) - 1) self._icon_start = QIcon() self._icon_start.addPixmap( QPixmap(self._path_file("media-playback-start.png")) ) self._icon_pause = QIcon() self._icon_pause.addPixmap( QPixmap(self._path_file("media-playback-pause.png")) ) self._button_play = self.find(QPushButton, f"playButton") self._button_play.setIcon(self._icon_start) self._button_back = self.find(QPushButton, f"backButton") self._button_next = self.find(QPushButton, f"nextButton") self._button_first = self.find(QPushButton, f"firstButton") self._button_last = self.find(QPushButton, f"lastButton") self._timer = QTimer(self) def setup_plots(self): self.canvas = MplCanvas(width=5, height=4, dpi=100) tab_widget = self.find(QTabWidget, f"tabWidget") tab_widget.setTabsClosable(True) tab_widget.tabCloseRequested.connect(self.delete_tab) tab_widget.tabBar().setTabButton(0, QTabBar.RightSide, None) tab_widget.tabBar().setTabButton(1, QTabBar.RightSide, None) tab_widget.tabBar().setTabButton(2, QTabBar.RightSide, None) self.canvas.setObjectName("canvas") self.toolbar = PamhyrPlotToolbar( self.canvas, self, items=[ "home", "move", "zoom", "save", "iso", "back/forward" ] ) self.plot_layout = self.find(QVBoxLayout, "verticalLayout") self.plot_layout.addWidget(self.toolbar) self.plot_layout.addWidget(self.canvas) self.plot_xy = PlotXY( canvas=self.canvas, results=self._results, reach_id=0, profile_id=0, res_id=self._current_results, trad=self._trad, toolbar=self.toolbar, display_current=True, parent=self ) self.plot_xy.draw() self.canvas_2 = MplCanvas(width=5, height=4, dpi=100) self.canvas_2.setObjectName("canvas_2") self.toolbar_2 = PamhyrPlotToolbar( self.canvas_2, self, items=[ "home", "move", "zoom", "save", "iso", "back/forward" ] ) self.plot_layout_2 = self.find(QVBoxLayout, "verticalLayout_2") self.plot_layout_2.addWidget(self.toolbar_2) self.plot_layout_2.addWidget(self.canvas_2) self.plot_rkc = PlotRKC( canvas=self.canvas_2, results=self._results, reach_id=0, profile_id=0, res_id=self._current_results, trad=self._trad, toolbar=self.toolbar_2, parent=self ) self.plot_rkc.draw() self.canvas_3 = MplCanvas(width=5, height=4, dpi=100) self.canvas_3.setObjectName("canvas_3") self.toolbar_3 = PamhyrPlotToolbar( self.canvas_3, self, items=[ "home", "move", "zoom", "save", "iso", "back/forward" ] ) self.plot_layout_3 = self.find(QVBoxLayout, "verticalLayout_3") self.plot_layout_3.addWidget(self.toolbar_3) self.plot_layout_3.addWidget(self.canvas_3) self.plot_ac = PlotAC( canvas=self.canvas_3, results=self._results, reach_id=0, profile_id=0, res_id=self._current_results, trad=self._trad, toolbar=self.toolbar_3, parent=self ) self.plot_ac.draw() self.canvas_4 = MplCanvas(width=5, height=4, dpi=100) self.canvas_4.setObjectName("canvas_4") self.toolbar_4 = PamhyrPlotToolbar( self.canvas_4, self, items=[ "home", "move", "zoom", "save", "iso", "back/forward" ] ) self.plot_layout_4 = self.find( QVBoxLayout, "verticalLayout_hydrograph") self.plot_layout_4.addWidget(self.toolbar_4) self.plot_layout_4.addWidget(self.canvas_4) self.plot_h = PlotH( canvas=self.canvas_4, results=self._results, reach_id=0, profile_id=[0], res_id=self._current_results, trad=self._trad, toolbar=self.toolbar_4, parent=self ) self.plot_h.draw() def closeEvent(self, event): try: self._timer.stop() except Exception as e: logger_exception(e) super(ResultsWindow, self).closeEvent(event) def _compute_status_label(self): # Timestamp ts = self._timestamps[self._slider_time.value()] t0 = datetime.fromtimestamp(0) fts = str( datetime.fromtimestamp(ts) - t0 ) fts.replace("days", _translate("Results", "days"))\ .replace("day", _translate("Results", "day")) # Reach table = self.find(QTableView, f"tableView_reach") indexes = table.selectedIndexes() if len(indexes) == 0: reach = self._study.river.enable_edges()[0] else: reach = self._study.river.enable_edges()[indexes[0].row()] # Profile table = self.find(QTableView, f"tableView_profile") indexes = table.selectedIndexes() if len(indexes) == 0: profile = reach.reach.profile(0) else: profile = reach.reach.profile(indexes[0].row()) pname = profile.name if profile.name != "" else profile.rk return (f"{self._trad['reach']}: {reach.name} | " + f"{self._trad['cross_section']}: {pname} | " + f"{self._trad['unit_time_s']} : {fts} ({ts} sec)") def setup_statusbar(self): txt = self._compute_status_label() self._status_label = QLabel(txt) self.statusbar.addPermanentWidget(self._status_label) def update_statusbar(self): txt = self._compute_status_label() self._status_label.setText(txt) def setup_connections(self): # Action actions = { "action_reload": self._reload, "action_add": self._add_custom_plot, "action_export": self._export, # "action_export": self.export_current, "action_import_data": self.import_data } if _rasterio_loaded: actions["action_Geo_tiff"] = self.import_geotiff else: self.find(QAction, "action_Geo_tiff")\ .setEnabled(False) if len(self._results) > 1: self.find(QAction, "action_reload").setEnabled(False) for action in actions: self.find(QAction, action).triggered.connect( actions[action] ) # Table and Plot fun = { "reach": self._set_current_reach, "profile": self._set_current_profile, "raw_data": self._set_current_profile_raw_data, "solver": self._set_current_results, } for t in ["reach", "profile", "raw_data", "solver"]: table = self.find(QTableView, f"tableView_{t}") table.selectionModel()\ .selectionChanged\ .connect(fun[t]) self._table[t].dataChanged.connect(fun[t]) self._slider_time.valueChanged.connect(self._set_current_timestamp) self._button_play.setChecked(False) self._button_play.clicked.connect(self._pause) self._button_back.clicked.connect(self._back) self._button_next.clicked.connect(self._next) self._button_first.clicked.connect(self._first) self._button_last.clicked.connect(self._last) self._timer.timeout.connect(self._update_slider) # tabs tab_widget = self.find(QTabWidget, f"tabWidget") tab_widget.currentChanged.connect(self.tab_changed) def update_table_selection_reach(self, ind): table = self.find(QTableView, f"tableView_reach") selectionModel = table.selectionModel() index = table.model().index(ind, 0) selectionModel.select( index, QItemSelectionModel.Rows | QItemSelectionModel.ClearAndSelect | QItemSelectionModel.Select ) table.scrollTo(index) self._table["profile"].update(ind) self._table["raw_data"].update(ind) def update_table_selection_profile(self, ind): for t in ["profile", "raw_data"]: table = self.find(QTableView, f"tableView_{t}") selectionModel = table.selectionModel() index = table.model().index(ind, 0) selectionModel.select( index, QItemSelectionModel.Rows | QItemSelectionModel.ClearAndSelect | QItemSelectionModel.Select ) table.scrollTo(index) def update_table_selection_solver(self, ind): table = self.find(QTableView, f"tableView_solver") selectionModel = table.selectionModel() index = table.model().index(ind, 0) selectionModel.select( index, QItemSelectionModel.Rows | QItemSelectionModel.ClearAndSelect | QItemSelectionModel.Select ) table.scrollTo(index) def update(self, reach_id=None, profile_id=None, solver_id=None, timestamp=None): if reach_id is not None: self.plot_xy.set_reach(reach_id) self.plot_ac.set_reach(reach_id) self.plot_rkc.set_reach(reach_id) self.plot_h.set_reach(reach_id) for plot in self._additional_plot: self._additional_plot[plot].set_reach(reach_id) self.update_table_selection_reach(reach_id) self.update_table_selection_profile(0) if profile_id is not None: self.plot_xy.set_profile(profile_id[0]) self.plot_ac.set_profile(profile_id[0]) self.plot_rkc.set_profile(profile_id[0]) self.plot_h.set_profile(profile_id) for plot in self._additional_plot: self._additional_plot[plot].set_profile(profile_id[0]) tab_widget = self.find(QTabWidget, f"tabWidget") if tab_widget.currentIndex() != 2: self.update_table_selection_profile(profile_id[0]) if solver_id is not None: self._current_results = solver_id self.plot_xy.set_result(solver_id) self.plot_ac.set_result(solver_id) self.plot_rkc.set_result(solver_id) self.plot_h.set_result(solver_id) for plot in self._additional_plot: self._additional_plot[plot].set_result(solver_id) if timestamp is not None: self.plot_xy.set_timestamp(timestamp) self.plot_ac.set_timestamp(timestamp) self.plot_rkc.set_timestamp(timestamp) self.plot_h.set_timestamp(timestamp) for plot in self._additional_plot: self._additional_plot[plot].set_timestamp(timestamp) self._table["raw_data"].timestamp = timestamp self.update_statusbar() def _get_current_reach(self): table = self.find(QTableView, f"tableView_reach") indexes = table.selectedIndexes() if len(indexes) == 0: return 0 return indexes[0].row() def _get_current_profile(self): table = self.find(QTableView, f"tableView_profile") indexes = table.selectedIndexes() if len(indexes) == 0: return 0 return indexes[0].row() def _get_current_profiles_list(self): table = self.find(QTableView, f"tableView_profile") indexes = table.selectedIndexes() if len(indexes) == 0: return [] return [i.row() for i in indexes] def _get_current_timestamp(self): return self._timestamps[ self._slider_time.value() ] def _set_current_reach(self): table = self.find(QTableView, f"tableView_reach") indexes = table.selectedIndexes() if len(indexes) == 0: return self.update(reach_id=indexes[0].row()) def _set_current_profile(self): table = self.find(QTableView, f"tableView_profile") indexes = table.selectionModel().selectedRows() if len(indexes) == 0: return self.update(profile_id=[i.row() for i in indexes]) def _set_current_profile_raw_data(self): return def _set_current_results(self): table = self.find(QTableView, f"tableView_solver") indexes = table.selectedIndexes() if len(indexes) == 0: return self.update(solver_id=[i.row() for i in indexes]) def _set_current_timestamp(self): timestamp = self._timestamps[self._slider_time.value()] self.update(timestamp=timestamp) def _reload_plots(self): self.plot_xy.results = self._results self.plot_ac.results = self._results self.plot_rkc.results = self._results self.plot_h.results = self._results self.plot_xy.draw() self.plot_ac.draw() self.plot_rkc.draw() self.plot_h.draw() def _reload_slider(self): self._slider_time = self.find(QSlider, f"horizontalSlider_time") self._slider_time.setMaximum(len(self._timestamps) - 1) self._slider_time.setValue(len(self._timestamps) - 1) def _reload(self): logger.debug("Reload results...") if len(self._results) == 1: self._results[0] = self._results[0].reload() self.get_timestamps() self._reload_plots() self._reload_slider() def _add_custom_plot(self): profile_id = self._get_current_profile() self.update_table_selection_profile(profile_id) dlg = CustomPlotValuesSelectionDialog(parent=self) if dlg.exec(): x, y, envelop = dlg.value self.create_new_tab_custom_plot(x, y, envelop) def create_new_tab_custom_plot(self, x: str, y: list, envelop: bool): name = f"{x}: {','.join(y)}" if envelop and x == "rk": name += "_envelop" wname = f"tab_custom_{x}_{y}" tab_widget = self.find(QTabWidget, f"tabWidget") # This plot already exists if name in [tab_widget.tabText(i) for i in range(tab_widget.count())]: tab_widget.setCurrentWidget( tab_widget.findChild(QWidget, wname) ) return widget = QWidget() grid = QGridLayout() widget.setObjectName(wname) canvas = MplCanvas(width=5, height=4, dpi=100) canvas.setObjectName(f"canvas_{x}_{y}") toolbar = PamhyrPlotToolbar( canvas, self ) plot = CustomPlot( x, y, envelop, self._get_current_reach(), self._get_current_profile(), self._get_current_timestamp(), data=self._results, canvas=canvas, res_id=self._current_results, toolbar=toolbar, parent=self, ) plot.draw() # Add plot to additional plot self._additional_plot[name] = plot grid.addWidget(toolbar, 0, 0) grid.addWidget(canvas, 1, 0) widget.setLayout(grid) tab_widget.addTab(widget, name) tab_widget.setCurrentWidget(widget) def _copy(self): # focus on raw data table if self.find(QTabWidget, f"tabWidget").currentIndex() == 0: table = self.find(QTableView, f"tableView_raw_data") model = self._table["raw_data"] coltext = [] for index in sorted(table.selectionModel().selectedRows()): row = index.row() rowtext = [] for column in range(model.columnCount()): i = model.createIndex(row, column) try: rowtext.append(model.data(i)) except AttributeError: rowtext.append("") coltext.append(rowtext) self.copyTableIntoClipboard(coltext) else: logger.info("TODO: copy") def _paste(self): logger.info("TODO: paste") def _undo(self): self._table.undo() def _redo(self): self._table.redo() # play / pause buttons def _update_slider(self): if self._slider_time.value() == self._slider_time.maximum(): self._slider_time.setValue(self._slider_time.minimum()) else: self._slider_time.setValue(self._slider_time.value()+1) def _next(self): self._slider_time.setValue(self._slider_time.value()+1) def _back(self): self._slider_time.setValue(self._slider_time.value()-1) def _first(self): self._slider_time.setValue(self._slider_time.minimum()) def _last(self): self._slider_time.setValue(self._slider_time.maximum()) def _pause(self): if self._button_play.isChecked(): self._button_next.setEnabled(False) self._button_back.setEnabled(False) self._button_first.setEnabled(False) self._button_last.setEnabled(False) self._timer.start(100) self._button_play.setIcon(self._icon_pause) else: self._timer.stop() self._button_next.setEnabled(True) self._button_back.setEnabled(True) self._button_first.setEnabled(True) self._button_last.setEnabled(True) self._button_play.setIcon(self._icon_start) def _export(self): profile_id = self._get_current_profile() self.update_table_selection_profile(profile_id) dlg = CustomExportDialog(parent=self) if dlg.exec(): x, y, envelop, solver_id = dlg.value else: return logger.debug( "Export custom plot for: " + f"{x} -> {','.join(y)}" ) self.file_dialog( select_file="AnyFile", callback=lambda f: self.export_to(f[0], x, y, envelop, solver_id), default_suffix=".csv", file_filter=[self._dict["file_csv"]], ) def export_to(self, filename, x, y, envelop, solver_id): results = self._results[solver_id] reach = results.river.reachs[self._get_current_reach()] first_line = [f"Study: {results.study.name}", f"Reach: {reach.name}"] if x == "rk": timestamp = self._get_current_timestamp() first_line.append(f"Time: {timestamp}s") val_dict = self._export_rk(timestamp, y, envelop, solver_id) elif x == "time": profile_id = self._get_current_profile() profile = reach.profile(profile_id) pname = profile.name if profile.name != "" else profile.rk first_line.append(f"Profile: {pname}") val_dict = self._export_time(profile_id, y, solver_id) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL) dict_x = self._trad.get_dict("values_x") header = [] writer.writerow(first_line) for text in val_dict.keys(): header.append(text) writer.writerow(header) for row in range(len(val_dict[dict_x[x]])): line = [] for var in val_dict.keys(): line.append(val_dict[var][row]) writer.writerow(line) def export_all(self, reach, directory, timestamps): name = reach.name name = name.replace(" ", "-") if len(timestamps) == 1: name = f"{name}_t{timestamps[0]}" file_name = os.path.join( directory, f"reach_{name}.csv" ) with open(file_name, 'w', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL) ts = timestamps[0] writer.writerow(self._table["raw_data"]._headers) for row in range(self._table["raw_data"].rowCount()): line = [] for column in range(self._table["raw_data"].columnCount()): index = self._table["raw_data"].index(row, column) line.append(self._table["raw_data"].data(index)) writer.writerow(line) def export_current(self): self.file_dialog( select_file="Directory", callback=lambda d: self.export_current_to(d[0]) ) def export_current_to(self, directory): results = self._results[self._current_results] reach = results.river.reachs[self._get_current_reach()] self.export_all(reach, directory, [self._get_current_timestamp()]) def delete_tab(self, index): tab_widget = self.find(QTabWidget, f"tabWidget") self._additional_plot.pop(tab_widget.tabText(index)) tab_widget.removeTab(index) def _export_rk(self, timestamp, y, envelop, solver_id): results = self._results[solver_id] reach = results.river.reachs[self._get_current_reach()] dict_x = self._trad.get_dict("values_x") dict_y = self._trad.get_dict("values_y") if self._current_results == 2: envelop = False reach1 = results[0].river.reachs[self._get_current_reach()] reach2 = results[1].river.reachs[self._get_current_reach()] if envelop: dict_y.update(self._trad.get_dict("values_y_envelop")) my_dict = {} my_dict[dict_x["rk"]] = reach.geometry.get_rk() if "bed_elevation" in y: if self._current_results != 2: zmin = reach.geometry.get_z_min() else: z_min1 = reach1.geometry.get_z_min() z_min2 = reach2.geometry.get_z_min() zmin = list( map( lambda x, y: x - y, z_min2, z_min2 ) ) my_dict[dict_y["bed_elevation"]] = zmin # if envelop and reach.has_sediment(): if "discharge" in y: my_dict[dict_y["discharge"]] = list( map( lambda p: p.get_ts_key(timestamp, "Q"), reach.profiles ) ) if envelop: my_dict[dict_y["min_discharge"]] = list( map( lambda p: min(p.get_key("Q")), reach.profiles ) ) my_dict[dict_y["max_discharge"]] = list( map( lambda p: max(p.get_key("Q")), reach.profiles ) ) if "water_elevation" in y: my_dict[dict_y["water_elevation"]] = list( map( lambda p: p.get_ts_key(timestamp, "Z"), reach.profiles ) ) if envelop: my_dict[dict_y["min_water_elevation"]] = list( map( lambda p: min(p.get_key("Z")), reach.profiles ) ) my_dict[dict_y["max_water_elevation"]] = list( map( lambda p: max(p.get_key("Z")), reach.profiles ) ) if "velocity" in y: my_dict[dict_y["velocity"]] = list( map( lambda p: p.get_ts_key(timestamp, "V"), reach.profiles ) ) if envelop: velocities = list(map( lambda p: p.get_key("V"), reach.profiles ) ) my_dict[dict_y["min_velocity"]] = [min(v) for v in velocities] my_dict[dict_y["max_velocity"]] = [max(v) for v in velocities] if "depth" in y: if self._current_results != 2: d = my_dict[dict_y["depth"]] = list( map( lambda p: p.geometry.max_water_depth( p.get_ts_key(timestamp, "Z")), reach.profiles ) ) else: d1 = list( map( lambda p: p.geometry.max_water_depth( p.get_ts_key(timestamp, "Z")), reach1.profiles ) ) d2 = list( map( lambda p: p.geometry.max_water_depth( p.get_ts_key(timestamp, "Z")), reach2.profiles ) ) d = list( map( lambda x, y: x - y, d1, d2 ) ) my_dict[dict_y["depth"]] = d if envelop: my_dict[dict_y["min_depth"]] = list(map( lambda p1, p2: p1 - p2, map( lambda p: min(p.get_key("Z")), reach.profiles ), reach.geometry.get_z_min() ) ) my_dict[dict_y["max_depth"]] = list(map( lambda p1, p2: p1 - p2, map( lambda p: max(p.get_key("Z")), reach.profiles ), reach.geometry.get_z_min() ) ) if "mean_depth" in y: if self._current_results != 2: d = list( map( lambda p: p.geometry.mean_water_depth( p.get_ts_key(timestamp, "Z")), reach.profiles ) ) else: d1 = list( map( lambda p: p.geometry.mean_water_depth( p.get_ts_key(timestamp, "Z")), reach1.profiles ) ) d2 = list( map( lambda p: p.geometry.mean_water_depth( p.get_ts_key(timestamp, "Z")), reach2.profiles ) ) d = list( map( lambda x, y: x - y, d1, d2 ) ) my_dict[dict_y["mean_depth"]] = d if "froude" in y: if self._current_results != 2: fr = my_dict[dict_y["froude"]] = list( map( lambda p: p.get_ts_key(timestamp, "V") / sqrt(9.81 * ( p.geometry.wet_area( p.get_ts_key(timestamp, "Z")) / p.geometry.wet_width( p.get_ts_key(timestamp, "Z")) )), reach.profiles ) ) else: fr1 = list( map( lambda p: p.get_ts_key(timestamp, "V") / sqrt(9.81 * ( p.geometry.wet_area( p.get_ts_key(timestamp, "Z")) / p.geometry.wet_width( p.get_ts_key(timestamp, "Z")) )), reach1.profiles ) ) fr2 = list( map( lambda p: p.get_ts_key(timestamp, "V") / sqrt(9.81 * ( p.geometry.wet_area( p.get_ts_key(timestamp, "Z")) / p.geometry.wet_width( p.get_ts_key(timestamp, "Z")) )), reach2.profiles ) ) fr = list( map( lambda x, y: x - y, fr1, fr2 ) ) my_dict[dict_y["froude"]] = fr if "wet_area" in y: if self._current_results != 2: wa = list( map( lambda p: p.geometry.wet_area( p.get_ts_key(timestamp, "Z")), reach.profiles ) ) else: wa1 = list( map( lambda p: p.geometry.wet_area( p.get_ts_key(timestamp, "Z")), reach1.profiles ) ) wa2 = list( map( lambda p: p.geometry.wet_area( p.get_ts_key(timestamp, "Z")), reach2.profiles ) ) wa = list( map( lambda x, y: x - y, wa1, wa2 ) ) my_dict[dict_y["wet_area"]] = wa return my_dict def _export_time(self, profile, y, solver_id): results = self._results[solver_id] reach = results.river.reachs[self._get_current_reach()] profile = reach.profile(profile) dict_x = self._trad.get_dict("values_x") dict_y = self._trad.get_dict("values_y") my_dict = {} my_dict[dict_x["time"]] = self._timestamps z = profile.get_key("Z") q = profile.get_key("Q") v = profile.get_key("V") if self._current_results == 2: reach1 = self._results[0].river.reach(self._reach) reach2 = self._results[1].river.reach(self._reach) profile1 = reach1.profile(self._profile) profile2 = reach2.profile(self._profile) q1 = profile1.get_key("Q") z1 = profile1.get_key("Z") v1 = profile1.get_key("V") q2 = profile2.get_key("Q") z2 = profile2.get_key("Z") v2 = profile2.get_key("V") if "bed_elevation" in y: if self._current_results != 2: z_min = [profile.geometry.z_min()] * len(self._timestamps) else: z_min1 = profile1.geometry.z_min() z_min2 = profile2.geometry.z_min() z_min = list( map( lambda ts: z_min1 - z_min2, self._timestamps ) ) my_dict[dict_y["bed_elevation"]] = z_min if "discharge" in y: my_dict[dict_y["discharge"]] = q if "water_elevation" in y: my_dict[dict_y["water_elevation"]] = z if "velocity" in y: my_dict[dict_y["velocity"]] = v if "depth" in y: if self._current_results != 2: d = list( map(lambda z: profile.geometry.max_water_depth(z), z) ) else: d1 = list( map(lambda z: profile1.geometry.max_water_depth(z), z1) ) d2 = list( map(lambda z: profile2.geometry.max_water_depth(z), z2) ) d = list( map( lambda x, y: x - y, d1, d2 ) ) my_dict[dict_y["depth"]] = d if "mean_depth" in y: if self._current_results != 2: d = list( map(lambda z: profile.geometry.mean_water_depth(z), z) ) else: d1 = list( map(lambda z: profile1.geometry.mean_water_depth(z), z1) ) d2 = list( map(lambda z: profile2.geometry.mean_water_depth(z), z2) ) d = list( map( lambda x, y: x - y, d1, d2 ) ) my_dict[dict_y["mean_depth"]] = d if "froude" in y: if self._current_results != 2: fr = my_dict[dict_y["froude"]] = list( map(lambda z, v: v / sqrt(9.81 * ( profile.geometry.wet_area(z) / profile.geometry.wet_width(z)) ), z, v) ) else: fr1 = list( map(lambda z, v: v / sqrt(9.81 * ( profile1.geometry.wet_area(z) / profile1.geometry.wet_width(z)) ), z1, v1) ) fr2 = list( map(lambda z, v: v / sqrt(9.81 * ( profile2.geometry.wet_area(z) / profile2.geometry.wet_width(z)) ), z2, v2) ) fr = list( map( lambda x, y: x - y, fr1, fr2 ) ) my_dict[dict_y["froude"]] = fr if "wet_area" in y: if self._current_results != 2: wa = list( map(lambda z: profile.geometry.wet_area(z), z) ) else: wa1 = list( map(lambda z: profile1.geometry.wet_area(z), z1) ) wa2 = list( map(lambda z: profile2.geometry.wet_area(z), z2) ) wa = list( map( lambda x, y: x - y, wa1, wa2 ) ) my_dict[dict_y["wet_area"]] = wa return my_dict def get_timestamps(self): self._timestamps = sorted(list(self._results[0].get("timestamps"))) def tab_changed(self, i): if i != 2: if len(self._get_current_profiles_list()) > 1: # unselect all profiles but the first one profile_id = self._get_current_profile() self.update_table_selection_profile(profile_id) def import_geotiff(self): options = QFileDialog.Options() settings = QSettings(QSettings.IniFormat, QSettings.UserScope, 'MyOrg', ) options |= QFileDialog.DontUseNativeDialog file_types = [ self._trad["file_geotiff"], self._trad["file_all"], ] filename, _ = QFileDialog.getOpenFileName( self, self._trad["open_file"], "", ";; ".join(file_types), options=options ) if filename != "": with rasterio.open(filename) as data: img = data.read() b = data.bounds[:] # b[0] left # b[1] bottom # b[2] right # b[3] top xlim = self.canvas.axes.get_xlim() ylim = self.canvas.axes.get_ylim() if b[2] > b[0] and b[1] < b[3]: self.canvas.axes.imshow(img.transpose((1, 2, 0)), extent=[b[0], b[2], b[1], b[3]]) else: dlg = CoordinatesDialog( xlim, ylim, trad=self._trad, parent=self ) if dlg.exec(): self.canvas.axes.imshow(img.transpose((1, 2, 0)), extent=dlg.values) self.plot_xy.idle() self.canvas.axes.set_xlim(xlim) self.canvas.axes.set_ylim(ylim) return def import_data(self): file_types = [ self._trad["file_csv"], self._trad["file_all"], ] self.file_dialog( select_file="Existing_file", callback=lambda f: self.read_csv_file(f[0]), default_suffix=".csv", file_filter=file_types, ) def read_csv_file(self, filename): if filename == "": return x, y = self.read_csv_file_data(filename) data = self.read_csv_file_format(x, y) results = self._results[self._current_results[0]] data_lst = results.get("additional_data") data_lst.append( AdditionalData( study=self._study, data=data ) ) self.update_plot_additional_data() def read_csv_file_data(self, filename): sep = "," x = [] y = [] with open(filename, 'r', newline='') as f: lines = f.readlines() for line in lines: if line[0] != "*" and line[0] != "#" and line[0] != "$": row = line.split(sep) if len(row) < 2: continue try: fx, fy = float(row[0]), float(row[1]) x.append(fx) y.append(fy) except Exception as e: continue return x, y def read_csv_file_format(self, x, y): data_type_lst = ['Q(t)', 'Z(t)', 'Z(x)'] data_type, ok = QInputDialog.getItem( self, 'Data type', 'Chose the type of data:', data_type_lst ) if not ok: return legend, ok = QInputDialog.getText(self, 'Legend', 'Legend:') if not ok: return if legend.strip() == '': legend = '*' tmp_dict = {'Z': 'water_elevation', 'Q': 'discharge', 'x': 'rk', 't': 'time'} tmp_unit = {'Z': ' (m)', 'Q': ' (m³/s)'} data = { 'type_x': tmp_dict[data_type[2]], 'type_y': tmp_dict[data_type[0]], 'legend': legend, 'unit': tmp_unit[data_type[0]], 'x': x, 'y': y } return data def update_plot_additional_data(self): results = self._results[self._current_results[0]] for data in results.get("additional_data"): data = data._data x, y = data['x'], data['y'] legend = data['legend'] unit = data['unit'] if ( data['type_x'] == 'water_elevation' and data['type_y'] == 'time' ): line = self.canvas_2.axes.plot( x, y, marker="+", label=legend + ' ' + unit ) self.plot_rkc.canvas.draw_idle() self.plot_rkc.update_idle() if data['type_x'] == 'discharge' and data['type_y'] == 'time': line = self.canvas_4.axes.plot( x, y, marker="+", label=legend + ' ' + unit ) self.plot_h._line.append(line) self.plot_h.enable_legend() self.plot_h.canvas.draw_idle() self.plot_h.update_idle() for p in self._additional_plot: self._additional_plot[p].add_imported_plot(data)