# SedimentLayer.py -- Pamhyr # Copyright (C) 2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- from functools import reduce from tools import trace, timer from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario from Model.Except import NotImplementedMethodeError class Layer(SQLSubModel): _sub_classes = [] def __init__(self, id: int = -1, name: str = "", type="", height=0.0, d50=0.0, sigma=0.0, critical_constraint=0.0, sl=None, status=None): super(Layer, self).__init__(id) self._status = status self._name = name self._type = type self._height = height self._d50 = d50 self._sigma = sigma self._critical_constraint = critical_constraint @property def name(self): if self._name == "": return f"Layer #{self.pamhyr_id}" return self._name @name.setter def name(self, name): self._name = name @property def type(self): return self._type @type.setter def type(self, type): self._type = type @property def height(self): return self._height @height.setter def height(self, height): self._height = float(height) @property def d50(self): return self._d50 @d50.setter def d50(self, d50): self._d50 = float(d50) @property def sigma(self): return self._sigma @sigma.setter def sigma(self, sigma): self._sigma = float(sigma) @property def critical_constraint(self): return self._critical_constraint @critical_constraint.setter def critical_constraint(self, critical_constraint): self._critical_constraint = float(critical_constraint) @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE sedimentary_layer_layer{ext} ( {cls.create_db_add_pamhyr_id()}, ind INTEGER NOT NULL, name TEXT NOT NULL, type TEXT NOT NULL, height REAL NOT NULL, d50 REAL NOT NULL, sigma REAL NOT NULL, critical_constraint REAL NOT NULL, sl INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(sl) REFERENCES sedimentary_layer(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 2: cls._db_create(execute) else: cls._db_update_to_0_1_0(execute, data) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_1_0(cls, execute, data): table = "sedimentary_layer_layer" sl = data['id2pid']['sedimentary_layer'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, ind, name, type, height, " + "d50, sigma, critical_constraint, sl, scenario) " + "SELECT pamhyr_id, ind, name, type, height, " + "d50, sigma, critical_constraint, sl, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_1_0_set_sl_pid(execute, table, sl) @classmethod def _db_update_to_0_1_0_set_sl_pid(cls, execute, table, sl): els = execute( f"SELECT pamhyr_id, sl FROM {table}" ) for row in els: it = iter(row) pid = next(it) sl_id = next(it) execute( f"UPDATE {table} " + f"SET sl = {sl[sl_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] sl = data["sl"] table = execute( "SELECT pamhyr_id, name, type, height, " + "d50, sigma, critical_constraint " + "FROM sedimentary_layer_layer " + f"WHERE sl = {sl} " + "ORDER BY ind ASC" ) for row in table: it = iter(row) pid = next(it) name = next(it) type = next(it) height = next(it) d50 = next(it) sigma = next(it) critical_constraint = next(it) layer = cls( id=pid, name=name, type=type, height=height, d50=d50, sigma=sigma, critical_constraint=critical_constraint, sl=sl, status=data['status'] ) new.append(layer) return new def _db_save(self, execute, data=None): ind = data["ind"] sl = data["sl"] execute( "INSERT INTO " + "sedimentary_layer_layer(pamhyr_id, ind, name, type, height, " + "d50, sigma, critical_constraint, sl) " + "VALUES (" + f"{self.pamhyr_id}, {ind}, '{self._db_format(self._name)}', " + f"'{self._db_format(self._type)}', {self._height}, " + f"{self._d50}, {self._sigma}, {self._critical_constraint}, " + f"{sl.pamhyr_id}" + ")" ) return True class SedimentLayer(SQLSubModel): _sub_classes = [Layer] def __init__(self, id: int = -1, name: str = "", comment: str = "", status=None): super(SedimentLayer, self).__init__(id) self._status = status self._name = name self._comment = comment self._layers = [] def __str__(self): s = f"{self.name} ({len(self)})" if self.comment != "": s += f" - {self.comment}" return s def __len__(self): return len(self._layers) @property def layers(self): return self._layers.copy() def height(self): return list( map(lambda layer: layer.height, self._layers) ) @property def name(self): if self._name == "": return f"SL #{self.pamhyr_id}" return self._name @name.setter def name(self, name): self._name = name def names(self): return list( map(lambda layer: layer.name, self._layers) ) @property def comment(self): return self._comment @comment.setter def comment(self, comment): self._comment = comment @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE sedimentary_layer{ext} ( {cls.create_db_add_pamhyr_id()}, name TEXT NOT NULL, comment TEXT NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 2: cls._db_create(execute) else: cls._db_update_to_0_1_0(execute, data) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_1_0(cls, execute, data): table = "sedimentary_layer" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, name, comment, scenario) " + "SELECT pamhyr_id, name, comment, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_load(cls, execute, data=None): new = [] table = execute( "SELECT pamhyr_id, name, comment " + "FROM sedimentary_layer " ) for row in table: sl = cls( id=row[0], name=row[1], comment=row[2], status=data['status'] ) data["sl"] = sl.id sl._layers = Layer._db_load(execute, data) new.append(sl) return new def _db_save(self, execute, data=None): if data is None: data = {} execute( "INSERT INTO sedimentary_layer (pamhyr_id, name, comment) " + f"VALUES ({self.pamhyr_id}, " + f"'{self._db_format(self._name)}', " + f"'{self._db_format(self._comment)}')" ) data["sl"] = self ind = 0 for layer in self._layers: data["ind"] = ind layer._db_save(execute, data) ind += 1 return True def get(self, index): return self._layers[index] def set(self, index, new): self._layers[index] = new self._status.modified() def insert(self, index, new): self._layers.insert(index, new) self._status.modified() def new(self, index): n = Layer(sl=self, status=self._status) self.insert(index, n) self._status.modified() return n def delete(self, els): for el in els: self._layers.remove(el) self._status.modified() def delete_i(self, indexes): els = list( map( lambda x: x[1], filter( lambda x: x[0] in indexes, enumerate(self._layers) ) ) ) self.delete(els) def move_up(self, index): if index >= 0: next = index - 1 lst = self._layers lst[index], lst[next] = lst[next], lst[index] self._status.modified() def move_down(self, index): if index + 1 < len(self._layers): prev = index + 1 lst = self._layers lst[index], lst[prev] = lst[prev], lst[index] self._status.modified() def compute_height_from_bottom(self, bottom_elevation: list): sl_height = self.height() sl_height_by_profile = [] for i in range(len(sl_height)): cur_profile = [] for _ in bottom_elevation: cur_profile.append(sl_height[i]) sl_height_by_profile.append(cur_profile) z_sl = reduce( lambda acc, current_sl: acc + [ list( map( lambda cur_sl_h, prev_sl_h: prev_sl_h - cur_sl_h, current_sl, acc[-1] ) ) ], sl_height_by_profile, [bottom_elevation] ) return z_sl