Pamhyr2/src/Solver/Mage.py

1311 lines
39 KiB
Python

# Mage.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
import numpy as np
from tools import timer, trace, logger_exception
from Solver.CommandLine import CommandLineSolver
from Checker.Mage import (
MageNetworkGraphChecker, MageGeometryGuideLineChecker,
)
from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile
logger = logging.getLogger()
def mage_file_open(filepath, mode):
f = open(filepath, mode)
if "w" in mode:
# Write header
comment = "*"
if ".ST" in filepath:
comment = "#"
f.write(
f"{comment} " +
"This file is generated by PAMHYR, please don't modify\n"
)
return f
class Mage(CommandLineSolver):
_type = "mage"
def __init__(self, name):
super(Mage, self).__init__(name)
self._type = "mage"
self._cmd_input = ""
self._cmd_solver = "@path @input -o @output"
self._cmd_output = ""
@classmethod
def default_parameters(cls):
lst = super(Mage, cls).default_parameters()
lst += [
("mage_min_timestep", "1.0"),
("mage_timestep_tra", "3600"),
("mage_timestep_bin", "3600"),
# ("mage_timestep_melissa", "0"),
("mage_implicitation", "0.70"),
("mage_continuity_discretization", "S"),
("mage_qsj_discretization", "B"),
("mage_stop_criterion_iterations", "R"),
("mage_iteration_type", "0"),
("mage_smooth_coef", "0"),
("mage_cfl_max", "-1."),
("mage_min_height", "0.1"),
("mage_max_niter", "99"),
("mage_timestep_reduction_factor", "2"),
("mage_precision_reduction_factor_Z", "1"),
("mage_precision_reduction_factor_Q", "1"),
("mage_niter_max_precision", "10"),
("mage_niter_before_switch", "99"),
("mage_max_froude", "1.5"),
("mage_diffluence_node_height_balance", "-1"),
("mage_compute_reach_volume_balance", "y"),
("mage_max_reach_volume_balance", "0.001"),
("mage_min_reach_volume_to_check", "1000.0"),
("mage_init_internal", "n"),
]
return lst
@classmethod
def checkers(cls):
lst = [
MageNetworkGraphChecker(connectivity=True, version=cls._type),
MageNetworkGraphChecker(connectivity=False, version=cls._type),
MageGeometryGuideLineChecker(version=cls._type),
]
return lst
##########
# Export #
##########
def cmd_args(self, study):
lst = super(Mage, self).cmd_args(study)
lst.append("-r")
return lst
def input_param(self):
name = self._study.name
return f"{name}.REP"
def output_param(self):
name = self._study.name
return f"{name}.BIN"
def log_file(self):
name = self._study.name
return f"{name}.TRA"
def _export_REP_additional_lines(self, study, rep_file):
lines = filter(
lambda line: line.is_enabled(),
study.river.rep_lines.lines
)
for line in lines:
rep_file.write(line.line)
def _export_ST(self, study, repertory, qlog, name="0"):
files = []
if qlog is not None:
qlog.put("Export ST file")
os.makedirs(os.path.join(repertory, "net"), exist_ok=True)
# Write header
edges = study.river.enable_edges()
for edge in edges:
name = f"Reach_{edge.id + 1:>3}".replace(" ", "0")
with mage_file_open(
os.path.join(repertory, "net", f"{name}.ST"),
"w+"
) as f:
files.append(str(os.path.join("net", f"{name}.ST")))
cnt_num = 1
for profile in edge.reach.profiles:
self._export_ST_profile_header(
f, files, profile, cnt_num
)
cnt_num += 1
# Points
for point in profile.points:
self._export_ST_point_line(
f, files, point
)
# Profile last line
f.write(f" 999.9990 999.9990 999.9990\n")
return files
def _export_ST_profile_header(self, wfile, files,
profile, cnt):
num = f"{cnt:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
rk = f"{profile.rk:>12f}"[0:12]
pname = profile.name
if profile.name == "":
# Generate name from profile id prefixed with
# 'p' (and replace space char with '0' char)
pname = f"p{profile.id:>3}".replace(" ", "0")
name = f"{pname:<19}"
# Generate sediment additional data if available
sediment = ""
if profile.sl is not None:
if not any(filter(lambda f: ".GRA" in f, files)):
files.append(self._gra_file)
# Number of layers
nl = len(profile.sl)
sediment = f" {nl:>3}"
# Layers data
for layer in profile.sl.layers:
sediment += (
f" {layer.height:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Profile header line
wfile.write(f"{num}{c1}{c2}{t} {rk} {pname} {sediment}\n")
def _export_ST_point_line(self, wfile, files, point):
x = f"{point.x:<12.4f}"[0:12]
y = f"{point.y:<12.4f}"[0:12]
z = f"{point.z:<12.4f}"[0:12]
n = f"{point.name:<3}"
# Generate sediment additional data if available
sediment = ""
prev = point.z
if point.sl is not None:
# Number of layers
nl = len(point.sl)
sediment = f"{nl:>3}"
# Layers data
for layer in point.sl.layers:
prev = round(prev - layer.height, 5)
sediment += (
f" {prev:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Point line
wfile.write(f"{x} {y} {z} {n} {sediment}\n")
def _export_BC(self, t, bounds, repertory, qlog, name="0"):
files = []
if len(bounds) == 0:
return files
if qlog is not None:
qlog.put(f"Export {t} file")
with mage_file_open(os.path.join(repertory, f"{name}.{t}"), "w+") as f:
files.append(f"{name}.{t}")
for bound in bounds:
if bound.node is None:
continue
name = f"{bound.node.id:3}".replace(" ", "x")
f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
f.write(f"${name}\n")
header = bound.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
for d in bound.data:
v0 = d[0]
v1 = d[1]
if t in ["HYD", "QSO", "LIM"]:
v0 /= 60 # Convert first column to minute
f.write(f"{v0:9} {v1:9} \n")
return files
def _export_bound_cond(self, study, repertory, qlog, name="0"):
files = []
lst = study.river.boundary_condition
AVA = []
HYD = []
LIM = []
QSO = []
for tab in ["liquid", "solid", "suspenssion"]:
for bound in lst.get_tab(tab):
if bound.node is None:
continue
if not study.river.is_enable_node(bound.node):
continue
if bound.bctype == "ZD":
AVA.append(bound)
elif bound.bctype == "TD" or bound.bctype == "PC":
HYD.append(bound)
elif bound.bctype == "TZ":
LIM.append(bound)
elif bound.bctype == "SL":
QSO.append(bound)
files = files + self._export_BC("AVA", AVA, repertory, qlog, name=name)
files = files + self._export_BC("HYD", HYD, repertory, qlog, name=name)
files = files + self._export_BC("LIM", LIM, repertory, qlog, name=name)
files = files + self._export_QSO(QSO, repertory, qlog, name=name)
return files
def _export_lateral_contrib(self, study, repertory, qlog, name="0"):
files = []
lst = study.river.lateral_contribution
with mage_file_open(
os.path.join(repertory, f"{name}.LAT"),
"w+"
) as f:
if qlog is not None:
qlog.put(f"Export LAT file")
files.append(f"{name}.LAT")
for tab in ["liquid", "solid", "suspenssion"]:
for lateral in lst.get_tab(tab):
self._export_LC(study, lateral, f, qlog)
return files
def _export_LC(self, study, lateral, f, qlog, name="0"):
if lateral.edge is None:
return
edges = study.river.enable_edges()
if lateral.edge not in edges:
return
eid, _ = next(
filter(
lambda e: e[1] == lateral.edge,
enumerate(edges)
)
)
name = f"{eid+1:>3}"
# name = f"Reach_{lateral.edge.id + 1:>3}".replace(" ", "0")
f.write(f"* {lateral.edge.name} ({name}) {lateral.lctype}\n")
f.write(
f"${name} " +
f"{lateral.begin_rk:>10.4f} {lateral.end_rk:>10.4f}\n"
)
header = lateral.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
for d in lateral.data:
if lateral.lctype in ["EV"]:
f.write(f"{d[0]:10.3f}{-d[1]:10.3f}\n")
else:
f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
def _export_RUG(self, study, repertory, qlog, name="0"):
files = []
if qlog is not None:
qlog.put("Export RUG file")
# Write header
with mage_file_open(os.path.join(repertory, f"{name}.RUG"), "w+") as f:
files.append(f"{name}.RUG")
edges = study.river.enable_edges()
id = 1
for edge in edges:
frictions = edge.frictions
for friction in frictions.frictions:
if friction.begin_strickler is None:
continue
num = f"{id:>3}"
brk = f"{friction.begin_rk:>10.3f}"
erk = f"{friction.end_rk:>10.3f}"
# if friction.begin_rk != friction.end_rk:
# print("TODO")
strickler = friction.begin_strickler
coef_1 = f"{strickler.minor:>10.3f}"
coef_2 = f"{strickler.medium:>10.3f}"
f.write(f"K{num} {brk}{erk}{coef_1}{coef_2}\n")
id += 1
return files
def _export_INI(self, study, repertory, qlog, name="0"):
files = []
if qlog is not None:
qlog.put("Export INI file")
# Write header
with mage_file_open(os.path.join(repertory, f"{name}.INI"), "w+") as f:
has_ini = False
id = 1
reachs = study.river.enable_edges()
# TODO put real date...
f.write(f"$ date en minutes : 0.00\n")
f.write(f"* IB IS discharge elevation rk\n")
id = 1
for reach in reachs:
cond = study.river.initial_conditions.get(reach)
data = cond.data
if len(data) == 0:
continue
has_ini = True
id_sec = 1
for d in data:
IR = f"{id}"
IS = f"{id_sec}"
discharge = f"{d['discharge']:>10.5f}"
elevation = f"{d['elevation']:>11.6f}"
rk = f"{d['rk']:>9.2f}"
f.write(f"{IR} {IS} {discharge} {elevation} {rk}\n")
id_sec += 1
id += 1
if has_ini:
files.append(f"{name}.INI")
return files
def _export_CAS(self, study, repertory, qlog, name="0"):
files = []
reservoirs = study.river.reservoir.lst
if len(reservoirs) == 0:
return files
if qlog is not None:
qlog.put("Export CAS file")
with mage_file_open(os.path.join(repertory, f"{name}.CAS"), "w+") as f:
files.append(f"{name}.CAS")
for reservoir in reservoirs:
if reservoir.node is None:
continue
reservoir.sort()
node = reservoir.node
name = f"{node.id:3}".replace(" ", "x")
f.write(f"* {node.name} ({name}) Reservoir\n")
f.write(f"${name}\n")
f.write(f"*{'Elev(m)':>9}|{'Area(ha)':>10}\n")
for d in reservoir.data:
v0 = d[0]
v1 = d[1]
f.write(f"{v0:>10.3f}{v1:>10.3f}\n")
return files
def _export_SIN(self, study, repertory, qlog, name="0"):
files = []
sin_dict = {
"ND": "*",
"S1": "D", "S2": "T", "S3": "T",
"OR": "O", "OC": "B", "OV": "F",
"CV": "O", # CheckValve
"V1": "V", "V2": "W",
"BO": "A",
"UD": "X",
"PO": "P",
}
hydraulic_structures = study.river.hydraulic_structures.lst
if len(hydraulic_structures) == 0:
return files
if qlog is not None:
qlog.put("Export SIN file")
with mage_file_open(os.path.join(repertory, f"{name}.SIN"), "w+") as f:
files.append(f"{name}.SIN")
for hs in hydraulic_structures:
if hs.input_reach is None:
continue
if not hs.input_reach.is_enable():
continue
if not hs.enabled:
continue
if hs.input_rk is None:
continue
f.write(
'* ouvrage au pk ' +
f"{float(hs.input_rk):>12.1f}" + ' ' +
hs.name + '\n'
)
self._export_SIN_bhs(study, sin_dict, hs, f)
return files
def _export_SIN_bhs(self, study, sin_dict, hs, f):
for bhs in hs.basic_structures:
if bhs.enabled:
reach_id = study.river.get_edge_id(hs.input_reach) + 1
param_str = ' '.join(
[
f'{p:>10.3f}'
for p in self._export_SIN_parameters(bhs)
]
)
name = bhs.name
if name == "":
name = f"HS_{bhs.id:>3}".replace(" ", "0")
else:
name = name.replace(" ", "_")
f.write(
f"{sin_dict[bhs._type]} " +
f"{reach_id} {float(hs.input_rk):>12.3f} " +
f"{param_str} {name}\n"
)
def _export_SIN_parameters(self, bhs):
res = [9999.999] * 5
if len(bhs) == 5:
res = self._export_SIN_parameters_5(bhs)
elif len(bhs) == 4:
res = self._export_SIN_parameters_4(bhs)
elif len(bhs) == 3:
res = self._export_SIN_parameters_3(bhs)
return res
def _export_SIN_parameters_5(self, bhs):
# S2, OR, V1, V2, UD, CV
return [
bhs._data[0].value,
bhs._data[1].value,
bhs._data[2].value,
bhs._data[3].value,
bhs._data[4].value,
]
def _export_SIN_parameters_4(self, bhs):
# S3, OC
res = [
bhs._data[0].value,
bhs._data[1].value,
bhs._data[2].value,
bhs._data[3].value,
0.0,
]
if bhs._type == "T": # S3
res = [0.0] + res[:-1]
return res
def _export_SIN_parameters_3(self, bhs):
# S1, BO
if bhs._type == "S1":
res = [
bhs._data[0].value,
bhs._data[1].value,
0.0,
bhs._data[2].value,
9999.99,
]
else:
res = [
bhs._data[0].value,
bhs._data[1].value,
bhs._data[2].value,
0.0,
0.0,
]
return res
def _export_VAR(self, study, repertory, qlog, name="0"):
files = []
hydraulic_structures = study.river.hydraulic_structures.lst
if len(hydraulic_structures) == 0:
return files
if qlog is not None:
qlog.put("Export VAR file")
nb_cv = 0
for hs in hydraulic_structures:
if hs.input_reach is None:
continue
if not hs.input_reach.is_enable():
continue
if not hs.enabled:
continue
for bhs in hs.basic_structures:
if bhs.enabled:
logger.info(bhs._type)
if bhs._type == "CV":
nb_cv += 1
if nb_cv != 0:
with mage_file_open(os.path.join(
repertory, f"{name}.VAR"), "w+") as f:
files.append(f"{name}.VAR")
for hs in hydraulic_structures:
if hs.input_reach is None:
continue
if not hs.input_reach.is_enable():
continue
if not hs.enabled:
continue
for bhs in hs.basic_structures:
logger.info(bhs._type)
if bhs._type != "CV":
continue
name = bhs.name
if name == "":
name = f"HS_{bhs.id:>3}".replace(" ", "0")
f.write(
f"${name} clapet"
)
return files
def _export_DEV(self, study, repertory, qlog, name="0"):
files = []
if qlog is not None:
qlog.put("Export DEV file")
with mage_file_open(
os.path.join(
repertory, f"{name}.DEV"
), "w+"
) as f:
reachs = study.river.enable_edges()
id = 1
for reach in reachs:
f.write(f"YD{id:3}\n")
f.write(f"YG{id:3}\n")
id += 1
files.append(f"{name}.DEV")
return files
def _export_REP(self, study, repertory, files, qlog, name="0"):
if qlog is not None:
qlog.put("Export REP file")
# Write header
with mage_file_open(
os.path.join(
repertory, f"{name}.REP"
), "w+"
) as f:
f.write("confirmation=non\n")
for file in files:
EXT = file.split('.')[1]
if EXT not in ["ST", "GRA"]:
f.write(f"{EXT} {file}\n")
f.write("* OUTPUT\n")
f.write(f"TRA {name}.TRA\n")
f.write(f"BIN {name}.BIN\n")
f.write(f"ERR {name}.ERR\n")
for file in files:
EXT = file.split('.')[1]
if EXT in ["GRA"]:
f.write(f"{EXT} {file}\n")
self._export_REP_additional_lines(study, f)
@timer
def export(self, study, repertory, qlog=None):
self._study = study
name = study.name.replace(" ", "_")
# Define GRA file name
self._gra_file = f"{name}.GRA"
self._bin_file = f"{name}.BIN"
self._export_ST(study, repertory, qlog, name=name)
self.export_additional_files(study, repertory, qlog, name=name)
self.export_study_description(study, repertory, qlog, name=name)
return True
###########
# RESULTS #
###########
def read_bin(self, study, fname, results, qlog=None, name="0"):
return
@timer
def results(self, study, repertory, qlog=None, name="0"):
results = Results(
study=study,
solver=self,
repertory=repertory,
name=name,
)
fname = os.path.join(repertory, f"{name}.BIN")
if not os.path.isfile(fname):
logger.info(f"Result file {name}.BIN does not exist")
return None
try:
self.read_bin(study, fname, results, qlog, name=name)
except Exception as e:
logger.error(f"Failed to read results")
logger_exception(e)
return None
return results
##########
# MAGE 7 #
##########
class Mage7(Mage):
_type = "mage7"
def __init__(self, name):
super(Mage7, self).__init__(name)
self._type = "mage7"
@classmethod
def default_parameters(cls):
lst = super(Mage7, cls).default_parameters()
return lst
##########
# MAGE 8 #
##########
class Mage8(Mage):
_type = "mage8"
def __init__(self, name):
super(Mage8, self).__init__(name)
self._type = "mage8"
@classmethod
def default_parameters(cls):
lst = super(Mage8, cls).default_parameters()
# Insert new parameters at specific position
names = list(map(lambda t: t[0], lst))
i = names.index("mage_precision_reduction_factor_Q")
lst.insert(i+1, ("mage_precision_reduction_factor_r", "1"))
# Mage parameter for sediment module (added in DB 0.0.4)
lst.append(("mage_sediment_masse_volumique", "2650.0"))
lst.append(("mage_sediment_angle_repos", "40.0"))
lst.append(("mage_sediment_porosity", "0.40"))
lst.append(("mage_distance_Han", "0.0"))
lst.append(("mage_distance_chargement_d50", "100.0"))
lst.append(("mage_distance_chargement_sigma", "100.0"))
lst.append(("mage_methode_modification_geometrie", "1"))
lst.append(("mage_shields_critique", "1"))
lst.append(("mage_shields_correction", "1"))
lst.append(("mage_capacite_solide", "1"))
lst.append(("mage_pas_de_temps_charriage", "1"))
lst.append(("mage_facteur_multiplicateur", "1.0"))
return lst
##########
# Export #
##########
def cmd_args(self, study):
lst = super(Mage8, self).cmd_args(study)
if study.river.has_sediment():
lst.append("-c=3")
return lst
def _export_PAR(self, study, repertory, qlog=None, name="0"):
files = []
if qlog is not None:
qlog.put("Export PAR file")
with mage_file_open(os.path.join(repertory, f"{name}.PAR"), "w+") as f:
files.append(f"{name}.PAR")
params = study.river.get_params(self.type).parameters
for p in params:
name = p.name\
.replace("all_", "")\
.replace("mage_", "")
value = p.value
if name in ["command_line_arguments"]:
continue
if name == "compute_reach_volume_balance":
value = "O" if value.lower() == "y" else "N"
if name == "init_internal":
value = ("p" if value.lower() in ["y", "yes", "true", "o"]
else "")
logger.debug(
f"export: PAR: {name}: {value} ({p.value})"
)
f.write(f"{name} {value}\n")
return files
def _export_NET(self, study, repertory, qlog=None, name="0"):
files = []
if qlog is not None:
qlog.put("Export NET file")
with mage_file_open(os.path.join(repertory, f"{name}.NET"), "w+") as f:
files.append(f"{name}.NET")
edges = study.river.enable_edges()
for e in edges:
name = f"Reach_{e.id + 1:>3}".replace(" ", "0")
id = name
n1 = f"{e.node1.id:3}".replace(" ", "x")
n2 = f"{e.node2.id:3}".replace(" ", "x")
file = os.path.join("net", name + ".ST")
f.write(f"{id} {n1} {n2} {file}\n")
return files
def _export_QSO(self, bounds, repertory, qlog, name="0"):
files = []
if len(bounds) == 0:
return files
if qlog is not None:
qlog.put(f"Export QSO file")
with mage_file_open(os.path.join(repertory, f"{name}.QSO"), "w+") as f:
files.append(f"{name}.QSO")
for bound in bounds:
# File header
name = f"{bound.node.id:3}".replace(" ", "x")
f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
d50 = bound.d50
sigma = bound.sigma
if len(bound.data) == 0:
f.write(f"${name} {d50} {sigma} default\n")
else:
f.write(f"${name} {d50} {sigma}\n")
# Table header
header = bound.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
# Data
for d in bound.data:
f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
return files
def export_func_dict(self):
return [
self._export_ST, self._export_PAR,
self._export_NET, self._export_bound_cond,
self._export_RUG, self._export_INI,
self._export_SIN, self._export_VAR,
self._export_CAS, self._export_DEV,
self._export_lateral_contrib,
]
@timer
def export(self, study, repertory, qlog=None, name="0"):
self._study = study
name = study.name.replace(" ", "_")
# Define GRA file name
self._gra_file = f"{name}.GRA"
self._bin_file = f"{name}.BIN"
# Generate files
files = []
try:
for func in self.export_func_dict():
files = files + func(study, repertory, qlog, name=name)
self.export_additional_files(study, repertory, qlog, name=name)
self.export_study_description(study, repertory, qlog, name=name)
self._export_REP(study, repertory, files, qlog, name=name)
return True
except Exception as e:
logger.error(f"Failed to export study to {self._type}")
logger_exception(e)
return False
###########
# RESULTS #
###########
@timer
def read_bin(self, study, fname, results, qlog=None, name="0"):
logger.info(f"read_bin: Start reading '{fname}' ...")
with mage_file_open(fname, "r") as f:
def newline(): return np.fromfile(f, dtype=np.int32, count=1)
def endline(): return np.fromfile(f, dtype=np.int32, count=1)
def read_int(size): return np.fromfile(
f, dtype=np.int32, count=size)
def read_float(size): return np.fromfile(
f, dtype=np.float32, count=size)
def read_float64(size): return np.fromfile(
f, dtype=np.float64, count=size)
# Meta data (1st line)
newline()
data = read_int(3)
nb_reach = data[0]
nb_profile = data[1]
mage_version = data[2]
logger.debug(f"read_bin: nb_reach = {nb_reach}")
logger.debug(f"read_bin: nb_profile = {nb_profile}")
logger.debug(f"read_bin: mage_version = {mage_version}")
if mage_version <= 80:
msg = (
"Read BIN files: " +
f"Possible incompatible mage version '{mage_version}', " +
"please check your solver configuration..."
)
logger.warning(msg)
if qlog is not None:
qlog.put("[WARNING] " + msg)
results.set("solver_version", f"Mage8 ({mage_version})")
results.set("nb_reach", f"{nb_reach}")
results.set("nb_profile", f"{nb_profile}")
endline()
# Reach information (2nd line)
newline()
reachs = []
iprofiles = {}
profile_len = []
reach_offset = {}
data = read_int(2*nb_reach)
for i in range(nb_reach):
# Add results reach to reach list
r = results.river.add(i)
reachs.append(r)
# ID of first and last reach profiles
i1 = data[2*i] - 1
i2 = data[2*i+1] - 1
# Add profile id correspondance to reach
key = (i1, i2)
iprofiles[key] = r
# Profile ID offset
reach_offset[r] = i1
profile_len.append(i2-i1+1)
logger.debug(f"read_bin: iprofiles = {iprofiles}")
endline()
# X (3rd line)
newline()
_ = read_float(nb_profile)
endline()
# Z and Y (4th line)
newline()
_ = read_float(3*nb_profile)
endline()
# Data
newline()
def ip_to_r(i): return iprofiles[
next(
filter(
lambda k: k[0] <= i <= k[1],
iprofiles
)
)
]
def ip_to_ri(r, i): return i - reach_offset[r]
# check results and geometry consistency
for i, r in enumerate(reachs):
lp = len(r.profiles)
pl = profile_len[i]
if lp != pl:
logger.warning(f"geometry reach {i} has {lp} profiles")
logger.warning(f"results reach {i} has {pl} values")
return
ts = set()
end = False
while not end:
n = read_int(1)[0]
timestamp = read_float64(1)[0]
key = bytearray(np.fromfile(
f, dtype=np.byte, count=1)).decode()
data = read_float(n)
# logger.debug(f"read_bin: timestamp = {timestamp} sec")
ts.add(timestamp)
if key in ["Z", "Q"]:
for i, d in enumerate(data):
# Get reach corresponding to profile ID
reach = ip_to_r(i)
# Get profile id in reach
ri = ip_to_ri(reach, i)
# Set data for profile RI
reach.set(ri, timestamp, key, d)
if key == "Z":
profile = reach.profile(ri)
limits = profile.geometry.get_water_limits(d)
reach.set(
ri, timestamp,
"water_limits",
limits
)
endline()
end = newline().size <= 0
results.set("timestamps", ts)
ts_list = sorted(ts)
logger.info(f"compute tab...")
for r in reachs:
for p in r.profiles:
if not p.geometry.tab_up_to_date:
p.geometry.compute_tabulation()
logger.info(f"compute velocily...")
for r in reachs:
for t in ts_list:
for i, p in enumerate(r.profiles):
v = p.geometry.speed(
p.get_ts_key(t, "Q"),
p.get_ts_key(t, "Z")
)
r.set(i, t, "V", v)
logger.info(f"read_bin: ... end with {len(ts)} timestamp read")
@timer
def read_gra(self, study, repertory, results, qlog=None, name="0"):
if not study.river.has_sediment():
return
fname = os.path.join(repertory, f"{name}.GRA")
logger.info(f"read_gra: Start reading '{fname}' ...")
with mage_file_open(fname, "r") as f:
def newline(): return np.fromfile(f, dtype=np.int32, count=1)
def endline(): return np.fromfile(f, dtype=np.int32, count=1)
def read_int(size): return np.fromfile(
f, dtype=np.int32, count=size)
def read_float(size): return np.fromfile(
f, dtype=np.float32, count=size)
def read_float64(size): return np.fromfile(
f, dtype=np.float64, count=size)
# Meta data (1st line)
newline()
data = read_int(3)
nb_reach = data[0]
nb_profile = data[1]
mage_version = data[2]
logger.debug(f"read_gra: nb_reach = {nb_reach}")
logger.debug(f"read_gra: nb_profile = {nb_profile}")
logger.debug(f"read_gra: mage_version = {mage_version}")
if mage_version < 80:
msg = (
"Read GRA files: " +
f"Possible incompatible mage version '{mage_version}', " +
"please check your solver configuration..."
)
logger.warning(msg)
if qlog is not None:
qlog.put("[WARNING] " + msg)
results.set("gra_solver_version", f"Mage8 ({mage_version})")
results.set("gra_nb_reach", f"{nb_reach}")
results.set("gra_nb_profile", f"{nb_profile}")
endline()
# Reach information (2nd line)
newline()
reachs = []
iprofiles = {}
reach_offset = {}
data = read_int(2*nb_reach)
for i in range(nb_reach):
# Get results reach to reach list
r = results.river.reach(i)
reachs.append(r)
# ID of first and last reach profiles
i1 = data[2*i] - 1
i2 = data[2*i+1] - 1
# Add profile id correspondance to reach
key = (i1, i2)
iprofiles[key] = r
# Profile ID offset
reach_offset[r] = i1
logger.debug(f"read_gra: iprofiles = {iprofiles}")
endline()
# X (3rd line)
newline()
_ = read_float(nb_profile)
endline()
# npts (4th line)
newline()
_ = read_int(nb_profile)
endline()
# Data
def ip_to_r(i): return iprofiles[
next(
filter(
lambda k: k[0] <= i <= k[1],
iprofiles
)
)
]
def ip_to_ri(r, i): return i - reach_offset[r]
ts = set()
end = False
newline()
while not end:
n = read_int(1)[0]
timestamp = read_float64(1)[0]
with_bedload = read_int(1)[0]
logger.debug(f"read_gra: Number of cross section: {n}")
logger.debug(f"read_gra: Timestamp: {timestamp}")
logger.debug(f"read_gra: Type of bedload: {with_bedload}")
endline()
npts = [1] * n
if with_bedload == 1:
newline()
npts = read_int(n)
endline()
sum_npts = sum(npts)
logger.debug(f"read_gra: Number of points: {sum_npts}")
newline()
nsl = read_int(sum_npts)
logger.debug(f"read_gra: Number of sedimentary layers: {nsl}")
endline()
newline()
data = read_float64(sum(nsl) * 3)
endline()
ts.add(timestamp)
i_pts = 0
i_data = 0
# Loop on cross section
for i in range(n):
sec_sl = []
reach = ip_to_r(i)
p_i = ip_to_ri(reach, i)
# Loop on cross section points
for j in range(npts[i]):
sl = []
# Loop on sediment layers
for k in range(nsl[i_pts]):
h = data[i_data]
d50 = data[i_data + 1]
sigma = data[i_data + 2]
sl.append((h, d50, sigma))
i_data += 3
i_pts += 1
sec_sl.append(sl)
reach.set(p_i, timestamp, "sl", sec_sl)
logger.debug(
f"read_gra: data size = {len(data)} ({i_data} readed)"
)
end = newline().size <= 0
results.set("sediment_timestamps", ts)
logger.info(f"read_gra: ... end with {len(ts)} timestamp read")
@timer
def results(self, study, repertory,
qlog=None, name=None,
with_gra=True):
self._study = study
if name is None:
name = study.name.replace(" ", "_")
results = super(Mage8, self).results(study, repertory, qlog, name=name)
if results is None:
return None
if with_gra:
self.read_gra(study, repertory, results, qlog, name=name)
return results
class MageFake7(Mage8):
_type = "mage_fake7"
def __init__(self, name):
super(MageFake7, self).__init__(name)
self._type = "mage_fake7"
@timer
def results(self, study, repertory,
qlog=None, name=None,
with_gra=False):
results = super(MageFake7, self).results(
study, repertory,
qlog, name=name, with_gra=with_gra
)
return results