# InitialConditionsAdisTS.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from functools import reduce from tools import trace, timer, old_pamhyr_date_to_timestamp from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.InitialConditionsAdisTS.InitialConditionsAdisTSSpec \ import ICAdisTSSpec logger = logging.getLogger() class InitialConditionsAdisTS(SQLSubModel): _sub_classes = [ ICAdisTSSpec, ] _id_cnt = 0 def __init__(self, id: int = -1, name: str = "default", pollutant: int = -1, status=None): super(InitialConditionsAdisTS, self).__init__() self._status = status if id == -1: self.id = InitialConditionsAdisTS._id_cnt else: self.id = id self._name = name self._pollutant = pollutant self._concentration = None self._eg = None self._em = None self._ed = None self._enabled = True self._data = [] InitialConditionsAdisTS._id_cnt = max( InitialConditionsAdisTS._id_cnt + 1, self.id ) @classmethod def _db_create(cls, execute): execute(""" CREATE TABLE initial_conditions_adists( id INTEGER NOT NULL PRIMARY KEY, pollutant INTEGER NOT NULL, name TEXT NOT NULL, concentration REAL NOT NULL, eg REAL NOT NULL, em REAL NOT NULL, ed REAL NOT NULL, enabled BOOLEAN NOT NULL, FOREIGN KEY(pollutant) REFERENCES Pollutants(id) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version): major, minor, release = version.strip().split(".") if major == 0 and minor < 1: if int(release) < 6: cls._db_create(execute) return True @classmethod def _db_load(cls, execute, data=None): new = [] table = execute( "SELECT id, pollutant, name, concentration, eg, em, ed, " + "enabled " + "FROM initial_conditions_adists" ) if table is not None: for row in table: IC_id = row[0] pollutant = row[1] name = row[2] concentration = row[3] eg = row[4] em = row[5] ed = row[6] enabled = (row[7] == 1) IC = cls( id=IC_id, name=name, status=data['status'] ) IC.pollutant = pollutant IC.concentration = concentration IC.eg = eg IC.em = em IC.ed = ed IC.enabled = enabled data['ic_default_id'] = IC_id IC._data = ICAdisTSSpec._db_load(execute, data) new.append(IC) return new def _db_save(self, execute, data=None): execute(f"DELETE FROM initial_conditions_adists WHERE id = {self.id}") pollutant = -1 if self.pollutant is not None: pollutant = self.pollutant concentration = -1. if self.concentration is not None: concentration = self.concentration eg = -1. if self.eg is not None: eg = self.eg em = -1. if self.em is not None: em = self.em ed = -1. if self.ed is not None: ed = self.ed sql = ( "INSERT INTO " + "initial_conditions_adists(" + "id, pollutant, name, concentration, " + "eg, em, ed, enabled" + ") " + "VALUES (" + f"{self.id}, {pollutant}, '{self._db_format(self._name)}', " + f"{concentration}, {eg}, {em}, {ed}, {self._enabled}" + ")" ) execute(sql) data['ic_default_id'] = self.id execute( "DELETE FROM initial_conditions_spec " + f"WHERE ic_default = {self.id}" ) for ic_spec in self._data: ic_spec._db_save(execute, data) return True def __len__(self): return len(self._data) @property def name(self): return self._name @name.setter def name(self, name): self._name = name self._status.modified() @property def pollutant(self): return self._pollutant @pollutant.setter def pollutant(self, pollutant): self._pollutant = pollutant self._status.modified() @property def concentration(self): return self._concentration @concentration.setter def concentration(self, concentration): self._concentration = concentration self._status.modified() @property def eg(self): return self._eg @eg.setter def eg(self, eg): self._eg = eg self._status.modified() @property def em(self): return self._em @em.setter def em(self, em): self._em = em self._status.modified() @property def ed(self): return self._ed @ed.setter def ed(self, ed): self._ed = ed self._status.modified() @property def enabled(self): return self._enabled @enabled.setter def enabled(self, enabled): self._enabled = enabled self._status.modified() def new(self, index): n = ICAdisTSSpec(status=self._status) self._data.insert(index, n) self._status.modified() return n def delete(self, data): self._data = list( filter( lambda x: x not in data, self._data ) ) self._status.modified() def delete_i(self, indexes): for ind in indexes: del self._data[ind] self._status.modified() def insert(self, index, data): self._data.insert(index, data) self._status.modified()