Merge branch 'RubarBE' into v0.0.8.

setup.py
Pierre-Antoine Rouby 2024-05-07 14:43:58 +02:00
commit f4561494f6
6 changed files with 627 additions and 93 deletions

View File

@ -209,6 +209,14 @@ class Friction(SQLSubModel):
self._status.modified()
def __contains__(self, kp):
return self.contains_kp(kp)
def contains_kp(self, kp):
return (
self._begin_kp <= kp <= self._end_kp
)
@property
def begin_strickler(self):
return self._begin_strickler

View File

@ -19,6 +19,7 @@
import logging
from tools import timer
from shapely import geometry
from Model.Geometry.Point import Point
from Model.Except import NotImplementedMethodeError
@ -341,3 +342,12 @@ class Profile(object):
# Abstract method for width approximation
def get_water_limits(self, z):
raise NotImplementedMethodeError(self, self.get_water_limits)
def wet_points(self, z):
raise NotImplementedMethodeError(self, self.wet_point)
def wet_perimeter(self, z):
raise NotImplementedMethodeError(self, self.wet_perimeter)
def wet_area(self, z):
raise NotImplementedMethodeError(self, self.wet_area)

View File

@ -22,6 +22,7 @@ from typing import List
from functools import reduce
from tools import timer
from shapely import geometry
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Except import ClipboardFormatError
@ -373,99 +374,13 @@ class ProfileXYZ(Profile, SQLSubModel):
"""
return [x for x in lst if not np.isnan(x)]
def _first_point_not_nan(self):
first_point = self.points[0]
def speed(self, q, z):
area = self.wet_area(z)
for point in self.points:
if not point.is_nan():
first_point = point
break
if area == 0:
return 0
return first_point
def _last_point_not_nan(self):
last_point = self.points[-1]
for point in self.points[::-1]:
if not point.is_nan():
last_point = point
break
return last_point
@timer
def get_station(self) -> np.ndarray:
"""Projection of the points of the profile on a plane.
Args:
profile: The profile
Returns:
Projection of the points of the profile on a plane.
"""
if self.nb_points < 3:
return None
else:
first_named_point = None
index_first_named_point = None
last_named_point = None
first_point_not_nan = self._first_point_not_nan()
last_point_not_nan = self._last_point_not_nan()
for index, point in enumerate(self.points):
if point.point_is_named():
index_first_named_point = index
first_named_point = point
break
for point in reversed(self.points):
if point.point_is_named():
last_named_point = point
break
station = []
constant = 0.0
if (first_named_point is not None and
last_named_point is not None):
if (first_named_point != last_named_point and
first_named_point.x != last_named_point.x):
vector = Vector1d(first_named_point, last_named_point)
norm_dir_vec = vector.normalized_direction_vector()
else:
vector = Vector1d(first_point_not_nan, last_point_not_nan)
norm_dir_vec = vector.normalized_direction_vector()
for point in self.points:
xi = point.x - first_named_point.x
yi = point.y - first_named_point.y
station_i = (norm_dir_vec[0] * xi +
norm_dir_vec[1] * yi)
station.append(station_i)
constant = station[index_first_named_point]
elif first_named_point is None:
vector = Vector1d(first_point_not_nan, last_point_not_nan)
norm_dir_vec = vector.normalized_direction_vector()
for point in self.points:
xi = point.x - first_point_not_nan.x
yi = point.y - first_point_not_nan.y
station_i = (norm_dir_vec[0] * xi +
norm_dir_vec[1] * yi)
station.append(station_i)
z_min = self.z_min()
index_profile_z_min = next(
filter(
lambda z: z[1] == z_min,
enumerate(self.z())
)
)
constant = station[index_profile_z_min[0]]
return list(map(lambda s: s - constant, station))
return q / area
def width_approximation(self):
if self.has_standard_named_points():
@ -477,6 +392,42 @@ class ProfileXYZ(Profile, SQLSubModel):
return abs(rg.dist(rd))
def wet_perimeter(self, z):
poly = self.wet_polygon(z)
if poly is None:
return 0
return poly.length
def wet_area(self, z):
poly = self.wet_polygon(z)
if poly is None:
return 0
return poly.area
def wet_polygon(self, z):
points = self.wet_points(z)
if len(points) < 3:
return None
z = map(lambda p: p.z, points)
station = self._get_station(points)
poly = geometry.Polygon(list(zip(station, z)))
return poly
def wet_points(self, z):
left, right = self.get_water_limits(z)
points = list(filter(lambda p: p.z < z, self._points))
points = [left] + points + [right]
points = sorted(points, key=lambda p: p.x)
return points
def get_water_limits(self, z):
"""
Determine left and right limits of water elevation.
@ -509,7 +460,7 @@ class ProfileXYZ(Profile, SQLSubModel):
[self.point(i_left).z, self.point(i_left - 1).z],
[self.point(i_left).y, self.point(i_left - 1).y]
)
pt_left = PointXYZ(x, y, z)
pt_left = PointXYZ(x, y, z, name="wl_left")
else:
pt_left = self.point(0)
@ -525,8 +476,105 @@ class ProfileXYZ(Profile, SQLSubModel):
[self.point(i_right).z, self.point(i_right + 1).z],
[self.point(i_right).y, self.point(i_right + 1).y]
)
pt_right = PointXYZ(x, y, z)
pt_right = PointXYZ(x, y, z, name="wl_right")
else:
pt_right = self.point(self.number_points - 1)
return pt_left, pt_right
def get_station(self):
"""Projection of the points of the profile on a plane.
Args:
self: The profile
Returns:
Projection of the points of the profile on a plane.
"""
if self.nb_points < 3:
return None
else:
return self._get_station(self.points)
@timer
def _get_station(self, points):
first_named_point = None
index_first_named_point = None
last_named_point = None
first_point_not_nan = self._first_point_not_nan(points)
last_point_not_nan = self._last_point_not_nan(points)
for index, point in enumerate(points):
if point.point_is_named():
index_first_named_point = index
first_named_point = point
break
for point in reversed(points):
if point.point_is_named():
last_named_point = point
break
station = []
constant = 0.0
if (first_named_point is not None and
last_named_point is not None):
if (first_named_point != last_named_point and
first_named_point.x != last_named_point.x):
vector = Vector1d(first_named_point, last_named_point)
norm_dir_vec = vector.normalized_direction_vector()
else:
vector = Vector1d(first_point_not_nan, last_point_not_nan)
norm_dir_vec = vector.normalized_direction_vector()
for point in points:
xi = point.x - first_named_point.x
yi = point.y - first_named_point.y
station_i = (norm_dir_vec[0] * xi +
norm_dir_vec[1] * yi)
station.append(station_i)
constant = station[index_first_named_point]
elif first_named_point is None:
vector = Vector1d(first_point_not_nan, last_point_not_nan)
norm_dir_vec = vector.normalized_direction_vector()
for point in points:
xi = point.x - first_point_not_nan.x
yi = point.y - first_point_not_nan.y
station_i = (norm_dir_vec[0] * xi +
norm_dir_vec[1] * yi)
station.append(station_i)
z_min = self.z_min()
index_profile_z_min = next(
filter(
lambda z: z[1] == z_min,
enumerate(self.z())
)
)
constant = station[index_profile_z_min[0]]
return list(map(lambda s: s - constant, station))
def _first_point_not_nan(self, points):
first_point = None
for point in points:
if not point.is_nan():
first_point = point
break
return first_point
def _last_point_not_nan(self, points):
last_point = None
for point in reversed(points):
if not point.is_nan():
last_point = point
break
return last_point

