Pamhyr2/src/Checker/Study.py

264 lines
7.1 KiB
Python

# Study.py -- Pamhyr study checkers
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import time
import logging
from functools import reduce
from PyQt5.QtCore import QCoreApplication
from Modules import Modules
from Checker.Checker import AbstractModelChecker, STATUS
_translate = QCoreApplication.translate
logger = logging.getLogger()
class StudyNetworkReachChecker(AbstractModelChecker):
def __init__(self):
super(StudyNetworkReachChecker, self).__init__()
self._name = _translate("Checker", "Study reach network checker")
self._description = _translate(
"Checker", "Check if exists at least one reach exists"
)
self._modules = Modules.NETWORK
def run(self, study):
if not self.basic_check(study):
return False
river = study.river
edges = list(filter(lambda e: e.is_enable(), river.edges()))
if len(edges) == 0:
self._status = STATUS.ERROR
self._summary = "no_reach_defined"
return False
self._summary = "ok"
self._status = STATUS.OK
return True
class StudyGeometryChecker(AbstractModelChecker):
def __init__(self):
super(StudyGeometryChecker, self).__init__()
self._name = _translate("Checker", "Study geometry checker")
self._description = _translate(
"Checker", "Check if the geometry of each reach exists"
)
self._modules = Modules.GEOMETRY
self._reachs = []
def run(self, study):
ok = True
nerror = 0
summary = "ok"
status = STATUS.OK
if not self.basic_check(study):
return False
river = study.river
edges = river.enable_edges()
if len(edges) == 0:
self._status = STATUS.ERROR
self._summary = "no_reach_defined"
return False
for edge in edges:
if len(edge.reach.profiles) < 2:
summary = "no_geometry_defined"
status = STATUS.ERROR
ok = False
self._reachs.append(edge)
self._summary = summary
self._status = status
return ok
class StudyInitialConditionsChecker(AbstractModelChecker):
def __init__(self):
super(StudyInitialConditionsChecker, self).__init__()
self._name = _translate("Checker", "Study initial conditions checker")
self._description = _translate(
"Checker", "Check initial conditions for each node of study"
)
self._modules = Modules.INITIAL_CONDITION
def run(self, study):
ok = True
nerror = 0
self._summary = "ok"
self._status = STATUS.OK
if not self.basic_check(study):
return False
for reach in study.river.enable_edges():
ok &= self.check_reach(study, reach.reach)
return ok
def check_reach(self, study, reach):
ok = True
river = study.river
if reach not in river.initial_conditions:
self._summary = "missing_initial_condition_defined"
self._status = STATUS.WARNING
return ok
ic = river.initial_conditions[reach]
len_ic = len(ic)
len_reach = len(reach)
if len_ic < len_reach:
self._summary = "initial_condition_missing_profile"
self._status = STATUS.WARNING
if len_ic > len_reach:
self._summary = "more_initial_condition_than_profile"
self._status = STATUS.ERROR
ok = False
return ok
class StudyBoundaryConditionChecker(AbstractModelChecker):
def __init__(self):
super(StudyBoundaryConditionChecker, self).__init__()
self._name = _translate("Checker", "Study boundary conditions checker")
self._description = _translate(
"Checker", "Check boundary conditions for each node of study"
)
self._modules = Modules.BOUNDARY_CONDITION
def run(self, study):
ok = True
nerror = 0
self._summary = "ok"
self._status = STATUS.OK
if not self.basic_check(study):
return False
ok = self.check_liquid(study)
return ok
def check_liquid(self, study):
river = study.river
bcs = river.boundary_condition.get_tab('liquid')
if len(bcs) == 0:
self._status = STATUS.ERROR
self._summary = "no_boundary_condition_defined"
return False
upstream, downstream = reduce(
lambda acc, n: (
acc[0] + [n] if river.is_upstream_node(n) else acc[0],
acc[1] + [n] if river.is_downstream_node(n) else acc[1],
),
filter(
lambda n: river.is_enable_node(n),
river.nodes()
),
([], [])
)
bcs_nodes = set(map(lambda bc: bc.node, bcs))
upstream_ok = self.check_liquid_all_node_has_bc(
bcs_nodes, upstream
)
downstream_ok = self.check_liquid_all_node_has_bc(
bcs_nodes, downstream
)
ok = upstream_ok and downstream_ok
if not ok:
self._status = STATUS.ERROR
self._summary = "no_boundary_condition_at_boundary_node"
return ok
def check_liquid_all_node_has_bc(self, bcs_nodes, nodes):
return reduce(
lambda acc, n: (acc and (n in bcs_nodes)),
nodes,
True
)
class DummyOK(AbstractModelChecker):
def __init__(self):
super(DummyOK, self).__init__()
self._name = _translate("Checker", "Dummy ok")
self._description = _translate("Checker", "Dummy ok")
def run(self, study):
time.sleep(1)
self._summary = "ok"
self._status = STATUS.OK
return True
class DummyWARNING(AbstractModelChecker):
def __init__(self):
super(DummyWARNING, self).__init__()
self._name = _translate("Checker", "Dummy warning")
self._description = _translate("Checker", "Dummy warning")
def run(self, study):
time.sleep(1)
self._summary = "Warning detected"
self._status = STATUS.WARNING
return True
class DummyERROR(AbstractModelChecker):
def __init__(self):
super(DummyERROR, self).__init__()
self._name = _translate("Checker", "Dummy error")
self._description = _translate("Checker", "Dummy error")
def run(self, study):
time.sleep(1)
self._summary = "Error detected"
self._status = STATUS.ERROR
return True