Pamhyr2/src/Checker/Mage.py

254 lines
7.1 KiB
Python

# Mage.py -- Pamhyr MAGE checkers
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import time
from queue import Queue
from tools import flatten, timer
from PyQt5.QtCore import QCoreApplication
from Modules import Modules
from Checker.Checker import AbstractModelChecker, STATUS
from Checker.Study import StudyGeometryChecker
_translate = QCoreApplication.translate
class MageNetworkGraphChecker(AbstractModelChecker):
def __init__(self, connectivity=True, version="mage8"):
super(MageNetworkGraphChecker, self).__init__()
self._mode_conn = connectivity
if connectivity:
mode = "connectivity"
else:
mode = "cycle"
self._name = _translate(
"Checker", f"Mage network graph {mode} checker")
self._description = _translate(
"Checker", "Check if the network graph is valid")
self._solver = version
self._modules = Modules.NETWORK
@timer
def _connectivity(self, summary, status, graph):
# Keep only enabled edges
edges = list(
filter(
lambda e: e.is_enable(),
graph.edges()
)
)
# Get all related nodes
nodes = list(
set(
flatten(
map(
lambda e: [e.node1, e.node2],
edges
)
)
)
)
# Visite graph
q = Queue()
for node in nodes:
if graph.is_upstream_node(node):
q.put(node)
break # We take only one node
if q.qsize() == 0:
summary = "no_upstream_node"
status = STATUS.ERROR
return summary, status
visited = set()
while q.qsize() != 0:
current = q.get()
if current is None:
continue
# Cut potential infinite loop on graph cycle
if current in visited:
continue
# Get next node(s) to visite
nexts = flatten(
map(
lambda e: [e.node1, e.node2],
filter(
lambda e: e.node1 == current or e.node2 == current,
edges
)
)
)
for n in nexts:
q.put(n)
# Visited node
visited.add(current)
if len(visited) != len(nodes):
summary = "network_connectivity"
status = STATUS.ERROR
return summary, status
return summary, status
@timer
def _cycle(self, summary, status, graph):
# Keep only enabled edges
edges = list(
filter(
lambda e: e.is_enable(),
graph.edges()
)
)
for edge in edges:
# Visite graph starting from EDGE source node (INITIAL)
q = Queue()
initial = edge.node1
q.put(initial)
visited = set()
while q.qsize() != 0:
current = q.get()
if current is None:
continue
# Cut potential infinite loop on subgraph cycle
if current in visited:
continue
related_edges = list(
filter(
lambda e: e.node1 == current,
edges
)
)
# Get next node(s) to visite
nexts = list(
map(
lambda e: e.node2,
related_edges
)
)
# The initial node cannot be visited a second time,
# otherelse there is a cycle in the graph
if initial in nexts:
summary = "cycle_detected"
status = STATUS.ERROR
return summary, status
for n in nexts:
q.put(n)
visited.add(current)
return summary, status
def run(self, study):
summary = "ok"
status = STATUS.OK
if study is None:
self._status = STATUS.ERROR
self._summary = "invalid_study"
return False
river = study.river
if river is None:
self._status = STATUS.ERROR
self._summary = "no_river_found"
return False
edges = list(filter(lambda e: e.is_enable(), river.edges()))
if len(edges) == 0:
self._status = STATUS.ERROR
self._summary = "no_reach_defined"
return False
if self._mode_conn:
summary, status = self._connectivity(summary, status, river)
else:
summary, status = self._cycle(summary, status, river)
self._summary = summary
self._status = status
return True
class MageGeometryGuideLineChecker(StudyGeometryChecker):
def __init__(self, version="mage8"):
super(MageGeometryGuideLineChecker, self).__init__()
self._name = _translate("Checker", "Mage geometry guideline checker")
self._description = _translate(
"Checker",
"Check if exists geometry guidelines are correctly defined \
for each reach"
)
self._solver = version
self._modules = Modules.GEOMETRY
self._reachs = []
def run(self, study):
ok = super(MageGeometryGuideLineChecker, self).run(study)
if not ok:
return ok
river = study.river
edges = river.enable_edges()
gls = []
for edge in edges:
comp, incomp = edge.reach.compute_guidelines()
if len(incomp) != 0:
self._status = STATUS.WARNING
self._summary = "incomplete_guideline"
return False
gls.append(comp)
profiles = edge.reach.profiles
for profile in profiles:
if not profile.has_standard_named_points():
self._status = STATUS.WARNING
self._summary = "no_standard_guideline_defined"
return False
for gl in gls[1:]:
if len(gls[0].symmetric_difference(gl)) != 0:
self._status = STATUS.WARNING
self._summary = "no_all_reaches_do_not_have_same_defined"
return False
return ok