# Pollutants.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from tools import ( trace, timer, old_pamhyr_date_to_timestamp, date_iso_to_timestamp, date_dmy_to_timestamp, ) from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.Scenario import Scenario from Model.BoundaryConditionsAdisTS.BoundaryConditionAdisTS import BoundaryConditionAdisTS logger = logging.getLogger() class PollutantCharacteristics(SQLSubModel): _sub_classes = [] def __init__(self, id: int = -1, type=-1, diametre=-1, rho=-1, porosity=-1, cdc_riv=-1, cdc_cas=-1, apd=-1, ac=-1, bc=-1, pollutant=None, status=None, owner_scenario=-1): super(PollutantCharacteristics, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._status = status self._type = type self._diametre = diametre self._rho = rho self._porosity = porosity self._cdc_riv = cdc_riv self._cdc_cas = cdc_cas self._apd = apd self._ac = ac self._bc = bc self._pollutant = pollutant # FIXME: This four next method is dirty work, just an hack to keep # the View API working... Must be refactoring def to_list(self): return [ self._type, self._diametre, self._rho, self._porosity, self._cdc_riv, self._cdc_cas, self._apd, self._ac, self._bc, ] def __len__(self): return len(self.to_list()) def __getitem__(self, key): return self.to_list()[key] def __setitem__(self, key, value): if key == 0: self._type = value elif key == 1: self._diametre = value elif key == 2: self._rho = value elif key == 3: self._porosity = value elif key == 4: self._cdc_riv = value elif key == 5: self._cdc_cas = value elif key == 6: self._apd = value elif key == 7: self._ac = value elif key == 8: self._bc = value @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE pollutants_characteristics{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, type INTEGER NOT NULL, diametre REAL NOT NULL, rho REAL NOT NULL, porosity REAL NOT NULL, cdc_riv REAL NOT NULL, cdc_cas REAL NOT NULL, apd REAL NOT NULL, ac REAL NOT NULL, bc REAL NOT NULL, pollutant INTEGER NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(pollutant) REFERENCES pollutants(pamhyr_id) ) """) return True @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 7: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "Pollutants_characteristics" table_new = "pollutants_characteristics" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table_new}_tmp " + "(pamhyr_id, type, diametre, rho, porosity, " + "cdc_riv, cdc_cas, apd, ac, bc, pollutant, scenario) " + "SELECT pamhyr_id, type, diametre, rho, porosity, " + "cdc_riv, cdc_cas, apd, ac, bc, pollutant, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table_new}_tmp RENAME TO {table_new}") cls._db_update_to_0_2_0_set_pollutants_pid(execute, data) @classmethod def _db_update_to_0_2_0_set_pollutants_pid(cls, execute, data): pid_pol = data["id2pid"]["Pollutants"] els = execute( f"SELECT pamhyr_id, pollutant " + "FROM pollutants_characteristics" ) for row in els: it = iter(row) pid = next(it) pol_id = next(it) if pol_id == -1: continue execute( f"UPDATE pollutants_characteristics " + f"SET pollutant = {pid_pol[pol_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data["status"] scenario = data["scenario"] loaded = data['loaded_pid'] pollutant = data['pollutant'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, type, diametre, rho, porosity, " + "cdc_riv, cdc_cas, apd, ac, bc, scenario " + "FROM pollutants_characteristics " + f"WHERE pollutant = {pollutant.id} " + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) type = next(it) diametre = next(it) rho = next(it) porosity = next(it) cdc_riv = next(it) cdc_cas = next(it) apd = next(it) ac = next(it) bc = next(it) owner_scenario = next(it) pc = cls( id=pid, type=type, diametre=diametre, rho=rho, porosity=porosity, cdc_riv=cdc_riv, cdc_cas=cdc_cas, apd=apd, ac=ac, bc=bc, pollutant=pollutant, status=status, owner_scenario=owner_scenario ) if deleted: pc.set_as_deleted() loaded.add(pid) new.append(pc) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True execute( "INSERT INTO " + "pollutants_characteristics(pamhyr_id, deleted, " + "type, diametre, rho, porosity, " + "cdc_riv, cdc_cas, apd, ac, bc, " + "pollutant, scenario) " + "VALUES (" + f"{self.id}, {self._db_format(self.is_deleted())}, " + f"{self._type}, " + f"{self._diametre}, {self._rho}, " + f"{self._porosity}, " + f"{self._cdc_riv}, {self._cdc_cas}, " + f"{self._apd}, {self._ac}, {self._bc}, " + f"{self._pollutant.id}, " + f"{self._status.scenario_id}" + ")" ) return True class Pollutants(SQLSubModel): _sub_classes = [PollutantCharacteristics, BoundaryConditionAdisTS] def __init__(self, id: int = -1, name: str = "", status=None, owner_scenario=-1): super(Pollutants, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._status = status if name is None or name == "": self.name = f"pol{self.id}" else: self._name = str(name) self._enabled = True self._data = [] self._boundary_conditions_adists = [] @property def boundary_conditions_adists(self): return self._boundary_conditions_adists @property def name(self): return self._name @name.setter def name(self, name): self._name = name self._status.modified() @property def data(self): return self._data @data.setter def data(self, data): self._data = [PollutantCharacteristics(type=int(data[0]), diametre=float(data[1]), rho=float(data[2]), porosity=float(data[3]), cdc_riv=float(data[4]), cdc_cas=float(data[5]), apd=float(data[6]), ac=float(data[7]), bc=float(data[8]), pollutant=self, status=self._status ) ] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE pollutants{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()} ) """) if ext != "": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 7: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) if not created: return cls._update_submodel(execute, version, data) return True @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "Pollutants" table_new = "pollutants" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table_new}_tmp " + "(pamhyr_id, name, scenario) " + "SELECT pamhyr_id, name, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table_new}_tmp RENAME TO {table_new}") @classmethod def _db_load(cls, execute, data=None): new = [] status = data["status"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, name FROM pollutants " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) name = next(it) new_pollutant = cls( id=pid, name=name, status=status ) if deleted: new_pollutant.set_as_deleted() data["pollutant"] = new_pollutant new_pollutant._data = PollutantCharacteristics._db_load( execute, data=data ) new_pollutant._boundary_conditions_adists = BoundaryConditionAdisTS._db_load( execute, data=data ) loaded.add(pid) new.append(new_pollutant) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True ok = True execute( "DELETE FROM pollutants " + f"WHERE pamhyr_id = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) execute( "DELETE FROM pollutants_characteristics " + f"WHERE pollutant = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) execute( "INSERT INTO " + "pollutants(pamhyr_id, deleted, name, scenario) " + "VALUES (" + f"{self.id}, {self._db_format(self.is_deleted())}, " + f"'{self._db_format(self._name)}', " + f"{self._status.scenario_id}" + ")" ) for d in self._data: ok &= d._db_save(execute, data) for bc in self._boundary_conditions_adists: ok &= bc._db_save(execute, data) return ok def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) for d in self._data: d._data_traversal(predicate, modifier, data) @property def enabled(self): return self._enabled @enabled.setter def enabled(self, enabled): self._enabled = enabled self._status.modified() def is_define(self): return len(self._data) != 0 def new_from_data(self, data): try: new = [int(data[0])] new += [float(d) for d in data[1:]] except Exception as e: logger.error(e) new = None return new def __len__(self): return len(self._data)