mirror of https://gitlab.com/pamhyr/pamhyr2
368 lines
10 KiB
Python
368 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import numpy as np
|
|
from typing import List
|
|
|
|
from tools import timer
|
|
|
|
from Model.DB import SQLSubModel
|
|
from Model.Except import ClipboardFormatError
|
|
from Model.Geometry.Profile import Profile
|
|
from Model.Geometry.PointXYZ import PointXYZ
|
|
from Model.Geometry.Vector_1d import Vector1d
|
|
|
|
class ProfileXYZ(Profile, SQLSubModel):
|
|
_sub_classes = [
|
|
PointXYZ,
|
|
]
|
|
|
|
def __init__(self,
|
|
id: int = -1,
|
|
name: str = "",
|
|
kp: float = 0.,
|
|
reach = None,
|
|
num = 0,
|
|
nb_point: int = 0,
|
|
code1: int = 0, code2: int = 0,
|
|
status = None):
|
|
"""ProfileXYZ constructor
|
|
|
|
Args:
|
|
num: The number of this profile
|
|
code1: The interpolation code 1
|
|
code2: The interpolation code 2
|
|
kp: Kilometer point
|
|
name: The name of profile
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
super(ProfileXYZ, self).__init__(
|
|
id = id,
|
|
num = num,
|
|
name = name,
|
|
kp = kp,
|
|
code1 = code1, code2 = code2,
|
|
_type = "XYZ",
|
|
reach = reach,
|
|
status = status,
|
|
)
|
|
|
|
@classmethod
|
|
def _sql_create(cls, execute):
|
|
execute("""
|
|
CREATE TABLE geometry_profileXYZ(
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
ind INTEGER NOT NULL,
|
|
name TEXT,
|
|
reach INTEGER NOT NULL,
|
|
kp REAL NOT NULL,
|
|
num INTEGER NOT NULL,
|
|
code1 INTEGER NOT NULL,
|
|
code2 INTEGER NOT NULL,
|
|
FOREIGN KEY(reach) REFERENCES river_reach(id)
|
|
)
|
|
""")
|
|
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _sql_update(cls, execute, version):
|
|
return cls._update_submodel(execute, version)
|
|
|
|
@classmethod
|
|
def _sql_load(cls, execute, data = None):
|
|
profiles = []
|
|
status = data["status"]
|
|
reach = data["reach"]
|
|
|
|
table = execute(
|
|
"SELECT id, ind, name, kp, num, code1, code2 " +
|
|
"FROM geometry_profileXYZ " +
|
|
f"WHERE reach = {reach}"
|
|
)
|
|
|
|
for _ in table:
|
|
profiles.append(None)
|
|
|
|
for row in table:
|
|
id = row[0]
|
|
ind = row[1]
|
|
name = row[2]
|
|
kp = row[3]
|
|
num = row[5]
|
|
code1 = row[5]
|
|
code2 = row[6]
|
|
|
|
new = cls(
|
|
id=id, num = num,
|
|
name = name, kp = kp,
|
|
code1 = code1, code2 = code2,
|
|
reach = data["parent"],
|
|
status = status
|
|
)
|
|
|
|
data["profile"] = id
|
|
new._points = PointXYZ._sql_load(execute, data)
|
|
|
|
profiles[ind] = new
|
|
|
|
return profiles
|
|
|
|
def _sql_save(self, execute, data = None):
|
|
ok = True
|
|
ind = data["ind"]
|
|
|
|
sql = (
|
|
"INSERT OR REPLACE INTO " +
|
|
"geometry_profileXYZ(id, ind, name, reach, kp, num, code1, code2) "+
|
|
"VALUES (" +
|
|
f"{self.id}, {ind}, '{self._sql_format(self._name)}', " +
|
|
f"{self.reach.id}, {self.kp}, {self.num}, " +
|
|
f"{self.code1}, {self.code1}" +
|
|
")"
|
|
)
|
|
execute(sql)
|
|
|
|
data["profile"] = self.id
|
|
execute(f"DELETE FROM geometry_pointXYZ WHERE profile = {self.id}")
|
|
|
|
ind = 0
|
|
for point in self._points:
|
|
data["ind"] = ind
|
|
ok &= point._sql_save(execute, data)
|
|
ind += 1
|
|
|
|
return ok
|
|
|
|
@classmethod
|
|
def from_data(cls, header, data):
|
|
profile = None
|
|
try:
|
|
if len(header) == 0:
|
|
profile = cls(
|
|
*data
|
|
)
|
|
else:
|
|
valid_header = {'name', 'reach', 'kp'}
|
|
d = {}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
profile = cls(**d)
|
|
except Exception as e:
|
|
raise ClipboardFormatError(header, data)
|
|
|
|
return profile
|
|
|
|
def point_from_data(self, header, data):
|
|
point = None
|
|
try:
|
|
if len(header) == 0:
|
|
point = PointXYZ(
|
|
*data, status=self._status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'x', 'y', 'z'}
|
|
d = {"status": self._status}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
point = PointXYZ(**d)
|
|
except Exception as e:
|
|
raise ClipboardFormatError(header=header, data=data)
|
|
|
|
return point
|
|
|
|
|
|
def x(self):
|
|
return [point.x for point in self._points]
|
|
|
|
def y(self):
|
|
return [point.y for point in self._points]
|
|
|
|
def z(self):
|
|
return [point.z for point in self._points]
|
|
|
|
def names(self):
|
|
return [point.name for point in self._points]
|
|
|
|
def x_max(self):
|
|
return max(self.filter_isnan(self.x()))
|
|
|
|
def x_min(self):
|
|
return min(self.filter_isnan(self.x()))
|
|
|
|
def y_max(self):
|
|
return max(self.filter_isnan(self.y()))
|
|
|
|
def y_min(self):
|
|
return min(self.filter_isnan(self.y()))
|
|
|
|
def z_max(self):
|
|
return max(self.filter_isnan(self.z()))
|
|
|
|
def z_min(self):
|
|
return min(self.filter_isnan(self.z()))
|
|
|
|
def import_points(self, list_points: list):
|
|
"""Import a list of points to profile
|
|
|
|
Args:
|
|
list_points: Liste of PointXYZ
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
for point in list_points:
|
|
pt = PointXYZ(*point, status=self._status)
|
|
self._points.append(pt)
|
|
self._status.modified()
|
|
|
|
def get_point_i(self, index: int) -> PointXYZ:
|
|
"""Get point at index.
|
|
|
|
Args:
|
|
index: Index of point.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
return self._points[index]
|
|
except IndexError:
|
|
raise IndexError(f"Invalid point index: {index}")
|
|
|
|
def add(self):
|
|
"""Add a new PointXYZ to profile.
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
point_xyz = PointXYZ(0., 0., 0., status=self._status)
|
|
self._points.append(point_xyz)
|
|
self._status.modified()
|
|
|
|
def insert(self, index: int):
|
|
"""Insert a new point at index.
|
|
|
|
Args:
|
|
index: The index of new profile.
|
|
|
|
Returns:
|
|
The new point.
|
|
"""
|
|
point = PointXYZ(0., 0., 0., status=self._status)
|
|
self._points.insert(index, point)
|
|
self._status.modified()
|
|
return point
|
|
|
|
def filter_isnan(self, lst):
|
|
"""Returns the input list without 'nan' element
|
|
|
|
Args:
|
|
lst: The list to filter
|
|
|
|
Returns:
|
|
The list without 'nan'
|
|
"""
|
|
return [x for x in lst if not np.isnan(x)]
|
|
|
|
def _first_point_not_nan(self):
|
|
first_point = self._points[0]
|
|
|
|
for point in self._points:
|
|
if not point.is_nan():
|
|
first_point = point
|
|
break
|
|
|
|
return first_point
|
|
|
|
def _last_point_not_nan(self):
|
|
last_point = self._points[-1]
|
|
|
|
for point in self._points[::-1]:
|
|
if not point.is_nan():
|
|
last_point = point
|
|
break
|
|
|
|
return last_point
|
|
|
|
@timer
|
|
def get_station(self) -> np.ndarray:
|
|
"""Projection of the points of the profile on a plane.
|
|
|
|
Args:
|
|
profile: The profile
|
|
|
|
Returns:
|
|
Projection of the points of the profile on a plane.
|
|
"""
|
|
if self.nb_points < 3:
|
|
return None
|
|
else:
|
|
first_named_point = None
|
|
index_first_named_point = None
|
|
last_named_point = None
|
|
|
|
first_point_not_nan = self._first_point_not_nan()
|
|
last_point_not_nan = self._last_point_not_nan()
|
|
|
|
for index, point in enumerate(self._points):
|
|
if point.point_is_named():
|
|
index_first_named_point = index
|
|
first_named_point = point
|
|
break
|
|
|
|
for point in reversed(self._points):
|
|
if point.point_is_named():
|
|
last_named_point = point
|
|
break
|
|
|
|
station = []
|
|
constant = 0.0
|
|
|
|
if (first_named_point is not None and
|
|
last_named_point is not None):
|
|
if (first_named_point != last_named_point and
|
|
first_named_point.x != last_named_point.x):
|
|
vector = Vector1d(first_named_point, last_named_point)
|
|
normalized_direction_vec = vector.normalized_direction_vector()
|
|
else:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
normalized_direction_vec = vector.normalized_direction_vector()
|
|
|
|
for point in self._points:
|
|
xi = point.x - first_named_point.x
|
|
yi = point.y - first_named_point.y
|
|
station_i = (normalized_direction_vec[0] * xi +
|
|
normalized_direction_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
constant = station[index_first_named_point]
|
|
elif first_named_point is None:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
normalized_direction_vec = vector.normalized_direction_vector()
|
|
|
|
for point in self._points:
|
|
xi = point.x - first_point_not_nan.x
|
|
yi = point.y - first_point_not_nan.y
|
|
station_i = (normalized_direction_vec[0] * xi +
|
|
normalized_direction_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
z_min = self.z_min()
|
|
index_profile_z_min = list(
|
|
filter(
|
|
lambda z: z[1] == z_min,
|
|
enumerate(self.z())
|
|
)
|
|
)[0]
|
|
constant = station[index_profile_z_min[0]]
|
|
|
|
return list(map(lambda s: s - constant, station))
|