Pamhyr2/src/Model/Study.py

542 lines
15 KiB
Python

# Study.py -- Pamhyr Study class
# Copyright (C) 2023-2025 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import shutil
import logging
from datetime import datetime
from tools import timer, timestamp
from functools import reduce
from Model.Tools.PamhyrDB import SQLModel
from Model.Scenarios import Scenarios
from Model.Scenario import Scenario
from Model.Status import StudyStatus
from Model.Except import NotImplementedMethodeError
from Model.River import River
from Checker.Study import *
logger = logging.getLogger()
class Study(SQLModel):
_version = "0.2.1"
_sub_classes = [
Scenario,
River,
]
def __init__(self, filename=None, init_new=True):
# Metadata
self.creation_date = datetime.now()
self.last_modification_date = datetime.now()
self.last_save_date = datetime.now()
self._filename = filename
super(Study, self).__init__(filename=filename)
self.status = StudyStatus()
# Study general information
self._name = ""
self.description = ""
# Time system
self._time_system = "time"
self._date = datetime.fromtimestamp(0)
if init_new:
# Study data
s0 = Scenario(
id=0, name='default',
description='Default scenario',
)
self.scenarios = Scenarios(status=self.status)
self.scenarios[0] = s0
self.status.scenario = s0
self._river = River(status=self.status)
else:
self._init_db_file(filename, is_new=False)
self._old_save_id = 0
self._river_scenario_cache = {}
@classmethod
def checkers(cls):
lst = [
StudyNetworkReachChecker(),
StudyGeometryChecker(),
StudyInitialConditionsChecker(),
# StudyBoundaryConditionChecker(),
# DummyOK(),
# DummyWARNING(),
# DummyERROR(),
]
return lst
@property
def river(self):
return self._river
def is_editable(self):
return self.status.is_editable()
def is_read_only(self):
return self.status.is_read_only()
@property
def is_saved(self):
return self.status.is_saved()
def save(self, progress=None):
# Save a copy of database
fdir, fname = os.path.split(self.filename)
if self._old_save_id == 0:
old_dir = os.path.join(fdir, "_PAMHYR_", "__old__")
if os.name == "nt":
old_dir = old_dir.replace("/", "\\")
try:
os.makedirs(old_dir)
except FileExistsError as e:
shutil.rmtree(old_dir)
os.makedirs(old_dir)
except Exception as e:
logger.error(e)
is_new = False
fname = fname + "." + str(self._old_save_id)
if os.path.exists(self.filename):
filename = os.path.join(fdir, "_PAMHYR_", "__old__", fname)
logger.debug(f"Backup previous version copy: {filename}")
shutil.copy(self.filename, filename)
self._old_save_id += 1
if not os.path.exists(self.filename):
is_new = True
self._init_db_file(self.filename, is_new=is_new)
self.commit()
# Save
self.last_save_date = datetime.now()
self._save(progress=progress)
self.status.save()
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = str(name)
self.status.modified()
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if filename is None:
self._filename = None
self.status.modified()
return
if self._filename is not None and self._filename != "":
self._filename = str(filename)
self.status.modified()
return
self._filename = str(filename)
self._init_db_file(filename, is_new=True)
self.status.modified()
@property
def time_system(self):
return self._time_system
def use_time(self):
self._time_system = "time"
self.status.modified()
def use_date(self, date: datetime):
self._time_system = "date"
self._date = date
self.status.modified()
@property
def date(self):
return self._date
@date.setter
def date(self, timestamp):
self._date = timestamp
self.status.modified()
# @classmethod
# def new(cls):
# return cls()
@classmethod
def new(cls, name, description, date=None):
me = cls()
me.name = name
me.description = description
if date is not None:
me.use_date()
me.date = date
return me
@classmethod
def open(cls, filename):
me = cls._load(filename)
return me
#######
# SQL #
#######
def _db_insert_into_info(self, key, value, commit=False):
self.execute(
"INSERT INTO info VALUES " +
f"('{key}', '{self._db_format(value)}')",
commit=commit
)
def _create(self):
# Info (metadata)
self.execute(
"CREATE TABLE info(key TEXT NOT NULL UNIQUE, value TEXT NOT NULL)"
)
self.execute(
"INSERT INTO info VALUES ('current_scenario', '0')"
)
self.execute(
"INSERT INTO info VALUES ('version', " +
f"'{self._db_format(self._version)}')",
commit=True
)
self.execute("INSERT INTO info VALUES ('name', '')")
self.execute("INSERT INTO info VALUES ('description', '')")
self.execute(
f"INSERT INTO info VALUES ('time_system', '{self._time_system}')"
)
self.execute(
"INSERT INTO info VALUES ('date', " +
f"'{timestamp(self._date)}')"
)
self.execute(
"INSERT INTO info VALUES ('creation_date', " +
f"'{timestamp(self.creation_date)}')"
)
self.execute(
"INSERT INTO info VALUES ('last_save_date', " +
f"'{timestamp(self.last_save_date)}')"
)
self._create_submodel()
self.commit()
def _update(self):
version = self.execute(f"SELECT value FROM info WHERE key='version'")
logger.debug(f"{version[0]} == {self._version}")
if version[0] == self._version:
return True
logger.info(f"Update database from {version[0]} to {self._version}")
major, minor, release = version[0].split('.')
if major == "0" and minor == "0" and int(release) < 10:
self.execute(
"INSERT INTO info VALUES ('study_release', '0')"
)
if major == "0" and int(minor) < 2:
self._add_into_info_if_not_exists('current_scenario', '0')
if major == "0" and int(minor) < 2:
# Need to temporary disable the sqlite foreign keys
# checking to update db dans change the table id fk to
# table pamhyr_id fk
self.execute(
"PRAGMA foreign_keys = OFF;"
)
ok = self._update_submodel(version[0], data={})
if major == "0" and int(minor) < 2:
# Reactivate foreign keys checking
self.execute(
"PRAGMA foreign_keys = ON;"
)
if ok:
self.execute(
f"UPDATE info SET value='{self._version}' WHERE key='version'"
)
logger.info("Update database done.")
return True
logger.info("Update failed!")
raise NotImplementedMethodeError(self, self._update)
def _add_into_info_if_not_exists(self, key, value):
rows = self.execute(f"SELECT value FROM info WHERE key='{key}'")
if rows is None or len(rows) == 0:
self.execute(
f"INSERT INTO info VALUES ('{key}', '{value}')"
)
@classmethod
def _load(cls, filename):
new = cls(init_new=False, filename=filename)
def sql_exec(sql):
return new.execute(
sql, fetch_one=False, commit=True
)
version = new.execute(
"SELECT value FROM info WHERE key='study_release'"
)
if version is not None:
new.status.version = int(version[0])
# TODO: Load metadata
new.name = new.execute("SELECT value FROM info WHERE key='name'")[0]
new.description = new.execute(
"SELECT value FROM info WHERE key='description'")[0]
new._time_system = new.execute(
"SELECT value FROM info WHERE key='time_system'")[0]
new._date = datetime.fromtimestamp(
float(new.execute("SELECT value FROM info WHERE key='date'")[0])
)
new.creation_date = datetime.fromtimestamp(
float(
new.execute(
"SELECT value FROM info WHERE key='creation_date'"
)[0]
)
)
new.last_save_date = datetime.fromtimestamp(
float(
new.execute(
"SELECT value FROM info WHERE key='last_save_date'"
)[0]
)
)
data = {
"study": new,
"status": new.status
}
# Scenarios
new.scenarios = Scenarios._db_load(
sql_exec,
data=data
)
scenario_id = new.execute(
"SELECT value FROM info WHERE key='current_scenario'"
)
logger.debug(f"Load with scenario {scenario_id[0]}")
scenario = new.scenarios[int(scenario_id[0])]
new.status.scenario = scenario
if reduce(
lambda a, s: a or (s.parent is scenario),
new.scenarios.lst,
False
):
new.status.set_as_read_only()
data["scenario"] = scenario
data["loaded_pid"] = set()
# Load river data
new._river = River._db_load(
sql_exec, data=data
)
new._river._db_load_results(sql_exec, data=data)
new.status._set_as_saved()
return new
def _save(self, progress=None):
progress = progress if progress is not None else lambda: None
self.execute(
"INSERT OR REPLACE INTO info VALUES ('study_release', " +
f"'{self.status.version}')"
)
self.execute(
f"UPDATE info SET " +
f"value='{self._db_format(self.name)}' WHERE key='name'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{self._db_format(self.description)}' " +
"WHERE key='description'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{self._time_system}' WHERE key='time_system'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self._date)}' WHERE key='date'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self.creation_date)}' " +
"WHERE key='creation_date'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self.last_save_date)}' " +
"WHERE key='last_save_date'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{self.status.scenario_id}' " +
"WHERE key='current_scenario'"
)
progress()
self._save_submodel(
[self.scenarios, self._river],
data=progress
)
self.commit()
def sql_save_request_count(self, *args, **kargs):
return self._count()
def _count(self):
cnt = self._save_count(
[self.scenarios, self._river]
)
logger.debug(cnt)
return cnt + 7
def close(self):
"""Close db connection
Returns:
Nothing.
"""
self._close()
def new_scenario_from_current(self, switch=True):
new = self.scenarios.new(self.status.scenario)
if switch:
self.status.scenario = new
self.status.set_as_editable()
return new
def reload_from_scenario(self, scenario):
if scenario in self._river_scenario_cache:
self._river = self._river_scenario_cache[scenario]
self.status.scenario = scenario
else:
def sql_exec(sql):
return self.execute(
sql, fetch_one=False, commit=True
)
self.status.scenario = scenario
data = {
"status": self.status,
"loaded_pid": set(),
"scenario": scenario
}
# Reload river data
river = River._db_load(
sql_exec, data=data
)
data["study"] = self
river._db_load_results(sql_exec, data=data)
self._river_scenario_cache[scenario] = river
self._river = river
if reduce(
lambda a, s: a or (s.parent is scenario),
self.scenarios.lst,
False
):
self.status.set_as_read_only()
else:
self.status.set_as_editable()
def duplicate_current_scenario(self):
source = self.status.scenario_id
new = self.scenarios.new(
self.status.scenario.parent
)
new.set_pos(self.status.scenario.x + 100,
self.status.scenario.y + 100)
new.name = self.status.scenario.name + " (copy)"
self.status.scenario = new
self.river._data_traversal(
predicate=lambda obj, data: obj._owner_scenario == source,
modifier=lambda obj, data: obj.set_owner_scenario(),
data={}
)
self.status.set_as_editable()
return new
@property
def results(self):
return self._river.results
@results.setter
def results(self, results):
self._river.results = results