# -*- coding: utf-8 -*- import tempfile import os from queue import Queue from tools import trace, timer from View.ASubWindow import ASubWindow, ASubMainWindow from View.ListedSubWindow import ListedSubWindow from PyQt5.QtGui import ( QKeySequence, ) from PyQt5.QtCore import ( Qt, QVariant, QAbstractTableModel, QCoreApplication, QModelIndex, pyqtSlot, QRect, QTimer, QProcess, ) from PyQt5.QtWidgets import ( QDialogButtonBox, QPushButton, QLineEdit, QFileDialog, QTableView, QAbstractItemView, QUndoStack, QShortcut, QAction, QItemDelegate, QComboBox, QVBoxLayout, QHeaderView, QTabWidget, QTextEdit, ) from View.RunSolver.Log.Window import SolverLogFileWindow try: from signal import SIGTERM, SIGSTOP, SIGCONT _signal = True except: _signal = False _translate = QCoreApplication.translate class SelectSolverWindow(ASubWindow, ListedSubWindow): def __init__(self, title="Select solver", study=None, config=None, parent=None): self._title = title self._study = study self._config = config self._solver = None super(SelectSolverWindow, self).__init__( name=self._title, ui="SelectSolver", parent=parent ) self.ui.setWindowTitle(self._title) self.setup_combobox() self.setup_connections() def setup_combobox(self): solvers = self._config.solvers solvers_name = list(map(lambda s: s.name + f" - ({s._type})", solvers)) self.combobox_add_items("comboBox", solvers_name) def setup_connections(self): self.find(QPushButton, "pushButton_run").clicked.connect(self.accept) self.find(QPushButton, "pushButton_cancel").clicked.connect(self.reject) @property def solver(self): return self._solver def accept(self): solver_name = self.get_combobox_text("comboBox") solver_name = solver_name.rsplit(" - ", 1)[0] self._solver = next( filter( lambda s: s.name == solver_name, self._config.solvers ) ) super(SelectSolverWindow, self).accept() class SolverLogWindow(ASubMainWindow, ListedSubWindow): def __init__(self, title="Solver logs", study=None, config=None, solver=None, parent=None): self._title = title self._parent = parent self._study = study self._config = config self._solver = solver super(SolverLogWindow, self).__init__( name=self._title, ui="SolverLog", parent=parent ) self.ui.setWindowTitle(self._title) self.setup_action() self.setup_alarm() self.setup_connections() self._workdir = "" if self._study.filename == "": self._workdir = tempfile.TemporaryDirectory() else: self._workdir = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), self._solver.name.replace(" ", "_"), ) os.makedirs(self._workdir, exist_ok = True) self._alarm.start(500) self._output = Queue() self._process = self.new_process(parent) self._log(f" *** Export study {self._solver.name}", color="blue") self._solver.export(self._study, self._workdir, qlog = self._output) self.update() self._log(f" *** Run solver {self._solver.name}", color="blue") self._solver.run( process = self._process, output_queue = self._output ) def new_process(self, parent): new = QProcess(parent) new.setWorkingDirectory(self._workdir) return new def setup_action(self): self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) def setup_alarm(self): self._alarm = QTimer() def setup_connections(self): self.find(QAction, "action_start").triggered.connect(self.start) self.find(QAction, "action_pause").triggered.connect(self.pause) self.find(QAction, "action_stop").triggered.connect(self.stop) self.find(QAction, "action_log_file").triggered.connect(self.log_file) self._alarm.timeout.connect(self.update) def _log(self, msg, color=None): if type(msg) == str: msg = msg.rsplit('\n')[0] if color is not None: msg = f"" + msg + "" self.find(QTextEdit, "textEdit").append(msg) elif type(msg) == int: color = "blue" if msg == 0 else "red" self.find(QTextEdit, "textEdit")\ .append(f"" + f" *** Finished with code {msg}" + "") def update(self): if self._solver.is_stoped(): self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) while self._output.qsize() != 0: s = self._output.get() self._log(s) def start(self): if self._solver.is_stoped(): self._log(f" *** Export study {self._solver.name}", color="blue") self._solver.export(self._study, self._workdir, qlog = self._output) self.update() self._process = self.new_process(self._parent) self._log(" *** Start", color="blue") self._solver.start(self._process) self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) def pause(self): self._log(" *** Pause", color="blue") self._solver.pause() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) def stop(self): self._log(" *** Stop", color="blue") self._solver.kill() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) def log_file(self): file_name = os.path.join(self._workdir, self._solver.log_file()) log = SolverLogFileWindow( file_name = file_name, study = self._study, config = self._config, solver = self._solver, parent = self, ) log.show()