Pamhyr2/src/Checker/Mage.py

179 lines
4.7 KiB
Python

# -*- coding: utf-8 -*-
import time
from queue import Queue
from tools import flatten, timer
from PyQt5.QtCore import QCoreApplication
from Checker.Checker import AbstractModelChecker, STATUS
_translate = QCoreApplication.translate
class MageNetworkGraphChecker(AbstractModelChecker):
def __init__(self, connectivity = True):
super(MageNetworkGraphChecker, self).__init__()
self._mode_conn = connectivity
if connectivity:
mode = "connectivity"
else:
mode = "cycle"
self._name = _translate("Checker", f"Mage network graph {mode} checker")
self._description = _translate("Checker", "Check if the network graph is valid")
@timer
def _connectivity(self, summary, status, graph):
# Keep only enabled edges
edges = list(
filter(
lambda e: e.is_enable(),
graph.edges()
)
)
# Get all related nodes
nodes = list(
set(
flatten(
map(
lambda e: [e.node1, e.node2],
edges
)
)
)
)
# Visite graph
q = Queue()
for node in nodes:
if graph.is_upstream_node(node):
q.put(node)
break # We take only one node
if q.qsize() == 0:
summary = "no_upstream_node"
status = STATUS.ERROR
return summary, status
visited = set()
while q.qsize() != 0:
current = q.get()
if current is None:
continue
# Cut potential infinite loop on graph cycle
if current in visited:
continue
# Get next node(s) to visite
nexts = flatten(
map(
lambda e: [e.node1, e.node2],
filter(
lambda e: e.node1 == current or e.node2 == current,
edges
)
)
)
for n in nexts:
q.put(n)
# Visited node
visited.add(current)
if len(visited) != len(nodes):
summary = "network_connectivity"
status = STATUS.ERROR
return summary, status
return summary, status
@timer
def _cycle(self, summary, status, graph):
# Keep only enabled edges
edges = list(
filter(
lambda e: e.is_enable(),
graph.edges()
)
)
for edge in edges:
# Visite graph starting from EDGE source node (INITIAL)
q = Queue()
initial = edge.node1
q.put(initial)
visited = set()
while q.qsize() != 0:
current = q.get()
if current is None:
continue
# Cut potential infinite loop on subgraph cycle
if current in visited:
continue
related_edges = list(
filter(
lambda e: e.node1 == current,
edges
)
)
# Get next node(s) to visite
nexts = list(
map(
lambda e: e.node2,
related_edges
)
)
# The initial node cannot be visited a second time,
# otherelse there is a cycle in the graph
if initial in nexts:
summary = "cycle_detected"
status = STATUS.ERROR
return summary, status
for n in nexts:
q.put(n)
visited.add(current)
return summary, status
def run(self, study):
summary = "ok"
status = STATUS.OK
if study is None:
self._status = STATUS.ERROR
self._summary = "invalid_study"
return False
river = study.river
if river is None:
self._status = STATUS.ERROR
self._summary = "no_river_found"
return False
edges = list(filter(lambda e: e.is_enable(), river.edges()))
if len(edges) == 0:
self._status = STATUS.ERROR
self._summary = "no_reach_defined"
return False
if self._mode_conn:
summary, status = self._connectivity(summary, status, river)
else:
summary, status = self._cycle(summary, status, river)
self._summary = summary
self._status = status
return True