Pamhyr2/src/Model/SedimentLayer/SedimentLayer.py

555 lines
15 KiB
Python

# SedimentLayer.py -- Pamhyr
# Copyright (C) 2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
from functools import reduce
from tools import trace, timer
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Scenario import Scenario
from Model.Except import NotImplementedMethodeError
class Layer(SQLSubModel):
_sub_classes = []
def __init__(self,
id: int = -1, name: str = "",
type="",
height=0.0, d50=0.0, sigma=0.0,
critical_constraint=0.0,
sl=None, status=None,
owner_scenario=-1):
super(Layer, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._name = name
self._type = type
self._height = height
self._d50 = d50
self._sigma = sigma
self._critical_constraint = critical_constraint
@property
def name(self):
if self._name == "":
return f"Layer #{self.pamhyr_id}"
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def type(self):
return self._type
@type.setter
def type(self, type):
self._type = type
@property
def height(self):
return self._height
@height.setter
def height(self, height):
self._height = float(height)
@property
def d50(self):
return self._d50
@d50.setter
def d50(self, d50):
self._d50 = float(d50)
@property
def sigma(self):
return self._sigma
@sigma.setter
def sigma(self, sigma):
self._sigma = float(sigma)
@property
def critical_constraint(self):
return self._critical_constraint
@critical_constraint.setter
def critical_constraint(self, critical_constraint):
self._critical_constraint = float(critical_constraint)
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE sedimentary_layer_layer{ext} (
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
ind INTEGER NOT NULL,
name TEXT NOT NULL,
type TEXT NOT NULL,
height REAL NOT NULL,
d50 REAL NOT NULL,
sigma REAL NOT NULL,
critical_constraint REAL NOT NULL,
sl INTEGER,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(sl) REFERENCES sedimentary_layer(pamhyr_id),
PRIMARY KEY(pamhyr_id, scenario)
)
""")
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
if major == minor == "0":
if int(release) < 2:
cls._db_create(execute)
return True
else:
cls._db_update_to_0_2_0(execute, data)
if major == "0" and minor == "1":
if int(release) < 2:
execute(
"ALTER TABLE sedimentary_layer_layer " +
"ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE"
)
return cls._update_submodel(execute, version, data)
@classmethod
def _db_update_to_0_2_0(cls, execute, data):
table = "sedimentary_layer_layer"
sl = data['id2pid']['sedimentary_layer']
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table}_tmp " +
"(pamhyr_id, ind, name, type, height, " +
"d50, sigma, critical_constraint, sl, scenario) " +
"SELECT pamhyr_id, ind, name, type, height, " +
"d50, sigma, critical_constraint, sl, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
cls._db_update_to_0_2_0_set_sl_pid(execute, table, sl)
@classmethod
def _db_update_to_0_2_0_set_sl_pid(cls, execute, table, sl):
els = execute(
f"SELECT pamhyr_id, sl FROM {table}"
)
for row in els:
it = iter(row)
pid = next(it)
sl_id = next(it)
execute(
f"UPDATE {table} " +
f"SET sl = {sl[sl_id]} " +
f"WHERE pamhyr_id = {pid}"
)
@classmethod
def _db_load(cls, execute, data=None):
new = []
sl = data["sl"]
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
table = execute(
"SELECT pamhyr_id, deleted, name, type, height, " +
"d50, sigma, critical_constraint, scenario " +
"FROM sedimentary_layer_layer " +
f"WHERE sl = {sl} " +
f"AND scenario = {scenario.id} " +
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " +
"ORDER BY ind ASC"
)
for row in table:
it = iter(row)
pid = next(it)
deleted = (next(it) == 1)
name = next(it)
type = next(it)
height = next(it)
d50 = next(it)
sigma = next(it)
critical_constraint = next(it)
owner_scenario = next(it)
layer = cls(
id=pid, name=name,
type=type, height=height,
d50=d50, sigma=sigma,
critical_constraint=critical_constraint,
sl=sl, status=data['status'],
owner_scenario=owner_scenario
)
if deleted:
layer.set_as_deleted()
loaded.add(pid)
new.append(layer)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
if not self.must_be_saved():
return True
ind = data["ind"]
sl = data["sl"]
execute(
"INSERT INTO " +
"sedimentary_layer_layer(" +
"pamhyr_id, deleted, ind, name, type, height, " +
"d50, sigma, critical_constraint, sl, scenario" +
") " +
"VALUES (" +
f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " +
f"{ind}, '{self._db_format(self._name)}', " +
f"'{self._db_format(self._type)}', {self._height}, " +
f"{self._d50}, {self._sigma}, {self._critical_constraint}, " +
f"{sl.pamhyr_id}, " +
f"{self._status.scenario_id}" +
")"
)
return True
class SedimentLayer(SQLSubModel):
_sub_classes = [Layer]
def __init__(self, id: int = -1,
name: str = "", comment: str = "",
status=None, owner_scenario=-1):
super(SedimentLayer, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._name = name
self._comment = comment
self._layers = []
def __str__(self):
s = f"{self.name} ({len(self)})"
if self.comment != "":
s += f" - {self.comment}"
return s
def __len__(self):
return len(
list(
filter(
lambda el: not el.is_deleted(),
self._layers
)
)
)
@property
def layers(self):
return list(
filter(
lambda el: not el.is_deleted(),
self._layers
)
)
def height(self):
return list(
map(lambda layer: layer.height,
self.layers)
)
@property
def name(self):
if self._name == "":
return f"SL #{self.pamhyr_id}"
return self._name
@name.setter
def name(self, name):
self._name = name
self.modified()
def names(self):
return list(
map(
lambda layer: layer.name,
self.layers
)
)
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, comment):
self._comment = comment
self.modified()
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE sedimentary_layer{ext} (
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
name TEXT NOT NULL,
comment TEXT NOT NULL,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
PRIMARY KEY(pamhyr_id, scenario)
)
""")
if ext == "_tmp":
return True
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
if major == minor == "0":
if int(release) < 2:
cls._db_create(execute)
return True
else:
cls._db_update_to_0_2_0(execute, data)
if major == "0" and minor == "1":
if int(release) < 2:
execute(
"ALTER TABLE sedimentary_layer " +
"ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE"
)
return cls._update_submodel(execute, version, data)
@classmethod
def _db_update_to_0_2_0(cls, execute, data):
table = "sedimentary_layer"
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table}_tmp " +
"(pamhyr_id, name, comment, scenario) " +
"SELECT pamhyr_id, name, comment, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
@classmethod
def _db_load(cls, execute, data=None):
new = []
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
table = execute(
"SELECT pamhyr_id, deleted, name, comment, scenario " +
"FROM sedimentary_layer " +
f"WHERE scenario = {scenario.id} " +
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) "
)
for row in table:
it = iter(row)
pid = next(it)
deleted = (next(it) == 1)
name = next(it)
comment = next(it)
owner_scenario = next(it)
sl = cls(
id=pid,
name=name,
comment=comment,
status=data['status'],
owner_scenario=owner_scenario
)
if deleted:
sl.set_as_deleted()
data["sl"] = sl.id
sl._layers = Layer._db_load(execute, data)
loaded.add(pid)
new.append(sl)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
if not self.must_be_saved():
return True
execute(
"INSERT INTO sedimentary_layer (" +
"pamhyr_id, deleted, name, comment, scenario" +
") " +
"VALUES (" +
f"{self._pamhyr_id}, {self._db_format(self.is_deleted())}, " +
f"'{self._db_format(self._name)}', " +
f"'{self._db_format(self._comment)}', " +
f"{self._status.scenario_id}" +
")"
)
data["sl"] = self
ind = 0
for layer in self._layers:
data["ind"] = ind
layer._db_save(execute, data)
ind += 1
return True
def get(self, index):
return self.layers[index]
# def set(self, index, new):
# self._layers[index] = new
# self.modified()
def insert(self, index, new):
self._layers.insert(index, new)
self.modified()
def new(self, index):
n = Layer(sl=self, status=self._status)
self.insert(index, n)
self.modified()
return n
def undelete(self, els):
for el in els:
el.set_as_not_deleted()
self.modified()
def delete(self, els):
for el in els:
el.set_as_deleted()
self.modified()
def delete_i(self, indexes):
list(
map(
lambda e: e[1].set_as_deleted(),
filter(
lambda e: e[0] in indexes,
enumerate(self._layers)
)
)
)
self.modified()
def move_up(self, index):
if index >= 0:
next = index - 1
lst = self._layers
lst[index], lst[next] = lst[next], lst[index]
self.modified()
def move_down(self, index):
if index + 1 < len(self._layers):
prev = index + 1
lst = self._layers
lst[index], lst[prev] = lst[prev], lst[index]
self.modified()
def compute_height_from_bottom(self, bottom_elevation: list):
sl_height = self.height()
sl_height_by_profile = []
for i in range(len(sl_height)):
cur_profile = []
for _ in bottom_elevation:
cur_profile.append(sl_height[i])
sl_height_by_profile.append(cur_profile)
z_sl = reduce(
lambda acc, current_sl: acc + [
list(
map(
lambda cur_sl_h, prev_sl_h: prev_sl_h - cur_sl_h,
current_sl, acc[-1]
)
)
],
sl_height_by_profile,
[bottom_elevation]
)
return z_sl