mirror of https://gitlab.com/pamhyr/pamhyr2
395 lines
9.8 KiB
Python
395 lines
9.8 KiB
Python
# InitialConditions.py -- Pamhyr
|
|
# Copyright (C) 2023 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from copy import copy, deepcopy
|
|
from tools import trace, timer
|
|
from functools import reduce
|
|
|
|
from Model.Tools.PamhyrDB import SQLSubModel
|
|
|
|
|
|
class Data(SQLSubModel):
|
|
def __init__(self, name: str = "",
|
|
comment: str = "", reach=None,
|
|
kp: float = 0.0, discharge: float = 0.0,
|
|
height: float = 0.0,
|
|
status=None):
|
|
super(Data, self).__init__()
|
|
|
|
self._status = status
|
|
|
|
self._reach = reach
|
|
|
|
self._name = name
|
|
self._comment = comment
|
|
|
|
self._kp = kp
|
|
self._discharge = discharge
|
|
self._speed = 0.0
|
|
self._elevation = 0.0
|
|
self._height = height
|
|
|
|
if self._kp != 0.0:
|
|
self._update_from_kp()
|
|
if self._height != 0.0:
|
|
self._update_from_height()
|
|
if self._discharge != 0.0:
|
|
self._update_from_discharge()
|
|
|
|
@classmethod
|
|
def _db_create(cls, execute):
|
|
execute("""
|
|
CREATE TABLE initial_conditions(
|
|
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
|
ind INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
comment TEXT NOT NULL,
|
|
reach INTEGER,
|
|
kp REAL NOT NULL,
|
|
discharge REAL NOT NULL,
|
|
height REAL NOT NULL,
|
|
FOREIGN KEY(reach) REFERENCES river_reach(id)
|
|
)
|
|
""")
|
|
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _db_update(cls, execute, version):
|
|
return cls._update_submodel(execute, version)
|
|
|
|
@classmethod
|
|
def _db_load(cls, execute, data=None):
|
|
id = data["reach"].id
|
|
table = execute(
|
|
"SELECT ind, name, comment, kp, discharge, height " +
|
|
"FROM initial_conditions " +
|
|
f"WHERE reach = {id}"
|
|
)
|
|
|
|
new = []
|
|
|
|
for _ in table:
|
|
new.append(None)
|
|
|
|
for row in table:
|
|
ind = row[0]
|
|
name = row[1]
|
|
comment = row[2]
|
|
kp = row[3]
|
|
discharge = row[4]
|
|
height = row[5]
|
|
|
|
d = cls(
|
|
reach=data["reach"],
|
|
status=data["status"],
|
|
name=name,
|
|
comment=comment,
|
|
kp=kp,
|
|
discharge=discharge,
|
|
height=height,
|
|
)
|
|
|
|
new[ind] = d
|
|
|
|
return new
|
|
|
|
def _db_save(self, execute, data=None):
|
|
ind = data["ind"]
|
|
|
|
execute(
|
|
"INSERT INTO " +
|
|
"initial_conditions(ind, name, comment, kp, " +
|
|
"discharge, height, reach) " +
|
|
"VALUES (" +
|
|
f"{ind}, '{self._db_format(self.name)}', " +
|
|
f"'{self._db_format(self._comment)}', " +
|
|
f"{self._kp}, {self._discharge}, {self._height}, " +
|
|
f"{self._reach.id}" +
|
|
")"
|
|
)
|
|
|
|
return True
|
|
|
|
def copy(self):
|
|
new = Data(
|
|
name=self.name,
|
|
comment=self._comment,
|
|
kp=self._kp,
|
|
discharge=self._discharge,
|
|
height=self._height,
|
|
reach=self._reach,
|
|
status=self._status,
|
|
)
|
|
|
|
return new
|
|
|
|
@property
|
|
def name(self):
|
|
return self._name
|
|
|
|
def __getitem__(self, key):
|
|
val = None
|
|
|
|
if key == "name":
|
|
val = self._name
|
|
elif key == "comment":
|
|
val = self._comment
|
|
elif key == "kp":
|
|
val = self._kp
|
|
elif key == "speed":
|
|
val = self._speed
|
|
elif key == "discharge":
|
|
val = self._discharge
|
|
elif key == "elevation":
|
|
val = self._elevation
|
|
elif key == "height":
|
|
val = self._height
|
|
|
|
return val
|
|
|
|
def _update_get_min(self):
|
|
profile = self._reach.reach.get_profiles_from_kp(self._kp)
|
|
if len(profile) > 0:
|
|
min = profile[0].z_min()
|
|
else:
|
|
min = 0.0
|
|
|
|
return min
|
|
|
|
def _update_from_kp(self):
|
|
min = self._update_get_min()
|
|
self._elevation = min + self._height
|
|
|
|
def _update_from_elevation(self):
|
|
min = self._update_get_min()
|
|
self._height = self._elevation - min
|
|
|
|
def _update_from_height(self):
|
|
min = self._update_get_min()
|
|
self._elevation = self._height + min
|
|
|
|
def _update_from_discharge(self):
|
|
min = self._update_get_min()
|
|
# print("TODO")
|
|
|
|
def __setitem__(self, key, value):
|
|
if key == "name":
|
|
self._name = str(value)
|
|
elif key == "comment":
|
|
self._comment = str(value)
|
|
elif key == "kp":
|
|
self._kp = float(value)
|
|
self._update_from_kp()
|
|
elif key == "speed":
|
|
# Not supposed to be modified
|
|
self._speed = float(value)
|
|
elif key == "discharge":
|
|
self._discharge = float(value)
|
|
self._update_from_discharge()
|
|
elif key == "elevation":
|
|
self._elevation = float(value)
|
|
self._update_from_elevation()
|
|
elif key == "height":
|
|
self._height = float(value)
|
|
self._update_from_height()
|
|
|
|
self._status.modified()
|
|
|
|
|
|
class InitialConditions(SQLSubModel):
|
|
_sub_classes = [
|
|
Data
|
|
]
|
|
|
|
def __init__(self, reach=None, status=None):
|
|
super(InitialConditions, self).__init__()
|
|
|
|
self._status = status
|
|
|
|
self._reach = reach
|
|
self._data = []
|
|
|
|
@classmethod
|
|
def _db_create(cls, execute):
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _db_update(cls, execute, version):
|
|
return cls._update_submodel(execute, version)
|
|
|
|
@classmethod
|
|
def _db_load(cls, execute, data=None):
|
|
new = cls(
|
|
reach=data["reach"],
|
|
status=data["status"]
|
|
)
|
|
|
|
new._data = Data._db_load(
|
|
execute,
|
|
data=data
|
|
)
|
|
|
|
if new._data is not None:
|
|
yield new
|
|
|
|
def _db_save(self, execute, data=None):
|
|
ok = True
|
|
|
|
ind = 0
|
|
for d in self._data:
|
|
data["ind"] = ind
|
|
ok &= d._db_save(execute, data)
|
|
ind += 1
|
|
|
|
return ok
|
|
|
|
def __len__(self):
|
|
return len(self._data)
|
|
|
|
@property
|
|
def reach(self):
|
|
return self._reach
|
|
|
|
@reach.setter
|
|
def reach(self, new):
|
|
self._reach = reach
|
|
self._status.modified()
|
|
|
|
@property
|
|
def data(self):
|
|
return self._data.copy()
|
|
|
|
@data.setter
|
|
def data(self, data):
|
|
self._data = data
|
|
|
|
def get(self, index):
|
|
return self._data[index]
|
|
|
|
def set(self, index, data):
|
|
self._data.insert(index, data)
|
|
self._status.modified()
|
|
|
|
def new(self, index):
|
|
n = Data(reach=self._reach, status=self._status)
|
|
self._data.insert(index, n)
|
|
self._status.modified()
|
|
|
|
def insert(self, index, data):
|
|
self._data.insert(index, data)
|
|
self._status.modified()
|
|
|
|
def delete(self, data):
|
|
self._data = list(
|
|
filter(
|
|
lambda x: x not in data,
|
|
self._data
|
|
)
|
|
)
|
|
self._status.modified()
|
|
|
|
def delete_i(self, indexes):
|
|
data = list(
|
|
map(
|
|
lambda x: x[1],
|
|
filter(
|
|
lambda x: x[0] in indexes,
|
|
enumerate(self._data)
|
|
)
|
|
)
|
|
)
|
|
self.delete(data)
|
|
|
|
def sort(self, reverse=False, key=None):
|
|
self._data.sort(reverse=reverse, key=key)
|
|
self._status.modified()
|
|
|
|
def _data_get(self, key):
|
|
return list(
|
|
map(
|
|
lambda d: d[key],
|
|
self._data
|
|
)
|
|
)
|
|
|
|
def get_kp(self):
|
|
return self._data_get("kp")
|
|
|
|
def get_elevation(self):
|
|
return self._data_get("elevation")
|
|
|
|
def get_discharge(self):
|
|
return self._data_get("discharge")
|
|
|
|
def _sort_by_z_and_kp(self, profiles):
|
|
profiles.sort(
|
|
reverse=False,
|
|
key=lambda p: p.kp
|
|
)
|
|
|
|
first_z = profiles[0].z()
|
|
last_z = profiles[-1].z()
|
|
|
|
if first_z > last_z:
|
|
profiles.sort(
|
|
reverse=True,
|
|
key=lambda p: p.kp
|
|
)
|
|
|
|
def generate_growing_constante_height(self, height: float):
|
|
self._data = []
|
|
|
|
profiles = self._reach.reach.profiles
|
|
self._sort_by_z_and_kp(profiles)
|
|
|
|
prev = None
|
|
for profile in profiles:
|
|
new = Data(reach=self._reach, status=self._status)
|
|
new["kp"] = profile.kp
|
|
|
|
if prev is None:
|
|
new["elevation"] = profile.z_min() + height
|
|
else:
|
|
new["elevation"] = max(
|
|
profile.z_min() + height,
|
|
prev["elevation"]
|
|
)
|
|
|
|
self._data.append(new)
|
|
prev = new
|
|
|
|
is_reverse = False
|
|
if profiles[0].kp > profiles[-1].kp:
|
|
is_reverse = True
|
|
|
|
self._data.sort(
|
|
reverse=not is_reverse,
|
|
key=lambda d: d['kp']
|
|
)
|
|
|
|
def generate_discharge(self, discharge: float):
|
|
self._new = []
|
|
|
|
for d in self._data:
|
|
n = d.copy()
|
|
n['discharge'] = discharge
|
|
self._new.append(n)
|
|
|
|
self._data = self._new
|