# InitialConditionsAdisTS.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from functools import reduce from tools import trace, timer, old_pamhyr_date_to_timestamp from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.Scenario import Scenario from Model.InitialConditionsAdisTS.InitialConditionsAdisTSSpec \ import ICAdisTSSpec logger = logging.getLogger() class InitialConditionsAdisTS(SQLSubModel): _sub_classes = [ ICAdisTSSpec, ] def __init__(self, id: int = -1, name: str = "default", pollutant: int = -1, status=None, owner_scenario=-1): super(InitialConditionsAdisTS, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._status = status self._name = name self._pollutant = pollutant self._concentration = None self._eg = None self._em = None self._ed = None self._enabled = True self._data = [] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE initial_conditions_adists{ext}( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, pollutant INTEGER NOT NULL, name TEXT NOT NULL, concentration REAL NOT NULL, eg REAL NOT NULL, em REAL NOT NULL, ed REAL NOT NULL, enabled BOOLEAN NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(pollutant) REFERENCES Pollutants(pamhyr_id) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 6: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) return True @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "initial_conditions_adists" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, pollutant, name, concentration, " + "eg, em, ed, enabled, scenario) " + "SELECT pamhyr_id, pollutant, name, concentration, " + "eg, em, ed, enabled, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_pollutants_pid(execute, data) @classmethod def _db_update_to_0_2_0_set_pollutants_pid(cls, execute, data): pid_pol = data["id2pid"]["Pollutants"] els = execute( f"SELECT pamhyr_id, pollutant " + "FROM initial_conditions_adists" ) for row in els: it = iter(row) pid = next(it) pol_id = next(it) if pol_id == -1: continue execute( f"UPDATE initial_conditions_adists " + f"SET pollutant = {pid_pol[pol_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data['status'] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, " + "pollutant, name, concentration, eg, em, ed, " + "enabled, scenario " + "FROM initial_conditions_adists " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) pollutant = next(it) name = next(it) concentration = next(it) eg = next(it) em = next(it) ed = next(it) enabled = (next(it) == 1) owner_scenario = next(it) ic = cls( id=pid, name=name, status=status, owner_scenario=owner_scenario ) if deleted: ic.set_as_deleted() ic.pollutant = pollutant ic.concentration = concentration ic.eg = eg ic.em = em ic.ed = ed ic.enabled = enabled data['ic_default_id'] = pid ic._data = ICAdisTSSpec._db_load(execute, data) loaded.add(pid) new.append(ic) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True execute( "DELETE FROM initial_conditions_adists " + f"WHERE pamhyr_id = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) pollutant = -1 if self.pollutant is not None: pollutant = self.pollutant concentration = -1. if self.concentration is not None: concentration = self.concentration eg = -1. if self.eg is not None: eg = self.eg em = -1. if self.em is not None: em = self.em ed = -1. if self.ed is not None: ed = self.ed sql = ( "INSERT INTO " + "initial_conditions_adists(" + "pamhyr_id, deleted, pollutant, name, concentration, " + "eg, em, ed, enabled, scenario" + ") " + "VALUES (" + f"{self.id}, {self.is_deleted()}, " + f"{pollutant}, '{self._db_format(self._name)}', " + f"{concentration}, {eg}, {em}, {ed}, {self._enabled}, " + f"{self._status.scenario_id}" + ")" ) execute(sql) data['ic_default_id'] = self.id execute( "DELETE FROM initial_conditions_adists_spec " + f"WHERE ic_default = {self.id} " + f"AND scenario = {self._status.scenario_id}" ) for ic_spec in self._data: ic_spec._db_save(execute, data) return True def __len__(self): return len(self._data) @property def name(self): return self._name @name.setter def name(self, name): self._name = name self.modified() @property def pollutant(self): return self._pollutant @pollutant.setter def pollutant(self, pollutant): self._pollutant = pollutant self.modified() @property def concentration(self): return self._concentration @concentration.setter def concentration(self, concentration): self._concentration = concentration self.modified() @property def eg(self): return self._eg @eg.setter def eg(self, eg): self._eg = eg self.modified() @property def em(self): return self._em @em.setter def em(self, em): self._em = em self.modified() @property def ed(self): return self._ed @ed.setter def ed(self, ed): self._ed = ed self.modified() @property def enabled(self): return self._enabled @enabled.setter def enabled(self, enabled): self._enabled = enabled self.modified() def new(self, index): n = ICAdisTSSpec(status=self._status) self._data.insert(index, n) self.modified() return n def delete(self, data): list( map( lambda x: x.set_as_deleted(), data ) ) self.modified() def delete_i(self, indexes): list( map( lambda e: e[1].set_as_deleted(), filter( lambda e: e[0] in indexes, enumerate(self._data) ) ) ) self.modified() def insert(self, index, data): if data in self._data: self.undelete([data]) else: self._data.insert(index, data) self.modified() def undelete(self, lst): for x in lst: x.set_as_not_deleted() self.modified()