Pamhyr2/src/Model/Geometry/Reach.py

894 lines
24 KiB
Python

# Reach.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
import shapefile
import numpy as np
from time import time
from typing import List
from copy import deepcopy
from operator import itemgetter
from functools import reduce
from tools import flatten, timer, trace, logger_exception
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Geometry.Profile import Profile
from Model.Geometry.ProfileXYZ import ProfileXYZ
from Model.Except import FileFormatError, exception_message_box
logger = logging.getLogger()
class Reach(SQLSubModel):
_sub_classes = [
ProfileXYZ,
]
def __init__(self, status=None, parent=None):
self.id = parent.id if parent is not None else 0
self._status = status
self._parent = parent
self._profiles: List[Profile] = []
self._guidelines_is_valid = False
self._guidelines = {}
@classmethod
def _db_create(cls, execute):
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version):
return cls._update_submodel(execute, version)
@classmethod
def _db_load(cls, execute, data=None):
new = cls(status=data["status"], parent=data["reach"])
new._profiles = ProfileXYZ._db_load(
execute,
data=data.copy()
)
return new
def _db_save(self, execute, data=None):
profiles = self.profiles
# Delete old data
execute(f"DELETE FROM geometry_profileXYZ WHERE reach = {self.id}")
if data is None:
data = {}
ind = 0
for profile in profiles:
data["ind"] = ind
profile._db_save(execute, data)
ind += 1
return True
def profile(self, i):
"""Returns profile at index i
Args:
i: The index of profile
Returns:
The profile at index i
"""
if i < len(self.profiles):
return self.profiles[i]
return None
@property
def name(self):
if self._parent is None:
return ""
return self._parent.name
def _get_profiles_list(self):
# Profiles list generator is type (int, Point) with the first
# element the index of the Point in list
logger.debug(f"Load profiles from reach {self.name}")
return list(
map(
lambda p: p[1],
sorted(
self._profiles,
key=lambda p: p[0]
)
)
)
def __len__(self):
if not isinstance(self._profiles, list):
self._profiles = self._get_profiles_list()
return len(self._profiles)
@property
def profiles(self):
if not isinstance(self._profiles, list):
self._profiles = self._get_profiles_list()
return self._profiles
@profiles.setter
def profiles(self, profiles):
self._profiles = profiles
def get_profiles_from_rk(self, rk):
return list(
filter(
lambda p: p.rk == rk, self.profiles
)
)
@property
def number_profiles(self):
"""
Returns:
Number of profiles
"""
return len(self.profiles)
def get_geometry(self) -> List[Profile]:
"""
Returns:
The profiles list.
"""
return self.profiles
def _update_profile_numbers(self):
"""Update profiles index
Returns:
Nothing.
"""
for ind, profile in enumerate(self.get_geometry()):
profile.num = ind + 1
def insert(self, index: int):
"""Insert new profile in list
Args:
index: The position of the new profile.
Returns:
The new profile.
"""
profile = ProfileXYZ(reach=self, status=self._status)
self.profiles.insert(index, profile)
self._update_profile_numbers()
self._status.modified()
return profile
def insert_profile(self, index: int, profile: Profile):
"""Insert new profile in list
Args:
index: The position of the new profile.
Returns:
Nothing.
"""
self.profiles.insert(index, profile)
self._update_profile_numbers()
self._status.modified()
def delete(self, indexes):
"""Delete some elements in profile list
Args:
indexes: The list of index to delete
Returns:
Nothing.
"""
profiles = set(
map(
lambda e: e[1],
filter(
lambda e: e[0] in indexes,
enumerate(self.profiles)
)
)
)
self._profiles = list(
filter(
lambda p: p not in profiles,
self.profiles
)
)
self._update_profile_numbers()
self._status.modified()
def delete_profiles(self, profiles):
"""Delete some elements in profile list
Args:
profiles: The list of profile to delete
Returns:
Nothing.
"""
self._profiles = list(
filter(
lambda p: p not in profiles,
self.profiles
)
)
self._update_profile_numbers()
self._status.modified()
def purge(self):
self._profiles = []
self._update_profile_numbers()
self._status.modified()
def move_up_profile(self, index: int):
if index < len(self.profiles):
next = index - 1
p = self.profiles
p[index], p[next] = p[next], p[index]
self._status.modified()
def move_down_profile(self, index: int):
if index >= 0:
prev = index + 1
p = self.profiles
p[index], p[prev] = p[prev], p[index]
self._status.modified()
def get_x(self):
return list(
map(
lambda profile: profile.x(),
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_y(self):
return list(
map(
lambda profile: profile.y(),
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_z(self):
return list(
map(
lambda profile: profile.z(),
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_z_min(self):
"""List of z min for each profile
Returns:
List of z min for each profile
"""
return list(
map(
lambda profile: profile.z_min(),
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_z_max(self):
"""List of z max for each profile
Returns:
List of z max for each profile
"""
return list(
map(
lambda profile: profile.z_max(),
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_rk(self):
"""List of profiles rk
Returns:
List of profiles rk
"""
return [profile.rk for profile in self.profiles]
def get_rk_complete_profiles(self):
return list(
map(
lambda profile: profile.rk,
filter(
lambda profile: len(profile) > 0,
self.profiles
)
)
)
def get_rk_min(self):
if len(self.get_rk()) > 0:
return min(self.get_rk())
else:
return 0.0
def get_rk_max(self):
if len(self.get_rk()) > 0:
return max(self.get_rk())
else:
return 0.0
def inter_profiles_rk(self):
profiles = sorted(self.profiles, key=lambda p: p.rk)
if len(profiles) == 0:
return []
first = profiles[0]
last = profiles[-1]
res_rk = [first.rk]
profiles = iter(profiles)
previous = next(profiles)
for profile in profiles:
prev = previous.rk
curr = profile.rk
diff = abs(previous.rk - profile.rk)
new = previous.rk + (diff / 2.0)
res_rk.append(new)
previous = profile
res_rk.append(last.rk)
return res_rk
# Sediment Layers
def get_sl(self):
"""Get sediment layer height of profile
Get sediment layer of profile (without spesific point sl)
Returns:
List of sediment layers height
"""
res = []
psl = [profile.sl for profile in self.profiles]
# Compute max number of layers
sl_max = 0
for sl in psl:
n = 0 if sl is None else len(sl)
sl_max = max(n, sl_max)
# Create list of height for each sl and each layer
for i in range(0, sl_max):
cur = []
# Compute new layer line for each sl
for sl in psl:
if sl is not None and i < len(sl):
cur.append(sl.get(i).height)
else:
cur.append(0)
# Add layer line to result
res.append(cur)
return res
# Guidelines
@timer
def _compute_guidelines_cache(self, guide_set, named_points,
complete, incomplete):
# Reset guide lines cache
self._guidelines = {}
self._complete_guidelines = complete.copy()
self._incomplete_guidelines = incomplete.copy()
self._guidelines_is_valid = len(incomplete) == 0
# Make a list of point for each guideline
for guide in guide_set:
self._guidelines[guide] = flatten(
map(
lambda lst: list(
# Filter point with name (we assume we have
# only one point by profile)
filter(
lambda p: p.name == guide,
lst
)
),
named_points
)
)
@timer
def compute_guidelines(self):
"""Compute reach guidelines
Returns:
Tuple of complete and incomplete guidelines name.
"""
# Get all point contained into a guideline
named_points = [profile.named_points() for profile in self.profiles]
points_name = list(
map(
lambda lst: list(map(lambda p: p.name, lst)),
named_points
)
)
# Get all guide line name
guide_set = reduce(
lambda acc, x: set(x).union(acc),
points_name,
set()
)
# Get incomplete guideline
incomplete = set(
reduce(
lambda acc, h: acc + h,
map(
lambda lst: list(
set(lst).symmetric_difference(guide_set)
),
points_name
),
[]
)
)
complete = guide_set - incomplete
# Compute guideline and put data in cache
self._compute_guidelines_cache(guide_set, named_points,
complete, incomplete)
self._status.modified()
return (complete, incomplete)
def _map_guidelines_points(self, func, full=False):
if len(self._guidelines) == 0:
_ = self.compute_guidelines()
return list(
# Map for each guideline
map(
lambda k: list(
# Apply function FUNC on each point of guideline
map(
func,
self._guidelines[k],
)
),
# Get only guide lines if FULL is False
self._guidelines if full else filter(
lambda x: x in self._complete_guidelines,
self._guidelines
)
)
)
def _complete_filter(self, gl):
return gl in self._complete_guidelines
def get_guidelines_x(self):
return self._map_guidelines_points(lambda p: p.x)
def get_guidelines_y(self):
return self._map_guidelines_points(lambda p: p.y)
def get_guidelines_z(self):
return self._map_guidelines_points(lambda p: p.z)
# Sort
@timer
def sort(self, is_reversed: bool = False):
self.profiles = sorted(
self.profiles,
key=lambda profile: profile.rk,
reverse=is_reversed
)
self._status.modified()
@timer
def sort_with_indexes(self, indexes: list):
if len(self.profiles) != len(indexes):
logger.critical("Indexes list do not correspond to profile list")
self.profiles = list(
map(
lambda x: x[1],
sorted(
enumerate(self.profiles),
key=lambda x: indexes[x[0]]
)
)
)
self._status.modified()
# Import/Export
def import_geometry(self, file_path_name: str):
ext = file_path_name.split(".")
if len(ext) > 0:
ext = ext[-1].lower()
if ext in ['st', 'm']:
return self.import_geometry_st(file_path_name)
if ext in ['shp']:
return self.import_geometry_shp(file_path_name)
raise FileFormatError(
file_path_name,
f"Geometry file extention ('.{ext}') unkown"
)
@timer
def import_geometry_shp(self, file_path_name: str):
sf = shapefile.Reader(
os.path.abspath(file_path_name)
)
if sf.shapeType != shapefile.POLYLINEZ:
raise FileFormatError(
file_path_name,
"Shapefile is not a POLYLINEZ"
)
profiles = []
for i in range(len(sf)):
s_profile = sf.shape(i)
z = s_profile.z
ind = 0
points = []
for point in s_profile.points:
points.append([
point[0],
point[1],
z[ind]
])
ind += 1
prof = ProfileXYZ(
reach=self, status=self._status
)
prof.import_points(points)
profiles.append(prof)
self.profiles = profiles + self.profiles
self._update_profile_numbers()
self._recompute_rk()
return profiles
@timer
def import_geometry_st(self, file_path_name: str):
"""Import a geometry from file (.ST or .st)
Args:
file_path_name: The absolute path of geometry file (.ST or .st)
to import.
Returns:
Nothing.
"""
imported_profiles = []
try:
list_profile, list_header = self.read_file_st(str(file_path_name))
profile_header = ["num", "code1", "code2",
"nb_point", "rk", "name"]
if list_profile and list_header:
for ind, profile in enumerate(list_profile):
d = {}
for i, data in enumerate(list_header[ind]):
d[profile_header[i]] = data
prof = ProfileXYZ(
**d, reach=self, status=self._status
)
prof.import_points(profile)
imported_profiles.append(prof)
self._status.modified()
except FileNotFoundError as e:
logger.error(e)
exception_message_box(e)
except FileFormatError as e:
logger.error(e)
e.alert()
finally:
self.profiles = imported_profiles + self.profiles
self._update_profile_numbers()
return imported_profiles
@timer
def read_file_st(self, filename):
"""Read the ST file
Returns:
List of profiles and list of headers.
"""
t0 = time()
line_is_header = True
list_point_profile = []
list_profile = []
list_header = []
stop_code = "999.999"
logger.debug(f"Import geometry from {filename}")
with open(filename, encoding="utf-8") as file_st:
for line in file_st:
if not (line.startswith("#") or line.startswith("*")):
line = line.split()
if line_is_header:
if len(line) >= 6:
list_header.append(line[:6])
elif len(line) == 5:
line.append("")
list_header.append(line)
line_is_header = False
else:
# Read until "999.9990 999.9990" as found
if len(line) == 3:
x, y, z = line
if stop_code in x and stop_code in y:
line_is_header = True
list_profile.append(list_point_profile)
list_point_profile = []
else:
line.append("")
list_point_profile.append(line)
elif len(line) == 4:
x, y, z, ld = line
if stop_code in x and stop_code in y:
list_profile.append(list_point_profile)
list_point_profile = []
line_is_header = True
else:
list_point_profile.append(line)
elif len(line) > 4:
x, y, z = line[:3]
if stop_code in x and stop_code in y:
line_is_header = True
list_profile.append(list_point_profile)
list_point_profile = []
else:
line.append("")
list_point_profile.append(line[:3])
else:
pass
return list_profile, list_header
# TODO: Move this function to model reach
def export_reach(self, filename):
with open(f"{filename}", "w") as st:
cnt = 0
for profile in self.profiles:
num = f"{cnt:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
rk = f"{profile.rk:>12f}"[0:12]
pname = profile.name
if profile.name == "":
pname = f"p{profile.id:>3}".replace(" ", "0")
st.write(f"{num}{c1}{c2}{t} {rk} {pname}\n")
for point in profile.points:
x = f"{point.x:<12.4f}"[0:12]
y = f"{point.y:<12.4f}"[0:12]
z = f"{point.z:<12.4f}"[0:12]
n = f"{point.name:<3}"
st.write(f"{x} {y} {z} {n}\n")
st.write(" 999.9990 999.9990 999.9990")
st.write("\n")
cnt += 1
def get_incline(self):
if len(self.profiles) == 0:
return 0.0
first = self.profile(0)
last = self.profile(len(self) - 1)
z_first = first.z_min()
rk_first = first.rk
z_last = last.z_min()
rk_last = last.rk
return (
(z_last - z_first)
/
(rk_last - rk_first)
)
def get_incline_mean(self):
profiles = self.profiles
if len(profiles) == 0:
return 0.0
previous = profiles[0]
incline_acc = 0
for profile in profiles[1:]:
z_first = previous.z_min()
rk_first = previous.rk
z_last = profile.z_min()
rk_last = profile.rk
incline_acc += (
(z_last - z_first)
/
(rk_last - rk_first)
)
previous = profile
return (incline_acc / (len(profiles) - 1))
def get_incline_median(self):
profiles = self.profiles
if len(profiles) == 0:
return 0.0
previous = profiles[0]
incline_acc = []
for profile in profiles[1:]:
z_first = previous.z_min()
rk_first = previous.rk
z_last = profile.z_min()
rk_last = profile.rk
incline_acc += [
(z_last - z_first)
/
(rk_last - rk_first)
]
previous = profile
incline_acc.sort()
return incline_acc[round(len(profiles)/2)]
def get_incline_median_mean(self):
profiles = self.profiles
if len(profiles) == 0:
return 0.0
previous = profiles[0]
incline_acc = []
for profile in profiles[1:]:
z_first = previous.z_min()
rk_first = previous.rk
z_last = profile.z_min()
rk_last = profile.rk
incline_acc += [
(z_last - z_first)
/
(rk_last - rk_first)
]
previous = profile
incline_acc.sort()
marge = round(len(incline_acc) * 0.1)
if marge > 0:
incline_set = incline_acc[marge:-marge]
else:
incline_set = incline_acc
logger.debug(f"+{incline_acc}")
logger.debug(f"-{incline_set}")
try:
return (
reduce(
lambda acc, x: acc + x,
incline_set,
0.0
) / (len(incline_set))
)
except Exception as e:
logger_exception(e)
return 0
def _recompute_rk(self, offset=0.0):
self._recompute_rk_no_gl(offset=offset)
def _recompute_rk_no_gl(self, offset=0.0):
profiles = iter(self.profiles)
previous = next(profiles)
previous.rk = offset
for profile in profiles:
prev_points = previous.points
curr_points = profile.points
dist = (
prev_points[0].dist_2d(curr_points[0]) +
prev_points[-1].dist_2d(curr_points[-1])
) / 2.0
profile.rk = previous.rk + dist
previous = profile