Pamhyr2/src/View/MainWindow.py

1825 lines
54 KiB
Python

# MainWindow.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import sys
import time
import logging
import subprocess
from queue import Queue
from functools import reduce
from time import process_time_ns
from numpy.core.multiarray import where
from platformdirs import user_cache_dir
from Solver.AdisTS import AdisTS
from Solver.Mage import Mage8
from tools import logger_exception
from PyQt5 import QtGui
from PyQt5.QtGui import (
QKeySequence, QDesktopServices, QIcon,
)
from PyQt5.QtCore import (
Qt, QTranslator, QEvent, QUrl, QTimer,
QCoreApplication, QMutex,
)
from PyQt5.QtWidgets import (
QMainWindow, QApplication, QAction,
QFileDialog, QShortcut, QMenu, QToolBar,
QMessageBox, QProgressDialog, QTabWidget,
QDialog, QVBoxLayout, QLabel,
)
from PyQt5.uic import loadUi
from Modules import Modules
from View.Tools.ASubWindow import WindowToolKit
from View.Tools.ListedSubWindow import ListedSubWindow
from View.DummyWindow import DummyWindow
from View.Translate import MainTranslate
from View.MainWindowTabInfo import WidgetInfo
from View.MainWindowTabChecker import WidgetChecker
from View.Configure.Window import ConfigureWindow
from View.Study.Window import NewStudyWindow
from View.About.Window import AboutWindow
from View.Network.Window import NetworkWindow
from View.Geometry.Window import GeometryWindow
from View.BoundaryCondition.Window import BoundaryConditionWindow
from View.Reservoir.Window import ReservoirWindow
from View.HydraulicStructures.Window import HydraulicStructuresWindow
from View.LateralContribution.Window import LateralContributionWindow
from View.InitialConditions.Window import InitialConditionsWindow
from View.Stricklers.Window import StricklersWindow
from View.Frictions.Window import FrictionsWindow
from View.SedimentLayers.Window import SedimentLayersWindow
from View.SedimentLayers.Reach.Window import ReachSedimentLayersWindow
from View.AdditionalFiles.Window import AddFileListWindow
from View.REPLines.Window import REPLineListWindow
from View.SolverParameters.Window import SolverParametersWindow
from View.RunSolver.Window import (
SelectSolverWindow,
SolverLogWindow,
CompareSolversWindow
)
from View.Results.Window import ResultsWindow
from View.RunSolver.WindowAdisTS import (
SelectSolverWindowAdisTS,
SolverLogWindowAdisTS
)
from View.CheckList.Window import CheckListWindow
from View.CheckList.WindowAdisTS import CheckListWindowAdisTS
from View.Results.WindowAdisTS import ResultsWindowAdisTS
from View.Results.ReadingResultsDialog import ReadingResultsDialog
from View.Debug.Window import ReplWindow
from View.OutputRKAdisTS.Window import OutputRKAdisTSWindow
from View.Pollutants.Window import PollutantsWindow
from View.D90AdisTS.Window import D90AdisTSWindow
from View.DIFAdisTS.Window import DIFAdisTSWindow
from Solver.Solvers import GenericSolver
from Model.Results.Results import Results
# Optional internal display of documentation for make the application
# package lighter...
try:
from View.Doc.Window import DocWindow
_doc = "internal"
except Exception as e:
print("Handle exception: {e}")
_doc = "external"
from Model.Study import Study
logger = logging.getLogger()
no_model_action = [
"action_menu_new", "action_menu_open", "action_toolBar_open",
]
model_action = [
"action_menu_close", "action_menu_edit", "action_menu_save",
"action_menu_save_as", "action_toolBar_close", "action_toolBar_save",
"action_menu_numerical_parameter", "action_open_results_from_file",
"action_open_results_adists",
]
other_model_action = [
"action_toolBar_run_solver",
]
define_model_action = [
# Toolbar
"action_toolBar_network", "action_toolBar_geometry",
"action_toolBar_boundary_cond", "action_toolBar_lateral_contrib",
"action_toolBar_frictions", "action_toolBar_initial_cond",
# Menu
"action_menu_run_solver", "action_menu_numerical_parameter",
"action_menu_edit_network", "action_menu_edit_geometry",
"action_menu_boundary_conditions", "action_menu_initial_conditions",
"action_menu_edit_friction", "action_menu_edit_lateral_contribution",
"action_menu_run_solver", "action_menu_sediment_layers",
"action_menu_edit_reach_sediment_layers", "action_menu_edit_reservoirs",
"action_menu_edit_hydraulic_structures", "action_menu_additional_file",
"action_menu_results_last", "action_open_results_from_file",
"action_compare_results", "action_menu_boundary_conditions_sediment",
"action_menu_rep_additional_lines", "action_menu_output_rk",
"action_menu_run_adists", "action_menu_pollutants",
"action_menu_d90", "action_menu_dif",
]
action = (
no_model_action + model_action + define_model_action
)
class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
def __init__(self, conf=None):
super(ApplicationWindow, self).__init__()
self._close_question = False
# App Configuration
self.conf = conf
# Model
self._study = None
# Translate
self._trad = MainTranslate()
# UI
self.ui = loadUi(
os.path.join(os.path.dirname(__file__), "ui", "MainWindow.ui"),
self
)
self.setup_debug_mode(init=True)
self.setup_tab()
self.setup_sc()
self.setup_connection()
self.default_style()
self.setup_results()
self.setup_timer()
if not self.conf.close_correctly:
if self.conf.last_study != "":
self.dialog_reopen_study()
if _doc == "external":
logger.info("doc: Internal documentation is not available...")
def set_title(self):
title = "(dbg) " if self.conf.debug else ""
if self._study is not None:
title += f"Pamhyr2 - {self._study.name}"
self.setWindowTitle(title)
else:
title += "Pamhyr2"
self.setWindowTitle(title)
def setup_tab(self):
self.setup_tab_info()
self.setup_tab_checker()
def setup_tab_info(self):
tab_widget = self.findChild(QTabWidget, "tabWidget")
self._tab_widget_info = WidgetInfo(
study=self._study, parent=self
)
tab_widget.addTab(
self._tab_widget_info,
self._trad["tab_info_name"]
)
def setup_tab_checker(self):
tab_widget = self.findChild(QTabWidget, "tabWidget")
self._tab_widget_checker = WidgetChecker(
study=self._study, config=self.conf, parent=self
)
tab_widget.addTab(
self._tab_widget_checker,
self._trad["tab_checker_name"]
)
def enable_actions(self, action: str, enable: bool):
"""Enable of disable an action componant
Args:
action: Action to enable/disable
enable: True to Enable, or False to disable
Returns:
Nothing
"""
qaction = self.findChild(QAction, action)
if qaction is not None:
qaction.setEnabled(enable)
else:
logger.warning(f"Set {action} to {enable}")
def setup_sc(self):
self._run_sc = QShortcut(QKeySequence("F6"), self)
return
def setup_connection(self):
"""Connect action to callback function
Returns:
Nothing
"""
actions = {
# Menu action
"action_menu_dif": self.open_dif,
"action_menu_d90": self.open_d90,
"action_menu_pollutants": self.open_pollutants,
"action_menu_run_adists": self.select_run_solver_adists,
"action_menu_output_rk": self.open_output_rk_adists,
"action_menu_config": self.open_configure,
"action_menu_new": self.open_new_study,
"action_menu_edit": self.open_edit_study,
"action_menu_open": self.open_model,
"action_menu_save": self.save_study,
"action_menu_save_as": self.save_as_study,
"action_menu_numerical_parameter": self.open_solver_parameters,
"action_menu_edit_network": self.open_network,
"action_menu_edit_geometry": self.open_geometry,
"action_menu_boundary_conditions": self.open_boundary_cond,
"action_menu_boundary_conditions_sediment":
self.open_boundary_cond_sed,
"action_menu_edit_reservoirs": self.open_reservoir,
"action_menu_edit_hydraulic_structures":
self.open_hydraulic_structures,
"action_menu_initial_conditions": self.open_initial_conditions,
"action_menu_edit_friction": self.open_frictions,
"action_menu_edit_lateral_contribution": self.open_lateral_contrib,
"action_menu_run_solver": self.select_and_run_solver,
"action_menu_sediment_layers": self.open_sediment_layers,
"action_menu_edit_reach_sediment_layers":
self.open_reach_sediment_layers,
"action_menu_additional_file": self.open_additional_files,
"action_menu_rep_additional_lines": self.open_rep_lines,
"action_menu_close": self.close_model,
"action_menu_results_last": self.open_last_results,
"action_open_results_from_file": self.open_results_from_file,
"action_compare_results": self.compare_results,
"action_open_results_adists": self.open_results_adists,
# Help
"action_menu_pamhyr_users_wiki": self.open_doc_user,
"action_menu_pamhyr_developers_pdf":
lambda: self.open_doc_dev(ext="pdf"),
"action_menu_pamhyr_developers_html":
lambda: self.open_doc_dev(ext="html"),
"action_menu_mage": self.open_doc_mage,
"action_menu_about": self.open_about,
# ToolBar action
"action_toolBar_open": self.open_model,
"action_toolBar_save": self.save_study,
"action_toolBar_close": self.close_model,
"action_toolBar_run_solver": self.run_lasest_solver,
# Current actions
"action_toolBar_network": self.open_network,
"action_toolBar_geometry": self.open_geometry,
"action_toolBar_boundary_cond": self.open_boundary_cond,
"action_toolBar_lateral_contrib": self.open_lateral_contrib,
"action_toolBar_frictions": self.open_frictions,
"action_toolBar_initial_cond": self.open_initial_conditions,
}
for action in actions:
qaction = self.findChild(QAction, action)
if qaction is not None:
qaction.triggered.connect(actions[action])
else:
logger.warning(
"Setup connection : " +
f"'{action}' to {actions[action]}"
)
self._run_sc.activated.connect(self.run_lasest_solver)
def changeEvent(self, event):
if event.type() == QEvent.LanguageChange:
self.retranslateUi()
super(ApplicationWindow, self).changeEvent(event)
def close(self):
if self._study is not None and not self._study.is_saved:
self._close_question = True
if self.dialog_close():
# PAMHYR is close correctly (no crash)
self.conf.set_close_correctly()
super(ApplicationWindow, self).close()
else:
self._close_question = False
else:
# PAMHYR is close correctly (no crash)
self.conf.set_close_correctly()
super(ApplicationWindow, self).close()
def closeEvent(self, event):
if not self._close_question:
if self._study is not None and not self._study.is_saved:
if self.dialog_close():
# PAMHYR is close correctly (no crash)
self.conf.set_close_correctly()
super(ApplicationWindow, self).closeEvent(event)
else:
event.ignore()
else:
super(ApplicationWindow, self).closeEvent(event)
def default_style(self):
"""Set default window style
Returns:
Nothing
"""
self.update_enable_action()
# self.showMaximized()
image = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"ui", "ressources",
"Pamhyr2_logo.png"
)
)
self.setWindowIcon(QIcon(image))
def set_debug_lvl(self, debug=True):
if debug:
logger.setLevel(logging.DEBUG)
logger.info("Set logging level to DEBUG")
else:
logger.setLevel(logging.INFO)
logger.info("Set logging level to INFO")
def setup_debug_mode(self, init=False):
menu = self.findChild(QMenu, "menu_help")
menu.setToolTipsVisible(True)
self.set_title()
if init:
self.debug_action = QAction("Debug", self)
self.debug_action.setToolTip(self._trad["open_debug"])
self.debug_action.triggered.connect(self.open_debug)
self.debug_sqlite_action = QAction("Debug SQLite", self)
self.debug_sqlite_action.setToolTip(self._trad["open_debug_sql"])
self.debug_sqlite_action.triggered.connect(self.open_sqlite)
if self.conf.debug:
menu.addAction(self.debug_action)
menu.addAction(self.debug_sqlite_action)
self.set_debug_lvl(debug=True)
else:
if self.conf.debug:
menu.addAction(self.debug_action)
menu.addAction(self.debug_sqlite_action)
self.set_debug_lvl(debug=True)
else:
menu.removeAction(self.debug_action)
menu.removeAction(self.debug_sqlite_action)
self.set_debug_lvl(debug=False)
def setup_timer(self):
self.setup_timer_propagation()
self.setup_timer_backup()
def setup_timer_propagation(self):
self._init_propagation_keys()
self._propagation_timer = QTimer(self)
self._propagation_timer.start(2000)
self._propagation_timer.timeout.connect(
self._do_propagate_update
)
def get_config_backup_freq_to_sec(self):
v = self.conf.backup_frequence.split(":")
m = [
(60 * 60), # Hour to sec
60, # Minute to sec
1 # Sec
]
ts = reduce(
lambda acc, x: acc + x,
map(
lambda v, m: int(v) * int(m),
v, m
)
)
return ts
def setup_timer_backup(self):
self._save_mutex = QMutex()
self._backup_timer = QTimer(self)
ts = self.get_config_backup_freq_to_sec()
self._backup_timer.start(ts * 1000)
self._backup_timer.timeout.connect(self._backup)
def _do_update_config(self):
ts = self.get_config_backup_freq_to_sec()
logger.debug(f"Reset backup timer to {ts} second")
self._backup_timer.start(ts * 1000)
def _init_propagation_keys(self):
self._propagation_keys = Modules(0)
def _propagate_update(self, key=Modules(0)):
self._propagation_keys |= key
logger.debug(f"Propagation keys: {self._propagation_keys}")
def _do_propagate_update(self):
self.update()
keys = self._propagation_keys
if keys is Modules.NONE:
return
self._init_propagation_keys()
if Modules.WINDOW_LIST in keys:
self._do_update_window_list()
if Modules.CONFIG in keys:
self._do_update_config()
self._do_propagate_update_info_tab(keys)
logger.debug(f"Propagation of {keys}")
for _, window in self.sub_win_list:
window._propagated_update(key=keys)
self._do_propagate_update_rec(window, keys)
self._tab_widget_checker.update(modules=keys)
def _do_propagate_update_info_tab(self, keys):
modules = Modules.modelling_list()
modules.append(Modules.STUDY)
has_info_mod = reduce(
lambda acc, m: acc or (m in keys),
modules, False
)
if has_info_mod:
self._tab_widget_info.update()
def _do_propagate_update_rec(self, window, keys):
for _, win in window.sub_win_list:
win._propagated_update(key=keys)
self._do_propagate_update_rec(win, keys)
def update(self):
self.set_title()
if self._tab_widget_info.study != self._study:
self._tab_widget_info.study = self._study
self._tab_widget_info.update()
if self._tab_widget_checker.study != self._study:
self._tab_widget_checker.study = self._study
self._tab_widget_checker.update(modules=Modules.STUDY)
#########
# MODEL #
#########
def get_model(self):
return self._study
def set_model(self, model):
self._study = model
self.update_enable_action()
self.conf.set_last_study(self._study.filename)
self.update()
def close_model(self):
if not self.dialog_close():
return
self._study = None
self.update_enable_action()
self.conf.set_close_correctly()
self.update()
self._close_sub_window()
def update_enable_action(self):
"""Update status of action componante
Update status of action componant, enable or disable in
function of model state
Returns:
Nothing
"""
no_model = self._study is None
for action in no_model_action:
self.enable_actions(action, no_model)
for action in define_model_action + other_model_action:
self.enable_actions(action, not no_model)
for action in model_action:
self.enable_actions(action, not no_model)
def setup_results(self):
self._last_solver = None
self._last_results = None
default = None
for solver in self.conf.solvers:
if solver.name == "default-mage":
default = solver
if solver.name == self.conf.last_solver_name:
self._last_solver = solver
if self._study is not None:
self.enable_actions("action_menu_results_last", True)
return
# Last solver note found, use default-mage if exists
self._last_solver = default
def set_results(self, solver, results):
self._last_solver = solver
self._last_results = results
self.enable_actions("action_menu_results_last", True)
############
# FEATURES #
############
def open_study(self, filename):
"""Open a study
Args:
filename: The study path
Returns:
Nothing
"""
self.set_model(Study.open(filename))
logger.info(f"Open Study - {self._study.name}")
self.set_title()
def save_study(self):
"""Save current study
Save current study, if study as no associate file, open a
file dialog.
Returns:
Nothing
"""
if self._study.filename is None or self._study.filename == "":
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_name, _ = QFileDialog.getSaveFileName(
self, "Save File",
"", "Pamhyr(*.pamhyr)",
options=options,
)
if file_name.rsplit(".", 1)[-1] == "pamhyr":
self._study.filename = file_name
else:
self._study.filename = file_name + ".pamhyr"
if self._study.is_saved:
return
self._backup_timer.blockSignals(True)
self._save_mutex.lock()
sql_request_count = self._study.sql_save_request_count()
progress = QProgressDialog(
"Saving...", None,
0, sql_request_count,
parent=self
)
progress.setWindowModality(Qt.WindowModal)
progress.setValue(0)
status = f"Save ({self._study.filename})..."
self.statusbar.showMessage(status, 3000)
logger.info(status)
self._study.save(
progress=lambda: progress.setValue(progress.value() + 1)
)
status += " Done"
logger.info(status)
self.statusbar.showMessage(status, 3000)
self.conf.set_last_study(self._study.filename)
self._save_mutex.unlock()
self._backup_timer.blockSignals(False)
def save_as_study(self):
"""Save current study as new file
Save current study as new file, if study as no associate file,
open a file dialog.
Returns:
Nothing
"""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_name, _ = QFileDialog.getSaveFileName(
self, "Save File",
"", "Pamhyr(*.pamhyr)",
options=options,
)
logger.debug(f"Save study as : {repr(file_name)}")
if file_name == "":
return
self._backup_timer.blockSignals(True)
self._save_mutex.lock()
if file_name.rsplit(".", 1)[-1] == "pamhyr":
logger.debug(
"Pamhyr extention is present : " +
f"{repr(file_name)}"
)
self._study.filename = file_name
else:
logger.debug(
"Pamhyr extention is not present : " +
f"{repr(file_name + '.pamhyr')}"
)
self._study.filename = file_name + ".pamhyr"
sql_request_count = self._study.sql_save_request_count()
progress = QProgressDialog(
"Saving...", None,
0, sql_request_count,
parent=self
)
progress.setWindowModality(Qt.WindowModal)
progress.setValue(0)
status = f"Save ({self._study.filename})..."
self.statusbar.showMessage(status, 3000)
logger.info(status)
self._study.save(
progress=lambda: progress.setValue(progress.value() + 1)
)
status += " Done"
logger.info(status)
self.statusbar.showMessage(status, 3000)
self.conf.set_last_study(self._study.filename)
self._save_mutex.unlock()
self._backup_timer.blockSignals(False)
def _backup(self):
logger.debug("Backup signal...")
if not self.conf.backup_enable:
return
if self._study is None:
return
if self._study.is_saved:
return
self._save_mutex.lock()
old = self._study.filename
file_name = ""
if old == "" or old is None:
file_name = os.path.join(
user_cache_dir("pamhyr"),
"unsaved.pamhyr.backup"
)
# Set backup at last study open
self.conf.set_last_study(file_name)
else:
file_name = self._study.filename + ".backup"
self._study.filename = file_name
try:
# sql_request_count = self._study.sql_save_request_count()
# progress = QProgressDialog(
# "Backup...", None,
# 0, sql_request_count,
# parent=self
# )
# progress.setWindowModality(Qt.WindowModal)
# progress.setValue(0)
status = f"Backup ({file_name})..."
logger.info(status)
self.statusbar.showMessage(status, 3000)
self._study.save(
# progress=lambda: progress.setValue(progress.value() + 1)
)
status += " Done"
logger.info(status)
self.statusbar.showMessage(status, 3000)
except Exception as e:
logger_exception(e)
self._study.filename = old
self._save_mutex.unlock()
##################
# MSG AND DIALOG #
##################
def msg_select_reach(self):
self.message_box(
window_title=self._trad["Warning"],
text=self._trad["mb_select_reach_title"],
informative_text=self._trad["mb_select_reach_msg"]
)
self.open_network()
def dialog_reopen_study(self):
dlg = QMessageBox(self)
dlg.setWindowTitle(self._trad["mb_last_open_title"])
dlg.setText(self._trad["mb_last_open_msg"])
opt = QMessageBox.No | QMessageBox.Yes # | QMessageBox.Open
dlg.setStandardButtons(opt)
dlg.setIcon(QMessageBox.Question)
dlg.button(QMessageBox.Yes).setText(self._trad["Yes"])
dlg.button(QMessageBox.No).setText(self._trad["No"])
res = dlg.exec()
if res == QMessageBox.Yes:
old = self.conf.last_study
file = self.dialog_reopen_study_select_backup(
self.conf.last_study
)
self.open_study(file)
self._study.filename = old
self.conf.set_last_study(old)
return True
elif res == QMessageBox.Open:
self.open_model()
return True
elif res == QMessageBox.No:
return False
def dialog_reopen_study_select_backup(self, filename):
file = self.conf.last_study
backup = self.conf.last_study + ".backup"
if os.path.exists(backup):
file_modified_date = time.ctime(os.path.getmtime(file))
backup_modified_date = time.ctime(os.path.getmtime(backup))
if backup_modified_date > file_modified_date:
logger.info(f"Select backup ({backup})")
file = backup
return file
def dialog_close(self):
dlg = QMessageBox(self)
dlg.setWindowTitle(self._trad["mb_close_title"])
dlg.setText(self._trad["mb_close_msg"])
opt = QMessageBox.Save | QMessageBox.Close
opt |= QMessageBox.Cancel
dlg.setStandardButtons(opt)
dlg.setIcon(QMessageBox.Warning)
dlg.button(QMessageBox.Save).setText(self._trad["Save"])
dlg.button(QMessageBox.Close).setText(self._trad["Close"])
dlg.button(QMessageBox.Cancel).setText(self._trad["Cancel"])
res = dlg.exec()
if res == QMessageBox.Save:
self.save_study()
return True
elif res == QMessageBox.Close:
return True
elif res == QMessageBox.Cancel:
return False
#########################
# SUB WINDOWS MENU LIST #
#########################
def _activate_window(self, window_hash):
self._try_activate_window_for_window(self, window_hash)
def _try_activate_window_for_window(self,
source_window,
window_hash):
try:
window = source_window.get_sub_win(window_hash)
if window is not None:
window.activateWindow()
else:
for _, win in source_window.sub_win_list:
self._try_activate_window_for_window(
win,
window_hash
)
except Exception:
return
def _update_window_list(self):
self._propagation_keys |= Modules.WINDOW_LIST
def _do_update_window_list(self):
menu = self.findChild(QMenu, "menu_windows")
menu.setToolTipsVisible(True)
# Remove all actions
menu.clear()
self._do_update_window_list_add_action(menu, self.sub_win_list)
def _do_update_window_list_add_action(self, menu, win_list):
for _, win in win_list:
self._do_update_window_list_add_action_for_window(
menu, win
)
try:
lst = win.sub_win_list
self._do_update_window_list_add_action(menu, lst)
except Exception:
continue
def _do_update_window_list_add_action_for_window(self, menu, window):
def lambda_generator(h):
return lambda: self._activate_window(h)
action = QAction(window._title, self)
action.setToolTip(self._trad["active_window"])
h = window.hash()
fn = lambda_generator(h)
action.triggered.connect(fn)
menu.addAction(action)
###############
# SUB WINDOWS #
###############
def open_d90(self):
if len(self._study.river.d90_adists.lst) != 0:
d90_default = self._study.river.d90_adists.lst[0]
else:
d90_default = self._study.river.d90_adists.new(0)
if self.sub_window_exists(
D90AdisTSWindow,
data=[self._study, None, d90_default]
):
return
D90AdisTS = D90AdisTSWindow(
study=self._study,
parent=self,
data=d90_default
)
D90AdisTS.show()
def open_dif(self):
if len(self._study.river.dif_adists.lst) != 0:
dif_default = self._study.river.dif_adists.lst[0]
else:
dif_default = self._study.river.dif_adists.new(0)
if self.sub_window_exists(
DIFAdisTSWindow,
data=[self._study, None, dif_default]
):
return
DIFAdisTS = DIFAdisTSWindow(
study=self._study,
parent=self,
data=dif_default
)
DIFAdisTS.show()
def open_pollutants(self):
if self.sub_window_exists(
PollutantsWindow,
data=[self._study, None]
):
return
Pollutants = PollutantsWindow(
study=self._study,
parent=self
)
Pollutants.show()
def open_output_rk_adists(self):
if self.sub_window_exists(
OutputRKAdisTSWindow,
data=[self._study, None]
):
return
Output_RK_AdisTS = OutputRKAdisTSWindow(
study=self._study,
parent=self
)
Output_RK_AdisTS.show()
def open_configure(self):
"""Open configure window
Open PamHyr configure window
Returns:
Nothing
"""
if self.sub_window_exists(
ConfigureWindow,
data=[None, self.conf]
):
return
self.config = ConfigureWindow(config=self.conf, parent=self)
self.config.show()
def open_about(self):
"""Open about window
Open a new window with information about PamHyr
Returns:
Nothing
"""
if self.sub_window_exists(
AboutWindow,
data=[None, None]
):
return
self.about = AboutWindow(parent=self)
self.about.show()
def open_model(self):
"""Open file dialog to select saved model
Returns:
Nothing
"""
if self._study is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
dialog = QFileDialog(self, options=options)
dialog.setFileMode(QFileDialog.FileMode.ExistingFile)
dialog.setDefaultSuffix(".pamhyr")
# dialog.setFilter(dialog.filter() | QtCore.QDir.Hidden)
dialog.setNameFilters([
'Pamhyr2 (*.pamhyr)',
'Pamhyr2 backup (*.pamhyr.backup)'
])
dialog.setDirectory(os.path.dirname(self.conf.last_study))
if dialog.exec_():
file_name = dialog.selectedFiles()
self.open_study(file_name[0])
def open_new_study(self):
"""Open dialog to set new study
Returns:
Nothing
"""
if self._study is None:
if self.sub_window_exists(
NewStudyWindow,
data=[None, None]
):
return
self.new_study = NewStudyWindow(parent=self)
self.new_study.show()
def open_edit_study(self):
"""Open dialog to set new study
Returns:
Nothing
"""
if self._study is not None:
if self.sub_window_exists(
NewStudyWindow,
data=[self._study, None]
):
return
self.new_study = NewStudyWindow(study=self._study, parent=self)
self.new_study.show()
def open_network(self):
"""Open network dialog
Returns:
Nothing
"""
if self._study is not None:
if self.sub_window_exists(
NetworkWindow,
data=[self._study, None]
):
return
self.network = NetworkWindow(study=self._study, parent=self)
self.network.show()
def open_geometry(self):
"""Open geometry window
Returns:
Nothing
"""
if (self._study is not None and self._study.river.has_current_reach()):
reach = self._study.river.current_reach().reach
if self.sub_window_exists(
GeometryWindow,
data=[self._study, self.conf, reach]
):
return
geometry = GeometryWindow(
study=self._study,
config=self.conf,
reach=reach,
parent=self
)
geometry.show()
else:
self.msg_select_reach()
def open_boundary_cond_sed(self):
self.open_boundary_cond(tab=1)
def open_boundary_cond(self, tab=0):
if self.sub_window_exists(
BoundaryConditionWindow,
data=[self._study, None]
):
bound = self.get_sub_window(
BoundaryConditionWindow,
data=[self._study, None]
)
bound.set_active_tab(tab)
return
bound = BoundaryConditionWindow(study=self._study, parent=self)
bound.show()
bound.set_active_tab(tab)
def open_reservoir(self):
if self.sub_window_exists(
ReservoirWindow,
data=[self._study, None]
):
return
reservoir = ReservoirWindow(study=self._study, parent=self)
reservoir.show()
def open_hydraulic_structures(self):
if self.sub_window_exists(
HydraulicStructuresWindow,
data=[self._study, None]
):
return
hydraulic_structures = HydraulicStructuresWindow(
study=self._study,
parent=self
)
hydraulic_structures.show()
def open_lateral_contrib(self):
if self.sub_window_exists(
LateralContributionWindow,
data=[self._study, None]
):
return
lateral = LateralContributionWindow(
study=self._study,
parent=self
)
lateral.show()
def open_stricklers(self):
if self.sub_window_exists(
StricklersWindow,
data=[self._study, self.conf]
):
return
strick = StricklersWindow(
study=self._study,
config=self.conf,
parent=self
)
strick.show()
def open_frictions(self):
if self._study is not None:
if self._study.river.has_current_reach():
reach = self._study.river.current_reach()
if self.sub_window_exists(
FrictionsWindow,
data=[self._study, None, reach]
):
return
frictions = FrictionsWindow(
study=self._study,
parent=self
)
frictions.show()
else:
self.msg_select_reach()
def open_initial_conditions(self):
if self._study.river.has_current_reach():
reach = self._study.river.current_reach()
if self.sub_window_exists(
InitialConditionsWindow,
data=[self._study, self.conf, reach]
):
return
initial = InitialConditionsWindow(
study=self._study,
config=self.conf,
reach=reach,
parent=self
)
initial.show()
else:
self.msg_select_reach()
def open_additional_files(self):
if self._study is not None:
if self.sub_window_exists(
AddFileListWindow,
data=[self._study, None]
):
return
self.additonal_files = AddFileListWindow(
study=self._study, parent=self
)
self.additonal_files.show()
def open_rep_lines(self):
if self._study is not None:
if self.sub_window_exists(
REPLineListWindow,
data=[self._study, None]
):
return
self.rep_lines = REPLineListWindow(
study=self._study, parent=self
)
self.rep_lines.show()
def open_solver_parameters(self):
if self.sub_window_exists(
SolverParametersWindow,
data=[self._study, None]
):
return
params = SolverParametersWindow(
study=self._study,
parent=self
)
params.show()
def open_sediment_layers(self):
if self.sub_window_exists(
SedimentLayersWindow,
data=[self._study, None]
):
return
sl = SedimentLayersWindow(
study=self._study,
parent=self
)
sl.show()
def open_reach_sediment_layers(self):
reach = self._study.river.current_reach().reach
if self.sub_window_exists(
ReachSedimentLayersWindow,
data=[self._study, None, reach]
):
return
sl = ReachSedimentLayersWindow(
study=self._study,
reach=reach,
parent=self
)
sl.show()
def run_lasest_solver(self):
if self._last_solver is None:
return
self.run_solver(self._last_solver)
def select_and_run_solver(self):
if self._study is None:
return
run = SelectSolverWindow(
study=self._study,
config=self.conf,
parent=self
)
if run.exec():
self.run_solver(run.solver)
def select_run_solver_adists(self):
if self._study is None:
return
# solver = next(filter(lambda x: x._type == "adistswc",
# self.conf.solvers))
# solver = next(filter(lambda x: x.name == "AdisTS-WC",
# self.conf.solvers))
# self.run_solver(solver)
run = SelectSolverWindowAdisTS(
study=self._study,
config=self.conf,
parent=self
)
if run.exec():
self.run_solver_adists(run.solver, run.mage_rep)
def run_solver_adists(self, solver, mage_rep):
if self._study is None:
return
if self.sub_window_exists(
CheckListWindowAdisTS,
data=[
self._study,
self.conf,
solver,
mage_rep
]
):
return
check = CheckListWindowAdisTS(
study=self._study,
config=self.conf,
solver=solver,
parent=self,
mage_rep=mage_rep,
)
check.show()
def run_solver(self, solver):
if self._study is None:
return
if self.sub_window_exists(
CheckListWindow,
data=[
self._study,
self.conf,
solver
]
):
return
check = CheckListWindow(
study=self._study,
config=self.conf,
solver=solver,
parent=self
)
check.show()
def solver_log_adists(self, solver, mage_rep):
sol = SolverLogWindowAdisTS(
study=self._study,
config=self.conf,
solver=solver,
parent=self,
mage_rep=mage_rep,
)
sol.show()
def solver_log(self, solver):
sol = SolverLogWindow(
study=self._study,
config=self.conf,
solver=solver,
parent=self
)
sol.show()
def open_solver_results(self, solver, results=None):
def reading_fn():
self._tmp_results = results
# If no specific results, get last results
if results is None:
def reading_fn():
self._tmp_results = self._last_results
if self._last_results is None:
def reading_fn():
self._tmp_results = solver.results(
self._study,
self._solver_workdir(solver),
)
# Open from file
if type(results) is str:
logger.info(f"Open results from {os.path.dirname(results)}")
name = os.path.basename(results).replace(".BIN", "")
def reading_fn():
self._tmp_results = solver.results(
self._study,
os.path.dirname(results),
name=name
)
dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self)
dlg.exec_()
results = self._tmp_results
# No results available
if results is None:
return
# results does not have values, for example if geometry missmatch
if not results.is_valid:
return
# Windows already opened
if self.sub_window_exists(
ResultsWindow,
data=[
self._study,
None, # No config,
results._solver,
results._repertory,
results._name]
):
return
res = ResultsWindow(
study=self._study,
results=[results],
parent=self
)
res.show()
def open_solver_results_adists(self, solver, results=None):
def reading_fn():
self._tmp_results = results
# If no specific results, get last results
if results is None:
def reading_fn():
self._tmp_results = self._last_results
if self._last_results is None:
def reading_fn():
self._tmp_results = solver.results(
self._study,
self._solver_workdir(solver),
)
# Open from file
if type(results) is str:
logger.info(f"Open results from {os.path.dirname(results)}")
name = os.path.basename(results).replace(".BIN", "")
def reading_fn():
self._tmp_results = solver.results(
self._study,
os.path.dirname(results),
name=name
)
dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self)
dlg.exec_()
results = self._tmp_results
# No results available
if results is None:
return
# Windows already opened
if self.sub_window_exists(
ResultsWindowAdisTS,
data=[
self._study,
None, # No config
solver,
results._repertory,
results._name,
]
):
return
res = ResultsWindowAdisTS(
study=self._study,
solver=solver,
results=results,
parent=self
)
res.show()
def _solver_workdir(self, solver):
workdir = os.path.join(
os.path.dirname(self._study.filename),
"_PAMHYR_",
self._study.name.replace(" ", "_"),
solver.name.replace(" ", "_"),
)
return workdir
def open_last_results(self):
if self._last_solver is None:
return
if self._last_solver._type == "mage8":
self.open_solver_results(self._last_solver,
self._last_results)
elif self._last_solver._type == "adistswc":
self.open_solver_results_adists(self._last_solver,
self._last_results)
def open_results_from_file(self):
if self._study is None:
return
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
dialog = QFileDialog(self, options=options)
dialog.setFileMode(QFileDialog.FileMode.ExistingFile)
dialog.setDefaultSuffix(".BIN")
# dialog.setFilter(dialog.filter() | QtCore.QDir.Hidden)
dialog.setNameFilters(['Mage (*.BIN)'])
if self._study.filename is not None:
if self._last_solver is None:
dialog.setDirectory(
os.path.dirname(self._study.filename)
)
else:
dialog.setDirectory(
self._solver_workdir(self._last_solver)
)
if dialog.exec_():
file_name = dialog.selectedFiles()
logger.info(f"Select results: {file_name}")
self.open_solver_results(
Mage8("Mage"),
results=file_name[0]
)
def compare_results(self):
if self._study is None:
return
run = CompareSolversWindow(
study=self._study,
config=self.conf,
parent=self
)
if run.exec():
results = self.diff_results(run.solver1,
run.solver2)
else:
return
# At least one result not available
if results is None:
return
# Windows already opened
if self.sub_window_exists(
ResultsWindow,
data=[self._study, None] +
[r._solver for r in results] +
[r._repertory for r in results] +
[r._name for r in results]
):
return
res = ResultsWindow(
study=self._study,
results=results,
parent=self
)
res.show()
def diff_results(self, solver1, solver2):
if solver1 is None:
# TODO message
return None
if solver2 is None:
# TODO message
return None
solver3 = GenericSolver(solver1.name+" - "+solver2.name)
solver4 = GenericSolver(solver1.name)
solver5 = GenericSolver(solver2.name)
result1 = solver1.results(
self._study,
self._solver_workdir(solver1),
)
if result1 is None:
# TODO message
return None
result2 = solver2.results(
self._study,
self._solver_workdir(solver2),
)
if result2 is None:
# TODO message
return None
if result2.get("nb_reach") != result1.get("nb_reach"):
# TODO message
return None
if result2.get("nb_profile") != result1.get("nb_profile"):
# TODO message
return None
# return [result1, result2]
result3 = Results(self._study, solver3)
result4 = Results(self._study, solver1)
result5 = Results(self._study, solver2)
result3.set("nb_reach", result1.get("nb_reach"))
result4.set("nb_reach", result1.get("nb_reach"))
result5.set("nb_reach", result1.get("nb_reach"))
result3.set("nb_profile", result1.get("nb_profile"))
result4.set("nb_profile", result1.get("nb_profile"))
result5.set("nb_profile", result1.get("nb_profile"))
ts = list(result1.get("timestamps").intersection(
result2.get("timestamps")))
result3.set("timestamps", ts)
result4.set("timestamps", ts)
result5.set("timestamps", ts)
for i in range(int(result1.get("nb_reach"))):
# Add reach to results reach list
r = result3.river.add(i)
r = result4.river.add(i)
r = result5.river.add(i)
for timestamp in result3.get("timestamps"):
for r in range(int(result1.get("nb_reach"))):
reach1 = result1.river.reach(r)
reach2 = result2.river.reach(r)
reach3 = result3.river.reach(r)
reach4 = result4.river.reach(r)
reach5 = result5.river.reach(r)
for p, (profile1, profile2) in enumerate(zip(
reach1.profiles, reach2.profiles)):
for key in ["Z", "Q", "V"]:
d1 = profile1.get_ts_key(timestamp, key)
d2 = profile2.get_ts_key(timestamp, key)
d = d1-d2
reach3.set(p, timestamp, key, d)
reach4.set(p, timestamp, key, d1)
reach5.set(p, timestamp, key, d2)
limits = reach3.profile(p).geometry.get_water_limits(
reach3.profile(p).get_ts_key(timestamp, "Z")
)
reach3.set(
p, timestamp,
"water_limits",
limits
)
limits = profile1.get_ts_key(timestamp, "water_limits")
reach4.set(p, timestamp, "water_limits", limits)
limits = profile2.get_ts_key(timestamp, "water_limits")
reach5.set(p, timestamp, "water_limits", limits)
return [result4, result5, result3]
def open_results_adists(self):
if self._study is None:
return
if self._study.filename == "":
return
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
dialog = QFileDialog(self, options=options)
dialog.setFileMode(QFileDialog.DirectoryOnly)
# QFileDialog.FileMode.ExistingFile)
if self._last_solver is None:
dialog.setDirectory(
os.path.dirname(self._study.filename)
)
else:
dialog.setDirectory(
self._solver_workdir(self._last_solver)
)
if dialog.exec_():
dir_path = dialog.selectedFiles()[0]
dir_list = os.listdir(dir_path)
if "resultats" in dir_list:
dir_path = os.path.join(dir_path, "resultats")
dir_list = os.listdir(dir_path)
bin_list = list(filter(lambda s: "bin" in s, dir_list))
if len(bin_list) < 1:
# TODO message
return
path = os.path.normpath(dir_path)
solver_name = path.split(os.sep)[-2]
solver_results = next(filter(lambda x: x.name == solver_name,
self.conf.solvers))
solver_results_adists = solver_results.results(
self._study,
repertory=dir_path, qlog=None) # self._output)
logger.info(f"Select results: {dir_path}")
if len(bin_list) >= 2 and ("total_sediment.bin" in bin_list):
self.open_solver_results_adists(
solver_results,
results=solver_results_adists,
)
else:
dlg = QDialog(self)
dlg.setWindowTitle("AdisTS Results")
layout = QVBoxLayout()
message = QLabel("AdisTS Results not found")
layout.addWidget(message)
dlg.setLayout(layout)
dlg.exec()
#################
# DOCUMENTATION #
#################
def _doc_path_file(self, filename):
if ".py" in sys.argv[0]:
return os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..", "..", "doc", filename
)
)
return os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..", "..", "doc", filename
)
)
def open_doc(self, filename):
if "https://" in filename:
url = QUrl(filename)
QDesktopServices.openUrl(url)
return
if _doc == "external":
url = QUrl(f"file://{self._doc_path_file(filename)}")
QDesktopServices.openUrl(url)
else:
if ".odt" in filename:
url = QUrl(f"file://{DocWindow._path_file(filename)}")
QDesktopServices.openUrl(url)
else:
doc = DocWindow(
filename=filename,
parent=self
)
doc.show()
def open_doc_user(self):
self.open_doc(
"https://gitlab.irstea.fr/theophile.terraz/pamhyr/-/wikis/home"
)
def open_doc_dev(self, ext="pdf"):
self.open_doc(f"Pamhyr2-dev.{ext}")
def open_doc_mage(self):
self.open_doc("mage8.pdf")
#########
# DEBUG #
#########
def open_debug(self):
repl = ReplWindow(
study=self._study,
config=self.conf,
parent=self
)
repl.show()
def open_sqlite(self):
if self._study is None:
logger.debug("No study open for sql debuging...")
return
file = self._study.filename
_ = subprocess.Popen(
f"sqlitebrowser {file}",
shell=True
)