# LateralContributionAdisTS.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from tools import ( trace, timer, old_pamhyr_date_to_timestamp, date_iso_to_timestamp, date_dmy_to_timestamp, ) from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.Scenario import Scenario logger = logging.getLogger() class Data(SQLSubModel): _sub_classes = [] def __init__(self, data0, data1, id: int = -1, types=[float, float], status=None, owner_scenario=-1): super(Data, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._types = types self._data = [data0, data1] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE lateral_contribution_data_adists{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, data0 TEXT NOT NULL, data1 TEXT NOT NULL, lca INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(lca) REFERENCES lateral_contribution_adists(pamhyr_id) ) """) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 7: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) return True @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "lateral_contribution_data_adists" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, data0, data1, lca, scenario) " + "SELECT pamhyr_id, data0, data1, lc, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_lca_pid(execute, data) @classmethod def _db_update_to_0_2_0_set_lca_pid(cls, execute, data): pid_lca = data["id2pid"]["lateral_contribution_adists"] els = execute( "SELECT pamhyr_id, lca " + "FROM lateral_contribution_data_adists" ) for row in els: it = iter(row) pid = next(it) lca_id = next(it) if lca_id == -1: continue execute( f"UPDATE lateral_contribution_data_adists " + f"SET lca = {pid_lca[lca_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data['status'] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, data0, data1 " + "FROM lateral_contribution_data_adists " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " f"AND lca = '{lca.id}'" ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) data0 = next(it) data1 = next(it) owner_scenario = next(it) data = cls( data0, data1, id=pid, status=status, owner_scenario=owner_scenario ) if deleted: data.set_as_deleted() loaded.add(pid) new.append(data) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True lca = data["lca"] execute( "INSERT INTO " + "lateral_contribution_adists(pamhyr_id, deleted," + "data0, data1, lca, scenario) " + "VALUES (" + f"{self.id}, {self._db_format(self.is_deleted())}, " + f"{self._data[0]}, {self._data[1]}, " + f"{lca.id}, {self._status.scenario_id}" + ")" ) return True def __getitem__(self, key): return self._types[key](self._data[key]) def __setitem__(self, key, value): self._data[key] = self._types[key](value) class LateralContributionAdisTS(SQLSubModel): _sub_classes = [Data] def __init__(self, id: int = -1, pollutant: int = -1, name: str = "", status=None, owner_scenario=-1): super(LateralContributionAdisTS, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._status = status self._pollutant = pollutant self._reach = None self._begin_rk = 0.0 self._end_rk = 0.0 self._data = [] self._header = ["time", "rate"] self._types = [self.time_convert, float] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE lateral_contribution_adists{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, pollutant INTEGER NOT NULL, reach INTEGER NOT NULL, begin_rk REAL NOT NULL, end_rk REAL NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(pollutant) REFERENCES Pollutants(pamhyr_id), FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id) ) """) return True @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 7: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "lateral_contribution_adists" reachs = data['id2pid']['river_reach'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, pollutant, reach, begin_rk, end_rk, scenario) " + "SELECT pamhyr_id, pollutant, edge, begin_rk, end_rk, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs) cls._db_update_to_0_2_0_set_pollutants_pid(execute, data) @classmethod def _db_update_to_0_2_0_set_pollutants_pid(cls, execute, data): pid_pol = data["id2pid"]["Pollutants"] els = execute( f"SELECT pamhyr_id, pollutant " + "FROM lateral_contribution_adists" ) for row in els: it = iter(row) pid = next(it) pol_id = next(it) if pol_id == -1: continue execute( f"UPDATE lateral_contribution_adists " + f"SET pollutant = {pid_pol[pol_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data['status'] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, pollutant, reach, " + "begin_rk, end_rk, scenario " + "FROM lateral_contribution_adists " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) pollutant = next(it) reach = next(it) brk = next(it) erk = next(it) owner_scenario = next(it) lca = cls( id=pid, pollutant=pollutant, status=status, owner_scenario=owner_scenario ) if deleted: lca.set_as_deleted() lca.reach = reach lca.begin_rk = brk lca.end_rk = erk data["lca"] = lca lca._data = Data._db_load(execute, data=data) loaded.add(pid) new.append(lca) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True execute( f"DELETE FROM lateral_contribution_adists " + f"WHERE pamhyr_id = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) execute( f"DELETE FROM lateral_contribution_data_adists " + f"WHERE lca = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) execute( "INSERT INTO " + "lateral_contribution_adists(id, deleted," + "pollutant, reach, begin_rk, end_rk, scenario) " + "VALUES (" + f"{self.id}, {self._db_format(self.is_deleted())}, " + f"{self._pollutant}, {self.reach}, " + f"{self._begin_rk}, {self._end_rk}" + f"{self._status.scenario_id}" + ")" ) ind = 0 for d in self._data: data["ind"] = ind d._db_save(execute, data) ind += 1 return True def __len__(self): return len( list( filter( lambda el: not el.is_deleted(), self._data ) ) ) @classmethod def time_convert(cls, data): if type(data) is str: if data.count("-") == 2: return date_iso_to_timestamp(data) if data.count("/") == 2: return date_dmy_to_timestamp(data) if data.count(":") == 3: return old_pamhyr_date_to_timestamp(data) if data.count(":") == 2: return old_pamhyr_date_to_timestamp("00:" + data) if data.count(".") == 1: return round(float(data)) return int(data) @property def reach(self): return self._reach @reach.setter def reach(self, reach): self._reach = reach self.modified() @property def header(self): return self._header.copy() @header.setter def header(self, header): self._header = header self.modified() @property def pollutant(self): return self._pollutant @property def data(self): return self._data.copy() @property def begin_rk(self): return self._begin_rk @begin_rk.setter def begin_rk(self, begin_rk): self._begin_rk = begin_rk self.modified() @property def end_rk(self): return self._end_rk @end_rk.setter def end_rk(self, end_rk): self._end_rk = end_rk self.modified()