# PamhyrDB.py -- Pamhyr abstract model database classes # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import sqlite3 import logging from pathlib import Path from functools import reduce from SQL import SQL from Model.Except import NotImplementedMethodeError from Model.Tools.PamhyrID import PamhyrID logger = logging.getLogger() # Top level model class class SQLModel(SQL): _sub_classes = [] def _init_db_file(self, db, is_new=True): if self._db is not None: self._db.close() exists = Path(db).exists() if exists and is_new: os.remove(db) self._db = sqlite3.connect(db, check_same_thread=False) self._cur = self._db.cursor() if is_new: logger.info("Create database") self._create() # Create db # self._save() # Save else: logger.info("Update database") self._update() # Update db scheme if necessary # self._load() # Load data def __init__(self, filename=None): self._db = None def _create_submodel(self): def fn(sql): return self.execute( sql, fetch_one=False, commit=False ) for cls in self._sub_classes: requests = cls._db_create(fn) self.commit() return True def _create(self): raise NotImplementedMethodeError(self, self._create) def _update_submodel(self, version, data=None): def fn(sql): return self.execute( sql, fetch_one=False, commit=False ) ok = True for cls in self._sub_classes: ok &= cls._db_update(fn, version, data=data) self.commit() return ok def _update(self): raise NotImplementedMethodeError(self, self._update) def _save_submodel(self, objs, data=None): progress = data if data is not None else lambda: None def fn(sql): res = self.execute( sql, fetch_one=False, commit=False ) progress() return res ok = True for obj in objs: ok &= obj._db_save(fn, data={}) self.commit() return ok def _save(self, progress=None): raise NotImplementedMethodeError(self, self._save) def _count(self): raise NotImplementedMethodeError(self, self._count) def _save_count(self, objs, data={}): counter = { "insert": 0, "update": 0, "delete": 0, "other": 0, } def fn(sql): if "insert" in sql.lower(): counter["insert"] = counter["insert"] + 1 elif "update" in sql.lower(): counter["update"] = counter["update"] + 1 elif "delete" in sql.lower(): counter["delete"] = counter["delete"] + 1 else: counter["other"] = counter["other"] + 1 return [] ok = True for obj in objs: ok &= obj._db_save(fn, data=data) logger.debug(counter) return reduce( lambda acc, k: acc + counter[k], counter, 0 ) @classmethod def _load(cls, filename=None): raise NotImplementedMethodeError(cls, cls._load) # Sub model class class SQLSubModel(PamhyrID): _sub_classes = [] def __init__(self, id: int = -1, status=None, owner_scenario=-1, **kwargs): super(SQLSubModel, self).__init__(id=id, **kwargs) self._status = status # Deletion status of the model object. This status MUST be set # to True if an object that exists in a parent scenario is # deleted in current scenario. self._deleted = False # The 'owner_scenario' is the id of the scenario to which the # object belongs. This id CAN be different to current # scenario, but in case of object modification, this id MUST # be set to current scenario id. (This action is made in # 'modified' method.) self._owner_scenario = 0 if owner_scenario == -1: if status is not None: self._owner_scenario = self._status.scenario_id else: self._owner_scenario = owner_scenario def must_be_saved(self): """Return True if this object MUST be save in the save file. Returns: True if this object MUST be save, otherelse False """ return self._owner_scenario == self._status.scenario_id def modified(self): """Set study status to modified and update the object owner_scenario to current scenario Returns: Nothing """ if self._status is None: return self._owner_scenario = self._status.scenario_id self._status.modified() def is_deleted(self): """This object is deleted? Returns: True if this object is deleted, otherelse False """ return self._deleted def set_as_deleted(self): """Set object deleted flag to True. Returns: Nothing """ self._deleted = True self.modified() def set_as_not_deleted(self): """Set object deleted flag to False. Returns: Nothing """ self._deleted = False self.modified() def _db_format(self, value): # Replace ''' by ''' to preserve SQL injection if type(value) is str: value = value.replace("'", "'") elif type(value) is bool: value = 'TRUE' if value else 'FALSE' elif value is None: value = "NULL" return value @classmethod def _create_submodel(cls, execute): for sc in cls._sub_classes: sc._db_create(execute) @classmethod def _db_create(cls, execute): """Create data base scheme Args: execute: Function to exec SQL resquest Returns: Return true, otherelse false if an issue appear """ raise NotImplementedMethodeError(cls, cls._db_create) @classmethod def _update_submodel(cls, execute, version, data=None): ok = True for sc in cls._sub_classes: ok &= sc._db_update(execute, version, data) return ok @classmethod def _db_update(cls, execute, version, data=None): """Update data base scheme Args: execute: Function to exec SQL resquest version: Current database version Returns: Return true, otherelse false if an issue appear """ raise NotImplementedMethodeError(cls, cls._db_update) @classmethod def _db_update_to_0_1_0_set_node_pid(cls, execute, table, nodes): els = execute( f"SELECT pamhyr_id, node FROM {table}" ) for row in els: it = iter(row) pid = next(it) node_id = next(it) if node_id == -1: continue execute( f"UPDATE {table} " + f"SET node = {nodes[node_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_update_to_0_1_0_set_reach_pid(cls, execute, table, reachs): els = execute( f"SELECT pamhyr_id, reach FROM {table}" ) for row in els: it = iter(row) pid = next(it) reach_id = next(it) if reach_id == -1: continue execute( f"UPDATE {table} " + f"SET reach = {reachs[reach_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_update_to_0_1_1_assoc_section_from_rk( cls, execute, table, reach_column="reach", rk_column="rk", section_column="section", origin_version="0.1.0"): kid = "pamhyr_id" if origin_version == "0.0.*": kid = "id" els = execute( "SELECT " + f"{kid}, {reach_column}, {rk_column} " + f"FROM {table}" ) for row in els: it = iter(row) pid = next(it) reach_id = next(it) rk = next(it) if reach_id == -1 or reach_id is None: continue section_id = -1 section = execute( f"SELECT pamhyr_id FROM geometry_profileXYZ " + f"WHERE reach == {reach_id} AND rk == {rk}" ) if len(section) != 0: section_id = section[0][0] logger.info( f"Update reach rk {rk}({reach_id}) to pid {section_id}" ) execute( f"UPDATE {table} " + f"SET {section_column} = {section_id} " + f"WHERE {kid} = {pid}" ) @classmethod def _db_load(cls, execute, data=None): """Load instance of this class from SQL data base Args: execute: Function to exec SQL request data: Optional data for the class constructor Returns: Return new instance of class """ raise NotImplementedMethodeError(cls, cls._db_load) def _save_submodel(self, execute, objs, data=None): ok = True for o in objs: ok &= o._db_save(execute, data=data) return ok def _db_save(self, execute, data=None): """Save class data to data base Args: execute: Function to exec SQL resquest data: Optional additional information for save Returns: Return true, otherelse false if an issue appear during save """ raise NotImplementedMethodeError(self, self._db_save)