Pamhyr2/src/Model/LateralContribution/LateralContribution.py

687 lines
19 KiB
Python

# LateralContribution.py -- Pamhyr
# Copyright (C) 2023-2025 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from tools import (
trace, timer,
old_pamhyr_date_to_timestamp,
date_iso_to_timestamp,
date_dmy_to_timestamp,
)
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Except import NotImplementedMethodeError
from Model.Scenario import Scenario
logger = logging.getLogger()
class Data(SQLSubModel):
_sub_classes = []
def __init__(self,
data0, data1,
id: int = -1,
types=[float, float],
status=None,
owner_scenario=-1):
super(Data, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._types = types
self._data = [data0, data1]
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE lateral_contribution_data{ext} (
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
ind INTEGER NOT NULL,
data0 TEXT NOT NULL,
data1 TEXT NOT NULL,
lc INTEGER,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(lc) REFERENCES lateral_contribution(pamhyr_id),
PRIMARY KEY(pamhyr_id, scenario)
)
""")
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
if major == minor == "0":
cls._db_update_to_0_2_0(execute, data)
if major == "0" and minor == "1":
if int(release) < 2:
execute(
"ALTER TABLE boundary_condition_data " +
"ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE"
)
return cls._update_submodel(execute, version, data)
@classmethod
def _db_update_to_0_2_0(cls, execute, data=None):
table = "lateral_contribution_data"
lcs = data['id2pid']['lateral_contribution']
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table} " +
"(pamhyr_id, ind, data0, data1, lc, scenario) " +
"SELECT pamhyr_id, ind, data0, data1, lc, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
cls._db_update_to_0_2_0_set_lc_pid(execute, table, lcs)
@classmethod
def _db_update_to_0_2_0_set_lc_pid(cls, execute, table, lcs):
els = execute(
f"SELECT pamhyr_id, lc FROM {table}"
)
for row in els:
it = iter(row)
pid = next(it)
lc_id = next(it)
execute(
f"UPDATE {table} " +
f"SET lc = {lcs[lc_id]} " +
f"WHERE pamhyr_id = {pid}"
)
@classmethod
def _db_load(cls, execute, data=None):
new = []
lc = data["lc"]
status = data['status']
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
values = execute(
"SELECT pamhyr_id, deleted, " +
"data0, data1, scenario " +
"FROM lateral_contribution_data " +
f"WHERE lc = {lc._pamhyr_id} " +
f"AND scenario = {scenario.id} " +
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " +
"ORDER BY ind ASC"
)
for v in values:
it = iter(v)
pid = next(it)
deleted = next(it)
data0 = lc._types[0](next(it))
data1 = lc._types[1](next(it))
owner_scenario = next(it)
nd = cls(
data0, data1,
id=pid,
types=lc._types,
status=status,
owner_scenario=owner_scenario
)
if deleted:
nd.set_as_deleted()
loaded.add(pid)
new.append(nd)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
pid = self._pamhyr_id
ind = data["ind"]
data0 = self._db_format(str(self[0]))
data1 = self._db_format(str(self[1]))
lc = data["lc"]
execute(
"INSERT INTO " +
"lateral_contribution_data (pamhyr_id, deleted, ind, " +
"data0, data1, lc, scenario) " +
f"VALUES ({pid}, {self._db_format(self.is_deleted())}, " +
f"{ind}, '{self._db_format(data0)}', {self._db_format(data1)}, " +
f"{lc._pamhyr_id}, {self._status.scenario_id}" +
")"
)
return True
def __getitem__(self, key):
return self._types[key](self._data[key])
def __setitem__(self, key, value):
self._data[key] = self._types[key](value)
class LateralContribution(SQLSubModel):
_sub_classes = [Data]
def __init__(self, id: int = -1,
name: str = "",
status=None, owner_scenario=-1):
super(LateralContribution, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._name = name
self._type = ""
self._reach = None
self._begin_section = None
self._end_section = None
self._data = []
self._header = []
self._types = [float, float]
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE lateral_contribution{ext}(
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
name TEXT NOT NULL,
type TEXT NOT NULL,
tab TEXT NOT NULL,
reach INTEGER,
begin_section INTEGER,
end_section INTEGER,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id),
FOREIGN KEY(begin_section)
REFERENCES geometry_profileXYZ(pamhyr_id),
FOREIGN KEY(end_section)
REFERENCES geometry_profileXYZ(pamhyr_id),
PRIMARY KEY(pamhyr_id, scenario)
)
""")
if ext == "_tmp":
return True
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
if major == minor == "0":
if int(release) < 11:
execute(
"""
ALTER TABLE lateral_contribution
RENAME COLUMN begin_kp TO begin_rk
"""
)
execute(
"""
ALTER TABLE lateral_contribution
RENAME COLUMN end_kp TO end_rk
"""
)
execute(
"""
ALTER TABLE lateral_contribution
RENAME COLUMN edge TO reach
"""
)
cls._db_update_to_0_2_0(execute, data)
if major == "0" and minor == "1":
if int(release) < 2:
execute(
"ALTER TABLE lateral_contribution " +
"ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE"
)
return cls._update_submodel(execute, version, data)
@classmethod
def _db_update_to_0_2_0(cls, execute, data):
table = "lateral_contribution"
reachs = data['id2pid']['river_reach']
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table} " +
"(pamhyr_id, name, type, tab, reach, scenario) " +
"SELECT pamhyr_id, name, type, tab, reach, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs)
@classmethod
def _get_ctor_from_type(cls, t):
from Model.LateralContribution.LateralContributionTypes import (
NotDefined, LateralContrib, Rain, Evaporation
)
res = NotDefined
if t == "LC":
res = LateralContrib
elif t == "RA":
res = Rain
elif t == "EV":
res = Evaporation
return res
@classmethod
def _db_load(cls, execute, data=None):
new = []
tab = data["tab"]
status = data['status']
edges = data["edges"]
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
table = execute(
"SELECT pamhyr_id, deleted, name, type, " +
"reach, begin_section, end_section, scenario " +
"FROM lateral_contribution " +
f"WHERE tab = '{tab}'" +
f"AND scenario = {scenario.id} " +
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) "
)
for row in table:
it = iter(row)
pid = next(it)
deleted = next(it)
name = next(it)
t = next(it)
reach = next(it)
b_section = next(it)
e_section = next(it)
owner_scenario = next(it)
ctor = cls._get_ctor_from_type(t)
lc = ctor(
id=pid, name=name,
status=status,
owner_scenario=owner_scenario
)
lc.reach = None
lc._begin_section = None
lc._end_section = None
if row[3] != -1:
lc.reach = next(
filter(
lambda e: e.id == reach,
edges
)
)
lc._begin_section = next(
filter(
lambda p: p.id == b_section,
lc.reach.reach.profiles
)
)
lc._end_section = next(
filter(
lambda p: p.id == e_section,
lc.reach.reach.profiles
)
)
data["lc"] = lc
lc._data = Data._db_load(execute, data=data)
loaded.add(pid)
new.append(lc)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
if not self.must_be_saved():
return True
tab = data["tab"]
data["lc"] = self
execute(
"DELETE FROM lateral_contribution_data " +
f"WHERE lc = {self._pamhyr_id} " +
f"AND scenario = {self._status.scenario_id}"
)
reach = -1
begin_section = -1
end_section = -1
if self._reach is not None:
reach = self._reach._pamhyr_id
begin_section = self._begin_section._pamhyr_id
end_section = self._end_section._pamhyr_id
execute(
"INSERT INTO " +
"lateral_contribution(" +
"pamhyr_id, deleted, name, type, tab, " +
"reach, begin_section, end_section, scenario) " +
"VALUES (" +
f"{self.id}, {self._db_format(self.is_deleted())}, " +
f"'{self._db_format(self._name)}', " +
f"'{self._db_format(self._type)}', '{tab}', {reach}, " +
f"{begin_section}, {end_section}, " +
f"{self._status.scenario_id}" +
")"
)
ind = 0
for d in self._data:
data["ind"] = ind
d._db_save(execute, data)
ind += 1
return True
def _data_traversal(self,
predicate=lambda obj, data: True,
modifier=lambda obj, data: None,
data={}):
if predicate(self, data):
modifier(self, data)
for d in self._data:
d._data_traversal(predicate, modifier, data)
def __len__(self):
return len(
list(
filter(
lambda el: not el.is_deleted(),
self._data
)
)
)
@classmethod
def compatibility(cls):
return ["liquid", "solid", "suspenssion"]
@classmethod
def time_convert(cls, data):
if type(data) is str:
if data.count("-") == 2:
return date_iso_to_timestamp(data)
if data.count("/") == 2:
return date_dmy_to_timestamp(data)
if data.count(":") == 3:
return old_pamhyr_date_to_timestamp(data)
if data.count(":") == 2:
return old_pamhyr_date_to_timestamp("00:" + data)
if data.count(".") == 1:
return round(float(data))
return int(data)
@property
def name(self):
if self._name == "":
return f"LC #{self.id}"
return self._name
@name.setter
def name(self, name):
self._name = name
self.modified()
@property
def lctype(self):
return self._type
@property
def reach(self):
return self._reach
@reach.setter
def reach(self, reach):
self._reach = reach
if reach is not None:
self._begin_section = self._reach.reach.profiles[0]
self._end_section = self._reach.reach.profiles[-1]
self.modified()
def has_reach(self):
return self._reach is not None
@property
def begin_rk(self):
if self._begin_section is None:
return 0
return self._begin_section.rk
@property
def begin_section(self):
return self._begin_section
@begin_section.setter
def begin_section(self, section):
self._begin_section = section
self.modified()
@property
def end_rk(self):
if self._end_section is None:
return 0
return self._end_section.rk
@property
def end_section(self):
return self._end_section
@end_section.setter
def end_section(self, section):
self._end_section = section
self.modified()
@property
def header(self):
return self._header.copy()
@property
def data(self):
return list(
filter(
lambda el: not el.is_deleted(),
self._data
)
)
def get_type_column(self, column):
if 0 <= column < 2:
return self._types[column]
return None
@property
def _default_0(self):
return self._types[0](0)
@property
def _default_1(self):
return self._types[1](0.0)
def is_define(self):
return self._data is not None
def new_from_data(self, header, data):
new_0 = self._default_0
new_1 = self._default_1
if len(header) != 0:
for i in [0, 1]:
for j in range(len(header)):
if self._header[i] == header[j]:
if i == 0:
new_0 = self._types[i](data[j].replace(",", "."))
else:
new_1 = self._types[i](data[j].replace(",", "."))
else:
new_0 = self._types[0](data[0].replace(",", "."))
new_1 = self._types[1](data[1].replace(",", "."))
return Data(new_0, new_1, status=self._status)
def add(self, index: int):
value = Data(self._default_0, self._default_1, status=self._status)
self._data.insert(index, value)
self.modified()
return value
def insert(self, index: int, value):
self._data.insert(index, value)
self.modified()
def delete_i(self, indexes):
self._data = list(
map(
lambda e: e[1],
filter(
lambda e: e[0] not in indexes,
enumerate(self.data)
)
)
)
self.modified()
def delete(self, els):
self._data = list(
filter(
lambda e: e not in els,
self.data
)
)
self.modified()
def sort(self, _reverse=False, key=None):
if key is None:
self._data.sort(reverse=_reverse)
else:
self._data.sort(reverse=_reverse, key=key)
self.modified()
def get_i(self, index):
return self.data[index]
def get_range(self, _range):
lst = []
for r in _range:
lst.append(r)
return lst
def _set_i_c_v(self, index, column, value):
v = self._data[index]
v[column] = self._types[column](value)
self._data[index] = v
self.modified()
def set_i_0(self, index: int, value):
self._set_i_c_v(index, 0, value)
def set_i_1(self, index: int, value):
self._set_i_c_v(index, 1, value)
@timer
def convert(self, cls):
new = cls(name=self.name, status=self._status)
new.reach = self.reach
new.begin_section = self.begin_section
new.end_section = self.end_section
for i, _ in enumerate(self.data):
new.add(i)
for i in [0, 1]:
for j in [0, 1]:
if self._header[i] == new.header[j]:
for ind, v in self.data:
try:
new._set_i_c_v(ind, j, v[i])
except Exception as e:
logger.info(e)
self.modified()
return new
def move_up(self, index):
if index < len(self):
next = index - 1
d = self._data
d[index], d[next] = d[next], d[index]
self.modified()
def move_down(self, index):
if index >= 0:
prev = index + 1
d = self._data
d[index], d[prev] = d[prev], d[index]
self.modified()