# DIFAdisTS.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from functools import reduce from tools import trace, timer, old_pamhyr_date_to_timestamp from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError from Model.Scenario import Scenario from Model.DIFAdisTS.DIFAdisTSSpec import DIFAdisTSSpec logger = logging.getLogger() class DIFAdisTS(SQLSubModel): _sub_classes = [ DIFAdisTSSpec, ] def __init__(self, id: int = -1, name: str = "default", status=None, owner_scenario=-1): super(DIFAdisTS, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._status = status self._name = name self._method = None self._dif = None self._b = None self._c = None self._enabled = True self._types = ["iwasa", "fisher", "elder", "constante", "generique"] self._data = [] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE dif_adists{ext}( {cls.create_db_add_pamhyr_id()}, name TEXT NOT NULL, method TEXT NOT NULL, dif REAL NOT NULL, b REAL, c REAL, enabled BOOLEAN NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()} ) """) if ext != "": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") created = False if major == "0" and int(minor) <= 1: if int(release) < 6: cls._db_create(execute) created = True if major == "0" and int(minor) < 2: if not created: cls._db_update_to_0_2_0(execute, data) if not created: return cls._update_submodel(execute, version, data) return True @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "dif_adists" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, name, method, dif, b, c, enabled, scenario) " + "SELECT pamhyr_id, name, method, dif, b, c, enabled, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_load(cls, execute, data=None): new = [] status = data['status'] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, name, method, dif, b, c, enabled, scenario " + "FROM dif_adists " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " ) if table is not None: for row in table: it = iter(row) pid = next(it) name = next(it) method = next(it) dif = next(it) b = next(it) c = next(it) enabled = (next(it) == 1) owner_scenario = next(it) dif = cls( id=pid, name=name, status=status, owner_scenario=owner_scenario, ) dif.method = method dif.dif = dif dif.b = b dif.c = c dif.enabled = enabled data['dif_default_id'] = pid dif._data = DIFAdisTSSpec._db_load(execute, data) loaded.add(pid) new.append(dif) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True execute( "DELETE FROM dif_adists " + f"WHERE pamhyr_id = {self.id} " + f"AND scenario = {self._status.scenario_id} " ) method = "" if self.method is not None: method = self.method dif = -1. if self.dif is not None: dif = self.dif b = -1. if self.b is not None: b = self.b c = -1. if self.dif is not None: c = self.c sql = ( "INSERT INTO " + "dif_adists(" + "pamhyr_id, name, method, dif, b, c, enabled, scenario" + ") " + "VALUES (" + f"{self.id}, '{self._db_format(self._name)}', " + f"'{self._db_format(self._method)}', " + f"{dif.id}, {b}, {c}, {self._enabled}, " + f"{self._status.scenario_id}" + ")" ) execute(sql) data['dif_default_id'] = self.id execute( "DELETE FROM dif_adists_spec " + f"WHERE dif_default = {self.id} " + f"AND scenario = {self._status.scenario_id} " ) for dif_spec in self._data: dif_spec._db_save(execute, data) return True def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) for d in self._data: d._data_traversal(predicate, modifier, data) def __len__(self): return len(self._data) @property def name(self): return self._name @name.setter def name(self, name): self._name = name self.modified() @property def method(self): return self._method @method.setter def method(self, method): self._method = method self.modified() @property def types(self): return self._types @property def dif(self): return self._dif @dif.setter def dif(self, dif): self._dif = dif self.modified() @property def b(self): return self._b @b.setter def b(self, b): self._b = b self.modified() @property def c(self): return self._c @c.setter def c(self, c): self._c = c self.modified() @property def enabled(self): return self._enabled @enabled.setter def enabled(self, enabled): self._enabled = enabled self.modified() def new(self, index): n = DIFAdisTSSpec(status=self._status) self._data.insert(index, n) self.modified() return n def delete(self, data): list( map( lambda x: x.set_as_deleted(), data ) ) self.modified() def delete_i(self, indexes): list( map( lambda e: e[1].set_as_deleted(), filter( lambda e: e[0] in indexes, enumerate(self._data) ) ) ) self.modified() def insert(self, index, data): if data in self._data: self.undelete([data]) else: self._data.insert(index, data) self.modified() def undelete(self, lst): for x in lst: x.set_as_not_deleted() self.modified()