# River.py -- Pamhyr river model # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from tools import flatten, logger_exception from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario from Model.Network.Node import Node from Model.Network.Edge import Edge from Model.Network.Graph import Graph from Model.Geometry.Profile import Profile from Model.Geometry.Reach import Reach from Model.BoundaryCondition.BoundaryConditionList import BoundaryConditionList from Model.LateralContribution.LateralContributionList import ( LateralContributionList ) from Model.InitialConditions.InitialConditionsDict import InitialConditionsDict from Model.Stricklers.StricklersList import StricklersList from Model.Friction.FrictionList import FrictionList from Model.SolverParameters.SolverParametersList import SolverParametersList from Model.SedimentLayer.SedimentLayerList import SedimentLayerList from Model.Reservoir.ReservoirList import ReservoirList from Model.HydraulicStructures.HydraulicStructuresList import ( HydraulicStructureList, ) from Model.AdditionalFile.AddFileList import AddFileList from Model.REPLine.REPLineList import REPLineList from Solver.Solvers import solver_type_list from Model.OutputRKAdists.OutputRKListAdists import OutputRKAdistsList from Model.Pollutants.PollutantsList import PollutantsList from Model.InitialConditionsAdisTS.InitialConditionsAdisTSList \ import InitialConditionsAdisTSList from Model.BoundaryConditionsAdisTS.BoundaryConditionsAdisTSList \ import BoundaryConditionsAdisTSList from Model.LateralContributionsAdisTS.LateralContributionsAdisTSList \ import LateralContributionsAdisTSList from Model.D90AdisTS.D90AdisTSList import D90AdisTSList from Model.DIFAdisTS.DIFAdisTSList import DIFAdisTSList from Model.Results.Results import Results logger = logging.getLogger() class RiverNode(Node): _sub_classes = [] def __init__(self, id: int = -1, name: str = "", x: float = 0.0, y: float = 0.0, status=None, owner_scenario=-1): super(RiverNode, self).__init__( id=id, name=name, x=x, y=y, status=status, owner_scenario=owner_scenario ) self._locker = None @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE river_node{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, x REAL NOT NULL, y REAL NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == "0" and int(minor) < 2: cls._db_update_to_0_2_0(execute, data=data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE river_node " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data=None): table = "river_node" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") # Copy table execute( f"INSERT INTO {table}_tmp " + f"(pamhyr_id, name, x, y, scenario) " + "SELECT pamhyr_id, name, x, y, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_load(cls, execute, data=None): nodes = [] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return nodes table = execute( "SELECT pamhyr_id, deleted, name, x, y, scenario " + "FROM river_node " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " ) for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) name = next(it) x = next(it) y = next(it) owner_scenario = next(it) node = cls( id=pid, name=name, x=x, y=y, status=data["status"], owner_scenario=owner_scenario ) if deleted: node.set_as_deleted() loaded.add(pid) nodes.append(node) data["scenario"] = scenario.parent nodes += cls._db_load(execute, data) data["scenario"] = scenario return nodes def _db_save(self, execute, data=None): execute( "INSERT OR REPLACE INTO river_node(" + "pamhyr_id, deleted, name, x, y, scenario" + ") " + "VALUES (" + f"{self._pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"'{self._db_format(self.name)}', " + f"{self.x}, {self.y}, {self._status.scenario_id}" + ")" ) return True def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) def is_deleted(self): return self._deleted @property def locker(self): return self._locker @locker.setter def locker(self, locker): self._locker = locker class RiverReach(Edge): _sub_classes = [ Reach, FrictionList, ] def __init__(self, id: str = -1, name: str = "", node1: RiverNode = None, node2: RiverNode = None, status=None, owner_scenario=-1): super(RiverReach, self).__init__( id=id, name=name, node1=node1, node2=node2, status=status, owner_scenario=owner_scenario ) self._reach = Reach(status=self._status, parent=self) self._frictions = FrictionList(status=self._status) @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE river_reach{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, enabled BOOLEAN NOT NULL, node1 INTEGER, node2 INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(node1) REFERENCES river_node(pamhyr_id), FOREIGN KEY(node2) REFERENCES river_node(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE river_reach " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data=None): table = "river_reach" nodes = data['id2pid']['river_node'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") # Copy table execute( f"INSERT INTO {table}_tmp " + f"(pamhyr_id, name, enabled, node1, node2, scenario) " + "SELECT pamhyr_id, name, enable, node1, node2, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_node_pid(execute, table, nodes) @classmethod def _db_update_to_0_2_0_set_node_pid(cls, execute, table, nodes): bcs = execute( f"SELECT pamhyr_id, node1, node2 FROM {table}" ) for row in bcs: it = iter(row) pid = next(it) node1_id = next(it) node2_id = next(it) execute( f"UPDATE {table} " + f"SET node1 = {nodes[node1_id]}, " + f"node2 = {nodes[node2_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): reachs = [] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return reachs table = execute( "SELECT pamhyr_id, deleted, name, enabled, " + "node1, node2, scenario " + "FROM river_reach " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" ) for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) name = next(it) enabled = (next(it) == 1) node1_pid = next(it) node2_pid = next(it) owner_scenario = next(it) # Get nodes corresponding to db foreign key id node1 = next( filter( lambda n: n.pamhyr_id == node1_pid, data["nodes"] ) ) node2 = next( filter( lambda n: n.pamhyr_id == node2_pid, data["nodes"] ) ) new = cls( id=pid, name=name, node1=node1, node2=node2, status=data["status"], owner_scenario=owner_scenario ) new.enable(enable=enabled) if deleted: nd.set_as_deleted() data["reach"] = new new._reach = Reach._db_load(execute, data) new._frictions = FrictionList._db_load(execute, data) loaded.add(pid) reachs.append(new) data["scenario"] = scenario.parent reachs += cls._db_load(execute, data) data["scenario"] = scenario return reachs def _db_save(self, execute, data=None): execute( "INSERT OR REPLACE INTO " + "river_reach(" + "pamhyr_id, deleted, name, enabled, " + "node1, node2, scenario" + ") " + "VALUES (" + f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"'{self._db_format(self._name)}', " + f"{self._db_format(self.is_enable())}," f"{self.node1.pamhyr_id}, {self.node2.pamhyr_id}, " + f"{self._status.scenario_id}" + ")" ) data["reach"] = self objs = [self._reach, self._frictions] return self._save_submodel(execute, objs, data) def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) self._reach._data_traversal(predicate, modifier, data) self._frictions._data_traversal(predicate, modifier, data) def is_deleted(self): return self._deleted @property def reach(self): return self._reach @property def frictions(self): return self._frictions class River(Graph): _sub_classes = [ StricklersList, SedimentLayerList, RiverNode, RiverReach, BoundaryConditionList, LateralContributionList, InitialConditionsDict, SolverParametersList, ReservoirList, HydraulicStructureList, AddFileList, REPLineList, OutputRKAdistsList, PollutantsList, InitialConditionsAdisTSList, BoundaryConditionsAdisTSList, LateralContributionsAdisTSList, D90AdisTSList, DIFAdisTSList, Results ] def __init__(self, status=None): super(River, self).__init__( status=status ) # Replace Node and Edge ctor by custom ctor self._node_ctor = RiverNode self._edge_ctor = RiverReach self._current_reach = None self._boundary_condition = BoundaryConditionList(status=self._status) self._lateral_contribution = LateralContributionList( status=self._status) self._initial_conditions = InitialConditionsDict(status=self._status) self._stricklers = StricklersList(status=self._status) self._parameters = {} self._sediment_layers = SedimentLayerList(status=self._status) self._reservoir = ReservoirList(status=self._status) self._hydraulic_structures = HydraulicStructureList( status=self._status ) self._additional_files = AddFileList(status=self._status) self._rep_lines = REPLineList(status=self._status) self._Output_rk_adists = OutputRKAdistsList(status=self._status) self._Pollutants = PollutantsList(status=self._status) self._InitialConditionsAdisTS = InitialConditionsAdisTSList( status=self._status) self._BoundaryConditionsAdisTS = BoundaryConditionsAdisTSList( status=self._status) self._LateralContributionsAdisTS = LateralContributionsAdisTSList( status=self._status) self._D90AdisTS = D90AdisTSList(status=self._status) self._DIFAdisTS = DIFAdisTSList(status=self._status) self._results = None @classmethod def _db_create(cls, execute): cls._create_submodel(execute) return True @classmethod def _db_update(cls, execute, version, data=None): return cls._update_submodel(execute, version, data) @classmethod def _db_load(cls, execute, data=None): new = cls(status=data["status"]) # Stricklers (Stricklers is load in first because it's needed # for reachs) new._stricklers = StricklersList._db_load( execute, data ) data["stricklers"] = new._stricklers data['loaded_pid'] = set() # Initial conditions new._sediment_layers = SedimentLayerList._db_load( execute, data ) data["sediment_layers_list"] = new._sediment_layers data['loaded_pid'] = set() # Network new._nodes = RiverNode._db_load( execute, data ) data["nodes"] = new.nodes() data['loaded_pid'] = set() new._edges = RiverReach._db_load( execute, data ) data["edges"] = new.edges() data['loaded_pid'] = set() # Boundary Condition new._boundary_condition = BoundaryConditionList._db_load( execute, data ) data['loaded_pid'] = set() # Lateral Contribution new._lateral_contribution = LateralContributionList._db_load( execute, data ) data['loaded_pid'] = set() # Initial conditions new._initial_conditions = InitialConditionsDict._db_load( execute, data ) data['loaded_pid'] = set() # Reservoir new._reservoir = ReservoirList._db_load( execute, data ) data['loaded_pid'] = set() # Hydraulic Structures new._hydraulic_structures = HydraulicStructureList._db_load( execute, data ) data['loaded_pid'] = set() # Parameters new._parameters = SolverParametersList._db_load( execute, data ) data['loaded_pid'] = set() # Additional Files new._additional_files = AddFileList._db_load( execute, data ) new._rep_lines = REPLineList._db_load(execute, data) data['loaded_pid'] = set() new._Pollutants = PollutantsList._db_load(execute, data) new._Output_rk_adists = OutputRKAdistsList._db_load( execute, data ) new._InitialConditionsAdisTS = InitialConditionsAdisTSList._db_load( execute, data) new._BoundaryConditionsAdisTS = BoundaryConditionsAdisTSList._db_load( execute, data) new._LateralContributionsAdisTS = \ LateralContributionsAdisTSList._db_load(execute, data) new._D90AdisTS = D90AdisTSList._db_load(execute, data) new._DIFAdisTS = DIFAdisTSList._db_load(execute, data) return new def _db_load_results(self, execute, data=None): self._results = Results._db_load(execute, data) def _db_save(self, execute, data=None): self._db_save_delete_artefact(execute, data) objs = (self._nodes + self._edges) objs.append(self._boundary_condition) objs.append(self._initial_conditions) objs.append(self._lateral_contribution) objs.append(self._sediment_layers) objs.append(self._stricklers) objs.append(self._reservoir) objs.append(self._hydraulic_structures) objs.append(self._additional_files) objs.append(self._rep_lines) for solver in self._parameters: objs.append(self._parameters[solver]) objs.append(self._Output_rk_adists) objs.append(self._Pollutants) objs.append(self._InitialConditionsAdisTS) objs.append(self._BoundaryConditionsAdisTS) objs.append(self._LateralContributionsAdisTS) objs.append(self._D90AdisTS) objs.append(self._DIFAdisTS) if self._results is not None: objs.append(self._results) self._save_submodel(execute, objs, data) return True def _db_save_delete_artefact(self, execute, data=None): self._db_save_delete_artefact_where_not_id( execute, data, "river_node", self._nodes ) self._db_save_delete_artefact_where_not_id( execute, data, "river_reach", self._edges ) def _db_save_delete_artefact_where_not_id(self, execute, data, table: str, els: list): if len(els) == 0: return try: execute( f"DELETE FROM {table} " + "WHERE " + f"scenario = {self._status.scenario_id} AND (" + " OR ".join( map( lambda n: f"( pamhyr_id <> {n.pamhyr_id} )", els ) ) + ")" ) except Exception as e: logger_exception(e) def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): """Traversal data and execute modifier fonction if predicate true Args: predicate: Function predicate, take current obj and data as input modifier: Function modifier, take current obj and data as input Returns: Nothing """ if predicate(self, data): modifier(self, data) for obj in self._get_objs_list(): obj._data_traversal(predicate, modifier, data) def _get_objs_list(self): objs = (self._nodes + self._edges) objs += [ self._boundary_condition, self._initial_conditions, self._lateral_contribution, self._sediment_layers, self._stricklers, self._reservoir, self._hydraulic_structures, self._additional_files, self._rep_lines, self._Output_rk_adists, self._Pollutants, self._InitialConditionsAdisTS, self._BoundaryConditionsAdisTS, self._LateralContributionsAdisTS, self._D90AdisTS, self._DIFAdisTS, ] for solver in self._parameters: objs.append(self._parameters[solver]) return objs def init_default(self): self.init_default_network() self.init_default_sediment() self.init_default_additional_files() def init_default_network(self): n1 = self.add_node(880.0, 950.0) n2 = self.add_node(1120.0, 1020.0) e = self.add_edge(n1, n2) def init_default_sediment(self): sediment = self._sediment_layers default = sediment.new(0) default_0 = default.new(0) default.name = "default" default.comment = "Default sediment layers" default_0.name = "L0" default_0.height = 1.0 default_0.d50 = 0.002 default_0.sigma = 1.0 default_0.critical_constraint = 0.047 def init_default_additional_files(self): add_file = self._additional_files.new(0) add_file.name = "Pamhyr2 stamp file" add_file.path = "Pamhyr2.txt" add_file.text = """This repository has been generated by Pamhyr2 \ version "@version" ! All hand made file modification could be erased by the next solver execution... Last export at: @date.""" def reachs(self): return self.edges() @property def boundary_condition(self): return self._boundary_condition @property def lateral_contribution(self): return self._lateral_contribution @property def initial_conditions(self): return self._initial_conditions @property def sediment_layers(self): return self._sediment_layers @property def stricklers(self): return self._stricklers def strickler(self, name): ret = list( filter( lambda s: s.name == name or str(s) == name, self._stricklers.stricklers ) ) if len(ret) == 0: return None return ret[0] @property def reservoir(self): return self._reservoir @property def hydraulic_structures(self): return self._hydraulic_structures @property def additional_files(self): return self._additional_files @property def rep_lines(self): return self._rep_lines @property def parameters(self): return self._parameters @property def Output_rk_adists(self): return self._Output_rk_adists @property def Pollutants(self): return self._Pollutants @property def ic_adists(self): return self._InitialConditionsAdisTS @property def boundary_conditions_adists(self): return self._BoundaryConditionsAdisTS @property def lateral_contributions_adists(self): return self._LateralContributionsAdisTS @property def d90_adists(self): return self._D90AdisTS @property def dif_adists(self): return self._DIFAdisTS def get_params(self, solver): if solver in self._parameters: return self._parameters[solver] new = SolverParametersList( solver_type=solver_type_list[solver], status=self._status ) self._parameters[solver] = new self._status.modified() return self._parameters[solver] def has_current_reach(self): if self.enable_edges_counts() == 1: return True return self._current_reach is not None def current_reach(self): ee = self.enable_edges() if len(ee) == 1: return ee[0] return self._current_reach def set_current_reach(self, reach): self._current_reach = reach def has_sediment(self): has = len(self._sediment_layers) != 0 has &= any( filter( lambda p: p.sl is not None, flatten( map(lambda e: e.reach.profiles, self.edges()) ) ) ) return has