mirror of https://gitlab.com/pamhyr/pamhyr2
882 lines
24 KiB
Python
882 lines
24 KiB
Python
# ProfileXYZ.py -- Pamhyr
|
|
# Copyright (C) 2023-2024 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import numpy as np
|
|
from typing import List
|
|
from functools import reduce
|
|
from dataclasses import dataclass
|
|
|
|
from tools import timer
|
|
from shapely import geometry
|
|
|
|
from Model.Tools.PamhyrDB import SQLSubModel
|
|
from Model.Except import ClipboardFormatError
|
|
from Model.Geometry.Profile import Profile
|
|
from Model.Geometry.PointXYZ import PointXYZ
|
|
from Model.Geometry.Vector_1d import Vector1d
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
@dataclass
|
|
class Tabulation:
|
|
z: np.array([])
|
|
A: np.array([])
|
|
L: np.array([])
|
|
|
|
|
|
class ProfileXYZ(Profile, SQLSubModel):
|
|
_sub_classes = [
|
|
PointXYZ,
|
|
]
|
|
|
|
def __init__(self,
|
|
id: int = -1,
|
|
name: str = "",
|
|
rk: float = 0.,
|
|
reach=None,
|
|
num=0,
|
|
nb_point: int = 0,
|
|
code1: int = 0, code2: int = 0,
|
|
status=None):
|
|
"""ProfileXYZ constructor
|
|
|
|
Args:
|
|
num: The number of this profile
|
|
code1: The interpolation code 1
|
|
code2: The interpolation code 2
|
|
rk: Kilometer point
|
|
name: The name of profile
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
super(ProfileXYZ, self).__init__(
|
|
id=id,
|
|
num=num,
|
|
name=name,
|
|
rk=rk,
|
|
code1=code1, code2=code2,
|
|
_type="XYZ",
|
|
reach=reach,
|
|
status=status,
|
|
)
|
|
|
|
self.tab = Tabulation([], [], [])
|
|
self.tab_up_to_date = False
|
|
self.time_z = 0.0
|
|
self.time_A = 0.0
|
|
self.time_l = 0.0
|
|
self._station = []
|
|
self.station_up_to_date = False
|
|
|
|
@classmethod
|
|
def _db_create(cls, execute):
|
|
execute("""
|
|
CREATE TABLE geometry_profileXYZ(
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
ind INTEGER NOT NULL,
|
|
name TEXT,
|
|
reach INTEGER NOT NULL,
|
|
rk REAL NOT NULL,
|
|
num INTEGER NOT NULL,
|
|
code1 INTEGER NOT NULL,
|
|
code2 INTEGER NOT NULL,
|
|
sl INTEGER,
|
|
FOREIGN KEY(reach) REFERENCES river_reach(id),
|
|
FOREIGN KEY(sl) REFERENCES sedimentary_layer(id)
|
|
)
|
|
""")
|
|
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _db_update(cls, execute, version):
|
|
major, minor, release = version.strip().split(".")
|
|
if major == minor == "0":
|
|
rl = int(release)
|
|
if rl < 2:
|
|
execute(
|
|
"""
|
|
ALTER TABLE geometry_profileXYZ
|
|
ADD COLUMN sl INTEGER
|
|
REFERENCES sedimentary_layer(id)
|
|
"""
|
|
)
|
|
|
|
if rl < 11:
|
|
execute(
|
|
"""
|
|
ALTER TABLE geometry_profileXYZ
|
|
RENAME COLUMN kp TO rk
|
|
"""
|
|
)
|
|
|
|
return cls._update_submodel(execute, version)
|
|
|
|
@classmethod
|
|
def _db_load(cls, execute, data=None):
|
|
profiles = []
|
|
status = data["status"]
|
|
reach = data["reach"]
|
|
|
|
table = execute(
|
|
"SELECT id, ind, name, rk, num, code1, code2, sl " +
|
|
"FROM geometry_profileXYZ " +
|
|
f"WHERE reach = {reach.id}"
|
|
)
|
|
|
|
for row in table:
|
|
id = row[0]
|
|
ind = row[1]
|
|
name = row[2]
|
|
rk = row[3]
|
|
num = row[5]
|
|
code1 = row[5]
|
|
code2 = row[6]
|
|
sl = row[7]
|
|
|
|
new = cls(
|
|
id=id, num=num,
|
|
name=name, rk=rk,
|
|
code1=code1, code2=code2,
|
|
reach=reach,
|
|
status=status
|
|
)
|
|
|
|
if sl == -1 or sl is None:
|
|
new._sl = None
|
|
else:
|
|
new._sl = next(
|
|
filter(
|
|
lambda s: s.id == sl,
|
|
data["sediment_layers_list"].sediment_layers
|
|
)
|
|
)
|
|
|
|
data["profile"] = new
|
|
new._points = PointXYZ._db_load(execute, data.copy())
|
|
|
|
yield ind, new
|
|
|
|
def _db_save(self, execute, data=None):
|
|
ok = True
|
|
ind = data["ind"]
|
|
|
|
sl = self._sl.id if self._sl is not None else -1
|
|
|
|
sql = (
|
|
"INSERT OR REPLACE INTO " +
|
|
"geometry_profileXYZ(id, ind, name, reach, " +
|
|
"rk, num, code1, code2, sl) " +
|
|
"VALUES (" +
|
|
f"{self.id}, {ind}, '{self._db_format(self._name)}', " +
|
|
f"{self.reach.id}, {self.rk}, {self.num}, " +
|
|
f"{self.code1}, {self.code1}, {sl}" +
|
|
")"
|
|
)
|
|
execute(sql)
|
|
|
|
points = self.points
|
|
|
|
data["profile"] = self
|
|
execute(f"DELETE FROM geometry_pointXYZ WHERE profile = {self.id}")
|
|
|
|
ind = 0
|
|
for point in points:
|
|
data["ind"] = ind
|
|
ok &= point._db_save(execute, data)
|
|
ind += 1
|
|
|
|
return ok
|
|
|
|
@classmethod
|
|
def from_data(cls, header, data):
|
|
profile = None
|
|
try:
|
|
if len(header) == 0:
|
|
name = data[0]
|
|
rk = data[1]
|
|
reach = data[2]
|
|
status = data[3]
|
|
|
|
profile = cls(
|
|
id=-1,
|
|
name=name,
|
|
rk=rk,
|
|
reach=reach,
|
|
status=status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'reach', 'rk', 'status'}
|
|
d = {}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
profile = cls(**d)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise ClipboardFormatError(header, data)
|
|
|
|
return profile
|
|
|
|
def point_from_data(self, header, data):
|
|
def float_format(s: str):
|
|
return float(
|
|
s.replace(",", ".")
|
|
)
|
|
|
|
point = None
|
|
try:
|
|
if len(header) == 0:
|
|
x = float_format(data[0])
|
|
y = float_format(data[1])
|
|
z = float_format(data[2])
|
|
name = data[3] if len(data) == 4 else ""
|
|
|
|
point = PointXYZ(
|
|
x, y, z, name, profile=self, status=self._status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'x', 'y', 'z'}
|
|
d = {"status": self._status, "profile": self}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
point = PointXYZ(**d)
|
|
except Exception as e:
|
|
raise ClipboardFormatError(header=header, data=data)
|
|
|
|
return point
|
|
|
|
def display_name(self):
|
|
name = ""
|
|
if self.name != "":
|
|
name += f"{self.name} "
|
|
|
|
name += f"({self.rk})"
|
|
|
|
return name
|
|
|
|
def x(self):
|
|
return [point.x for point in self.points]
|
|
|
|
def y(self):
|
|
return [point.y for point in self.points]
|
|
|
|
def z(self):
|
|
return [point.z for point in self.points]
|
|
|
|
def names(self):
|
|
return [point.name for point in self.points]
|
|
|
|
def x_max(self):
|
|
return max(self.filter_isnan(self.x()))
|
|
|
|
def x_min(self):
|
|
return min(self.filter_isnan(self.x()))
|
|
|
|
def y_max(self):
|
|
return max(self.filter_isnan(self.y()))
|
|
|
|
def y_min(self):
|
|
return min(self.filter_isnan(self.y()))
|
|
|
|
def z_max(self):
|
|
return max(self.filter_isnan(self.z()))
|
|
|
|
def z_min(self):
|
|
return min(self.filter_isnan(self.z()))
|
|
|
|
def import_points(self, list_points: list):
|
|
"""Import a list of points to profile
|
|
|
|
Args:
|
|
list_points: Liste of PointXYZ
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
for point in list_points:
|
|
pt = PointXYZ(*point, profile=self, status=self._status)
|
|
self.points.append(pt)
|
|
self._status.modified()
|
|
self.tab_up_to_date = False
|
|
self.station_up_to_date = False
|
|
|
|
def get_point_i(self, index: int) -> PointXYZ:
|
|
"""Get point at index.
|
|
|
|
Args:
|
|
index: Index of point.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
return self.points[index]
|
|
except IndexError:
|
|
raise IndexError(f"Invalid point index: {index}")
|
|
|
|
def get_point_by_name(self, name: str) -> PointXYZ:
|
|
"""Get point by name.
|
|
|
|
Args:
|
|
name: Point name.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
n_name = name.lower().strip()
|
|
return next(
|
|
filter(
|
|
lambda p: p.name.lower().strip() == n_name,
|
|
self.points
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"{e}")
|
|
raise IndexError(
|
|
f"Invalid point name: {name} " +
|
|
f"for profile ({self.id}) rk = {self.rk}"
|
|
)
|
|
|
|
def has_standard_named_points(self):
|
|
l, r = reduce(
|
|
lambda acc, n: (
|
|
(acc[0] | (n == "rg")),
|
|
(acc[1] | (n == "rd"))
|
|
),
|
|
map(lambda p: p.name.lower().strip(),
|
|
self.points),
|
|
(False, False)
|
|
)
|
|
|
|
return l & r
|
|
|
|
def add(self):
|
|
"""Add a new PointXYZ to profile.
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
point_xyz = PointXYZ(0., 0., 0., profile=self, status=self._status)
|
|
self.points.append(point_xyz)
|
|
self._status.modified()
|
|
self.tab_up_to_date = False
|
|
self.station_up_to_date = False
|
|
|
|
def insert(self, index: int):
|
|
"""Insert a new point at index.
|
|
|
|
Args:
|
|
index: The index of new profile.
|
|
|
|
Returns:
|
|
The new point.
|
|
"""
|
|
point = PointXYZ(0., 0., 0., profile=self, status=self._status)
|
|
self.points.insert(index, point)
|
|
self._status.modified()
|
|
self.tab_up_to_date = False
|
|
self.station_up_to_date = False
|
|
return point
|
|
|
|
def filter_isnan(self, lst):
|
|
"""Returns the input list without 'nan' element
|
|
|
|
Args:
|
|
lst: The list to filter
|
|
|
|
Returns:
|
|
The list without 'nan'
|
|
"""
|
|
return [x for x in lst if not np.isnan(x)]
|
|
|
|
def speed(self, q, z):
|
|
area = self.wet_area(z)
|
|
|
|
if area == 0:
|
|
return 0
|
|
|
|
return q / area
|
|
|
|
def width_approximation(self):
|
|
if self.has_standard_named_points():
|
|
rg = self.get_point_by_name("rg")
|
|
rd = self.get_point_by_name("rd")
|
|
else:
|
|
rg = self.points[0]
|
|
rd = self.points[-1]
|
|
|
|
return abs(rg.dist(rd))
|
|
|
|
def compute_wet_width(self, z):
|
|
start, end = self.get_all_water_limits_ac(z)
|
|
|
|
if len(start) == 0:
|
|
return 0
|
|
|
|
length = 0.0
|
|
for s, e in zip(start, end):
|
|
length += abs(s - e)
|
|
return length
|
|
|
|
def wet_width(self, z):
|
|
if self.tab_up_to_date:
|
|
if z > self.tab.z[-1]:
|
|
return self.tab.L[-1]
|
|
elif z < self.tab.z[0]:
|
|
return 0.0
|
|
else:
|
|
return np.interp(z, self.tab.z, self.tab.L)
|
|
else:
|
|
return self.compute_wet_width(z)
|
|
|
|
def wet_perimeter(self, z):
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0
|
|
|
|
length = 0.0
|
|
for line in lines:
|
|
length += line.length
|
|
return length
|
|
|
|
def compute_wet_area(self, z):
|
|
area = 0.0
|
|
if len(self.tab.L) > 0:
|
|
if z < self.tab.z[0]:
|
|
return 0.0
|
|
i = np.searchsorted([z], self.tab.z, side='right')[0]
|
|
for j in range(i-1):
|
|
area += (self.tab.L[j] + self.tab.L[j+1]) * (
|
|
self.tab.z[j+1] - self.tab.z[j]) / 2.0
|
|
area += (self.tab.L[i-1] + self.wet_width(z))
|
|
else:
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0.0
|
|
for line in lines:
|
|
if len(line.coords) > 2:
|
|
poly = geometry.Polygon(line)
|
|
area += poly.area
|
|
return area
|
|
|
|
def wet_area(self, z):
|
|
if self.tab_up_to_date:
|
|
if z > self.tab.z[-1]:
|
|
return self.tab.A[-1] + self.tab.L[-1] * (z - self.tab.z[-1])
|
|
elif z < self.tab.z[0]:
|
|
return 0.0
|
|
else:
|
|
return np.interp(z, self.tab.z, self.tab.A)
|
|
else:
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0.0
|
|
|
|
area = 0.0
|
|
for line in lines:
|
|
if len(line.coords) > 2:
|
|
poly = geometry.Polygon(line)
|
|
area += poly.area
|
|
return area
|
|
|
|
def wet_radius(self, z):
|
|
p = self.wet_perimeter(z)
|
|
a = self.wet_area(z)
|
|
|
|
if p == 0:
|
|
return 0
|
|
|
|
return a/p
|
|
|
|
def wet_line(self, z):
|
|
points = self.wet_points(z)
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = map(lambda p: p.z, points)
|
|
station = self._get_station(points)
|
|
|
|
line = geometry.LineString(list(zip(station, zz)))
|
|
return line
|
|
|
|
def wet_lines(self, z):
|
|
points = self._points
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
lines = []
|
|
|
|
zz = list(map(lambda p: p.z, points))
|
|
station = self._get_station(points)
|
|
|
|
line = []
|
|
for i in range(self.number_points-1):
|
|
|
|
if zz[i] >= z and zz[i+1] < z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i], zz[i+1]],
|
|
[station[i], station[i+1]]
|
|
)
|
|
line.append([y, z])
|
|
|
|
if zz[i] < z:
|
|
line.append([station[i], zz[i]])
|
|
|
|
if zz[i] <= z and zz[i+1] >= z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i], zz[i+1]],
|
|
[station[i], station[i+1]]
|
|
)
|
|
line.append([y, z])
|
|
if len(line) > 2:
|
|
lines.append(geometry.LineString(line))
|
|
line = []
|
|
|
|
if zz[self.number_points-1] < z:
|
|
line.append([station[self.number_points-1], z])
|
|
if len(line) > 2:
|
|
lines.append(geometry.LineString(line))
|
|
line = []
|
|
|
|
return lines
|
|
|
|
def max_water_depth(self, z):
|
|
return z - self.z_min()
|
|
|
|
def mean_water_depth(self, z):
|
|
a = self.wet_area(z)
|
|
w = self.wet_width(z)
|
|
|
|
if w == 0:
|
|
return 0
|
|
|
|
return a/w
|
|
|
|
def wet_polygon(self, z):
|
|
points = self.wet_points(z)
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = map(lambda p: p.z, points)
|
|
station = self._get_station(points)
|
|
|
|
poly = geometry.Polygon(list(zip(station, zz)))
|
|
return poly
|
|
|
|
def wet_points(self, z):
|
|
left, right = self.get_water_limits(z)
|
|
|
|
points = list(filter(lambda p: p.z < z, self._points))
|
|
points = [left] + points + [right]
|
|
points = sorted(points, key=lambda p: p.x)
|
|
|
|
return points
|
|
|
|
def get_nb_wet_areas(self, z):
|
|
|
|
n_zones = 0
|
|
points = self._points
|
|
if points[0].z <= z:
|
|
n_zones += 1
|
|
|
|
for i in range(self.number_points-1):
|
|
if points[i].z > z and points[i+1].z <= z:
|
|
n_zones += 1
|
|
|
|
return n_zones
|
|
|
|
def get_all_water_limits_ac(self, z):
|
|
"""
|
|
Determine all water limits for z elevation.
|
|
"""
|
|
|
|
points = self._points
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = list(map(lambda p: p.z, points))
|
|
station = self.get_station()
|
|
|
|
start = []
|
|
if points[0].z <= z:
|
|
start.append(station[0])
|
|
|
|
for i in range(self.number_points-1):
|
|
if zz[i] > z and zz[i+1] <= z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i], zz[i+1]],
|
|
[station[i], station[i+1]]
|
|
)
|
|
start.append(y)
|
|
|
|
end = []
|
|
if points[-1].z <= z:
|
|
end.append(station[-1])
|
|
|
|
for i in reversed(range(self.number_points-1)):
|
|
if zz[i] <= z and zz[i+1] > z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i], zz[i+1]],
|
|
[station[i], station[i+1]]
|
|
)
|
|
end.append(y)
|
|
|
|
if len(start) != len(end):
|
|
logger.error(f"ERROR in get_all_water_limits_ac")
|
|
return [], []
|
|
|
|
return start, list(reversed(end))
|
|
|
|
def get_water_limits(self, z):
|
|
"""
|
|
Determine left and right limits of water elevation.
|
|
"""
|
|
|
|
# Get the index of first point with elevation lesser than water
|
|
# elevation (for the right and left river side)
|
|
i_left = -1
|
|
i_right = -1
|
|
|
|
for i in range(self.number_points):
|
|
if self.point(i).z <= z:
|
|
i_left = i
|
|
break
|
|
|
|
for i in reversed(range(self.number_points)):
|
|
if self.point(i).z <= z:
|
|
i_right = i
|
|
break
|
|
|
|
# Interpolate points at river left side
|
|
if (i_left > 0):
|
|
x = np.interp(
|
|
z,
|
|
[self.point(i_left).z, self.point(i_left - 1).z],
|
|
[self.point(i_left).x, self.point(i_left - 1).x]
|
|
)
|
|
y = np.interp(
|
|
z,
|
|
[self.point(i_left).z, self.point(i_left - 1).z],
|
|
[self.point(i_left).y, self.point(i_left - 1).y]
|
|
)
|
|
pt_left = PointXYZ(x, y, z, name="wl_left")
|
|
else:
|
|
pt_left = self.point(0)
|
|
|
|
# Interpolate points at river right side
|
|
if (i_right < self.number_points - 1):
|
|
x = np.interp(
|
|
z,
|
|
[self.point(i_right).z, self.point(i_right + 1).z],
|
|
[self.point(i_right).x, self.point(i_right + 1).x]
|
|
)
|
|
y = np.interp(
|
|
z,
|
|
[self.point(i_right).z, self.point(i_right + 1).z],
|
|
[self.point(i_right).y, self.point(i_right + 1).y]
|
|
)
|
|
pt_right = PointXYZ(x, y, z, name="wl_right")
|
|
else:
|
|
pt_right = self.point(self.number_points - 1)
|
|
|
|
return pt_left, pt_right
|
|
|
|
def compute_tabulation(self):
|
|
sorted_points = sorted(self._points, key=lambda p: p.z)
|
|
self.tab.z = np.array([p.z for p in sorted_points], np.float64)
|
|
self.tab.L = np.zeros(len(self.tab.z), np.float64)
|
|
self.tab.A = np.zeros(len(self.tab.z), np.float64)
|
|
for i in range(1, len(self.tab.z)):
|
|
self.tab.L[i] = self.compute_wet_width(self.tab.z[i])
|
|
dx = (self.tab.L[i] + self.tab.L[i-1])/2
|
|
dz = self.tab.z[i] - self.tab.z[i-1]
|
|
self.tab.A[i] = self.tab.A[i-1] + dz * dx
|
|
|
|
self.tab_up_to_date = True
|
|
|
|
def get_station(self):
|
|
"""Projection of the points of the profile on a plane.
|
|
|
|
Args:
|
|
self: The profile
|
|
|
|
Returns:
|
|
Projection of the points of the profile on a plane.
|
|
"""
|
|
if self.nb_points < 2:
|
|
return [0.0]
|
|
else:
|
|
if self.station_up_to_date:
|
|
return self._station
|
|
else:
|
|
self._station = self._get_station(self.points)
|
|
self.station_up_to_date = True
|
|
return self._station
|
|
|
|
@timer
|
|
def _get_station(self, points):
|
|
first_named_point = None
|
|
index_first_named_point = None
|
|
last_named_point = None
|
|
|
|
first_point_not_nan = self._first_point_not_nan(points)
|
|
last_point_not_nan = self._last_point_not_nan(points)
|
|
|
|
for index, point in enumerate(points):
|
|
if point.point_is_named():
|
|
index_first_named_point = index
|
|
first_named_point = point
|
|
break
|
|
|
|
for point in reversed(points):
|
|
if point.point_is_named():
|
|
last_named_point = point
|
|
break
|
|
|
|
station = []
|
|
constant = 0.0
|
|
|
|
if (first_named_point is not None
|
|
and last_named_point is not None):
|
|
if (first_named_point != last_named_point
|
|
and first_named_point.x != last_named_point.x):
|
|
vector = Vector1d(first_named_point, last_named_point)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
else:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_named_point.x
|
|
yi = point.y - first_named_point.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
constant = station[index_first_named_point]
|
|
elif first_named_point is None:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_point_not_nan.x
|
|
yi = point.y - first_point_not_nan.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
z_min = self.z_min()
|
|
index_profile_z_min = next(
|
|
filter(
|
|
lambda z: z[1] == z_min,
|
|
enumerate(self.z())
|
|
)
|
|
)
|
|
constant = station[index_profile_z_min[0]]
|
|
|
|
return list(map(lambda s: s - constant, station))
|
|
|
|
def _first_point_not_nan(self, points):
|
|
first_point = None
|
|
|
|
for point in points:
|
|
if not point.is_nan():
|
|
first_point = point
|
|
break
|
|
|
|
return first_point
|
|
|
|
def _last_point_not_nan(self, points):
|
|
last_point = None
|
|
|
|
for point in reversed(points):
|
|
if not point.is_nan():
|
|
last_point = point
|
|
break
|
|
|
|
return last_point
|
|
|
|
def purge(self, np_purge):
|
|
"""
|
|
Remove points to keep at most np_purge points.
|
|
"""
|
|
if (self.nb_points <= np_purge):
|
|
return
|
|
|
|
nb_named = 2 # we consider the first and last point as named
|
|
area = [0.0]
|
|
|
|
for i in range(1, self.nb_points-1):
|
|
if self.point(i).point_is_named():
|
|
area.append(9999999.999)
|
|
nb_named += 1
|
|
else:
|
|
area.append(
|
|
PointXYZ.areatriangle3d(
|
|
self.point(i-1),
|
|
self.point(i),
|
|
self.point(i+1))
|
|
)
|
|
|
|
area.append(0.0)
|
|
|
|
while self.nb_points > max(np_purge, nb_named):
|
|
to_rm = np.argmin(area[1:self.nb_points - 1]) + 1
|
|
|
|
self.delete_i([to_rm])
|
|
area.pop(to_rm)
|
|
|
|
for i in [to_rm-1, to_rm]:
|
|
if (i == 0):
|
|
continue
|
|
|
|
if (i == self.nb_points - 1):
|
|
continue
|
|
|
|
if self.point(i).point_is_named():
|
|
area[i] = 9999999.999
|
|
else:
|
|
area[i] = PointXYZ.areatriangle3d(
|
|
self.point(i-1),
|
|
self.point(i),
|
|
self.point(i+1)
|
|
)
|
|
|
|
def shift(self, x, y, z):
|
|
for p in self.points:
|
|
p.x = p.x + x
|
|
p.y = p.y + y
|
|
p.z = p.z + z
|