mirror of https://gitlab.com/pamhyr/pamhyr2
627 lines
17 KiB
Python
627 lines
17 KiB
Python
# ProfileXYZ.py -- Pamhyr
|
|
# Copyright (C) 2023-2024 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import numpy as np
|
|
from typing import List
|
|
from functools import reduce
|
|
|
|
from tools import timer
|
|
from shapely import geometry
|
|
|
|
from Model.Tools.PamhyrDB import SQLSubModel
|
|
from Model.Except import ClipboardFormatError
|
|
from Model.Geometry.Profile import Profile
|
|
from Model.Geometry.PointXYZ import PointXYZ
|
|
from Model.Geometry.Vector_1d import Vector1d
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
class ProfileXYZ(Profile, SQLSubModel):
|
|
_sub_classes = [
|
|
PointXYZ,
|
|
]
|
|
|
|
def __init__(self,
|
|
id: int = -1,
|
|
name: str = "",
|
|
kp: float = 0.,
|
|
reach=None,
|
|
num=0,
|
|
nb_point: int = 0,
|
|
code1: int = 0, code2: int = 0,
|
|
status=None):
|
|
"""ProfileXYZ constructor
|
|
|
|
Args:
|
|
num: The number of this profile
|
|
code1: The interpolation code 1
|
|
code2: The interpolation code 2
|
|
kp: Kilometer point
|
|
name: The name of profile
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
super(ProfileXYZ, self).__init__(
|
|
id=id,
|
|
num=num,
|
|
name=name,
|
|
kp=kp,
|
|
code1=code1, code2=code2,
|
|
_type="XYZ",
|
|
reach=reach,
|
|
status=status,
|
|
)
|
|
|
|
@classmethod
|
|
def _db_create(cls, execute):
|
|
execute("""
|
|
CREATE TABLE geometry_profileXYZ(
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
ind INTEGER NOT NULL,
|
|
name TEXT,
|
|
reach INTEGER NOT NULL,
|
|
kp REAL NOT NULL,
|
|
num INTEGER NOT NULL,
|
|
code1 INTEGER NOT NULL,
|
|
code2 INTEGER NOT NULL,
|
|
sl INTEGER,
|
|
FOREIGN KEY(reach) REFERENCES river_reach(id),
|
|
FOREIGN KEY(sl) REFERENCES sedimentary_layer(id)
|
|
)
|
|
""")
|
|
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _db_update(cls, execute, version):
|
|
major, minor, release = version.strip().split(".")
|
|
if major == minor == "0":
|
|
if int(release) < 2:
|
|
execute(
|
|
"""
|
|
ALTER TABLE geometry_profileXYZ
|
|
ADD COLUMN sl INTEGER
|
|
REFERENCES sedimentary_layer(id)
|
|
"""
|
|
)
|
|
|
|
return cls._update_submodel(execute, version)
|
|
|
|
@classmethod
|
|
def _db_load(cls, execute, data=None):
|
|
profiles = []
|
|
status = data["status"]
|
|
reach = data["reach"]
|
|
|
|
table = execute(
|
|
"SELECT id, ind, name, kp, num, code1, code2, sl " +
|
|
"FROM geometry_profileXYZ " +
|
|
f"WHERE reach = {reach.id}"
|
|
)
|
|
|
|
for row in table:
|
|
id = row[0]
|
|
ind = row[1]
|
|
name = row[2]
|
|
kp = row[3]
|
|
num = row[5]
|
|
code1 = row[5]
|
|
code2 = row[6]
|
|
sl = row[7]
|
|
|
|
new = cls(
|
|
id=id, num=num,
|
|
name=name, kp=kp,
|
|
code1=code1, code2=code2,
|
|
reach=reach,
|
|
status=status
|
|
)
|
|
|
|
if sl == -1 or sl is None:
|
|
new._sl = None
|
|
else:
|
|
new._sl = next(
|
|
filter(
|
|
lambda s: s.id == sl,
|
|
data["sediment_layers_list"].sediment_layers
|
|
)
|
|
)
|
|
|
|
data["profile"] = new
|
|
new._points = PointXYZ._db_load(execute, data.copy())
|
|
|
|
yield ind, new
|
|
|
|
def _db_save(self, execute, data=None):
|
|
ok = True
|
|
ind = data["ind"]
|
|
|
|
sl = self._sl.id if self._sl is not None else -1
|
|
|
|
sql = (
|
|
"INSERT OR REPLACE INTO " +
|
|
"geometry_profileXYZ(id, ind, name, reach, " +
|
|
"kp, num, code1, code2, sl) " +
|
|
"VALUES (" +
|
|
f"{self.id}, {ind}, '{self._db_format(self._name)}', " +
|
|
f"{self.reach.id}, {self.kp}, {self.num}, " +
|
|
f"{self.code1}, {self.code1}, {sl}" +
|
|
")"
|
|
)
|
|
execute(sql)
|
|
|
|
points = self.points
|
|
|
|
data["profile"] = self
|
|
execute(f"DELETE FROM geometry_pointXYZ WHERE profile = {self.id}")
|
|
|
|
ind = 0
|
|
for point in points:
|
|
data["ind"] = ind
|
|
ok &= point._db_save(execute, data)
|
|
ind += 1
|
|
|
|
return ok
|
|
|
|
@classmethod
|
|
def from_data(cls, header, data):
|
|
profile = None
|
|
try:
|
|
if len(header) == 0:
|
|
name = data[0]
|
|
kp = data[1]
|
|
reach = data[2]
|
|
status = data[3]
|
|
|
|
profile = cls(
|
|
id=-1,
|
|
name=name,
|
|
kp=kp,
|
|
reach=reach,
|
|
status=status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'reach', 'kp', 'status'}
|
|
d = {}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
profile = cls(**d)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise ClipboardFormatError(header, data)
|
|
|
|
return profile
|
|
|
|
def point_from_data(self, header, data):
|
|
def float_format(s: str):
|
|
return float(
|
|
s.replace(",", ".")
|
|
)
|
|
|
|
point = None
|
|
try:
|
|
if len(header) == 0:
|
|
x = float_format(data[0])
|
|
y = float_format(data[1])
|
|
z = float_format(data[2])
|
|
name = data[3] if len(data) == 4 else ""
|
|
|
|
point = PointXYZ(
|
|
x, y, z, name, profile=self, status=self._status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'x', 'y', 'z'}
|
|
d = {"status": self._status, "profile": self}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
point = PointXYZ(**d)
|
|
except Exception as e:
|
|
raise ClipboardFormatError(header=header, data=data)
|
|
|
|
return point
|
|
|
|
def x(self):
|
|
return [point.x for point in self.points]
|
|
|
|
def y(self):
|
|
return [point.y for point in self.points]
|
|
|
|
def z(self):
|
|
return [point.z for point in self.points]
|
|
|
|
def names(self):
|
|
return [point.name for point in self.points]
|
|
|
|
def x_max(self):
|
|
return max(self.filter_isnan(self.x()))
|
|
|
|
def x_min(self):
|
|
return min(self.filter_isnan(self.x()))
|
|
|
|
def y_max(self):
|
|
return max(self.filter_isnan(self.y()))
|
|
|
|
def y_min(self):
|
|
return min(self.filter_isnan(self.y()))
|
|
|
|
def z_max(self):
|
|
return max(self.filter_isnan(self.z()))
|
|
|
|
def z_min(self):
|
|
return min(self.filter_isnan(self.z()))
|
|
|
|
def import_points(self, list_points: list):
|
|
"""Import a list of points to profile
|
|
|
|
Args:
|
|
list_points: Liste of PointXYZ
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
for point in list_points:
|
|
pt = PointXYZ(*point, profile=self, status=self._status)
|
|
self.points.append(pt)
|
|
self._status.modified()
|
|
|
|
def get_point_i(self, index: int) -> PointXYZ:
|
|
"""Get point at index.
|
|
|
|
Args:
|
|
index: Index of point.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
return self.points[index]
|
|
except IndexError:
|
|
raise IndexError(f"Invalid point index: {index}")
|
|
|
|
def get_point_by_name(self, name: str) -> PointXYZ:
|
|
"""Get point by name.
|
|
|
|
Args:
|
|
name: Point name.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
n_name = name.lower().strip()
|
|
return next(
|
|
filter(
|
|
lambda p: p.name.lower().strip() == n_name,
|
|
self.points
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"{e}")
|
|
raise IndexError(
|
|
f"Invalid point name: {name} " +
|
|
f"for profile ({self.id}) kp = {self.kp}"
|
|
)
|
|
|
|
def has_standard_named_points(self):
|
|
l, r = reduce(
|
|
lambda acc, n: (
|
|
(acc[0] | (n == "rg")),
|
|
(acc[1] | (n == "rd"))
|
|
),
|
|
map(lambda p: p.name.lower().strip(),
|
|
self.points),
|
|
(False, False)
|
|
)
|
|
|
|
return l & r
|
|
|
|
def add(self):
|
|
"""Add a new PointXYZ to profile.
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
point_xyz = PointXYZ(0., 0., 0., profile=self, status=self._status)
|
|
self.points.append(point_xyz)
|
|
self._status.modified()
|
|
|
|
def insert(self, index: int):
|
|
"""Insert a new point at index.
|
|
|
|
Args:
|
|
index: The index of new profile.
|
|
|
|
Returns:
|
|
The new point.
|
|
"""
|
|
point = PointXYZ(0., 0., 0., profile=self, status=self._status)
|
|
self.points.insert(index, point)
|
|
self._status.modified()
|
|
return point
|
|
|
|
def filter_isnan(self, lst):
|
|
"""Returns the input list without 'nan' element
|
|
|
|
Args:
|
|
lst: The list to filter
|
|
|
|
Returns:
|
|
The list without 'nan'
|
|
"""
|
|
return [x for x in lst if not np.isnan(x)]
|
|
|
|
def speed(self, q, z):
|
|
area = self.wet_area(z)
|
|
|
|
if area == 0:
|
|
return 0
|
|
|
|
return q / area
|
|
|
|
def width_approximation(self):
|
|
if self.has_standard_named_points():
|
|
rg = self.get_point_by_name("rg")
|
|
rd = self.get_point_by_name("rd")
|
|
else:
|
|
rg = self.points[0]
|
|
rd = self.points[-1]
|
|
|
|
return abs(rg.dist(rd))
|
|
|
|
def wet_perimeter(self, z):
|
|
poly = self.wet_polygon(z)
|
|
|
|
if poly is None:
|
|
return 0
|
|
|
|
return poly.length
|
|
|
|
def wet_area(self, z):
|
|
poly = self.wet_polygon(z)
|
|
|
|
if poly is None:
|
|
return 0
|
|
|
|
return poly.area
|
|
|
|
def wet_polygon(self, z):
|
|
points = self.wet_points(z)
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
z = map(lambda p: p.z, points)
|
|
station = self._get_station(points)
|
|
|
|
poly = geometry.Polygon(list(zip(station, z)))
|
|
return poly
|
|
|
|
def wet_points(self, z):
|
|
left, right = self.get_water_limits(z)
|
|
|
|
points = list(filter(lambda p: p.z < z, self._points))
|
|
points = [left] + points + [right]
|
|
points = sorted(points, key=lambda p: p.x)
|
|
|
|
return points
|
|
|
|
def get_water_limits(self, z):
|
|
"""
|
|
Determine left and right limits of water elevation.
|
|
"""
|
|
|
|
# Get the index of first point with elevation lesser than water
|
|
# elevation (for the right and left river side)
|
|
i_left = -1
|
|
i_right = -1
|
|
|
|
for i in range(self.number_points):
|
|
if self.point(i).z <= z:
|
|
i_left = i
|
|
break
|
|
|
|
for i in reversed(range(self.number_points)):
|
|
if self.point(i).z <= z:
|
|
i_right = i
|
|
break
|
|
|
|
# Interpolate points at river left side
|
|
if (i_left > 0):
|
|
x = np.interp(
|
|
z,
|
|
[self.point(i_left).z, self.point(i_left - 1).z],
|
|
[self.point(i_left).x, self.point(i_left - 1).x]
|
|
)
|
|
y = np.interp(
|
|
z,
|
|
[self.point(i_left).z, self.point(i_left - 1).z],
|
|
[self.point(i_left).y, self.point(i_left - 1).y]
|
|
)
|
|
pt_left = PointXYZ(x, y, z, name="wl_left")
|
|
else:
|
|
pt_left = self.point(0)
|
|
|
|
# Interpolate points at river right side
|
|
if (i_right < self.number_points - 1):
|
|
x = np.interp(
|
|
z,
|
|
[self.point(i_right).z, self.point(i_right + 1).z],
|
|
[self.point(i_right).x, self.point(i_right + 1).x]
|
|
)
|
|
y = np.interp(
|
|
z,
|
|
[self.point(i_right).z, self.point(i_right + 1).z],
|
|
[self.point(i_right).y, self.point(i_right + 1).y]
|
|
)
|
|
pt_right = PointXYZ(x, y, z, name="wl_right")
|
|
else:
|
|
pt_right = self.point(self.number_points - 1)
|
|
|
|
return pt_left, pt_right
|
|
|
|
def get_station(self):
|
|
"""Projection of the points of the profile on a plane.
|
|
|
|
Args:
|
|
self: The profile
|
|
|
|
Returns:
|
|
Projection of the points of the profile on a plane.
|
|
"""
|
|
if self.nb_points < 3:
|
|
return None
|
|
else:
|
|
return self._get_station(self.points)
|
|
|
|
@timer
|
|
def _get_station(self, points):
|
|
first_named_point = None
|
|
index_first_named_point = None
|
|
last_named_point = None
|
|
|
|
first_point_not_nan = self._first_point_not_nan(points)
|
|
last_point_not_nan = self._last_point_not_nan(points)
|
|
|
|
for index, point in enumerate(points):
|
|
if point.point_is_named():
|
|
index_first_named_point = index
|
|
first_named_point = point
|
|
break
|
|
|
|
for point in reversed(points):
|
|
if point.point_is_named():
|
|
last_named_point = point
|
|
break
|
|
|
|
station = []
|
|
constant = 0.0
|
|
|
|
if (first_named_point is not None
|
|
and last_named_point is not None):
|
|
if (first_named_point != last_named_point
|
|
and first_named_point.x != last_named_point.x):
|
|
vector = Vector1d(first_named_point, last_named_point)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
else:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_named_point.x
|
|
yi = point.y - first_named_point.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
constant = station[index_first_named_point]
|
|
elif first_named_point is None:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_point_not_nan.x
|
|
yi = point.y - first_point_not_nan.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
z_min = self.z_min()
|
|
index_profile_z_min = next(
|
|
filter(
|
|
lambda z: z[1] == z_min,
|
|
enumerate(self.z())
|
|
)
|
|
)
|
|
constant = station[index_profile_z_min[0]]
|
|
|
|
return list(map(lambda s: s - constant, station))
|
|
|
|
def _first_point_not_nan(self, points):
|
|
first_point = None
|
|
|
|
for point in points:
|
|
if not point.is_nan():
|
|
first_point = point
|
|
break
|
|
|
|
return first_point
|
|
|
|
def _last_point_not_nan(self, points):
|
|
last_point = None
|
|
|
|
for point in reversed(points):
|
|
if not point.is_nan():
|
|
last_point = point
|
|
break
|
|
|
|
return last_point
|
|
|
|
def purge(self, np_purge):
|
|
"""
|
|
Remove points to keep at most np_purge points.
|
|
"""
|
|
if (self.nb_points <= np_purge):
|
|
return
|
|
|
|
nb_named = 2 # we consider the first and last point as named
|
|
area = [0.0]
|
|
|
|
for i in range(1, self.nb_points-1):
|
|
if self.point(i).point_is_named():
|
|
area.append(9999999.999)
|
|
nb_named += 1
|
|
else:
|
|
area.append(
|
|
PointXYZ.areatriangle3d(
|
|
self.point(i-1),
|
|
self.point(i),
|
|
self.point(i+1))
|
|
)
|
|
|
|
area.append(0.0)
|
|
|
|
while self.nb_points > max(np_purge, nb_named):
|
|
to_rm = np.argmin(area[1:self.nb_points - 1]) + 1
|
|
|
|
self.delete_i([to_rm])
|
|
area.pop(to_rm)
|
|
|
|
for i in [to_rm-1, to_rm]:
|
|
if (i == 0):
|
|
continue
|
|
|
|
if (i == self.nb_points - 1):
|
|
continue
|
|
|
|
if self.point(i).point_is_named():
|
|
area[i] = 9999999.999
|
|
else:
|
|
area[i] = PointXYZ.areatriangle3d(
|
|
self.point(i-1),
|
|
self.point(i),
|
|
self.point(i+1)
|
|
)
|