Pamhyr2/src/View/Results/Window.py

1228 lines
41 KiB
Python

# Window.py -- Pamhyr
# Copyright (C) 2023-2025 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import csv
import logging
try:
import rasterio
import rasterio.control
import rasterio.crs
import rasterio.sample
import rasterio.vrt
import rasterio._features
_rasterio_loaded = True
except Exception as e:
print(f"Module 'rasterio' is not available: {e}")
_rasterio_loaded = False
from numpy import sqrt
from datetime import datetime
from tools import trace, timer, logger_exception
from View.Tools.PamhyrWindow import PamhyrWindow
from PyQt5.QtGui import (
QKeySequence, QIcon, QPixmap,
)
from PyQt5.QtCore import (
Qt, QVariant, QAbstractTableModel,
QCoreApplication, QModelIndex, pyqtSlot,
QItemSelectionModel, QTimer,
QSettings
)
from PyQt5.QtWidgets import (
QDialogButtonBox, QPushButton, QLineEdit,
QFileDialog, QTableView, QAbstractItemView,
QUndoStack, QShortcut, QAction, QItemDelegate,
QComboBox, QVBoxLayout, QHeaderView, QTabWidget,
QSlider, QLabel, QWidget, QGridLayout, QTabBar
)
from View.Tools.Plot.PamhyrCanvas import MplCanvas
from View.Tools.Plot.PamhyrToolbar import PamhyrPlotToolbar
from View.Results.PlotXY import PlotXY
from View.Results.PlotAC import PlotAC
from View.Results.PlotRKC import PlotRKC
from View.Results.PlotH import PlotH
from View.Results.PlotSedReach import PlotSedReach
from View.Results.PlotSedProfile import PlotSedProfile
from View.Results.CustomPlot.Plot import CustomPlot
from View.Results.CustomPlot.CustomPlotValuesSelectionDialog import (
CustomPlotValuesSelectionDialog,
)
from View.Results.CustomExport.CustomExport import CustomExportDialog
from View.Results.Table import TableModel
from View.Results.translate import (
ResultsTranslate,
CompareResultsTranslate
)
from View.Results.CoordinatesDialog import CoordinatesDialog
_translate = QCoreApplication.translate
logger = logging.getLogger()
class ResultsWindow(PamhyrWindow):
_pamhyr_ui = "Results"
_pamhyr_name = "Results"
def _path_file(self, filename):
return os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..", "ui", "ressources", filename
)
)
def __init__(self, study=None, config=None,
results=None, parent=None, trad=None):
self._solvers = [r._solver for r in results]
self._results = results
self._current_results = [0]
if trad is None:
trad = ResultsTranslate()
name = (
trad[self._pamhyr_name] + " - "
+ study.name + " - "
+ " - ".join([r.solver_name for r in self._results])
)
super(ResultsWindow, self).__init__(
title=name,
study=study,
config=config,
trad=trad,
parent=parent
)
for r in results:
self._hash_data.append(r._solver)
self._hash_data.append(r._repertory)
self._hash_data.append(r._name)
self._additional_plot = {}
try:
self.get_timestamps()
self.setup_slider()
self.setup_table()
self.setup_plots()
self.setup_statusbar()
self.setup_connections()
except Exception as e:
logger_exception(e)
return
self.update(reach_id=0,
profile_id=[0])
self.update_table_selection_solver(0)
def setup_table(self):
self._table = {}
for t in ["reach", "profile", "raw_data", "solver"]:
table = self.find(QTableView, f"tableView_{t}")
self._table[t] = TableModel(
table_view=table,
table_headers=self._trad.get_dict(f"table_headers_{t}"),
data=self._results,
undo=self._undo_stack,
opt_data=t,
parent=self
)
self._table[t]._timestamp = self._timestamps[
self._slider_time.value()]
if len(self._results) <= 1:
table = self.find(QTableView, f"tableView_solver")
table.hide()
def setup_slider(self):
default_reach = self._results[self._current_results[0]].river.reach(0)
self._slider_time = self.find(QSlider, f"horizontalSlider_time")
self._slider_time.setMaximum(len(self._timestamps) - 1)
self._slider_time.setValue(len(self._timestamps) - 1)
self._icon_start = QIcon()
self._icon_start.addPixmap(
QPixmap(self._path_file("media-playback-start.png"))
)
self._icon_pause = QIcon()
self._icon_pause.addPixmap(
QPixmap(self._path_file("media-playback-pause.png"))
)
self._button_play = self.find(QPushButton, f"playButton")
self._button_play.setIcon(self._icon_start)
self._button_back = self.find(QPushButton, f"backButton")
self._button_next = self.find(QPushButton, f"nextButton")
self._button_first = self.find(QPushButton, f"firstButton")
self._button_last = self.find(QPushButton, f"lastButton")
self._timer = QTimer(self)
def setup_plots(self):
self.canvas = MplCanvas(width=5, height=4, dpi=100)
tab_widget = self.find(QTabWidget, f"tabWidget")
tab_widget.setTabsClosable(True)
tab_widget.tabCloseRequested.connect(self.delete_tab)
tab_widget.tabBar().setTabButton(0, QTabBar.RightSide, None)
tab_widget.tabBar().setTabButton(1, QTabBar.RightSide, None)
tab_widget.tabBar().setTabButton(2, QTabBar.RightSide, None)
self.canvas.setObjectName("canvas")
self.toolbar = PamhyrPlotToolbar(
self.canvas, self, items=[
"home", "move", "zoom", "save",
"iso", "back/forward"
]
)
self.plot_layout = self.find(QVBoxLayout, "verticalLayout")
self.plot_layout.addWidget(self.toolbar)
self.plot_layout.addWidget(self.canvas)
self.plot_xy = PlotXY(
canvas=self.canvas,
results=self._results,
reach_id=0,
profile_id=0,
res_id=self._current_results,
trad=self._trad,
toolbar=self.toolbar,
display_current=True,
parent=self
)
self.plot_xy.draw()
self.canvas_2 = MplCanvas(width=5, height=4, dpi=100)
self.canvas_2.setObjectName("canvas_2")
self.toolbar_2 = PamhyrPlotToolbar(
self.canvas_2, self, items=[
"home", "move", "zoom", "save",
"iso", "back/forward"
]
)
self.plot_layout_2 = self.find(QVBoxLayout, "verticalLayout_2")
self.plot_layout_2.addWidget(self.toolbar_2)
self.plot_layout_2.addWidget(self.canvas_2)
self.plot_rkc = PlotRKC(
canvas=self.canvas_2,
results=self._results,
reach_id=0,
profile_id=0,
res_id=self._current_results,
trad=self._trad,
toolbar=self.toolbar_2,
parent=self
)
self.plot_rkc.draw()
self.canvas_3 = MplCanvas(width=5, height=4, dpi=100)
self.canvas_3.setObjectName("canvas_3")
self.toolbar_3 = PamhyrPlotToolbar(
self.canvas_3, self, items=[
"home", "move", "zoom", "save",
"iso", "back/forward"
]
)
self.plot_layout_3 = self.find(QVBoxLayout, "verticalLayout_3")
self.plot_layout_3.addWidget(self.toolbar_3)
self.plot_layout_3.addWidget(self.canvas_3)
self.plot_ac = PlotAC(
canvas=self.canvas_3,
results=self._results,
reach_id=0,
profile_id=0,
res_id=self._current_results,
trad=self._trad,
toolbar=self.toolbar_3,
parent=self
)
self.plot_ac.draw()
self.canvas_4 = MplCanvas(width=5, height=4, dpi=100)
self.canvas_4.setObjectName("canvas_4")
self.toolbar_4 = PamhyrPlotToolbar(
self.canvas_4, self, items=[
"home", "move", "zoom", "save",
"iso", "back/forward"
]
)
self.plot_layout_4 = self.find(
QVBoxLayout, "verticalLayout_hydrograph")
self.plot_layout_4.addWidget(self.toolbar_4)
self.plot_layout_4.addWidget(self.canvas_4)
self.plot_h = PlotH(
canvas=self.canvas_4,
results=self._results,
reach_id=0,
profile_id=[0],
res_id=self._current_results,
trad=self._trad,
toolbar=self.toolbar_4,
parent=self
)
self.plot_h.draw()
def closeEvent(self, event):
try:
self._timer.stop()
except Exception as e:
logger_exception(e)
super(ResultsWindow, self).closeEvent(event)
def _compute_status_label(self):
# Timestamp
ts = self._timestamps[self._slider_time.value()]
t0 = datetime.fromtimestamp(0)
fts = str(
datetime.fromtimestamp(ts) - t0
)
fts.replace("days", _translate("Results", "days"))\
.replace("day", _translate("Results", "day"))
# Reach
table = self.find(QTableView, f"tableView_reach")
indexes = table.selectedIndexes()
if len(indexes) == 0:
reach = self._study.river.enable_edges()[0]
else:
reach = self._study.river.enable_edges()[indexes[0].row()]
# Profile
table = self.find(QTableView, f"tableView_profile")
indexes = table.selectedIndexes()
if len(indexes) == 0:
profile = reach.reach.profile(0)
else:
profile = reach.reach.profile(indexes[0].row())
pname = profile.name if profile.name != "" else profile.rk
return (f"{self._trad['reach']}: {reach.name} | " +
f"{self._trad['cross_section']}: {pname} | " +
f"{self._trad['unit_time_s']} : {fts} ({ts} sec)")
def setup_statusbar(self):
txt = self._compute_status_label()
self._status_label = QLabel(txt)
self.statusbar.addPermanentWidget(self._status_label)
def update_statusbar(self):
txt = self._compute_status_label()
self._status_label.setText(txt)
def setup_connections(self):
# Action
actions = {
"action_reload": self._reload,
"action_add": self._add_custom_plot,
"action_export": self._export,
# "action_export": self.export_current,
}
if _rasterio_loaded:
actions["action_Geo_tiff"] = self.import_geotiff
else:
self.find(QAction, "action_Geo_tiff")\
.setEnabled(False)
if len(self._results) > 1:
self.find(QAction, "action_reload").setEnabled(False)
for action in actions:
self.find(QAction, action).triggered.connect(
actions[action]
)
# Table and Plot
fun = {
"reach": self._set_current_reach,
"profile": self._set_current_profile,
"raw_data": self._set_current_profile_raw_data,
"solver": self._set_current_results,
}
for t in ["reach", "profile", "raw_data", "solver"]:
table = self.find(QTableView, f"tableView_{t}")
table.selectionModel()\
.selectionChanged\
.connect(fun[t])
self._table[t].dataChanged.connect(fun[t])
self._slider_time.valueChanged.connect(self._set_current_timestamp)
self._button_play.setChecked(False)
self._button_play.clicked.connect(self._pause)
self._button_back.clicked.connect(self._back)
self._button_next.clicked.connect(self._next)
self._button_first.clicked.connect(self._first)
self._button_last.clicked.connect(self._last)
self._timer.timeout.connect(self._update_slider)
# tabs
tab_widget = self.find(QTabWidget, f"tabWidget")
tab_widget.currentChanged.connect(self.tab_changed)
def update_table_selection_reach(self, ind):
table = self.find(QTableView, f"tableView_reach")
selectionModel = table.selectionModel()
index = table.model().index(ind, 0)
selectionModel.select(
index,
QItemSelectionModel.Rows |
QItemSelectionModel.ClearAndSelect |
QItemSelectionModel.Select
)
table.scrollTo(index)
self._table["profile"].update(ind)
self._table["raw_data"].update(ind)
def update_table_selection_profile(self, ind):
for t in ["profile", "raw_data"]:
table = self.find(QTableView, f"tableView_{t}")
selectionModel = table.selectionModel()
index = table.model().index(ind, 0)
selectionModel.select(
index,
QItemSelectionModel.Rows |
QItemSelectionModel.ClearAndSelect |
QItemSelectionModel.Select
)
table.scrollTo(index)
def update_table_selection_solver(self, ind):
table = self.find(QTableView, f"tableView_solver")
selectionModel = table.selectionModel()
index = table.model().index(ind, 0)
selectionModel.select(
index,
QItemSelectionModel.Rows |
QItemSelectionModel.ClearAndSelect |
QItemSelectionModel.Select
)
table.scrollTo(index)
def update(self,
reach_id=None,
profile_id=None,
solver_id=None,
timestamp=None):
if reach_id is not None:
self.plot_xy.set_reach(reach_id)
self.plot_ac.set_reach(reach_id)
self.plot_rkc.set_reach(reach_id)
self.plot_h.set_reach(reach_id)
for plot in self._additional_plot:
self._additional_plot[plot].set_reach(reach_id)
self.update_table_selection_reach(reach_id)
self.update_table_selection_profile(0)
if profile_id is not None:
self.plot_xy.set_profile(profile_id[0])
self.plot_ac.set_profile(profile_id[0])
self.plot_rkc.set_profile(profile_id[0])
self.plot_h.set_profile(profile_id)
for plot in self._additional_plot:
self._additional_plot[plot].set_profile(profile_id[0])
tab_widget = self.find(QTabWidget, f"tabWidget")
if tab_widget.currentIndex() != 2:
self.update_table_selection_profile(profile_id[0])
if solver_id is not None:
self._current_results = solver_id
self.plot_xy.set_result(solver_id)
self.plot_ac.set_result(solver_id)
self.plot_rkc.set_result(solver_id)
self.plot_h.set_result(solver_id)
for plot in self._additional_plot:
self._additional_plot[plot].set_result(solver_id)
if timestamp is not None:
self.plot_xy.set_timestamp(timestamp)
self.plot_ac.set_timestamp(timestamp)
self.plot_rkc.set_timestamp(timestamp)
self.plot_h.set_timestamp(timestamp)
for plot in self._additional_plot:
self._additional_plot[plot].set_timestamp(timestamp)
self._table["raw_data"].timestamp = timestamp
self.update_statusbar()
def _get_current_reach(self):
table = self.find(QTableView, f"tableView_reach")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return 0
return indexes[0].row()
def _get_current_profile(self):
table = self.find(QTableView, f"tableView_profile")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return 0
return indexes[0].row()
def _get_current_profiles_list(self):
table = self.find(QTableView, f"tableView_profile")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return []
return [i.row() for i in indexes]
def _get_current_timestamp(self):
return self._timestamps[
self._slider_time.value()
]
def _set_current_reach(self):
table = self.find(QTableView, f"tableView_reach")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return
self.update(reach_id=indexes[0].row())
def _set_current_profile(self):
table = self.find(QTableView, f"tableView_profile")
indexes = table.selectionModel().selectedRows()
if len(indexes) == 0:
return
self.update(profile_id=[i.row() for i in indexes])
def _set_current_profile_raw_data(self):
return
def _set_current_results(self):
table = self.find(QTableView, f"tableView_solver")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return
self.update(solver_id=[i.row() for i in indexes])
def _set_current_timestamp(self):
timestamp = self._timestamps[self._slider_time.value()]
self.update(timestamp=timestamp)
def _reload_plots(self):
self.plot_xy.results = self._results
self.plot_ac.results = self._results
self.plot_rkc.results = self._results
self.plot_h.results = self._results
self.plot_xy.draw()
self.plot_ac.draw()
self.plot_rkc.draw()
self.plot_h.draw()
def _reload_slider(self):
self._slider_time = self.find(QSlider, f"horizontalSlider_time")
self._slider_time.setMaximum(len(self._timestamps) - 1)
self._slider_time.setValue(len(self._timestamps) - 1)
def _reload(self):
logger.debug("Reload results...")
if len(self._results) == 1:
self._results[0] = self._results[0].reload()
self.get_timestamps()
self._reload_plots()
self._reload_slider()
def _add_custom_plot(self):
profile_id = self._get_current_profile()
self.update_table_selection_profile(profile_id)
dlg = CustomPlotValuesSelectionDialog(parent=self)
if dlg.exec():
x, y, envelop = dlg.value
self.create_new_tab_custom_plot(x, y, envelop)
def create_new_tab_custom_plot(self, x: str, y: list, envelop: bool):
name = f"{x}: {','.join(y)}"
if envelop and x == "rk":
name += "_envelop"
wname = f"tab_custom_{x}_{y}"
tab_widget = self.find(QTabWidget, f"tabWidget")
# This plot already exists
if name in [tab_widget.tabText(i) for i in range(tab_widget.count())]:
tab_widget.setCurrentWidget(
tab_widget.findChild(QWidget, wname)
)
return
widget = QWidget()
grid = QGridLayout()
widget.setObjectName(wname)
canvas = MplCanvas(width=5, height=4, dpi=100)
canvas.setObjectName(f"canvas_{x}_{y}")
toolbar = PamhyrPlotToolbar(
canvas, self
)
plot = CustomPlot(
x, y, envelop,
self._get_current_reach(),
self._get_current_profile(),
self._get_current_timestamp(),
data=self._results,
canvas=canvas,
res_id=self._current_results,
toolbar=toolbar,
parent=self,
)
plot.draw()
# Add plot to additional plot
self._additional_plot[name] = plot
grid.addWidget(toolbar, 0, 0)
grid.addWidget(canvas, 1, 0)
widget.setLayout(grid)
tab_widget.addTab(widget, name)
tab_widget.setCurrentWidget(widget)
def _copy(self):
# focus on raw data table
if self.find(QTabWidget, f"tabWidget").currentIndex() == 0:
table = self.find(QTableView, f"tableView_raw_data")
model = self._table["raw_data"]
coltext = []
for index in sorted(table.selectionModel().selectedRows()):
row = index.row()
rowtext = []
for column in range(model.columnCount()):
i = model.createIndex(row, column)
try:
rowtext.append(model.data(i))
except AttributeError:
rowtext.append("")
coltext.append(rowtext)
self.copyTableIntoClipboard(coltext)
else:
logger.info("TODO: copy")
def _paste(self):
logger.info("TODO: paste")
def _undo(self):
self._table.undo()
def _redo(self):
self._table.redo()
# play / pause buttons
def _update_slider(self):
if self._slider_time.value() == self._slider_time.maximum():
self._slider_time.setValue(self._slider_time.minimum())
else:
self._slider_time.setValue(self._slider_time.value()+1)
def _next(self):
self._slider_time.setValue(self._slider_time.value()+1)
def _back(self):
self._slider_time.setValue(self._slider_time.value()-1)
def _first(self):
self._slider_time.setValue(self._slider_time.minimum())
def _last(self):
self._slider_time.setValue(self._slider_time.maximum())
def _pause(self):
if self._button_play.isChecked():
self._button_next.setEnabled(False)
self._button_back.setEnabled(False)
self._button_first.setEnabled(False)
self._button_last.setEnabled(False)
self._timer.start(100)
self._button_play.setIcon(self._icon_pause)
else:
self._timer.stop()
self._button_next.setEnabled(True)
self._button_back.setEnabled(True)
self._button_first.setEnabled(True)
self._button_last.setEnabled(True)
self._button_play.setIcon(self._icon_start)
def _export(self):
profile_id = self._get_current_profile()
self.update_table_selection_profile(profile_id)
dlg = CustomExportDialog(parent=self)
if dlg.exec():
x, y, envelop, solver_id = dlg.value
else:
return
logger.debug(
"Export custom plot for: " +
f"{x} -> {','.join(y)}"
)
self.file_dialog(
select_file="AnyFile",
callback=lambda f: self.export_to(f[0], x, y, envelop, solver_id),
default_suffix=".csv",
file_filter=["CSV (*.csv)"],
)
def export_to(self, filename, x, y, envelop, solver_id):
results = self._results[solver_id]
reach = results.river.reachs[self._get_current_reach()]
first_line = [f"Study: {results.study.name}",
f"Reach: {reach.name}"]
if x == "rk":
timestamp = self._get_current_timestamp()
first_line.append(f"Time: {timestamp}s")
val_dict = self._export_rk(timestamp, y, envelop, solver_id)
elif x == "time":
profile_id = self._get_current_profile()
profile = reach.profile(profile_id)
pname = profile.name if profile.name != "" else profile.rk
first_line.append(f"Profile: {pname}")
val_dict = self._export_time(profile_id, y, solver_id)
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
dict_x = self._trad.get_dict("values_x")
header = []
writer.writerow(first_line)
for text in val_dict.keys():
header.append(text)
writer.writerow(header)
for row in range(len(val_dict[dict_x[x]])):
line = []
for var in val_dict.keys():
line.append(val_dict[var][row])
writer.writerow(line)
def export_all(self, reach, directory, timestamps):
name = reach.name
name = name.replace(" ", "-")
if len(timestamps) == 1:
name = f"{name}_t{timestamps[0]}"
file_name = os.path.join(
directory,
f"reach_{name}.csv"
)
with open(file_name, 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
ts = timestamps[0]
writer.writerow(self._table["raw_data"]._headers)
for row in range(self._table["raw_data"].rowCount()):
line = []
for column in range(self._table["raw_data"].columnCount()):
index = self._table["raw_data"].index(row, column)
line.append(self._table["raw_data"].data(index))
writer.writerow(line)
def export_current(self):
self.file_dialog(
select_file="Directory",
callback=lambda d: self.export_current_to(d[0])
)
def export_current_to(self, directory):
results = self._results[self._current_results]
reach = results.river.reachs[self._get_current_reach()]
self.export_all(reach, directory, [self._get_current_timestamp()])
def delete_tab(self, index):
tab_widget = self.find(QTabWidget, f"tabWidget")
self._additional_plot.pop(tab_widget.tabText(index))
tab_widget.removeTab(index)
def _export_rk(self, timestamp, y, envelop, solver_id):
results = self._results[solver_id]
reach = results.river.reachs[self._get_current_reach()]
dict_x = self._trad.get_dict("values_x")
dict_y = self._trad.get_dict("values_y")
if self._current_results == 2:
envelop = False
reach1 = results[0].river.reachs[self._get_current_reach()]
reach2 = results[1].river.reachs[self._get_current_reach()]
if envelop:
dict_y.update(self._trad.get_dict("values_y_envelop"))
my_dict = {}
my_dict[dict_x["rk"]] = reach.geometry.get_rk()
if "bed_elevation" in y:
if self._current_results != 2:
zmin = reach.geometry.get_z_min()
else:
z_min1 = reach1.geometry.get_z_min()
z_min2 = reach2.geometry.get_z_min()
zmin = list(
map(
lambda x, y: x - y,
z_min2, z_min2
)
)
my_dict[dict_y["bed_elevation"]] = zmin
# if envelop and reach.has_sediment():
if "discharge" in y:
my_dict[dict_y["discharge"]] = list(
map(
lambda p: p.get_ts_key(timestamp, "Q"),
reach.profiles
)
)
if envelop:
my_dict[dict_y["min_discharge"]] = list(
map(
lambda p: min(p.get_key("Q")),
reach.profiles
)
)
my_dict[dict_y["max_discharge"]] = list(
map(
lambda p: max(p.get_key("Q")),
reach.profiles
)
)
if "water_elevation" in y:
my_dict[dict_y["water_elevation"]] = list(
map(
lambda p: p.get_ts_key(timestamp, "Z"),
reach.profiles
)
)
if envelop:
my_dict[dict_y["min_water_elevation"]] = list(
map(
lambda p: min(p.get_key("Z")),
reach.profiles
)
)
my_dict[dict_y["max_water_elevation"]] = list(
map(
lambda p: max(p.get_key("Z")),
reach.profiles
)
)
if "velocity" in y:
my_dict[dict_y["velocity"]] = list(
map(
lambda p: p.get_ts_key(timestamp, "V"),
reach.profiles
)
)
if envelop:
velocities = list(map(
lambda p: p.get_key("V"),
reach.profiles
)
)
my_dict[dict_y["min_velocity"]] = [min(v) for v in velocities]
my_dict[dict_y["max_velocity"]] = [max(v) for v in velocities]
if "depth" in y:
if self._current_results != 2:
d = my_dict[dict_y["depth"]] = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
my_dict[dict_y["depth"]] = d
if envelop:
my_dict[dict_y["min_depth"]] = list(map(
lambda p1, p2: p1 - p2, map(
lambda p: min(p.get_key("Z")),
reach.profiles
), reach.geometry.get_z_min()
)
)
my_dict[dict_y["max_depth"]] = list(map(
lambda p1, p2: p1 - p2, map(
lambda p: max(p.get_key("Z")),
reach.profiles
), reach.geometry.get_z_min()
)
)
if "mean_depth" in y:
if self._current_results != 2:
d = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
my_dict[dict_y["mean_depth"]] = d
if "froude" in y:
if self._current_results != 2:
fr = my_dict[dict_y["froude"]] = list(
map(
lambda p:
p.get_ts_key(timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(timestamp, "Z"))
)),
reach.profiles
)
)
else:
fr1 = list(
map(
lambda p:
p.get_ts_key(timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(timestamp, "Z"))
)),
reach1.profiles
)
)
fr2 = list(
map(
lambda p:
p.get_ts_key(timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(timestamp, "Z"))
)),
reach2.profiles
)
)
fr = list(
map(
lambda x, y: x - y,
fr1, fr2
)
)
my_dict[dict_y["froude"]] = fr
if "wet_area" in y:
if self._current_results != 2:
wa = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")),
reach.profiles
)
)
else:
wa1 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")),
reach1.profiles
)
)
wa2 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(timestamp, "Z")),
reach2.profiles
)
)
wa = list(
map(
lambda x, y: x - y,
wa1, wa2
)
)
my_dict[dict_y["wet_area"]] = wa
return my_dict
def _export_time(self, profile, y, solver_id):
results = self._results[solver_id]
reach = results.river.reachs[self._get_current_reach()]
profile = reach.profile(profile)
dict_x = self._trad.get_dict("values_x")
dict_y = self._trad.get_dict("values_y")
my_dict = {}
my_dict[dict_x["time"]] = self._timestamps
z = profile.get_key("Z")
q = profile.get_key("Q")
v = profile.get_key("V")
if self._current_results == 2:
reach1 = self._results[0].river.reach(self._reach)
reach2 = self._results[1].river.reach(self._reach)
profile1 = reach1.profile(self._profile)
profile2 = reach2.profile(self._profile)
q1 = profile1.get_key("Q")
z1 = profile1.get_key("Z")
v1 = profile1.get_key("V")
q2 = profile2.get_key("Q")
z2 = profile2.get_key("Z")
v2 = profile2.get_key("V")
if "bed_elevation" in y:
if self._current_results != 2:
z_min = [profile.geometry.z_min()] * len(self._timestamps)
else:
z_min1 = profile1.geometry.z_min()
z_min2 = profile2.geometry.z_min()
z_min = list(
map(
lambda ts: z_min1 - z_min2,
self._timestamps
)
)
my_dict[dict_y["bed_elevation"]] = z_min
if "discharge" in y:
my_dict[dict_y["discharge"]] = q
if "water_elevation" in y:
my_dict[dict_y["water_elevation"]] = z
if "velocity" in y:
my_dict[dict_y["velocity"]] = v
if "depth" in y:
if self._current_results != 2:
d = list(
map(lambda z: profile.geometry.max_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.max_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.max_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
my_dict[dict_y["depth"]] = d
if "mean_depth" in y:
if self._current_results != 2:
d = list(
map(lambda z: profile.geometry.mean_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.mean_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.mean_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
my_dict[dict_y["mean_depth"]] = d
if "froude" in y:
if self._current_results != 2:
fr = my_dict[dict_y["froude"]] = list(
map(lambda z, v:
v / sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z, v)
)
else:
fr1 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile1.geometry.wet_area(z) /
profile1.geometry.wet_width(z))
), z1, v1)
)
fr2 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile2.geometry.wet_area(z) /
profile2.geometry.wet_width(z))
), z2, v2)
)
fr = list(
map(
lambda x, y: x - y,
fr1, fr2
)
)
my_dict[dict_y["froude"]] = fr
if "wet_area" in y:
if self._current_results != 2:
wa = list(
map(lambda z: profile.geometry.wet_area(z), z)
)
else:
wa1 = list(
map(lambda z: profile1.geometry.wet_area(z), z1)
)
wa2 = list(
map(lambda z: profile2.geometry.wet_area(z), z2)
)
wa = list(
map(
lambda x, y: x - y,
wa1, wa2
)
)
my_dict[dict_y["wet_area"]] = wa
return my_dict
def get_timestamps(self):
self._timestamps = sorted(list(self._results[0].get("timestamps")))
def tab_changed(self, i):
if i != 2:
if len(self._get_current_profiles_list()) > 1:
# unselect all profiles but the first one
profile_id = self._get_current_profile()
self.update_table_selection_profile(profile_id)
def import_geotiff(self):
options = QFileDialog.Options()
settings = QSettings(QSettings.IniFormat,
QSettings.UserScope, 'MyOrg', )
options |= QFileDialog.DontUseNativeDialog
file_types = [
self._trad["file_geotiff"],
self._trad["file_all"],
]
filename, _ = QFileDialog.getOpenFileName(
self,
self._trad["open_file"],
"",
";; ".join(file_types),
options=options
)
if filename != "":
with rasterio.open(filename) as data:
img = data.read()
b = data.bounds[:]
# b[0] left
# b[1] bottom
# b[2] right
# b[3] top
xlim = self.canvas.axes.get_xlim()
ylim = self.canvas.axes.get_ylim()
if b[2] > b[0] and b[1] < b[3]:
self.canvas.axes.imshow(img.transpose((1, 2, 0)),
extent=[b[0], b[2], b[1], b[3]])
else:
dlg = CoordinatesDialog(
xlim,
ylim,
trad=self._trad,
parent=self
)
if dlg.exec():
self.canvas.axes.imshow(img.transpose((1, 2, 0)),
extent=dlg.values)
self.plot_xy.idle()
self.canvas.axes.set_xlim(xlim)
self.canvas.axes.set_ylim(ylim)