Pamhyr2/src/Model/Network/Graph.py

246 lines
6.1 KiB
Python

# Graph.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
from functools import reduce
from Model.Network.Node import Node
from Model.Network.Edge import Edge
class Graph(object):
def __init__(self, status=None):
super(Graph, self).__init__()
self._status = status
self._node_ctor = Node
self._edge_ctor = Edge
self._nodes = []
self._edges = []
def __repr__(self):
return f"Graph {{nodes: {self._nodes}, edges: {self._edges}}}"
def nodes(self):
return self._nodes
def nodes_names(self):
return list(map(lambda n: n.name, self._nodes))
def edges(self):
return self._edges
def enable_edges(self):
return list(
self._enable_edges()
)
def _enable_edges(self):
"""Return a generator"""
return filter(
lambda e: e.is_enable(),
self._edges
)
def edges_names(self):
return list(map(lambda e: e.name, self._edges))
def nodes_counts(self):
return len(self._nodes)
def edges_counts(self):
return len(self._edges)
def enable_nodes_counts(self):
return reduce(
lambda acc, n: acc + 1 if self.is_enable_node(n) else acc,
self._nodes,
0
)
def enable_edges_counts(self):
return reduce(
lambda acc, e: acc + 1 if e.is_enable() else acc,
self._edges,
0
)
def is_node_exists(self, node_name):
return reduce(
lambda acc, n: (acc or (n.name == node_name)),
self._nodes,
False
)
def is_edge_exists(self, edge_name):
return reduce(
lambda acc, e: (acc or (e.name == edge_name)),
self._edges,
False
)
def node(self, node_name: str):
node = list(
filter(
lambda n: n.name == node_name,
self._nodes
)
)
if len(node) == 0:
return None
return node[0]
def edge(self, edge_name: str):
edge = list(
filter(
lambda e: e.name == edge_name,
self._edges
)
)
if len(edge) == 0:
return None
return edge[0]
def _create_node(self, x: float, y: float):
node = self._node_ctor(
-1,
"",
x=x, y=y,
status=self._status
)
return node
def _add_node(self, node):
self._nodes.append(node)
self._status.modified()
return node
def add_node(self, x: float = 0.0, y: float = 0.0):
node = self._create_node(x, y)
return self._add_node(node)
def insert_node(self, node):
return self._add_node(node)
def remove_node(self, node):
self._nodes.remove(node)
self._remove_associated_edge(node)
self._status.modified()
def _remove_associated_edge(self, node: str):
edges = list(
filter(
lambda e: (e.node1 == node or
e.node2 == node),
self._edges,
)
)
for edge in edges:
self.remove_edge(edge)
def create_node(self, x: float = 0.0, y: float = 0.0):
node = self._create_node(x, y)
return node
def _create_edge(self, n1: Node, n2: Node):
edge = self._edge_ctor(
-1,
"", n1, n2,
status=self._status
)
return edge
def _add_edge(self, edge):
# # This edge already exists ?
# if any(filter(lambda e: (e.node1 == edge.node1 and
# e.node2 == edge.node2),
# self._edges)):
# return None
self._edges.append(edge)
self._status.modified()
return edge
def add_edge(self, n1: Node, n2: Node):
edge = self._create_edge(n1, n2)
return self._add_edge(edge)
def insert_edge(self, edge):
return self._add_edge(edge)
def create_edge(self, n1: Node, n2: Node):
return self._create_edge(n1, n2)
def remove_edge(self, edge):
self._edges.remove(edge)
self._status.modified()
def is_upstream_node(self, node):
return reduce(
lambda acc, e: (acc and (e.node2 != node or not e.enable)),
self._enable_edges(),
True
)
def is_downstream_node(self, node):
return reduce(
lambda acc, e: (acc and (e.node1 != node or not e.enable)),
self._enable_edges(),
True
)
def is_internal_node(self, node):
return not (
self.is_upstream_node(node) or
self.is_downstream_node(node)
)
def is_enable_node(self, node):
return reduce(
lambda acc, e: (
acc or (
(e.node1 == node or
e.node2 == node)
)
),
self._enable_edges(),
False
)
def is_enable_edge(self, edge):
return edge._enable
# def get_edge_id(self, reach):
# for i, e in enumerate(self.enable_edges):
# if e.id == reach.id:
# return i
def get_edge_id(self, reach):
return next(
filter(
lambda e: e[1].id == reach.id,
enumerate(self.enable_edges())
)
)[0]