Pamhyr2/src/Model/Study.py

324 lines
8.7 KiB
Python

# Study.py -- Pamhyr Study class
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import shutil
import logging
from datetime import datetime
from tools import timer, timestamp
from Model.Tools.PamhyrDB import SQLModel
from Model.Saved import SavedStatus
from Model.Serializable import Serializable
from Model.Except import NotImplementedMethodeError
from Model.River import River
from Checker.Study import *
logger = logging.getLogger()
class Study(SQLModel):
_sub_classes = [
River,
]
def __init__(self, filename=None, init_new=True):
# Metadata
self._version = "0.0.6"
self.creation_date = datetime.now()
self.last_modification_date = datetime.now()
self.last_save_date = datetime.now()
self._filename = filename
super(Study, self).__init__(filename=filename)
self.status = SavedStatus()
# Study general information
self._name = ""
self.description = ""
# Time system
self._time_system = "time"
self._date = datetime.fromtimestamp(0)
if init_new:
# Study data
self._river = River(status=self.status)
else:
self._init_db_file(filename, is_new=False)
self._old_save_id = 0
@classmethod
def checkers(cls):
lst = [
StudyNetworkReachChecker(),
StudyGeometryChecker(),
# DummyOK(),
# DummyWARNING(),
# DummyERROR(),
]
return lst
@property
def river(self):
return self._river
@property
def is_saved(self):
return self.status.is_saved()
def save(self, progress=None):
# Save a copy of database
fdir, fname = os.path.split(self.filename)
if self._old_save_id == 0:
old_dir = os.path.join(fdir, "_PAMHYR_", "__old__")
if os.name == "nt":
old_dir = old_dir.replace("/", "\\")
try:
os.makedirs(old_dir)
except FileExistsError as e:
shutil.rmtree(old_dir)
os.makedirs(old_dir)
except Exception as e:
logger.error(e)
fname = fname + "." + str(self._old_save_id)
filename = os.path.join(fdir, "_PAMHYR_", "__old__", fname)
shutil.copy(self.filename, filename)
self._old_save_id += 1
# Save
self.last_save_date = datetime.now()
self._save(progress=progress)
self.status.save()
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = str(name)
self.status.modified()
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
self._filename = str(filename)
self._init_db_file(filename, is_new=True)
self.status.modified()
@property
def time_system(self):
return self._time_system
def use_time(self):
self._time_system = "time"
self.status.modified()
def use_date(self, date: datetime):
self._time_system = "date"
self._date = date
self.status.modified()
@property
def date(self):
return self._date
@date.setter
def date(self, timestamp):
self._date = timestamp
self.status.modified()
# @classmethod
# def new(cls):
# return cls()
@classmethod
def new(cls, name, description, date=None):
me = cls()
me.name = name
me.description = description
if date is not None:
me.use_date()
me.date = date
return me
@classmethod
def open(cls, filename):
me = cls._load(filename)
return me
#######
# SQL #
#######
def _db_insert_into_info(self, key, value, commit=False):
self.execute(
"INSERT INTO info VALUES " +
f"('{key}', '{self._db_format(value)}')",
commit=commit
)
def _create(self):
# Info (metadata)
self.execute(
"CREATE TABLE info(key TEXT NOT NULL UNIQUE, value TEXT NOT NULL)")
self.execute(
"INSERT INTO info VALUES ('version', " +
f"'{self._db_format(self._version)}')",
commit=True
)
self.execute("INSERT INTO info VALUES ('name', '')")
self.execute("INSERT INTO info VALUES ('description', '')")
self.execute(
f"INSERT INTO info VALUES ('time_system', '{self._time_system}')")
self.execute(
f"INSERT INTO info VALUES ('date', " +
"'{self._date.timestamp()}')"
)
self.execute(
f"INSERT INTO info VALUES ('creation_date', " +
"'{self.creation_time.timestamp()}')"
)
self.execute(
f"INSERT INTO info VALUES ('last_save_date', " +
"'{self.last_save_time.timestamp()}')"
)
self._create_submodel()
self.commit()
def _update(self):
version = self.execute(f"SELECT value FROM info WHERE key='version'")
logger.debug(f"{version[0]} == {self._version}")
if version[0] == self._version:
return True
logger.info("Update database")
if self._update_submodel(version[0]):
self.execute(
f"UPDATE info SET value='{self._version}' WHERE key='version'")
return True
logger.info("TODO: update failed")
raise NotImplementedMethodeError(self, self._update)
@classmethod
def _load(cls, filename):
new = cls(init_new=False, filename=filename)
# TODO: Load metadata
new.name = new.execute("SELECT value FROM info WHERE key='name'")[0]
new.description = new.execute(
"SELECT value FROM info WHERE key='description'")[0]
new._time_system = new.execute(
"SELECT value FROM info WHERE key='time_system'")[0]
new._date = datetime.fromtimestamp(
float(new.execute("SELECT value FROM info WHERE key='date'")[0])
)
new.creation_date = datetime.fromtimestamp(
float(new.execute(
"SELECT value FROM info WHERE key='creation_date'")[0])
)
new.last_save_date = datetime.fromtimestamp(
float(new.execute(
"SELECT value FROM info WHERE key='last_save_date'")[0])
)
# Load river data
new._river = River._db_load(
lambda sql: new.execute(
sql,
fetch_one=False,
commit=True
),
data={"status": new.status}
)
return new
def _save(self, progress=None):
progress = progress if progress is not None else lambda: None
self.execute(
f"UPDATE info SET " +
f"value='{self._db_format(self.name)}' WHERE key='name'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{self._db_format(self.description)}' " +
"WHERE key='description'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{self._time_system}' WHERE key='time_system'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self._date)}' WHERE key='date'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self.creation_date)}' " +
"WHERE key='creation_date'"
)
progress()
self.execute(
f"UPDATE info SET " +
f"value='{timestamp(self.last_save_date)}' " +
"WHERE key='last_save_date'"
)
progress()
self._save_submodel([self._river], data=progress)
self.commit()
def sql_save_request_count(self):
return self._count()
def _count(self):
cnt = self._save_count([self._river])
logger.debug(cnt)
return cnt + 6
def close(self):
"""Close db connection
Returns:
Nothing.
"""
self._close()