# LateralContribution.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from tools import ( trace, timer, old_pamhyr_date_to_timestamp, date_iso_to_timestamp, date_dmy_to_timestamp, ) from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.Scenario import Scenario logger = logging.getLogger() class Data(SQLSubModel): _sub_classes = [] def __init__(self, data0, data1, id: int = -1, types=[float, float], status=None, owner_scenario=-1): super(Data, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._types = types self._data = [data0, data1] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE lateral_contribution_data{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, ind INTEGER NOT NULL, data0 TEXT NOT NULL, data1 TEXT NOT NULL, lc INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(lc) REFERENCES lateral_contribution(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE boundary_condition_data " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data=None): table = "lateral_contribution_data" lcs = data['id2pid']['lateral_contribution'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table} " + "(pamhyr_id, ind, data0, data1, lc, scenario) " + "SELECT pamhyr_id, ind, data0, data1, lc, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_lc_pid(execute, table, lcs) @classmethod def _db_update_to_0_2_0_set_lc_pid(cls, execute, table, lcs): els = execute( f"SELECT pamhyr_id, lc FROM {table}" ) for row in els: it = iter(row) pid = next(it) lc_id = next(it) execute( f"UPDATE {table} " + f"SET lc = {lcs[lc_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] lc = data["lc"] status = data['status'] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new values = execute( "SELECT pamhyr_id, deleted, " + "data0, data1, scenario " + "FROM lateral_contribution_data " + f"WHERE lc = {lc._pamhyr_id} " + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " + "ORDER BY ind ASC" ) for v in values: it = iter(v) pid = next(it) delete = next(it) data0 = bc._types[0](next(it)) data1 = bc._types[1](next(it)) owner_scenario = next(it) nd = cls( data0, data1, id=pid, types=lc._types, status=status, owner_scenario=owner_scenario ) if deleted: nd.set_as_deleted() loaded.add(pid) new.append(nd) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): pid = self._pamhyr_id ind = data["ind"] data0 = self._db_format(str(self[0])) data1 = self._db_format(str(self[1])) lc = data["lc"] execute( "INSERT INTO " + "lateral_contribution_data (pamhyr_id, deleted, ind, " + "data0, data1, lc, scenario) " + f"VALUES ({pid}, {self._db_format(self.is_deleted())}, " + f"{ind}, '{self._db_format(data0)}', {self._db_format(data1)}, " + f"{lc._pamhyr_id}, {self._status.scenario_id}" + ")" ) return True def __getitem__(self, key): return self._types[key](self._data[key]) def __setitem__(self, key, value): self._data[key] = self._types[key](value) class LateralContribution(SQLSubModel): _sub_classes = [Data] def __init__(self, id: int = -1, name: str = "", status=None, owner_scenario=-1): super(LateralContribution, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._name = name self._type = "" self._reach = None self._begin_section = None self._end_section = None self._data = [] self._header = [] self._types = [float, float] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE lateral_contribution{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, type TEXT NOT NULL, tab TEXT NOT NULL, reach INTEGER, begin_section INTEGER, end_section INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id), FOREIGN KEY(begin_section) REFERENCES geometry_profileXYZ(pamhyr_id), FOREIGN KEY(end_section) REFERENCES geometry_profileXYZ(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 11: execute( """ ALTER TABLE lateral_contribution RENAME COLUMN begin_kp TO begin_rk """ ) execute( """ ALTER TABLE lateral_contribution RENAME COLUMN end_kp TO end_rk """ ) execute( """ ALTER TABLE lateral_contribution RENAME COLUMN edge TO reach """ ) cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE lateral_contribution " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "lateral_contribution" reachs = data['id2pid']['river_reach'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table} " + "(pamhyr_id, name, type, tab, reach, scenario) " + "SELECT pamhyr_id, name, type, tab, reach, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs) @classmethod def _get_ctor_from_type(cls, t): from Model.LateralContribution.LateralContributionTypes import ( NotDefined, LateralContrib, Rain, Evaporation ) res = NotDefined if t == "LC": res = LateralContrib elif t == "RA": res = Rain elif t == "EV": res = Evaporation return res @classmethod def _db_load(cls, execute, data=None): new = [] tab = data["tab"] status = data['status'] edges = data["edges"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, name, type, " + "reach, begin_section, end_section, scenario " + "FROM lateral_contribution " + f"WHERE tab = '{tab}'" + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " ) for row in table: it = iter(row) pid = next(it) deleted = next(it) name = next(it) t = next(it) reach = next(it) b_section = next(it) e_section = next(it) owner_scenario = next(it) ctor = cls._get_ctor_from_type(t) lc = ctor( id=pid, name=name, status=status, owner_scenario=owner_scenario ) lc.reach = None lc._begin_section = None lc._end_section = None if row[3] != -1: lc.reach = next( filter( lambda e: e.id == reach, edges ) ) lc._begin_section = next( filter( lambda p: p.id == b_section, lc.reach.reach.profiles ) ) lc._end_section = next( filter( lambda p: p.id == e_section, lc.reach.reach.profiles ) ) data["lc"] = lc lc._data = Data._db_load(execute, data=data) loaded.add(pid) new.append(lc) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True tab = data["tab"] data["lc"] = self execute( "DELETE FROM lateral_contribution_data " + f"WHERE lc = {self._pamhyr_id} " + f"AND scenario = {self._status.scenario_id}" ) reach = -1 begin_section = -1 end_section = -1 if self._reach is not None: reach = self._reach._pamhyr_id begin_section = self._begin_section._pamhyr_id end_section = self._end_section._pamhyr_id execute( "INSERT INTO " + "lateral_contribution(" + "pamhyr_id, deleted, name, type, tab, " + "reach, begin_section, end_section, scenario) " + "VALUES (" + f"{self.id}, {self._db_format(self.is_deleted())}, " + f"'{self._db_format(self._name)}', " + f"'{self._db_format(self._type)}', '{tab}', {reach}, " + f"{begin_section}, {end_section}, " + f"{self._status.scenario_id}" + ")" ) ind = 0 for d in self._data: data["ind"] = ind d._db_save(execute, data) ind += 1 return True def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) for d in self._data: d._data_traversal(predicate, modifier, data) def __len__(self): return len( list( filter( lambda el: not el.is_deleted(), self._data ) ) ) @classmethod def compatibility(cls): return ["liquid", "solid", "suspenssion"] @classmethod def time_convert(cls, data): if type(data) is str: if data.count("-") == 2: return date_iso_to_timestamp(data) if data.count("/") == 2: return date_dmy_to_timestamp(data) if data.count(":") == 3: return old_pamhyr_date_to_timestamp(data) if data.count(":") == 2: return old_pamhyr_date_to_timestamp("00:" + data) if data.count(".") == 1: return round(float(data)) return int(data) @property def name(self): if self._name == "": return f"LC #{self.id}" return self._name @name.setter def name(self, name): self._name = name self.modified() @property def lctype(self): return self._type @property def reach(self): return self._reach @reach.setter def reach(self, reach): self._reach = reach if reach is not None: self._begin_section = self._reach.reach.profiles[0] self._end_section = self._reach.reach.profiles[-1] self.modified() def has_reach(self): return self._reach is not None @property def begin_rk(self): if self._begin_section is None: return 0 return self._begin_section.rk @property def begin_section(self): return self._begin_section @begin_section.setter def begin_section(self, section): self._begin_section = section self.modified() @property def end_rk(self): if self._end_section is None: return 0 return self._end_section.rk @property def end_section(self): return self._end_section @end_section.setter def end_section(self, section): self._end_section = section self.modified() @property def header(self): return self._header.copy() @property def data(self): return list( filter( lambda el: not el.is_deleted(), self._data ) ) def get_type_column(self, column): if 0 <= column < 2: return self._types[column] return None @property def _default_0(self): return self._types[0](0) @property def _default_1(self): return self._types[1](0.0) def is_define(self): return self._data is not None def new_from_data(self, header, data): new_0 = self._default_0 new_1 = self._default_1 if len(header) != 0: for i in [0, 1]: for j in range(len(header)): if self._header[i] == header[j]: if i == 0: new_0 = self._types[i](data[j].replace(",", ".")) else: new_1 = self._types[i](data[j].replace(",", ".")) else: new_0 = self._types[0](data[0].replace(",", ".")) new_1 = self._types[1](data[1].replace(",", ".")) return Data(new_0, new_1, status=self._status) def add(self, index: int): value = Data(self._default_0, self._default_1, status=self._status) self._data.insert(index, value) self.modified() return value def insert(self, index: int, value): self._data.insert(index, value) self.modified() def delete_i(self, indexes): self._data = list( map( lambda e: e[1], filter( lambda e: e[0] not in indexes, enumerate(self.data) ) ) ) self.modified() def delete(self, els): self._data = list( filter( lambda e: e not in els, self.data ) ) self.modified() def sort(self, _reverse=False, key=None): if key is None: self._data.sort(reverse=_reverse) else: self._data.sort(reverse=_reverse, key=key) self.modified() def get_i(self, index): return self.data[index] def get_range(self, _range): lst = [] for r in _range: lst.append(r) return lst def _set_i_c_v(self, index, column, value): v = self._data[index] v[column] = self._types[column](value) self._data[index] = v self.modified() def set_i_0(self, index: int, value): self._set_i_c_v(index, 0, value) def set_i_1(self, index: int, value): self._set_i_c_v(index, 1, value) @timer def convert(self, cls): new = cls(name=self.name, status=self._status) new.reach = self.reach new.begin_section = self.begin_section new.end_section = self.end_section for i, _ in enumerate(self.data): new.add(i) for i in [0, 1]: for j in [0, 1]: if self._header[i] == new.header[j]: for ind, v in self.data: try: new._set_i_c_v(ind, j, v[i]) except Exception as e: logger.info(e) self.modified() return new def move_up(self, index): if index < len(self): next = index - 1 d = self._data d[index], d[next] = d[next], d[index] self.modified() def move_down(self, index): if index >= 0: prev = index + 1 d = self._data d[index], d[prev] = d[prev], d[index] self.modified()