# InitialConditions.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from copy import copy, deepcopy from tools import trace, timer from functools import reduce from numpy import interp from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario logger = logging.getLogger() class Data(SQLSubModel): def __init__(self, id: int = -1, name: str = "", comment: str = "", reach=None, section=None, discharge: float = 0.0, height: float = 0.0, status=None, owner_scenario=-1): super(Data, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._reach = reach self._name = name self._comment = comment self._section = section self._discharge = discharge self._speed = 0.0 self._elevation = 0.0 self._height = height if self._section is not None: self._update_from_rk() if self._height != 0.0: self._update_from_height() if self._discharge != 0.0: self._update_from_discharge() @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE initial_conditions{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, ind INTEGER NOT NULL, name TEXT NOT NULL, comment TEXT NOT NULL, reach INTEGER, section INTEGER, discharge REAL NOT NULL, height REAL NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 11: execute( "ALTER TABLE initial_conditions RENAME COLUMN kp TO rk" ) cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 1: cls._db_update_to_0_1_1(execute, data) if int(release) < 2: execute( "ALTER TABLE initial_conditions " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "initial_conditions" reachs = data['id2pid']['river_reach'] cls.update_db_add_pamhyr_id(execute, table, data) cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs) cls._db_update_to_0_1_1( execute, data, origin_version="0.0.*" ) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, ind, name, comment, reach, section, " + "discharge, height, scenario) " + "SELECT pamhyr_id, ind, name, comment, reach, section, " + "discharge, height, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_update_to_0_1_1(cls, execute, data, origin_version="0.1.0"): execute( "ALTER TABLE initial_conditions " + f"ADD COLUMN section INTEGER" ) cls._db_update_to_0_1_1_assoc_section_from_rk( execute, "initial_conditions", origin_version=origin_version ) @classmethod def _db_load(cls, execute, data=None): new = [] reach = data["reach"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new id = reach.pamhyr_id table = execute( "SELECT pamhyr_id, deleted, ind, name, comment, " + "section, discharge, height, scenario " + "FROM initial_conditions " + f"WHERE reach = {id} " + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " + "ORDER BY ind ASC" ) ok = True for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) ind = next(it) name = next(it) comment = next(it) section_id = next(it) discharge = next(it) height = next(it) owner_scenario = next(it) section = reduce( lambda acc, s: ( s if s.pamhyr_id == section_id else acc ), reach.reach.profiles, None ) if section is None: logger.debug( f"Section ID {section_id} do not exist in reach " + f"{reach.name} ({reach.pamhyr_id})" ) ok = False continue d = cls( id=pid, reach=data["reach"], status=data["status"], name=name, comment=comment, section=section, discharge=discharge, height=height, owner_scenario=owner_scenario ) if deleted: d.set_as_deleted() loaded.add(pid) new.append(d) if not ok: logger.info( "Please regenerate/defined the inital conditions " + f"for reach {reach.name} ({reach.pamhyr_id})" ) # Recution only if no data found if len(new) == 0: data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): ind = data["ind"] execute( "INSERT INTO " + "initial_conditions(pamhyr_id, deleted, ind, name, comment, " + "section, discharge, height, reach, scenario) " + "VALUES (" + f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"{ind}, '{self._db_format(self.name)}', " + f"'{self._db_format(self._comment)}', " + f"{self._section.id}, {self._discharge}, {self._height}, " + f"{self._reach.id}, {self._status.scenario_id}" + ")" ) return True def copy(self): new = Data( name=self.name, comment=self._comment, section=self._section, discharge=self._discharge, height=self._height, reach=self._reach, status=self._status, ) return new @property def name(self): return self._name def __getitem__(self, key): val = None if key == "name": val = self._name elif key == "comment": val = self._comment elif key == "rk" or key == "section": val = self._section elif key == "speed": val = self._speed elif key == "discharge": val = self._discharge elif key == "elevation": val = self._elevation elif key == "height": val = self._height return val def _update_get_min(self): profile = self._section if profile is not None: min = profile.z_min() else: min = 0.0 return min def _update_from_rk(self): min = self._update_get_min() self._elevation = min + self._height def _update_from_elevation(self): min = self._update_get_min() self._height = self._elevation - min def _update_from_height(self): min = self._update_get_min() self._elevation = self._height + min def _update_from_discharge(self): min = self._update_get_min() # print("TODO") def __setitem__(self, key, value): if key == "name": self._name = str(value) elif key == "comment": self._comment = str(value) elif key == "rk" or key == "section": self._section = value self._update_from_rk() elif key == "speed": # Not supposed to be modified self._speed = float(value) elif key == "discharge": self._discharge = float(value) self._update_from_discharge() elif key == "elevation": self._elevation = float(value) self._update_from_elevation() elif key == "height": self._height = float(value) self._update_from_height() self.modified() class InitialConditions(SQLSubModel): _sub_classes = [ Data ] def __init__(self, reach=None, status=None): super(InitialConditions, self).__init__() self._status = status self._reach = reach self._data = [] @classmethod def _db_create(cls, execute): return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): return cls._update_submodel(execute, version, data) @classmethod def _db_load(cls, execute, data=None): new = cls( reach=data["reach"], status=data["status"] ) new._data = Data._db_load( execute, data=data ) if new._data is not None: return new def _db_save(self, execute, data=None): ok = True ind = 0 for d in self._data: data["ind"] = ind ok &= d._db_save(execute, data) ind += 1 return ok def __len__(self): return len(self.data) def lst(self): return self._data @property def reach(self): return self._reach @reach.setter def reach(self, new): self._reach = reach self.modified() @property def data(self): return list( filter( lambda el: not el.is_deleted(), self._data ) ) @data.setter def data(self, data): self._data = data def get(self, index): return self.data[index] def set(self, index, data): self._data.insert(index, data) self.modified() def new(self, index): n = Data(reach=self._reach, status=self._status) self._data.insert(index, n) self.modified() def new_from_data(self, rk, discharge, elevation): n = Data(reach=self._reach, status=self._status) section = reduce( lambda acc, s: ( s if s.rk == rk else acc ), self._reach.reach.profiles, None ) n['section'] = section n['discharge'] = discharge n['elevation'] = elevation return n def insert(self, index, data): if data in self._data: data.set_as_not_deleted() else: self._data.insert(index, data) self.modified() def delete(self, data): list( map( lambda x: x.set_as_deleted(), filter( lambda x: x in data, self._data ) ) ) self.modified() def delete_i(self, indexes): data = list( map( lambda x: x[1], filter( lambda x: x[0] in indexes, enumerate(self._data) ) ) ) self.delete(data) def sort(self, reverse=False, key=None): self._data.sort(reverse=reverse, key=key) self.modified() def _data_get(self, key): return list( map( lambda d: d[key], self.data ) ) def get_rk(self): return list( map( lambda d: d["rk"].rk, self.data ) ) def get_elevation(self): return self._data_get("elevation") def get_discharge(self): return self._data_get("discharge") def clear_data(self): for data in self._data: data.set_as_deleted() def generate_growing_constant_depth(self, height: float, compute_discharge: bool): profiles = self._reach.reach.profiles.copy() previous_elevation = -99999.99 data_discharge = {} if not compute_discharge: if len(self._data) == 0: for profile in profiles: data_discharge[profile.rk] = 0.0 else: for data in self._data: data_discharge[data["rk"].rk] = data["discharge"] incline = self._reach.reach.get_incline_median_mean() logger.debug(f"incline = {incline}") self.clear_data() for profile in reversed(profiles): width = profile.wet_width(profile.z_min() + height) area = profile.wet_area(profile.z_min() + height) radius = profile.wet_radius(profile.z_min() + height) frictions = self._reach.frictions.frictions strickler = None if frictions is not None: if len(frictions) >= 1: for f in frictions: if f.contains_rk(profile.rk): strickler = f.get_friction(profile.rk)[0] if strickler is None: strickler = 25.0 if not compute_discharge: discharge = data_discharge[profile.rk] else: discharge = ( # ((width * 0.8) # * strickler # * (height ** (5/3)) # * (abs(incline) ** (0.5))) (area * strickler * (radius ** (2/3)) * (abs(incline) ** (0.5))) ) elevation = max( profile.z_min() + height, previous_elevation ) logger.debug(f"({profile.rk}):") logger.debug(f" width = {width}") logger.debug(f" strickler = {strickler}") logger.debug(f" discharge = {discharge}") new = Data(reach=self._reach, status=self._status) new["rk"] = profile new["discharge"] = discharge new["elevation"] = elevation previous_elevation = elevation self._data.append(new) def generate_discharge(self, discharge: float, compute_height: bool): profiles = self._reach.reach.profiles.copy() if profiles is None: return None previous_elevation = -99999.99 data_height = {} if not compute_height: if len(self._data) == 0: for profile in profiles: data_height[profile.rk] = 0.0 else: for data in self._data: data_height[data["rk"].rk] = data["height"] print(data_height) incline = self._reach.reach.get_incline_median_mean() logger.debug(f"incline = {incline}") self.clear_data() for profile in reversed(profiles): width = profile.width_approximation() frictions = self._reach.frictions.frictions strickler = None if frictions is not None: if len(frictions) >= 1: for f in frictions: if f.contains_rk(profile.rk): strickler = f.get_friction(profile.rk)[0] if strickler is None: strickler = 25.0 if not compute_height: height = data_height[profile.rk] else: if abs(incline) <= 0: height = 0.0 else: height = ( discharge / ((width * 0.8) * strickler * (abs(incline) ** (0.5))) ) ** (0.6) elevation = max( profile.z_min() + height, previous_elevation ) logger.debug(f"({profile.rk}):") logger.debug(f" width = {width}") logger.debug(f" strickler = {strickler}") logger.debug(f" height = {height}") new = Data(reach=self._reach, status=self._status) new["section"] = profile new["discharge"] = discharge new["elevation"] = elevation previous_elevation = elevation self._data.append(new) def generate_height(self, elevation1: float, elevation2: float, compute_discharge: bool, discharge: float): profiles = self._reach.reach.profiles.copy() upstream_rk = profiles[0].rk downstream_rk = profiles[-1].rk data_discharge = {} if not compute_discharge: if len(self._data) == 0: for profile in profiles: data_discharge[profile.rk] = 0.0 else: for data in self._data: data_discharge[data["rk"].rk] = data["discharge"] self.clear_data() for profile in profiles: if not compute_discharge: d = data_discharge[profile.rk] else: d = discharge elevation = interp(profile.rk, [upstream_rk, downstream_rk], [elevation1, elevation2]) new = Data(reach=self._reach, status=self._status) new["rk"] = profile new["discharge"] = d new["elevation"] = elevation self._data.append(new)