# River.py -- Pamhyr river model # Copyright (C) 2023 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- from tools import flatten from Model.Tools.PamhyrDB import SQLSubModel from Model.Network.Node import Node from Model.Network.Edge import Edge from Model.Network.Graph import Graph from Model.Geometry.Profile import Profile from Model.Geometry.Reach import Reach from Model.BoundaryCondition.BoundaryConditionList import BoundaryConditionList from Model.LateralContribution.LateralContributionList import ( LateralContributionList ) from Model.InitialConditions.InitialConditionsDict import InitialConditionsDict from Model.Stricklers.StricklersList import StricklersList from Model.Friction.FrictionList import FrictionList from Model.SolverParameters.SolverParametersList import SolverParametersList from Model.SedimentLayer.SedimentLayerList import SedimentLayerList from Model.Reservoir.ReservoirList import ReservoirList from Model.HydraulicStructures.HydraulicStructuresList import ( HydraulicStructureList, ) from Solver.Solvers import solver_type_list class RiverNode(Node, SQLSubModel): _sub_classes = [] def __init__(self, id: str, name: str, x: float, y: float, status=None): super(RiverNode, self).__init__( id, name, x, y, status=status ) self._locker = None @classmethod def _db_create(cls, execute): execute(""" CREATE TABLE river_node( id INTEGER NOT NULL PRIMARY KEY, name TEXT NOT NULL, x REAL NOT NULL, y REAL NOT NULL ) """) cls._create_submodel(execute) return True @classmethod def _db_update(cls, execute, version): return True @classmethod def _db_load(cls, execute, data=None): nodes = [] table = execute("SELECT id, name, x, y FROM river_node") for row in table: # Update id counter cls._id_cnt = max(cls._id_cnt, row[0]) # Create new node nodes.append(cls(*row, status=data["status"])) return nodes def _db_save(self, execute, data=None): sql = ( "INSERT OR REPLACE INTO river_node(id, name, x, y) VALUES (" + f"{self.id}, '{self._db_format(self.name)}', " + f"{self.x}, {self.y}" + ")" ) execute(sql) return True @property def locker(self): return self._locker @locker.setter def locker(self, locker): self._locker = locker class RiverReach(Edge, SQLSubModel): _sub_classes = [ Reach, FrictionList, ] def __init__(self, id: str, name: str, node1: RiverNode = None, node2: RiverNode = None, status=None): super(RiverReach, self).__init__( id, name, node1, node2, status=status ) self._reach = Reach(status=self._status, parent=self) self._frictions = FrictionList(status=self._status) @classmethod def _db_create(cls, execute): execute(""" CREATE TABLE river_reach( id INTEGER NOT NULL PRIMARY KEY, name TEXT NOT NULL, enable BOOLEAN NOT NULL, node1 INTEGER, node2 INTEGER, FOREIGN KEY(node1) REFERENCES river_node(id), FOREIGN KEY(node2) REFERENCES river_node(id) ) """) cls._create_submodel(execute) return True @classmethod def _db_update(cls, execute, version): return cls._update_submodel(execute, version) @classmethod def _db_load(cls, execute, data=None): reachs = [] if data is None: data = {} table = execute( "SELECT id, name, enable, node1, node2 FROM river_reach") for row in table: # Update id counter cls._id_cnt = max(cls._id_cnt, row[0]) # Create new reach id = row[0] name = row[1] enable = (row[2] == 1) # Get nodes corresponding to db foreign key id node1 = next(filter(lambda n: n.id == row[3], data["nodes"])) node2 = next(filter(lambda n: n.id == row[4], data["nodes"])) new = cls(id, name, node1, node2, status=data["status"]) new.enable(enable=enable) data["reach"] = id data["parent"] = new new._reach = Reach._db_load(execute, data) new._frictions = FrictionList._db_load(execute, data) reachs.append(new) return reachs def _db_save(self, execute, data=None): sql = ( "INSERT OR REPLACE INTO " + "river_reach(id, name, enable, node1, node2) " + "VALUES (" + f"{self.id}, '{self._db_format(self._name)}', " + f"{self._db_format(self.is_enable())}," f"{self.node1.id}, {self.node2.id}" + ")" ) execute(sql) if data is None: data = {} data["reach"] = self objs = [self._reach, self._frictions] return self._save_submodel(execute, objs, data) @property def reach(self): return self._reach @property def frictions(self): return self._frictions class River(Graph, SQLSubModel): _sub_classes = [ RiverNode, RiverReach, BoundaryConditionList, LateralContributionList, InitialConditionsDict, StricklersList, SolverParametersList, SedimentLayerList, ReservoirList, HydraulicStructureList, ] def __init__(self, status=None): super(River, self).__init__(status=status) # Replace Node and Edge ctor by custom ctor self._node_ctor = RiverNode self._edge_ctor = RiverReach self._current_reach = None self._boundary_condition = BoundaryConditionList(status=self._status) self._lateral_contribution = LateralContributionList( status=self._status) self._initial_conditions = InitialConditionsDict(status=self._status) self._stricklers = StricklersList(status=self._status) self._parameters = {} self._sediment_layers = SedimentLayerList(status=self._status) self._reservoir = ReservoirList(status=self._status) self._hydraulic_structures = HydraulicStructureList(status=self._status) @classmethod def _db_create(cls, execute): cls._create_submodel(execute) return True @classmethod def _db_update(cls, execute, version): cls._update_submodel(execute, version) return True @classmethod def _db_load(cls, execute, data=None): new = cls(data["status"]) # Stricklers (Stricklers is load in first because it's needed # for reachs) new._stricklers = StricklersList._db_load( execute, data ) data["stricklers"] = new._stricklers # Initial conditions new._sediment_layers = SedimentLayerList._db_load( execute, data ) data["sediment_layers_list"] = new._sediment_layers # Network new._nodes = RiverNode._db_load( execute, data ) data["nodes"] = new.nodes() new._edges = RiverReach._db_load( execute, data ) data["edges"] = new.edges() # Boundary Condition new._boundary_condition = BoundaryConditionList._db_load( execute, data ) # Lateral Contribution new._lateral_contribution = LateralContributionList._db_load( execute, data ) # Initial conditions new._initial_conditions = InitialConditionsDict._db_load( execute, data ) # Reservoir new._reservoir = ReservoirList._db_load( execute, data ) # Hydraulic Structures new._hydraulic_structures = HydraulicStructureList._db_load( execute, data ) # Parameters new._parameters = SolverParametersList._db_load( execute, data ) return new def _db_save(self, execute, data=None): objs = (self._nodes + self._edges) objs.append(self._boundary_condition) objs.append(self._initial_conditions) objs.append(self._lateral_contribution) objs.append(self._sediment_layers) objs.append(self._stricklers) objs.append(self._reservoir) objs.append(self._hydraulic_structures) for solver in self._parameters: objs.append(self._parameters[solver]) self._save_submodel(execute, objs, data) return True @property def boundary_condition(self): return self._boundary_condition @property def lateral_contribution(self): return self._lateral_contribution @property def initial_conditions(self): return self._initial_conditions @property def sediment_layers(self): return self._sediment_layers @property def stricklers(self): return self._stricklers def strickler(self, name): ret = list( filter( lambda s: s.name == name or str(s) == name, self._stricklers.stricklers ) ) if len(ret) == 0: return None return ret[0] @property def reservoir(self): return self._reservoir @property def hydraulics_structures(self): return self._hydraulics_structures @property def parameters(self): return self._parameters def get_params(self, solver): if solver in self._parameters: return self._parameters[solver] new = SolverParametersList( solver_type=solver_type_list[solver], status=self._status ) self._parameters[solver] = new self._status.modified() return self._parameters[solver] def has_current_reach(self): if self.enable_edges_counts() == 1: return True return self._current_reach is not None def current_reach(self): ee = self.enable_edges() if len(ee) == 1: return ee[0] return self._current_reach def set_current_reach(self, reach): self._current_reach = reach def has_sediment(self): has = len(self._sediment_layers) != 0 has &= any( filter( lambda p: p.sl is not None, flatten( map(lambda e: e.reach.profiles, self.edges()) ) ) ) return has