Pamhyr2/src/View/RunSolver/Window.py

297 lines
9.3 KiB
Python

# Window.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
import tempfile
from queue import Queue
from tools import trace, timer
from View.Tools.PamhyrWindow import PamhyrDialog, PamhyrWindow
from PyQt5.QtGui import (
QKeySequence,
)
from PyQt5.QtCore import (
Qt, QVariant, QAbstractTableModel,
QCoreApplication, QModelIndex, pyqtSlot,
QRect, QTimer, QProcess,
)
from PyQt5.QtWidgets import (
QDialogButtonBox, QPushButton, QLineEdit,
QFileDialog, QTableView, QAbstractItemView,
QUndoStack, QShortcut, QAction, QItemDelegate,
QComboBox, QVBoxLayout, QHeaderView, QTabWidget,
QTextEdit,
)
from View.RunSolver.Log.Window import SolverLogFileWindow
try:
from signal import SIGTERM, SIGSTOP, SIGCONT
_signal = True
except Exception:
_signal = False
_translate = QCoreApplication.translate
logger = logging.getLogger()
class SelectSolverWindow(PamhyrDialog):
_pamhyr_ui = "SelectSolver"
_pamhyr_name = "Select solver"
def __init__(self, study=None, config=None,
parent=None):
self._solver = None
super(SelectSolverWindow, self).__init__(
title=self._pamhyr_name,
study=study,
config=config,
options=[],
parent=parent
)
self.setup_combobox()
self.setup_connections()
def setup_combobox(self):
solvers = self._config.solvers
solvers_name = list(map(lambda s: s.name + f" - ({s._type})", solvers))
self.combobox_add_items("comboBox", solvers_name)
def setup_connections(self):
self.find(QPushButton, "pushButton_run").clicked.connect(self.accept)
self.find(QPushButton, "pushButton_cancel")\
.clicked.connect(self.reject)
@property
def solver(self):
return self._solver
def accept(self):
solver_name = self.get_combobox_text("comboBox")
solver_name = solver_name.rsplit(" - ", 1)[0]
self._solver = next(
filter(
lambda s: s.name == solver_name,
self._config.solvers
)
)
super(SelectSolverWindow, self).accept()
class SolverLogWindow(PamhyrWindow):
_pamhyr_ui = "SolverLog"
_pamhyr_name = "Solver Log"
def __init__(self, study=None, config=None,
solver=None, parent=None):
self._solver = solver
self._results = None
super(SolverLogWindow, self).__init__(
title=self._pamhyr_name,
study=study,
config=config,
options=[],
parent=parent
)
self.setup_action()
self.setup_alarm()
self.setup_connections()
self._workdir = ""
if self._study.filename == "":
self._workdir = tempfile.TemporaryDirectory()
else:
self._workdir = os.path.join(
os.path.dirname(self._study.filename),
"_PAMHYR_",
self._study.name.replace(" ", "_"),
self._solver.name.replace(" ", "_"),
)
os.makedirs(self._workdir, exist_ok=True)
self._alarm.start(500)
self._output = Queue()
self._process = self.new_process(parent)
self._log(f" *** Export study {self._solver.name}", color="blue")
self._solver.export(self._study, self._workdir, qlog=self._output)
self.update()
self._log(f" *** Run solver {self._solver.name}", color="blue")
self._solver.run(
study,
process=self._process,
output_queue=self._output
)
def new_process(self, parent):
new = QProcess(parent)
new.setWorkingDirectory(self._workdir)
new.setProcessChannelMode(QProcess.MergedChannels)
return new
def setup_action(self):
self.find(QAction, "action_start").setEnabled(False)
if _signal:
self.find(QAction, "action_pause").setEnabled(True)
else:
self.find(QAction, "action_pause").setEnabled(False)
self.find(QAction, "action_stop").setEnabled(True)
self.find(QAction, "action_log_file").setEnabled(False)
self.find(QAction, "action_results").setEnabled(False)
def setup_alarm(self):
self._alarm = QTimer()
def setup_connections(self):
self.find(QAction, "action_start").triggered.connect(self.start)
self.find(QAction, "action_pause").triggered.connect(self.pause)
self.find(QAction, "action_stop").triggered.connect(self.stop)
self.find(QAction, "action_log_file").triggered.connect(self.log_file)
self.find(QAction, "action_results").triggered.connect(self.results)
self._alarm.timeout.connect(self.update)
def _log(self, msg, color=None):
if type(msg) is str:
logger.info(f"solver: {msg}")
msg = msg.rsplit('\n')[0]
if color is not None:
msg = f"<font color=\"{color}\">" + msg + "</font>"
self.find(QTextEdit, "textEdit").append(msg)
elif type(msg) is int:
logger.info(f"solver: Returns {msg}")
color = "blue" if msg == 0 else "red"
self.find(QTextEdit, "textEdit")\
.append(f"<font color=\"{color}\">" +
f" *** Finished with code {msg}" +
"</font>")
self.statusbar.showMessage(
"Done" if msg == 0 else "Failed",
3000
)
def update(self):
if self._solver.is_stoped():
self.find(QAction, "action_start").setEnabled(True)
self.find(QAction, "action_pause").setEnabled(False)
self.find(QAction, "action_stop").setEnabled(False)
self.find(QAction, "action_results").setEnabled(True)
if self._solver.log_file() != "":
self.find(QAction, "action_log_file").setEnabled(True)
self._update_get_results()
self._update_logs_all()
def _update_get_results(self):
if self._results is None:
self._results = self._solver.results(
self._study, self._workdir, qlog=self._output
)
self._parent.set_results(self._solver, self._results)
def _update_logs_all(self):
while self._output.qsize() != 0:
s = self._output.get()
if type(s) is str and "[ERROR]" in s:
self._log(s, color="red")
else:
self._log(s)
def start(self):
if self._solver.is_stoped():
self._log(f" *** Export study {self._solver.name}", color="blue")
self._solver.export(self._study, self._workdir, qlog=self._output)
self.update()
self._process = self.new_process(self._parent)
self._log(" *** Start", color="blue")
self._results = None
self._solver.start(self._study, process=self._process)
self.find(QAction, "action_start").setEnabled(False)
if _signal:
self.find(QAction, "action_pause").setEnabled(True)
else:
self.find(QAction, "action_pause").setEnabled(False)
self.find(QAction, "action_stop").setEnabled(True)
self.find(QAction, "action_log_file").setEnabled(False)
self.find(QAction, "action_results").setEnabled(False)
def pause(self):
self._log(" *** Pause", color="blue")
self._solver.pause()
self.find(QAction, "action_start").setEnabled(True)
self.find(QAction, "action_pause").setEnabled(False)
self.find(QAction, "action_stop").setEnabled(True)
self.find(QAction, "action_results").setEnabled(False)
def stop(self):
self._log(" *** Stop", color="blue")
self._solver.kill()
self.find(QAction, "action_start").setEnabled(True)
self.find(QAction, "action_pause").setEnabled(False)
self.find(QAction, "action_stop").setEnabled(False)
self.find(QAction, "action_results").setEnabled(True)
if self._solver.log_file() != "":
self.find(QAction, "action_log_file").setEnabled(True)
def results(self):
if self._results is None:
self._results = self._solver.results(
self._study, self._workdir, qlog=self._output)
self._parent.set_results(self._solver, self._results)
self._parent.open_solver_results(self._solver, self._results)
def log_file(self):
file_name = os.path.join(self._workdir, self._solver.log_file())
log = SolverLogFileWindow(
file_name=file_name,
study=self._study,
config=self._config,
solver=self._solver,
parent=self,
)
log.show()