View File

@ -366,6 +366,30 @@ class Reach(SQLSubModel):
else:
return 0.0
def inter_profiles_kp(self):
profiles = sorted(self.profiles, key=lambda p: p.kp)
first = profiles[0]
last = profiles[-1]
res_kp = [first.kp]
profiles = iter(profiles)
previous = next(profiles)
for profile in profiles:
prev = previous.kp
curr = profile.kp
diff = abs(previous.kp - profile.kp)
new = previous.kp + (diff / 2.0)
res_kp.append(new)
previous = profile
res_kp.append(last.kp)
return res_kp
# Sediment Layers
def get_sl(self):

441
src/Solver/RubarBE.py Normal file
View File

@ -0,0 +1,441 @@
# RubarBE.py -- Pamhyr
# Copyright (C) 2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
import numpy as np
from tools import timer, trace, old_pamhyr_date_to_timestamp
from Solver.CommandLine import CommandLineSolver
from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile
logger = logging.getLogger()
class RubarBE(CommandLineSolver):
_type = "rubarbe"
def __init__(self, name):
super(RubarBE, self).__init__(name)
self._type = "rubarbe"
self._cmd_input = ""
self._cmd_solver = "@path @input -o @output"
self._cmd_output = ""
@classmethod
def default_parameters(cls):
lst = super(RubarBE, cls).default_parameters()
lst += [
("rubarbe_cfl", "0.50000E+00"),
("rubarbe_condam", "1"),
("rubarbe_condav", "3"),
("rubarbe_regime", "0"),
("rubarbe_iodev", "n"),
("rubarbe_iodebord", ""),
("rubarbe_iostockage", ""),
("rubarbe_iopdt", "y"),
("rubarbe_iovis", "n"),
("rubarbe_rep", "n"),
("rubarbe_tinit", "000:00:00:00"),
("rubarbe_tmax", "999:99:99:00"),
("rubarbe_tiopdt", "000:00:00:00"),
("rubarbe_dt", "3000.0"),
("rubarbe_ts", "999:99:99:00"),
("rubarbe_dtsauv", "999:99:99:00"),
("rubarbe_psave", "999:99:99:00"),
("rubarbe_fdeb1", "1"),
("rubarbe_fdeb2", "10"),
("rubarbe_fdeb3", "100"),
("rubarbe_tf_1", "y"),
("rubarbe_tf_2", "y"),
("rubarbe_tf_3", "y"),
("rubarbe_tf_4", "y"),
("rubarbe_tf_5", "y"),
("rubarbe_tf_6", "n"),
("rubarbe_trased", "y"),
("rubarbe_optfpc", "0"),
("rubarbe_ros", "2650.0"),
("rubarbe_dm", "0.1"),
("rubarbe_segma", "1.0"),
# Sediment parameters
("rubarbe_sediment_ros", "2650.0"),
("rubarbe_sediment_por", "0.4"),
("rubarbe_sediment_dcharg", "0.0"),
("rubarbe_sediment_halfa", "1.0"),
("rubarbe_sediment_mult_1", "1.0"),
("rubarbe_sediment_mult_2", ""),
("rubarbe_sediment_mult_3", ""),
("rubarbe_sediment_mult_4", ""),
("rubarbe_sediment_mult_5", ""),
("rubarbe_sediment_visc", "0.047"),
("rubarbe_sediment_opts", "6"),
("rubarbe_sediment_odchar", "0"),
("rubarbe_sediment_unisol", "1"),
("rubarbe_sediment_typdef", "3"),
("rubarbe_sediment_depot", "2"),
("rubarbe_sediment_choixc", "2"),
("rubarbe_sediment_option", "2"),
("rubarbe_sediment_capsol", "1"),
("rubarbe_sediment_bmiu", "0.85"),
("rubarbe_sediment_demix", "0"),
("rubarbe_sediment_defond", "1"),
("rubarbe_sediment_varcons", "1"),
("rubarbe_sediment_dchard", "0.0"),
("rubarbe_sediment_dchars", "0.0"),
]
return lst
@classmethod
def checkers(cls):
lst = [
]
return lst
##########
# Export #
##########
def cmd_args(self, study):
lst = super(RubarBE, self).cmd_args(study)
return lst
def input_param(self):
name = self._study.name
return f"{name}.REP"
def output_param(self):
name = self._study.name
return f"{name}.BIN"
def log_file(self):
name = self._study.name
return f"{name}.TRA"
def export(self, study, repertory, qlog=None):
self._study = study
name = study.name.replace(" ", "_")
self._export_donnee(study, repertory, qlog, name=name)
self._export_ts(study, repertory, qlog, name=name)
self._export_geomac_i(study, repertory, qlog, name=name)
self._export_mail(study, repertory, qlog, name=name)
self._export_tps(study, repertory, qlog, name=name)
self._export_stricklers(study, repertory, qlog, name=name)
def _export_donnee(self, study, repertory, qlog, name="0"):
if qlog is not None:
qlog.put("Export DONNEE file")
with open(
os.path.join(
repertory, f"donnee.{name}"
), "w+"
) as f:
params = filter(
lambda p: "rubarbe_sediment_" not in p.name,
study.river.get_params(self._type).parameters
)
it = iter(params)
line = 0
while line < 29:
param = next(it)
name = param.name
value = param.value
if value != "":
# Value format
if value.count(':') == 3:
value = old_pamhyr_date_to_timestamp(value)
value = f"{value:>12.5e}".upper()
if value.count('.') == 1:
value = f"{float(value):>12.5e}".upper()
if value == "y" or value == "n":
value = "O" if value == "y" else "N"
# Write value
f.write(f"{name:<50}{value}")
# Add values of 'rubarbe_iodebord' and
# 'rubarbe_iostockage'
if name == "rubarbe_iodev":
v2 = next(it).value
v3 = next(it).value
f.write(f"{v2}{v3}")
# New line
f.write(f"\n")
line += 1
def _export_ts(self, study, repertory, qlog, name="0"):
if qlog is not None:
qlog.put("Export TS file")
with open(
os.path.join(
repertory, f"ts.{name}"
), "w+"
) as f:
def float_format(string):
if "." in string:
return f"{float(string):>10.0f}"
return ""
params = filter(
lambda p: "rubarbe_sediment_" in p.name,
study.river.get_params(self.type).parameters
)
it = iter(params)
line = 0
while line < 20:
param = next(it)
name = param.name
value = param.value
if value != "":
# Value format
if value.count('.') == 1:
value = f"{float_format(value)}"
else:
value = f"{value:>10}"
# Write value
f.write(f"{name:<50}{value}")
# Add values of 'rubarbe_iodebord' and
# 'rubarbe_iostockage'
if name == "rubarbe_sediment_mult_1":
m2 = f"{float_format(next(it).value)}"
m3 = f"{float_format(next(it).value)}"
m4 = f"{float_format(next(it).value)}"
m5 = f"{float_format(next(it).value)}"
f.write(f"{m2}{m3}{m4}{m5}")
# New line
f.write(f"\n")
line += 1
def _export_geomac_i(self, study, repertory, qlog, name="0"):
if qlog is not None:
qlog.put("Export GEOMAC-i file")
with open(
os.path.join(
repertory, f"geomac-i.{name}"
), "w+"
) as f:
for edge in study.river.enable_edges():
reach = edge.reach
n_profiles = len(reach)
time = 0.0
f.write(f"{n_profiles:>5} {time:>11.3f}\n")
ind = 1
for profile in reach.profiles:
kp = profile.kp
n_points = len(profile)
f.write(f"{ind:>4} {kp:>11.3f} {n_points:>4}\n")
for point in profile.points:
label = point.name.lower()
if label != "":
if label[0] == "r":
label = label[1].upper()
else:
label = lable[0]
y = point.y
z = point.z
dcs = 0.001
scs = 1.0
tmcs = 0.0
f.write(
f"{label} {y:>11.5f}" +
f"{z:>13.5f}{dcs:>15.10f}" +
f"{scs:>15.10f}{tmcs:>15.5f}" +
"\n"
)
ind += 1
def _export_mail(self, study, repertory, qlog, name="0"):
if qlog is not None:
qlog.put("Export MAIL file")
with open(
os.path.join(
repertory, f"mail.{name}"
), "w+"
) as f:
for edge in study.river.enable_edges():
reach = edge.reach
lm = len(reach) + 1
f.write(f"{lm:>13}\n")
for mails in [reach.inter_profiles_kp(),
reach.get_kp()]:
ind = 0
for mail in mails:
f.write(f"{mail:15.3f}")
ind += 1
if ind % 3 == 0:
f.write("\n")
if ind % 3 != 0:
f.write("\n")
def _export_stricklers(self, study, repertory, qlog, name="0"):
self._export_frot(study, repertory, qlog, name=name, version="")
self._export_frot(study, repertory, qlog, name=name, version="2")
def _export_frot(self, study, repertory, qlog, name="0", version=""):
if qlog is not None:
qlog.put(f"Export FROT{version} file")
with open(
os.path.join(
repertory, f"frot{version}.{name}"
), "w+"
) as f:
for edge in study.river.enable_edges():
reach = edge.reach
lm = len(reach) + 1
f.write(f"{lm:>6}\n")
def get_stricklers_from_kp(kp):
return next(
map(
lambda s: (
s.begin_strickler.medium if version == "2"
else s.begin_strickler.minor
),
filter(
lambda f: kp in f,
edge.frictions.lst
)
)
)
ind = 1
for mail in edge.reach.inter_profiles_kp():
coef = get_stricklers_from_kp(mail)
f.write(f"{ind:>6} {coef:>12.5f}")
ind += 1
f.write("\n")
def _export_tps(self, study, repertory, qlog, name="0"):
if qlog is not None:
qlog.put("Export TPS file")
with open(
os.path.join(
repertory, f"tps.{name}"
), "w+"
) as f:
for edge in study.river.enable_edges():
reach = edge.reach
f.write(f"0.0\n")
ics = study.river.initial_conditions.get(edge)
data = self._export_tps_init_data(ics)
profiles = reach.profiles
first = profiles[0]
last = profiles[-1]
if first.kp not in data or last.kp not in data:
logger.error("Study initial condition is not fully defined")
return
f_h_s = self._export_tps_profile_height_speed(first, data)
l_h_s = self._export_tps_profile_height_speed(last, data)
# First mail
f.write(f"{1:>5} {f_h_s[0]} {f_h_s[1]}")
ind = 2
it = iter(profiles)
prev = next(it)
prev_h, prev_s = f_h_s
for profile in it:
if profile.kp not in data:
ind += 1
continue
cur_h, cur_s = self._export_tps_profile_height_speed(
profile, data
)
# Mean of height and speed
h = (prev_h + cur_h) / 2
s = (prev_s + cur_s) / 2
f.write(f"{ind:>5} {h} {s}\n")
prev_h, prev_s = cur_h, cur_s
ind += 1
# Last mail
f.write(f"{ind:>5} {f_h_s[0]} {f_h_s[1]}")
def _export_tps_init_data(self, ics):
data = {}
for d in ics.data:
data[d['kp']] = (
d['elevation'],
d['discharge'],
)
return data
def _export_tps_profile_height_speed(self, profile, data):
z = data[profile.kp][0]
q = data[profile.kp][1]
height = z - profile.z_min()
speed = profile.speed(q, z)
return height, speed

View File

@ -22,6 +22,7 @@ from Solver.GenericSolver import GenericSolver
from Solver.Mage import (
Mage7, Mage8, MageFake7,
)
from Solver.RubarBE import RubarBE
_translate = QCoreApplication.translate
@ -30,6 +31,7 @@ solver_long_name = {
# "mage7": "Mage v7",
"mage8": "Mage v8",
# "mage_fake7": "Mage fake v7",
"rubarbe": "RubarBE",
}
solver_type_list = {
@ -37,4 +39,5 @@ solver_type_list = {
# "mage7": Mage7,
"mage8": Mage8,
# "mage_fake7": MageFake7,
"rubarbe": RubarBE,
}