# WindowAdisTS.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import logging import tempfile from queue import Queue from tools import trace, timer, logger_exception from View.Tools.PamhyrWindow import PamhyrDialog, PamhyrWindow from PyQt5.QtGui import ( QKeySequence, ) from PyQt5.QtCore import ( Qt, QVariant, QAbstractTableModel, QCoreApplication, QModelIndex, pyqtSlot, QRect, QTimer, QProcess, ) from PyQt5.QtWidgets import ( QDialogButtonBox, QPushButton, QLineEdit, QFileDialog, QTableView, QAbstractItemView, QUndoStack, QShortcut, QAction, QItemDelegate, QComboBox, QVBoxLayout, QHeaderView, QTabWidget, QTextEdit, ) from View.RunSolver.Log.Window import SolverLogFileWindow from View.Results.ReadingResultsDialog import ReadingResultsDialog try: from signal import SIGTERM, SIGSTOP, SIGCONT _signal = True except Exception: _signal = False _translate = QCoreApplication.translate logger = logging.getLogger() class SelectSolverWindowAdisTS(PamhyrDialog): _pamhyr_ui = "SelectSolverAdisTS" _pamhyr_name = "Select solver" def __init__(self, study=None, config=None, parent=None): self._solver = None name = _translate("Solver", "Select solver") super(SelectSolverWindowAdisTS, self).__init__( title=name, study=study, config=config, options=[], parent=parent ) self.setup_combobox() self.setup_connections() self.select_last_solver() def setup_combobox(self): # solvers = self._config.solvers # solvers mage solvers = list(filter( lambda x: "mage" not in x._type, self._config.solvers )) solvers_name = list( map( self._format_solver_name, solvers ) ) solvers_mage = list(filter( lambda x: "mage" in x._type.lower(), self._config.solvers )) solvers_mage_names = list(map(lambda x: x._name, solvers_mage)) solvers_dir = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), ) dir_solvers_List = os.listdir(solvers_dir) display_mage_names = list(filter( lambda x: x in solvers_mage_names, dir_solvers_List )) self.combobox_add_items("comboBox", solvers_name) self.combobox_add_items("comboBoxRepMage", display_mage_names) def setup_connections(self): self.find(QPushButton, "pushButton_run").clicked.connect(self.accept) self.find(QPushButton, "pushButton_cancel") \ .clicked.connect(self.reject) def select_last_solver(self): solvers = self._config.solvers last = self._config.last_solver_name solver = list( filter( lambda s: s.name == last, solvers ) ) if len(solver) != 0: self.set_combobox_text( "comboBox", self._format_solver_name(solver[0]) ) def _format_solver_name(self, solver): return f"{solver.name} - ({solver._type})" @property def solver(self): return self._solver @property def mage_rep(self): return self._mage_result_rep def accept(self): solver_name = self.get_combobox_text("comboBox") solver_name = solver_name.rsplit(" - ", 1)[0] self._mage_result_rep = self.get_combobox_text("comboBoxRepMage") self._config.update_last_solver_used(solver_name) self._solver = next( filter( lambda s: s.name == solver_name, self._config.solvers ) ) super(SelectSolverWindowAdisTS, self).accept() class SolverLogWindowAdisTS(PamhyrWindow): _pamhyr_ui = "SolverLogAdisTS" _pamhyr_name = "Solver Log" def __init__(self, study=None, config=None, solver=None, parent=None, mage_rep=None): self._solver = solver self._results = None self._results_mage = None self._mage_rep = mage_rep name = _translate("Solver", "Select log") super(SolverLogWindowAdisTS, self).__init__( title=name, study=study, config=config, options=[], parent=parent ) self.setup_action() self.setup_alarm() self.setup_connections() self.setup_workdir() self.setup_process() ok = self.export() if ok: self.run() else: self._log( f" *** Failed to export study to {self._solver._type}", color="red" ) def setup_action(self): self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) self.find(QAction, "action_results").setEnabled(False) def setup_alarm(self): self._alarm = QTimer() def setup_connections(self): self.find(QAction, "action_start").triggered.connect(self.start) self.find(QAction, "action_pause").triggered.connect(self.pause) self.find(QAction, "action_stop").triggered.connect(self.stop) self.find(QAction, "action_log_file").triggered.connect(self.log_file) self.find(QAction, "action_results").triggered.connect(self.results) self.find(QAction, "action_results_Mage").triggered.connect( self.resultsMage ) self._alarm.timeout.connect(self.update) def setup_workdir(self): self._workdir = "" if self._study.filename == "": self._workdir = tempfile.TemporaryDirectory() else: self._workdir = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), self._solver.name.replace(" ", "_"), ) os.makedirs(self._workdir, exist_ok=True) def setup_process(self): self._alarm.start(100) self._output = Queue() self._process = self.new_process(self._parent) def new_process(self, parent): new = QProcess(parent) new.setWorkingDirectory(self._workdir) new.setProcessChannelMode(QProcess.MergedChannels) return new def export(self): self._log(f" *** Export study {self._solver.name}", color="blue") ok = self._solver.export( self._study, self._workdir, self._mage_rep, qlog=self._output ) self.update() return ok def closeEvent(self, event): self._alarm.stop() super(SolverLogWindowAdisTS, self).closeEvent(event) def _copy(self): self.find(QTextEdit, "textEdit").copy() ####### # LOG # ####### def _log(self, msg, color=None): if type(msg) is str: self._log_str(msg, color) elif type(msg) is int: self._log_int(msg, color) def _log_str(self, msg, color=None): if msg == "": return logger.info(f"solver: {msg}") msg = msg.rsplit('\n', 1)[0] if color is not None: msg = f"" + msg + "" self.find(QTextEdit, "textEdit").append(msg) def _log_int(self, int_code, color=None): logger.info(f"solver: Returns {int_code}") color = "blue" if int_code == 0 else "red" self.find(QTextEdit, "textEdit")\ .append( f"" + f" *** Finished with code {int_code}" + "" ) self.statusbar.showMessage( "Done" if int_code == 0 else "Failed", 3000 ) ########## # UPDATE # ########## def update(self): if self._solver.is_stoped(): self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) self.find(QAction, "action_results").setEnabled(True) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) self._update_logs_all() # self._update_get_results() self._update_logs_all() def _update_get_results(self): if self._results is None: def reading_fn(): try: self._results = self._solver.results( self._study, self._workdir, qlog=self._output ) self._parent.set_results(self._solver, self._results) except Exception as e: logger.error(f"Failed to open results") logger_exception(e) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() def _update_logs_all(self): while self._output.qsize() != 0: s = self._output.get() try: if type(s) is str and "[ERROR]" in s: self._log(s.encode("utf-8"), color="red") else: self._log(s) except Exception as e: logger_exception(e) #################### # Process controle # #################### def run(self): self._log(f" *** Run solver {self._solver.name}", color="blue") self._solver.run( self._study, process=self._process, output_queue=self._output ) def start(self): if self._solver.is_stoped(): self._log(f" *** Export study {self._solver.name}", color="blue") ok = self._solver.export( self._study, self._workdir, self._mage_rep, qlog=self._output ) if not ok: self._log(f" *** Failed to export", color="red") self.update() return else: self.update() self._process = self.new_process(self._parent) self._log(" *** Start", color="blue") self._results = None self._solver.start(self._study, process=self._process) self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) self.find(QAction, "action_results").setEnabled(False) def pause(self): self._log(" *** Pause", color="blue") self._solver.pause() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_results").setEnabled(False) def stop(self): self._log(" *** Stop", color="blue") self._solver.kill() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) self.find(QAction, "action_results").setEnabled(True) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) ########### # Results # ########### def results(self): if self._results is None: dir_list = os.listdir(self._workdir) if "resultats" in dir_list: dir_path = os.path.join(self._workdir, "resultats") else: dir_path = self._workdir def reading_fn(): self._results = self._solver.results( self._study, dir_path, qlog=self._output ) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() self._parent.set_results(self._solver, self._results) self._parent.open_solver_results_adists(self._solver, self._results) self._solver.has_results_loaded() def resultsMage(self): if self._results_mage is None: mage_solver = next(filter( lambda x: x._name == self._mage_rep, self._config.solvers )) workdir_mage = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), mage_solver.name.replace(" ", "_"), ) def reading_fn(): self._results_mage = mage_solver.results( self._study, workdir_mage, qlog=self._output ) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() self._parent.set_results(mage_solver, self._results_mage) self._parent.open_solver_results(mage_solver, self._results_mage) mage_solver.has_results_loaded() def log_file(self): file_name = os.path.join(self._workdir, self._solver.log_file()) log = SolverLogFileWindow( file_name=file_name, study=self._study, config=self._config, solver=self._solver, parent=self, ) log.show()