# SedimentLayer.py -- Pamhyr # Copyright (C) 2024-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- from functools import reduce from tools import trace, timer from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario from Model.Except import NotImplementedMethodeError class Layer(SQLSubModel): _sub_classes = [] def __init__(self, id: int = -1, name: str = "", type="", height=0.0, d50=0.0, sigma=0.0, critical_constraint=0.0, sl=None, status=None, owner_scenario=-1): super(Layer, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._sl = sl self._name = name self._type = type self._height = height self._d50 = d50 self._sigma = sigma self._critical_constraint = critical_constraint @property def name(self): if self._name == "": return f"Layer #{self.pamhyr_id}" return self._name @name.setter def name(self, name): self._name = name self.modified() @property def type(self): return self._type @type.setter def type(self, type): self._type = type self.modified() @property def height(self): return self._height @height.setter def height(self, height): self._height = float(height) self.modified() @property def d50(self): return self._d50 @d50.setter def d50(self, d50): self._d50 = float(d50) self.modified() @property def sigma(self): return self._sigma @sigma.setter def sigma(self, sigma): self._sigma = float(sigma) self.modified() @property def critical_constraint(self): return self._critical_constraint @critical_constraint.setter def critical_constraint(self, critical_constraint): self._critical_constraint = float(critical_constraint) self.modified() @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE sedimentary_layer_layer{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, ind INTEGER NOT NULL, name TEXT NOT NULL, type TEXT NOT NULL, height REAL NOT NULL, d50 REAL NOT NULL, sigma REAL NOT NULL, critical_constraint REAL NOT NULL, sl INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(sl) REFERENCES sedimentary_layer(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 2: cls._db_create(execute) return True else: cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE sedimentary_layer_layer " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "sedimentary_layer_layer" sl = data['id2pid']['sedimentary_layer'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, ind, name, type, height, " + "d50, sigma, critical_constraint, sl, scenario) " + "SELECT pamhyr_id, ind, name, type, height, " + "d50, sigma, critical_constraint, sl, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_sl_pid(execute, table, sl) @classmethod def _db_update_to_0_2_0_set_sl_pid(cls, execute, table, sl): els = execute( f"SELECT pamhyr_id, sl FROM {table}" ) if els is None: return True for row in els: it = iter(row) pid = next(it) sl_id = next(it) execute( f"UPDATE {table} " + f"SET sl = {sl[sl_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] sl = data["sl"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, name, type, height, " + "d50, sigma, critical_constraint, scenario " + "FROM sedimentary_layer_layer " + f"WHERE sl = {sl.pamhyr_id} " + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " + "ORDER BY ind ASC" ) for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) name = next(it) type = next(it) height = next(it) d50 = next(it) sigma = next(it) critical_constraint = next(it) owner_scenario = next(it) layer = cls( id=pid, name=name, type=type, height=height, d50=d50, sigma=sigma, critical_constraint=critical_constraint, sl=sl, status=data['status'], owner_scenario=owner_scenario ) if deleted: layer.set_as_deleted() loaded.add(pid) new.append(layer) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True ind = data["ind"] sl = data["sl"] execute( "INSERT INTO " + "sedimentary_layer_layer(" + "pamhyr_id, deleted, ind, name, type, height, " + "d50, sigma, critical_constraint, sl, scenario" + ") " + "VALUES (" + f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"{ind}, '{self._db_format(self._name)}', " + f"'{self._db_format(self._type)}', {self._height}, " + f"{self._d50}, {self._sigma}, {self._critical_constraint}, " + f"{sl.pamhyr_id}, " + f"{self._status.scenario_id}" + ")" ) return True def modified(self): super(Layer, self).modified() self._sl.modified() class SedimentLayer(SQLSubModel): _sub_classes = [Layer] def __init__(self, id: int = -1, name: str = "", comment: str = "", status=None, owner_scenario=-1): super(SedimentLayer, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._name = name self._comment = comment self._layers = [] def __str__(self): s = f"{self.name} ({len(self)})" if self.comment != "": s += f" - {self.comment}" return s def __len__(self): return len( list( filter( lambda el: not el.is_deleted(), self._layers ) ) ) @property def layers(self): return list( filter( lambda el: not el.is_deleted(), self._layers ) ) def height(self): return list( map(lambda layer: layer.height, self.layers) ) @property def name(self): if self._name == "": return f"SL #{self.pamhyr_id}" return self._name @name.setter def name(self, name): self._name = name self.modified() def names(self): return list( map( lambda layer: layer.name, self.layers ) ) @property def comment(self): return self._comment @comment.setter def comment(self, comment): self._comment = comment self.modified() @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE sedimentary_layer{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, comment TEXT NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 2: cls._db_create(execute) return True else: cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE sedimentary_layer " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "sedimentary_layer" cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, name, comment, scenario) " + "SELECT pamhyr_id, name, comment, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_load(cls, execute, data=None): new = [] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, deleted, name, comment, scenario " + "FROM sedimentary_layer " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " ) if table is not None: for row in table: it = iter(row) pid = next(it) deleted = (next(it) == 1) name = next(it) comment = next(it) owner_scenario = next(it) sl = cls( id=pid, name=name, comment=comment, status=data['status'], owner_scenario=owner_scenario ) if deleted: sl.set_as_deleted() data["sl"] = sl sl._layers = Layer._db_load(execute, data) loaded.add(pid) new.append(sl) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True execute( "INSERT INTO sedimentary_layer (" + "pamhyr_id, deleted, name, comment, scenario" + ") " + "VALUES (" + f"{self._pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"'{self._db_format(self._name)}', " + f"'{self._db_format(self._comment)}', " + f"{self._status.scenario_id}" + ")" ) data["sl"] = self ind = 0 for layer in self._layers: data["ind"] = ind layer._db_save(execute, data) ind += 1 return True def _data_traversal(self, predicate=lambda obj, data: True, modifier=lambda obj, data: None, data={}): if predicate(self, data): modifier(self, data) for layer in self._layers: layer._data_traversal(predicate, modifier, data) def get(self, index): return self.layers[index] # def set(self, index, new): # self._layers[index] = new # self.modified() def insert(self, index, new): self._layers.insert(index, new) self.modified() def new(self, index): n = Layer(sl=self, status=self._status) self.insert(index, n) self.modified() return n def undelete(self, els): for el in els: el.set_as_not_deleted() self.modified() def delete(self, els): for el in els: el.set_as_deleted() self.modified() def delete_i(self, indexes): list( map( lambda e: e[1].set_as_deleted(), filter( lambda e: e[0] in indexes, enumerate(self._layers) ) ) ) self.modified() def move_up(self, index): if index >= 0: next = index - 1 lst = self._layers lst[index], lst[next] = lst[next], lst[index] self.modified() def move_down(self, index): if index + 1 < len(self._layers): prev = index + 1 lst = self._layers lst[index], lst[prev] = lst[prev], lst[index] self.modified() def compute_height_from_bottom(self, bottom_elevation: list): sl_height = self.height() sl_height_by_profile = [] for i in range(len(sl_height)): cur_profile = [] for _ in bottom_elevation: cur_profile.append(sl_height[i]) sl_height_by_profile.append(cur_profile) z_sl = reduce( lambda acc, current_sl: acc + [ list( map( lambda cur_sl_h, prev_sl_h: prev_sl_h - cur_sl_h, current_sl, acc[-1] ) ) ], sl_height_by_profile, [bottom_elevation] ) return z_sl