mirror of https://gitlab.com/pamhyr/pamhyr2
1142 lines
32 KiB
Python
1142 lines
32 KiB
Python
# ProfileXYZ.py -- Pamhyr
|
|
# Copyright (C) 2023-2025 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import queue
|
|
import random
|
|
import logging
|
|
import numpy as np
|
|
from typing import List
|
|
from functools import reduce
|
|
from dataclasses import dataclass
|
|
|
|
from tools import timer, flatten
|
|
from shapely import geometry
|
|
|
|
from Model.Tools.PamhyrDB import SQLSubModel
|
|
from Model.Except import ClipboardFormatError
|
|
from Model.Scenario import Scenario
|
|
from Model.Geometry.Profile import Profile
|
|
from Model.Geometry.PointXYZ import PointXYZ
|
|
from Model.Geometry.Vector_1d import Vector1d
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
@dataclass
|
|
class Tabulation:
|
|
z: np.array([])
|
|
A: np.array([])
|
|
L: np.array([])
|
|
|
|
|
|
class ProfileXYZ(Profile, SQLSubModel):
|
|
_sub_classes = [
|
|
PointXYZ,
|
|
]
|
|
|
|
def __init__(self,
|
|
id: int = -1,
|
|
ind: int = -1,
|
|
name: str = "",
|
|
rk: float = 0.,
|
|
reach=None,
|
|
num=0,
|
|
nb_point: int = 0,
|
|
code1: int = 0, code2: int = 0,
|
|
status=None, owner_scenario=-1):
|
|
"""ProfileXYZ constructor
|
|
|
|
Args:
|
|
num: The number of this profile
|
|
code1: The interpolation code 1
|
|
code2: The interpolation code 2
|
|
rk: Kilometer point
|
|
name: The name of profile
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
super(ProfileXYZ, self).__init__(
|
|
id=id,
|
|
name=name,
|
|
num=num, rk=rk,
|
|
code1=code1, code2=code2,
|
|
_type="XYZ",
|
|
reach=reach,
|
|
status=status,
|
|
owner_scenario=owner_scenario
|
|
)
|
|
|
|
self._db_ind = ind
|
|
self.tab = Tabulation([], [], [])
|
|
self.tab_up_to_date = False
|
|
self.time_z = 0.0
|
|
self.time_A = 0.0
|
|
self.time_l = 0.0
|
|
self._station = []
|
|
self.station_up_to_date = False
|
|
|
|
@classmethod
|
|
def _db_create(cls, execute, ext=""):
|
|
execute(f"""
|
|
CREATE TABLE geometry_profileXYZ{ext} (
|
|
{cls.create_db_add_pamhyr_id()},
|
|
deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
|
ind INTEGER NOT NULL,
|
|
name TEXT,
|
|
reach INTEGER NOT NULL,
|
|
rk REAL NOT NULL,
|
|
num INTEGER NOT NULL,
|
|
code1 INTEGER NOT NULL,
|
|
code2 INTEGER NOT NULL,
|
|
sl INTEGER,
|
|
{Scenario.create_db_add_scenario()},
|
|
{Scenario.create_db_add_scenario_fk()},
|
|
FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id),
|
|
FOREIGN KEY(sl) REFERENCES sedimentary_layer(pamhyr_id),
|
|
PRIMARY KEY(pamhyr_id, scenario)
|
|
)
|
|
""")
|
|
|
|
if ext == "_tmp":
|
|
return True
|
|
|
|
return cls._create_submodel(execute)
|
|
|
|
@classmethod
|
|
def _db_update(cls, execute, version, data=None):
|
|
major, minor, release = version.strip().split(".")
|
|
if major == minor == "0":
|
|
rl = int(release)
|
|
if rl < 2:
|
|
execute(
|
|
"""
|
|
ALTER TABLE geometry_profileXYZ
|
|
ADD COLUMN sl INTEGER
|
|
REFERENCES sedimentary_layer(id)
|
|
"""
|
|
)
|
|
|
|
if rl < 11:
|
|
execute(
|
|
"""
|
|
ALTER TABLE geometry_profileXYZ
|
|
RENAME COLUMN kp TO rk
|
|
"""
|
|
)
|
|
|
|
cls._db_update_to_0_2_0(execute, data)
|
|
|
|
if major == "0" and minor == "1":
|
|
if int(release) < 2:
|
|
execute(
|
|
"ALTER TABLE geometry_profileXYZ " +
|
|
"ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE"
|
|
)
|
|
|
|
return cls._update_submodel(execute, version, data)
|
|
|
|
@classmethod
|
|
def _db_update_to_0_2_0(cls, execute, data):
|
|
table = "geometry_profileXYZ"
|
|
id2pid = data['id2pid']
|
|
reachs = id2pid['river_reach']
|
|
|
|
cls._db_update_cleanup_points(execute)
|
|
|
|
cls.update_db_add_pamhyr_id(execute, table, data)
|
|
Scenario.update_db_add_scenario(execute, table)
|
|
|
|
cls._db_create(execute, ext="_tmp")
|
|
|
|
execute(
|
|
f"INSERT INTO {table}_tmp " +
|
|
"(pamhyr_id, ind, name, reach, rk, " +
|
|
"num, code1, code2, sl, scenario) " +
|
|
"SELECT pamhyr_id, ind, name, reach, rk, " +
|
|
"num, code1, code2, sl, scenario " +
|
|
f"FROM {table}"
|
|
)
|
|
|
|
execute(f"DROP TABLE {table}")
|
|
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
|
|
|
|
cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs)
|
|
|
|
if 'sedimentary_layer' in id2pid:
|
|
sl = id2pid['sedimentary_layer']
|
|
cls._db_update_to_0_2_0_set_sl_pid(execute, table, sl)
|
|
|
|
@classmethod
|
|
def _db_update_cleanup_points(cls, execute):
|
|
profiles = set(
|
|
map(
|
|
str,
|
|
flatten(
|
|
execute(
|
|
"SELECT id FROM geometry_profileXYZ"
|
|
)
|
|
)
|
|
)
|
|
)
|
|
|
|
execute(
|
|
"DELETE FROM geometry_pointXYZ " +
|
|
f"WHERE profile NOT IN ({', '.join(profiles)})"
|
|
)
|
|
|
|
@classmethod
|
|
def _db_update_to_0_2_0_set_sl_pid(cls, execute, table, sl):
|
|
els = execute(
|
|
f"SELECT pamhyr_id, sl FROM {table}"
|
|
)
|
|
|
|
for row in els:
|
|
it = iter(row)
|
|
pid = next(it)
|
|
sl_id = next(it)
|
|
|
|
if sl_id == -1:
|
|
continue
|
|
|
|
execute(
|
|
f"UPDATE {table} " +
|
|
f"SET sl = {sl[sl_id]} " +
|
|
f"WHERE pamhyr_id = {pid}"
|
|
)
|
|
|
|
@classmethod
|
|
def _db_load(cls, execute, data=None):
|
|
new = []
|
|
|
|
status = data["status"]
|
|
scenario = data["scenario"]
|
|
loaded = data['loaded_pid']
|
|
reach = data["reach"]
|
|
|
|
if scenario is None:
|
|
return new
|
|
|
|
table = execute(
|
|
"SELECT pamhyr_id, ind, deleted, name, rk, num, " +
|
|
"code1, code2, sl, scenario " +
|
|
"FROM geometry_profileXYZ " +
|
|
f"WHERE reach = {reach.id} " +
|
|
f"AND scenario = {scenario.id} " +
|
|
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " +
|
|
"ORDER BY ind ASC"
|
|
)
|
|
|
|
for row in table:
|
|
it = iter(row)
|
|
|
|
pid = next(it)
|
|
ind = next(it)
|
|
deleted = (next(it) == 1)
|
|
name = next(it)
|
|
rk = next(it)
|
|
num = next(it)
|
|
code1 = next(it)
|
|
code2 = next(it)
|
|
sl = next(it)
|
|
owner_scenario = next(it)
|
|
|
|
profile = cls(
|
|
id=pid, ind=ind, num=num,
|
|
name=name, rk=rk,
|
|
code1=code1, code2=code2,
|
|
reach=reach,
|
|
status=status,
|
|
owner_scenario=owner_scenario
|
|
)
|
|
if deleted:
|
|
profile.set_as_deleted()
|
|
|
|
if sl == -1 or sl is None:
|
|
profile._sl = None
|
|
else:
|
|
profile._sl = next(
|
|
filter(
|
|
lambda s: s.id == sl,
|
|
data["sediment_layers_list"].sediment_layers
|
|
),
|
|
None
|
|
)
|
|
|
|
loaded_save = data['loaded_pid']
|
|
data['loaded_pid'] = set()
|
|
data["profile"] = profile
|
|
|
|
profile._points = list(
|
|
map(
|
|
lambda ip: ip[1],
|
|
sorted(
|
|
PointXYZ._db_load(execute, data.copy()),
|
|
key=lambda ip: ip[0]
|
|
)
|
|
)
|
|
)
|
|
|
|
data['loaded_pid'] = loaded_save
|
|
|
|
loaded.add(pid)
|
|
new.append(profile)
|
|
|
|
data["scenario"] = scenario.parent
|
|
new += cls._db_load(execute, data)
|
|
data["scenario"] = scenario
|
|
|
|
return new
|
|
|
|
def _db_save(self, execute, data=None):
|
|
if not self.must_be_saved():
|
|
return True
|
|
|
|
ok = True
|
|
ind = data["ind"]
|
|
|
|
sl = self._sl.pamhyr_id if self._sl is not None else -1
|
|
|
|
execute(
|
|
"INSERT OR REPLACE INTO " +
|
|
"geometry_profileXYZ(pamhyr_id, deleted, ind, name, reach, " +
|
|
"rk, num, code1, code2, sl, scenario) " +
|
|
"VALUES (" +
|
|
f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " +
|
|
f"{ind}, '{self._db_format(self._name)}', " +
|
|
f"{self.reach.pamhyr_id}, {self.rk}, {self.num}, " +
|
|
f"{self.code1}, {self.code1}, {sl}, {self._status.scenario_id}" +
|
|
")"
|
|
)
|
|
|
|
data["profile"] = self
|
|
execute(
|
|
"DELETE FROM geometry_pointXYZ " +
|
|
f"WHERE profile = {self.pamhyr_id} " +
|
|
f"AND scenario = {self._status.scenario_id}"
|
|
)
|
|
|
|
ind = 0
|
|
for point in self._points:
|
|
data["ind"] = ind
|
|
ok &= point._db_save(execute, data)
|
|
ind += 1
|
|
|
|
return ok
|
|
|
|
def _data_traversal(self,
|
|
predicate=lambda obj, data: True,
|
|
modifier=lambda obj, data: None,
|
|
data={}):
|
|
if predicate(self, data):
|
|
modifier(self, data)
|
|
|
|
for p in self._points:
|
|
p._data_traversal(predicate, modifier, data)
|
|
|
|
@classmethod
|
|
def from_data(cls, header, data):
|
|
profile = None
|
|
try:
|
|
if len(header) == 0:
|
|
name = data[0]
|
|
rk = data[1]
|
|
reach = data[2]
|
|
status = data[3]
|
|
|
|
profile = cls(
|
|
id=-1,
|
|
name=name,
|
|
rk=rk,
|
|
reach=reach,
|
|
status=status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'reach', 'rk', 'status'}
|
|
d = {}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
profile = cls(**d)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise ClipboardFormatError(header, data)
|
|
|
|
return profile
|
|
|
|
def copy(self):
|
|
new_p = ProfileXYZ(
|
|
id=-1,
|
|
name=self.name,
|
|
rk=self.rk,
|
|
reach=self.reach,
|
|
status=self._status
|
|
)
|
|
if self.is_deleted():
|
|
new_p.set_as_deleted()
|
|
|
|
new_p._sl = self.sl
|
|
|
|
for point in self._points:
|
|
p = point.copy()
|
|
print(p)
|
|
new_p._points.append(p)
|
|
|
|
new_p.modified()
|
|
return new_p
|
|
|
|
def cloned_for(self, new_reach):
|
|
new_p = ProfileXYZ(
|
|
id=-1,
|
|
name=self.name,
|
|
rk=self.rk,
|
|
reach=new_reach,
|
|
status=self._status
|
|
)
|
|
if self.is_deleted():
|
|
new_p.set_as_deleted()
|
|
|
|
new_p._sl = self.sl
|
|
|
|
for point in self._points:
|
|
p = point.cloned_for(new_p)
|
|
new_p._points.append(p)
|
|
|
|
new_p.modified()
|
|
return new_p
|
|
|
|
def point_from_data(self, header, data):
|
|
def float_format(s: str):
|
|
return float(
|
|
s.replace(",", ".")
|
|
)
|
|
|
|
point = None
|
|
try:
|
|
if len(header) == 0:
|
|
x = float_format(data[0])
|
|
y = float_format(data[1])
|
|
z = float_format(data[2])
|
|
name = data[3] if len(data) == 4 else ""
|
|
|
|
point = PointXYZ(
|
|
x, y, z, name, profile=self, status=self._status
|
|
)
|
|
else:
|
|
valid_header = {'name', 'x', 'y', 'z'}
|
|
d = {"status": self._status, "profile": self}
|
|
for i, v in enumerate(data):
|
|
h = header[i].strip().lower().split(' ')[0]
|
|
if h in valid_header:
|
|
d[h] = v
|
|
|
|
point = PointXYZ(**d)
|
|
except Exception as e:
|
|
raise ClipboardFormatError(header=header, data=data)
|
|
|
|
return point
|
|
|
|
def display_name(self):
|
|
name = ""
|
|
if self.name != "":
|
|
name += f"{self.name} "
|
|
|
|
name += f"({self.rk})"
|
|
|
|
return name
|
|
|
|
def modified(self):
|
|
super(ProfileXYZ, self).modified()
|
|
|
|
self.tab_up_to_date = False
|
|
self.station_up_to_date = False
|
|
|
|
def x(self):
|
|
return [point.x for point in self.points]
|
|
|
|
def y(self):
|
|
return [point.y for point in self.points]
|
|
|
|
def z(self):
|
|
return [point.z for point in self.points]
|
|
|
|
def names(self):
|
|
return [point.name for point in self.points]
|
|
|
|
def x_max(self):
|
|
return max(self.filter_isnan(self.x()))
|
|
|
|
def x_min(self):
|
|
return min(self.filter_isnan(self.x()))
|
|
|
|
def y_max(self):
|
|
return max(self.filter_isnan(self.y()))
|
|
|
|
def y_min(self):
|
|
return min(self.filter_isnan(self.y()))
|
|
|
|
def z_max(self):
|
|
return max(self.filter_isnan(self.z()))
|
|
|
|
def z_min(self):
|
|
return min(self.filter_isnan(self.z()))
|
|
|
|
def import_points(self, list_points: list):
|
|
"""Import a list of points to profile
|
|
|
|
Args:
|
|
list_points: Liste of PointXYZ
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
for point in list_points:
|
|
named_args = {
|
|
"x": point[0],
|
|
"y": point[1],
|
|
"z": point[2],
|
|
"name": point[3],
|
|
}
|
|
|
|
pt = PointXYZ(
|
|
id=-1, **named_args, profile=self,
|
|
status=self._status
|
|
)
|
|
self._points.append(pt)
|
|
|
|
self.modified()
|
|
|
|
def get_point_i(self, index: int) -> PointXYZ:
|
|
"""Get point at index.
|
|
|
|
Args:
|
|
index: Index of point.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
return self.points[index]
|
|
except IndexError:
|
|
raise IndexError(f"Invalid point index: {index}")
|
|
|
|
def get_point_by_name(self, name: str) -> PointXYZ:
|
|
"""Get point by name.
|
|
|
|
Args:
|
|
name: Point name.
|
|
|
|
Returns:
|
|
The point.
|
|
"""
|
|
try:
|
|
n_name = name.lower().strip()
|
|
return next(
|
|
filter(
|
|
lambda p: p.name.lower().strip() == n_name,
|
|
self.points
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"{e}")
|
|
raise IndexError(
|
|
f"Invalid point name: {name} " +
|
|
f"for profile ({self.id}) rk = {self.rk}"
|
|
)
|
|
|
|
def has_standard_named_points(self):
|
|
l, r = reduce(
|
|
lambda acc, n: (
|
|
(acc[0] | (n == "rg")),
|
|
(acc[1] | (n == "rd"))
|
|
),
|
|
map(lambda p: p.name.lower().strip(),
|
|
self.points),
|
|
(False, False)
|
|
)
|
|
|
|
return l & r
|
|
|
|
def add(self):
|
|
"""Add a new PointXYZ to profile.
|
|
|
|
Returns:
|
|
Nothing.
|
|
"""
|
|
point_xyz = PointXYZ(
|
|
x=0., y=0., z=0., profile=self,
|
|
status=self._status
|
|
)
|
|
self.points.append(point_xyz)
|
|
|
|
self.modified()
|
|
|
|
def insert(self, index: int):
|
|
"""Insert a new point at index.
|
|
|
|
Args:
|
|
index: The index of new profile.
|
|
|
|
Returns:
|
|
The new point.
|
|
"""
|
|
x, y, z = (0., 0., 0.)
|
|
|
|
if 0 < len(self._points) >= index:
|
|
x, y, z = self._points[index - 1].get_coordinate()
|
|
|
|
point = PointXYZ(x=x, y=y, z=z, profile=self, status=self._status)
|
|
self._points.insert(index, point)
|
|
|
|
self.modified()
|
|
|
|
return point
|
|
|
|
def filter_isnan(self, lst):
|
|
"""Returns the input list without 'nan' element
|
|
|
|
Args:
|
|
lst: The list to filter
|
|
|
|
Returns:
|
|
The list without 'nan'
|
|
"""
|
|
return [x for x in lst if not np.isnan(x)]
|
|
|
|
def speed(self, q, z):
|
|
area = self.wet_area(z)
|
|
|
|
if area == 0:
|
|
return 0
|
|
|
|
return q / area
|
|
|
|
def width_approximation(self):
|
|
if self.has_standard_named_points():
|
|
rg = self.get_point_by_name("rg")
|
|
rd = self.get_point_by_name("rd")
|
|
else:
|
|
rg = self.points[0]
|
|
rd = self.points[-1]
|
|
|
|
return abs(rg.dist(rd))
|
|
|
|
def compute_wet_width(self, z):
|
|
start, end = self.get_all_water_limits_ac(z)
|
|
|
|
if len(start) == 0:
|
|
return 0
|
|
|
|
length = 0.0
|
|
for s, e in zip(start, end):
|
|
length += abs(s - e)
|
|
return length
|
|
|
|
def wet_width(self, z):
|
|
if self.tab_up_to_date:
|
|
if z > self.tab.z[-1]:
|
|
return self.tab.L[-1]
|
|
elif z < self.tab.z[0]:
|
|
return 0.0
|
|
else:
|
|
return np.interp(z, self.tab.z, self.tab.L)
|
|
else:
|
|
return self.compute_wet_width(z)
|
|
|
|
def wet_perimeter(self, z):
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0
|
|
|
|
length = 0.0
|
|
for line in lines:
|
|
length += line.length
|
|
return length
|
|
|
|
def compute_wet_area(self, z):
|
|
area = 0.0
|
|
if len(self.tab.L) > 0:
|
|
if z < self.tab.z[0]:
|
|
return 0.0
|
|
i = np.searchsorted([z], self.tab.z, side='right')[0]
|
|
for j in range(i-1):
|
|
area += (self.tab.L[j] + self.tab.L[j+1]) * (
|
|
self.tab.z[j+1] - self.tab.z[j]) / 2.0
|
|
area += (self.tab.L[i-1] + self.wet_width(z))
|
|
else:
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0.0
|
|
for line in lines:
|
|
if len(line.coords) > 2:
|
|
poly = geometry.Polygon(line)
|
|
area += poly.area
|
|
return area
|
|
|
|
def wet_area(self, z):
|
|
if self.tab_up_to_date:
|
|
if z > self.tab.z[-1]:
|
|
return self.tab.A[-1] + self.tab.L[-1] * (z - self.tab.z[-1])
|
|
elif z < self.tab.z[0]:
|
|
return 0.0
|
|
else:
|
|
return np.interp(z, self.tab.z, self.tab.A)
|
|
else:
|
|
lines = self.wet_lines(z)
|
|
|
|
if lines is None:
|
|
return 0.0
|
|
|
|
area = 0.0
|
|
for line in lines:
|
|
if len(line.coords) > 2:
|
|
poly = geometry.Polygon(line)
|
|
area += poly.area
|
|
return area
|
|
|
|
def wet_radius(self, z):
|
|
p = self.wet_perimeter(z)
|
|
a = self.wet_area(z)
|
|
|
|
if p == 0:
|
|
return 0
|
|
|
|
return a/p
|
|
|
|
def wet_line(self, z):
|
|
points = self.wet_points(z)
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = map(lambda p: p.z, points)
|
|
station = self._get_station(points)
|
|
|
|
line = geometry.LineString(list(zip(station, zz)))
|
|
return line
|
|
|
|
def wet_lines(self, z):
|
|
points = self._points
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
lines = []
|
|
|
|
zz = list(map(lambda p: p.z, points))
|
|
station = self._get_station(points)
|
|
|
|
line = []
|
|
for i in range(self.number_points-1):
|
|
|
|
if zz[i] >= z and zz[i+1] < z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i+1], zz[i]],
|
|
[station[i+1], station[i]]
|
|
)
|
|
line.append([y, z])
|
|
|
|
if zz[i] < z:
|
|
line.append([station[i], zz[i]])
|
|
|
|
if zz[i] <= z and zz[i+1] >= z:
|
|
y = np.interp(
|
|
z,
|
|
[zz[i], zz[i+1]],
|
|
[station[i], station[i+1]]
|
|
)
|
|
line.append([y, z])
|
|
if len(line) > 2:
|
|
lines.append(geometry.LineString(line))
|
|
line = []
|
|
|
|
if zz[self.number_points-1] < z:
|
|
line.append([station[self.number_points-1], z])
|
|
if len(line) > 2:
|
|
lines.append(geometry.LineString(line))
|
|
line = []
|
|
|
|
return lines
|
|
|
|
def max_water_depth(self, z):
|
|
return z - self.z_min()
|
|
|
|
def mean_water_depth(self, z):
|
|
a = self.wet_area(z)
|
|
w = self.wet_width(z)
|
|
|
|
if w == 0:
|
|
return 0
|
|
|
|
return a/w
|
|
|
|
def wet_polygon(self, z):
|
|
points = self.wet_points(z)
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = map(lambda p: p.z, points)
|
|
station = self._get_station(points)
|
|
|
|
poly = geometry.Polygon(list(zip(station, zz)))
|
|
return poly
|
|
|
|
def wet_points(self, z):
|
|
left, right = self.get_water_limits(z)
|
|
|
|
points = list(filter(lambda p: p.z < z, self._points))
|
|
points = [left] + points + [right]
|
|
points = sorted(points, key=lambda p: p.x)
|
|
|
|
return points
|
|
|
|
def get_nb_wet_areas(self, z):
|
|
|
|
n_zones = 0
|
|
points = self._points
|
|
if points[0].z <= z:
|
|
n_zones += 1
|
|
|
|
for i in range(self.number_points-1):
|
|
if points[i].z > z and points[i+1].z <= z:
|
|
n_zones += 1
|
|
|
|
return n_zones
|
|
|
|
def get_all_water_limits_ac(self, z):
|
|
"""
|
|
Determine all water limits for z elevation.
|
|
"""
|
|
|
|
points = self._points
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
zz = list(map(lambda p: p.z, points))
|
|
station = self.get_station()
|
|
|
|
start = []
|
|
if points[0].z <= z:
|
|
start.append(station[0])
|
|
|
|
for i in range(self.number_points-1):
|
|
if zz[i] > z and zz[i+1] <= z:
|
|
fact = (z - zz[i]) / (zz[i+1] - zz[i])
|
|
y = station[i] + fact * (station[i+1] - station[i])
|
|
start.append(y)
|
|
|
|
end = []
|
|
if points[-1].z <= z:
|
|
end.append(station[-1])
|
|
|
|
for i in reversed(range(self.number_points-1)):
|
|
if zz[i] <= z and zz[i+1] > z:
|
|
fact = (z - zz[i]) / (zz[i+1] - zz[i])
|
|
y = station[i] + fact * (station[i+1] - station[i])
|
|
end.append(y)
|
|
|
|
if len(start) != len(end):
|
|
logger.error(f"ERROR in get_all_water_limits_ac")
|
|
return [], []
|
|
|
|
return start, list(reversed(end))
|
|
|
|
def get_water_limits(self, z):
|
|
"""
|
|
Determine left and right limits of water elevation.
|
|
"""
|
|
|
|
# Get the index of first point with elevation lesser than water
|
|
# elevation (for the right and left river side)
|
|
i_left = -1
|
|
i_right = -1
|
|
|
|
for i in range(self.number_points):
|
|
if self.point(i).z <= z:
|
|
i_left = i
|
|
break
|
|
|
|
for i in reversed(range(self.number_points)):
|
|
if self.point(i).z <= z:
|
|
i_right = i
|
|
break
|
|
|
|
# Interpolate points at river left side
|
|
if (i_left > 0):
|
|
if abs(self.point(i_left - 1).z - self.point(i_left).z) < 1e-20:
|
|
pt_left = self.point(i_left)
|
|
else:
|
|
fact = (z - self.point(i_left).z) / (self.point(i_left - 1).z
|
|
- self.point(i_left).z)
|
|
x = self.point(i_left).x + fact * (self.point(i_left - 1).x
|
|
- self.point(i_left).x)
|
|
y = self.point(i_left).y + fact * (self.point(i_left - 1).y
|
|
- self.point(i_left).y)
|
|
pt_left = PointXYZ(x=x, y=y, z=z, name="wl_left")
|
|
else:
|
|
pt_left = self.point(0)
|
|
|
|
# Interpolate points at river right side
|
|
if (i_right < self.number_points - 1):
|
|
if abs(self.point(i_right + 1).z - self.point(i_right).z) < 1e-20:
|
|
pt_right = self.point(i_right)
|
|
else:
|
|
fact = (z - self.point(i_right).z) / \
|
|
(self.point(i_right + 1).z - self.point(i_right).z)
|
|
x = self.point(i_right).x + fact * \
|
|
(self.point(i_right + 1).x - self.point(i_right).x)
|
|
y = self.point(i_right).y + fact * \
|
|
(self.point(i_right + 1).y - self.point(i_right).y)
|
|
pt_right = PointXYZ(x=x, y=y, z=z, name="wl_right")
|
|
else:
|
|
pt_right = self.point(self.number_points - 1)
|
|
|
|
return pt_left, pt_right
|
|
|
|
def compute_tabulation(self):
|
|
sorted_points = sorted(self._points, key=lambda p: p.z)
|
|
self.tab.z = np.array([p.z for p in sorted_points], np.float64)
|
|
self.tab.L = np.zeros(len(self.tab.z), np.float64)
|
|
self.tab.A = np.zeros(len(self.tab.z), np.float64)
|
|
for i in range(1, len(self.tab.z)):
|
|
self.tab.L[i] = self.compute_wet_width(self.tab.z[i])
|
|
dx = (self.tab.L[i] + self.tab.L[i-1])/2
|
|
dz = self.tab.z[i] - self.tab.z[i-1]
|
|
self.tab.A[i] = self.tab.A[i-1] + dz * dx
|
|
|
|
self.tab_up_to_date = True
|
|
|
|
def get_station(self):
|
|
"""Projection of the points of the profile on a plane.
|
|
|
|
Args:
|
|
self: The profile
|
|
|
|
Returns:
|
|
Projection of the points of the profile on a plane.
|
|
"""
|
|
if self.nb_points < 2:
|
|
return [0.0]
|
|
else:
|
|
if self.station_up_to_date:
|
|
return self._station
|
|
else:
|
|
self._station = self._get_station(self.points)
|
|
self.station_up_to_date = True
|
|
return self._station
|
|
|
|
@timer
|
|
def _get_station(self, points):
|
|
first_named_point = None
|
|
index_first_named_point = None
|
|
last_named_point = None
|
|
|
|
first_point_not_nan = self._first_point_not_nan(points)
|
|
last_point_not_nan = self._last_point_not_nan(points)
|
|
|
|
for index, point in enumerate(points):
|
|
if point.is_named():
|
|
index_first_named_point = index
|
|
first_named_point = point
|
|
break
|
|
|
|
for point in reversed(points):
|
|
if point.is_named():
|
|
last_named_point = point
|
|
break
|
|
|
|
station = []
|
|
constant = 0.0
|
|
|
|
if (first_named_point is not None
|
|
and last_named_point is not None):
|
|
if (first_named_point != last_named_point
|
|
and first_named_point.x != last_named_point.x):
|
|
vector = Vector1d(first_named_point, last_named_point)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
else:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_named_point.x
|
|
yi = point.y - first_named_point.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
constant = station[index_first_named_point]
|
|
elif first_named_point is None:
|
|
vector = Vector1d(first_point_not_nan, last_point_not_nan)
|
|
norm_dir_vec = vector.normalized_direction_vector()
|
|
|
|
for point in points:
|
|
xi = point.x - first_point_not_nan.x
|
|
yi = point.y - first_point_not_nan.y
|
|
station_i = (norm_dir_vec[0] * xi +
|
|
norm_dir_vec[1] * yi)
|
|
station.append(station_i)
|
|
|
|
z_min = self.z_min()
|
|
index_profile_z_min = next(
|
|
filter(
|
|
lambda z: z[1] == z_min,
|
|
enumerate(self.z())
|
|
)
|
|
)
|
|
constant = station[index_profile_z_min[0]]
|
|
|
|
return list(map(lambda s: s - constant, station))
|
|
|
|
def _first_point_not_nan(self, points):
|
|
first_point = None
|
|
|
|
for point in points:
|
|
if not point.is_nan():
|
|
first_point = point
|
|
break
|
|
|
|
return first_point
|
|
|
|
def _last_point_not_nan(self, points):
|
|
last_point = None
|
|
|
|
for point in reversed(points):
|
|
if not point.is_nan():
|
|
last_point = point
|
|
break
|
|
|
|
return last_point
|
|
|
|
def purge(self, np_purge):
|
|
"""
|
|
Remove points to keep at most np_purge points.
|
|
"""
|
|
if self.nb_points <= np_purge:
|
|
return
|
|
|
|
nb_named = 0 # we consider the first and last point as named
|
|
area = []
|
|
|
|
for i, p in enumerate(self.points):
|
|
if p.is_named() or i == 0 or i == self.nb_points - 1:
|
|
area.append((9999999.999, p))
|
|
nb_named += 1
|
|
else:
|
|
area.append((
|
|
PointXYZ.areatriangle3d(
|
|
self.point(i-1),
|
|
p,
|
|
self.point(i+1)
|
|
),
|
|
p
|
|
))
|
|
|
|
while self.nb_points > max(np_purge, nb_named):
|
|
ind, (min_area, p) = min(
|
|
enumerate(area),
|
|
key=lambda t: t[1][0]
|
|
)
|
|
|
|
p.set_as_deleted()
|
|
area.pop(ind)
|
|
|
|
for i in [ind-1, ind]:
|
|
p = self.point(i)
|
|
|
|
if p.is_named() or i == 0 or i == self.nb_points - 1:
|
|
area[i] = (9999999.999, p)
|
|
else:
|
|
area[i] = (
|
|
PointXYZ.areatriangle3d(
|
|
self.point(i-1), p, self.point(i+1)
|
|
),
|
|
p
|
|
)
|
|
|
|
def shift(self, x, y, z):
|
|
for p in self._points:
|
|
p.x = p.x + x
|
|
p.y = p.y + y
|
|
p.z = p.z + z
|
|
|
|
self.modified()
|
|
|
|
def modified(self):
|
|
self.tab_up_to_date = False
|
|
self.station_up_to_date = False
|
|
|
|
def add_npoints(self, npoints):
|
|
# add npoints in a profile
|
|
for k in range(npoints):
|
|
disti = 0.0
|
|
i = 1
|
|
for j in range(len(self.points)-1):
|
|
distj = self.point(j).dist(self.point(j+1))
|
|
if distj > disti:
|
|
disti = distj
|
|
i = j
|
|
self.insert_point(i, self.point(i).copy())
|
|
self.point(i+1).name = ''
|
|
self.point(i+1).x = 0.5 * self.point(i).x + 0.5 * self.point(i+2).x
|
|
self.point(i+1).y = 0.5 * self.point(i).y + 0.5 * self.point(i+2).y
|
|
self.point(i+1).z = 0.5 * self.point(i).z + 0.5 * self.point(i+2).z
|
|
|
|
def copy(self):
|
|
p = ProfileXYZ(id=self.id,
|
|
name=self.name,
|
|
rk=self.rk,
|
|
reach=self.reach,
|
|
num=self.num,
|
|
status=self._status)
|
|
for i, k in enumerate(self.points):
|
|
p.insert_point(i, k.copy())
|
|
|
|
return p
|
|
|
|
def purge_aligned_points(self):
|
|
|
|
align = True
|
|
while (align):
|
|
align = False
|
|
for i in range(1, self.number_points - 1):
|
|
d = self.point(i).dist_from_seg(self.point(i-1),
|
|
self.point(i+1))
|
|
if d < 0.00001 and self.point(i).name == "":
|
|
align = True
|
|
self.delete_i([i])
|
|
break
|
|
|
|
def hard_purge_aligned_points(self):
|
|
align = True
|
|
points = self.points
|
|
while (align):
|
|
align = False
|
|
for i in range(1, len(points) - 1):
|
|
d = points[i].dist_from_seg(points[i-1],
|
|
points[i+1])
|
|
if d < 0.00001 and points[i].name == "":
|
|
align = True
|
|
points.pop(i)
|
|
break
|
|
self._points = points
|
|
self.modified()
|