# Study.py -- Pamhyr Study class # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import shutil import logging from datetime import datetime from tools import timer, timestamp from functools import reduce from Model.Tools.PamhyrDB import SQLModel from Model.Scenarios import Scenarios from Model.Scenario import Scenario from Model.Status import StudyStatus from Model.Except import NotImplementedMethodeError from Model.River import River from Checker.Study import * logger = logging.getLogger() class Study(SQLModel): _version = "0.2.1" _sub_classes = [ Scenario, River, ] def __init__(self, filename=None, init_new=True, copy=False): if copy: return # Metadata self.creation_date = datetime.now() self.last_modification_date = datetime.now() self.last_save_date = datetime.now() self._filename = filename super(Study, self).__init__(filename=filename) self.status = StudyStatus() # Study general information self._name = "" self.description = "" # Time system self._time_system = "time" self._date = datetime.fromtimestamp(0) if init_new: # Study data s0 = Scenario( id=0, name='default', description='Default scenario', ) self.scenarios = Scenarios(status=self.status) self.scenarios[0] = s0 self.status.scenario = s0 self._river = River(status=self.status) else: self._init_db_file(filename, is_new=False) self._old_save_id = 0 self._river_scenario_cache = {} @classmethod def checkers(cls): lst = [ StudyNetworkReachChecker(), StudyGeometryChecker(), StudyInitialConditionsChecker(), # StudyBoundaryConditionChecker(), # DummyOK(), # DummyWARNING(), # DummyERROR(), ] return lst @property def river(self): return self._river def is_editable(self): return self.status.is_editable() def is_read_only(self): return self.status.is_read_only() @property def is_saved(self): return self.status.is_saved() def save(self, progress=None): # Save a copy of database fdir, fname = os.path.split(self.filename) if self._old_save_id == 0: old_dir = os.path.join(fdir, "_PAMHYR_", "__old__") if os.name == "nt": old_dir = old_dir.replace("/", "\\") try: os.makedirs(old_dir) except FileExistsError as e: shutil.rmtree(old_dir) os.makedirs(old_dir) except Exception as e: logger.error(e) is_new = False fname = fname + "." + str(self._old_save_id) if os.path.exists(self.filename): filename = os.path.join(fdir, "_PAMHYR_", "__old__", fname) logger.debug(f"Backup previous version copy: {filename}") shutil.copy(self.filename, filename) self._old_save_id += 1 if not os.path.exists(self.filename): is_new = True self._init_db_file(self.filename, is_new=is_new) self.commit() # Save self.last_save_date = datetime.now() self._save(progress=progress) self.status.save() @property def name(self): return self._name @name.setter def name(self, name): self._name = str(name) self.status.modified() @property def filename(self): return self._filename @filename.setter def filename(self, filename): if filename is None: self._filename = None self.status.modified() return if self._filename is not None and self._filename != "": self._filename = str(filename) self.status.modified() return self._filename = str(filename) self._init_db_file(filename, is_new=True) self.status.modified() @property def time_system(self): return self._time_system def use_time(self): self._time_system = "time" self.status.modified() def use_date(self, date: datetime): self._time_system = "date" self._date = date self.status.modified() @property def date(self): return self._date @date.setter def date(self, timestamp): self._date = timestamp self.status.modified() # @classmethod # def new(cls): # return cls() @classmethod def new(cls, name, description, date=None): me = cls() me.name = name me.description = description if date is not None: me.use_date() me.date = date return me @classmethod def open(cls, filename): me = cls._load(filename) return me ####### # SQL # ####### def _db_insert_into_info(self, key, value, commit=False): self.execute( "INSERT INTO info VALUES " + f"('{key}', '{self._db_format(value)}')", commit=commit ) def _create(self): # Info (metadata) self.execute( "CREATE TABLE info(key TEXT NOT NULL UNIQUE, value TEXT NOT NULL)" ) self.execute( "INSERT INTO info VALUES ('current_scenario', '0')" ) self.execute( "INSERT INTO info VALUES ('version', " + f"'{self._db_format(self._version)}')", commit=True ) self.execute("INSERT INTO info VALUES ('name', '')") self.execute("INSERT INTO info VALUES ('description', '')") self.execute( f"INSERT INTO info VALUES ('time_system', '{self._time_system}')" ) self.execute( "INSERT INTO info VALUES ('date', " + f"'{timestamp(self._date)}')" ) self.execute( "INSERT INTO info VALUES ('creation_date', " + f"'{timestamp(self.creation_date)}')" ) self.execute( "INSERT INTO info VALUES ('last_save_date', " + f"'{timestamp(self.last_save_date)}')" ) self._create_submodel() self.commit() def _update(self): version = self.execute(f"SELECT value FROM info WHERE key='version'") logger.debug(f"{version[0]} == {self._version}") if version[0] == self._version: return True logger.info(f"Update database from {version[0]} to {self._version}") major, minor, release = version[0].split('.') if major == "0" and minor == "0" and int(release) < 10: self.execute( "INSERT INTO info VALUES ('study_release', '0')" ) if major == "0" and int(minor) < 2: self._add_into_info_if_not_exists('current_scenario', '0') if major == "0" and int(minor) < 2: # Need to temporary disable the sqlite foreign keys # checking to update db dans change the table id fk to # table pamhyr_id fk self.execute( "PRAGMA foreign_keys = OFF;" ) ok = self._update_submodel(version[0], data={}) if major == "0" and int(minor) < 2: # Reactivate foreign keys checking self.execute( "PRAGMA foreign_keys = ON;" ) if ok: self.execute( f"UPDATE info SET value='{self._version}' WHERE key='version'" ) logger.info("Update database done.") return True logger.info("Update failed!") raise NotImplementedMethodeError(self, self._update) def _add_into_info_if_not_exists(self, key, value): rows = self.execute(f"SELECT value FROM info WHERE key='{key}'") if rows is None or len(rows) == 0: self.execute( f"INSERT INTO info VALUES ('{key}', '{value}')" ) @classmethod def _load(cls, filename): new = cls(init_new=False, filename=filename) def sql_exec(sql): return new.execute( sql, fetch_one=False, commit=True ) version = new.execute( "SELECT value FROM info WHERE key='study_release'" ) if version is not None: new.status.version = int(version[0]) # TODO: Load metadata new.name = new.execute("SELECT value FROM info WHERE key='name'")[0] new.description = new.execute( "SELECT value FROM info WHERE key='description'")[0] new._time_system = new.execute( "SELECT value FROM info WHERE key='time_system'")[0] new._date = datetime.fromtimestamp( float(new.execute("SELECT value FROM info WHERE key='date'")[0]) ) new.creation_date = datetime.fromtimestamp( float( new.execute( "SELECT value FROM info WHERE key='creation_date'" )[0] ) ) new.last_save_date = datetime.fromtimestamp( float( new.execute( "SELECT value FROM info WHERE key='last_save_date'" )[0] ) ) data = { "study": new, "status": new.status } # Scenarios new.scenarios = Scenarios._db_load( sql_exec, data=data ) scenario_id = new.execute( "SELECT value FROM info WHERE key='current_scenario'" ) logger.debug(f"Load with scenario {scenario_id[0]}") scenario = new.scenarios[int(scenario_id[0])] new.status.scenario = scenario if reduce( lambda a, s: a or (s.parent is scenario), new.scenarios.lst, False ): new.status.set_as_read_only() data["scenario"] = scenario data["loaded_pid"] = set() # Load river data new._river = River._db_load( sql_exec, data=data ) new._river._db_load_results(sql_exec, data=data) new.status._set_as_saved() return new def _save(self, progress=None): progress = progress if progress is not None else lambda: None self.execute( "INSERT OR REPLACE INTO info VALUES ('study_release', " + f"'{self.status.version}')" ) self.execute( f"UPDATE info SET " + f"value='{self._db_format(self.name)}' WHERE key='name'" ) progress() self.execute( f"UPDATE info SET " + f"value='{self._db_format(self.description)}' " + "WHERE key='description'" ) progress() self.execute( f"UPDATE info SET " + f"value='{self._time_system}' WHERE key='time_system'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self._date)}' WHERE key='date'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self.creation_date)}' " + "WHERE key='creation_date'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self.last_save_date)}' " + "WHERE key='last_save_date'" ) progress() self.execute( f"UPDATE info SET " + f"value='{self.status.scenario_id}' " + "WHERE key='current_scenario'" ) progress() self._save_submodel( [self.scenarios, self._river], data=progress ) self.commit() def sql_save_request_count(self, *args, **kargs): return self._count() def _count(self): cnt = self._save_count( [self.scenarios, self._river] ) logger.debug(cnt) return cnt + 7 def close(self): """Close db connection Returns: Nothing. """ self._close() def new_scenario_from_current(self, switch=True): new = self.scenarios.new(self.status.scenario) if switch: self.status.scenario = new self.status.set_as_editable() return new def reload_from_scenario(self, scenario): river = self.load_scenario(scenario) self._river = river self.status.scenario = scenario if reduce( lambda a, s: a or (s.parent is scenario), self.scenarios.lst, False ): self.status.set_as_read_only() else: self.status.set_as_editable() def load_scenario(self, scenario): if scenario in self._river_scenario_cache: return self._river_scenario_cache[scenario] def sql_exec(sql): return self.execute( sql, fetch_one=False, commit=True ) old_scenario = self.status.scenario self.status.scenario = scenario data = { "status": self.status, "loaded_pid": set(), "scenario": scenario } # Load river data river = River._db_load( sql_exec, data=data ) data["study"] = self river._db_load_results(sql_exec, data=data) self._river_scenario_cache[scenario] = river self.status.scenario = old_scenario return river def copy_from_scenario(self, scenario): new = self._copy() new._river = self.load_scenario(scenario) def set_status(obj): obj._status = new.status new.river._data_traversal( modifier=lambda obj, data: set_status(obj), ) return new def _copy(self): """ This method make a copy of current study. This copy is like an empty shell, it's not fully functional. Study object use SQLite connection to file, this copy as no valid connection. (!) Please use this copy as read only object! """ new = Study(copy=True) new._filename = "" new.creation_date = self.creation_date new.last_modification_date = self.last_modification_date new.last_save_date = self.last_save_date new._name = self._name new.description = self.description new._time_system = self._time_system new._date = self._date new.status = StudyStatus() new.scenarios = self.scenarios new.status.scenario = self.status.scenario new._river = self._river new._river_scenario_cache = self._river_scenario_cache return new def duplicate_current_scenario(self): source = self.status.scenario_id new = self.scenarios.new( self.status.scenario.parent ) new.set_pos(self.status.scenario.x + 100, self.status.scenario.y + 100) new.name = self.status.scenario.name + " (copy)" self.status.scenario = new self.river._data_traversal( predicate=lambda obj, data: obj._owner_scenario == source, modifier=lambda obj, data: obj.set_owner_scenario(), data={} ) self.status.set_as_editable() return new @property def results(self): return self._river.results @results.setter def results(self, results): self._river.results = results