Pamhyr2/src/Solver/Mage.py

440 lines
12 KiB
Python

# -*- coding: utf-8 -*-
import os
from tools import timer
from Solver.ASolver import AbstractSolver
from Checker.Mage import MageNetworkGraphChecker
class Mage(AbstractSolver):
_type = "mage"
def __init__(self, name):
super(Mage, self).__init__(name)
self._type = "mage"
self._cmd_input = ""
self._cmd_solver = "@path @input -o @output"
self._cmd_output = ""
@classmethod
def default_parameters(cls):
lst = super(Mage, cls).default_parameters()
lst += [
("mage_min_timestep", "1.0"),
("mage_timestep_tra", "3600"),
("mage_timestep_bin", "0"),
# ("mage_timestep_melissa", "0"),
("mage_implication", "0.70"),
("mage_continuity_discretization", "S"),
("mage_qsj_discretization", "B"),
("mage_stop_criterion_iterations", "R"),
("mage_iter_type", "0"),
("mage_smooth_coef", "0"),
("mage_cfl_max", "-1."),
("mage_min_height", "0.1"),
("mage_max_niter", "10"),
("mage_timestep_reduction_factor", "2"),
("mage_precision_reduction_factor_Z", "1"),
("mage_precision_reduction_factor_Q", "1"),
("mage_niter_max_precision", "99"),
("mage_niter_before_switch", "99"),
("mage_max_froude", "1.5"),
("mage_diffluence_node_height_balance", "-1"),
("mage_compute_reach_volume_balance", "y"),
("mage_max_reach_volume_balance", "0.001"),
("mage_min_reach_volume_to_check", "1000.0"),
]
return lst
@classmethod
def checkers(cls):
lst = [
MageNetworkGraphChecker(connectivity = True),
MageNetworkGraphChecker(connectivity = False)
]
return lst
##########
# Export #
##########
def input_param(self):
return "0.REP"
def log_file(self):
return "0.TRA"
@timer
def _export_ST(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export ST file")
# Write header
edges = study.river.edges()
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
for edge in edges:
name = edge.name.replace(" ", "_")
if edge._name == "":
name = f"Reach_{edge.id}"
with open(os.path.join(repertory, f"{name}.ST"), "w+") as f:
files.append(f"{name}.ST")
f.write("* This file is generate by PAMHYR, please don't modify\n")
for profile in edge.reach.profiles:
num = f"{profile.num:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
kp = f"{profile.kp:>13.4f}"
name = profile.name
f.write(f"{num}{c1}{c2}{t}{kp} {name}\n")
for point in profile.points:
x = f"{point.x:>13.4f}"
y = f"{point.y:>13.4f}"
z = f"{point.z:>13.4f}"
n = point.name
f.write(f"{x}{y}{z} {n}\n")
f.write(f" 999.9990 999.9990 999.9990\n")
return files
@timer
def _export_BC(self, t, bounds, repertory, qlog):
files = []
if len(bounds) == 0:
return files
if qlog is not None:
qlog.put(f"Export {t} file")
with open(os.path.join(repertory, f"0.{t}"), "w+") as f:
files.append(f"0.{t}")
f.write("* This file is generate by PAMHYR, please don't modify\n")
for bound in bounds:
name = f"{bound.node.id:3}".replace(" ", "x")
f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
f.write(f"${name}\n")
header = bound.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
for d in bound.data:
f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
return files
@timer
def _export_bound_cond(self, study, repertory, qlog):
files = []
lst = study.river.boundary_condition
AVA = []
HYD = []
LIM = []
for tab in ["liquid", "solid", "suspenssion"]:
for bound in lst.get_tab(tab):
if bound.bctype == "ZD":
AVA.append(bound)
elif bound.bctype == "TD" or bound.bctype == "PC":
HYD.append(bound)
elif bound.bctype == "TZ":
LIM.append(bound)
files = files + self._export_BC("AVA", AVA, repertory, qlog)
files = files + self._export_BC("HYD", HYD, repertory, qlog)
files = files + self._export_BC("LIM", LIM, repertory, qlog)
return files
# @timer
# def _export_LC(self, lateral, repertory, qlog):
# files = []
# if qlog is not None:
# qlog.put(f"Export LAT file")
# with open(os.path.join(repertory, f"0.LAT"), "w+") as f:
# files.append(f"0.LAT")
# f.write("* This file is generate by PAMHYR, please don't modify\n")
# name = f"{lateral.node.id:3}".replace(" ", "x")
# f.write(f"* {lateral.node.name} ({name}) {lateral.bctype}\n")
# f.write(f"${name}\n")
# header = lateral.header
# f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
# for d in lateral.data:
# f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
# return files
# @timer
# def _export_lateral_contrib(self, study, repertory, qlog):
# files = []
# lst = study.river.lateral_contribution
# for tab in ["liquid", "solid", "suspenssion"]:
# for lateral in lst.get_tab(tab):
# files = files + self._export_LC(lateral, repertory, qlog)
# return files
@timer
def _export_RUG(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export RUG file")
# Write header
with open(os.path.join(repertory, "0.RUG"), "w+") as f:
files.append("0.RUG")
f.write("* This file is generate by PAMHYR, please don't modify\n")
edges = study.river.edges()
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
id = 1
for edge in edges:
frictions = edge.frictions
for friction in frictions.frictions:
num = f"{id:>3}"
bkp = f"{friction.begin_kp:>10.3f}"
ekp = f"{friction.end_kp:>10.3f}"
# if friction.begin_kp != friction.end_kp:
# print("TODO")
strickler = friction.begin_strickler
coef_1 = f"{strickler.minor:>10.3f}"
coef_2 = f"{strickler.medium:>10.3f}"
f.write(f"K{num} {bkp}{ekp}{coef_1}{coef_2}\n")
id += 1
return files
@timer
def _export_INI(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export INI file")
# Write header
with open(os.path.join(repertory, "0.INI"), "w+") as f:
has_ini = False
id = 1
reachs = study.river.edges()
reachs = list(
filter(
lambda e: e.is_enable(),
reachs
)
)
f.write("* This file is generate by PAMHYR, please don't modify\n")
# TODO put real date...
f.write(f"$ date en minutes : 0.00\n")
f.write(f"* IB IS discharge elevation kp\n")
id = 1
for reach in reachs:
cond = study.river.initial_conditions.get(reach)
data = cond.data
if len(data) == 0:
continue
has_ini = True
id_sec = 1
for d in data:
IR = f"{id:>3}"
IS = f"{id_sec:>3}"
discharge = f"{d['discharge']:>10.5f}"
elevation = f"{d['elevation']:>11.6f}"
kp = f"{d['kp']:>9.2f}"
f.write(f" {IR} {IS} {discharge}{elevation} {kp}\n")
id_sec += 1
id += 1
if has_ini:
files.append("0.INI")
return files
@timer
def _export_REP(self, study, repertory, files, qlog):
if qlog is not None:
qlog.put("Export REP file")
# Write header
with open(os.path.join(repertory, f"0.REP"), "w+") as f:
f.write("confirmation=non\n")
f.write("* This file is generate by PAMHYR, please don't modify\n")
for file in files:
EXT = file.split('.')[1]
f.write(f"{EXT} {file}\n")
f.write("* OUTPUT\n")
f.write(f"TRA 0.TRA\n")
f.write(f"BIN 0.BIN\n")
@timer
def export(self, study, repertory, qlog = None):
self._export_ST(study, repertory, qlog)
return True
##########
# MAGE 7 #
##########
class Mage7(Mage):
_type = "mage7"
def __init__(self, name):
super(Mage7, self).__init__(name)
self._type = "mage7"
@classmethod
def default_parameters(cls):
lst = super(Mage7, cls).default_parameters()
return lst
##########
# MAGE 8 #
##########
class Mage8(Mage):
_type = "mage8"
def __init__(self, name):
super(Mage8, self).__init__(name)
self._type = "mage8"
@classmethod
def default_parameters(cls):
lst = super(Mage8, cls).default_parameters()
# Insert new parameters at specific position
names = list(map(lambda t: t[0], lst))
i = names.index("mage_precision_reduction_factor_Q")
lst.insert(i+1, ("mage_precision_reduction_factor_r", "1"))
return lst
##########
# Export #
##########
@timer
def _export_PAR(self, study, repertory, qlog = None):
files = []
if qlog is not None:
qlog.put("Export PAR file")
with open(os.path.join(repertory, "0.PAR"), "w+") as f:
files.append("0.PAR")
f.write("* This file is generate by PAMHYR, please don't modify\n")
params = study.river.get_params(self.type).parameters
for p in params:
name = p.name\
.replace("all_", "")\
.replace("mage_", "")
value = p.value
f.write(f"{name} {value}\n")
return files
@timer
def _export_NET(self, study, repertory, qlog = None):
files = []
if qlog is not None:
qlog.put("Export NET file")
with open(os.path.join(repertory, "0.NET"), "w+") as f:
files.append("0.NET")
f.write("* This file is generate by PAMHYR, please don't modify\n")
edges = study.river.edges()
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
for e in edges:
name = e.name.replace(" ", "_")
if e._name == "":
name = f"Reach_{e.id}"
id = f"Bief_{e.id+1}"
n1 = f"{e.node1.id:3}".replace(" ", "x")
n2 = f"{e.node2.id:3}".replace(" ", "x")
file = name + ".ST"
f.write(f"{id} {n1} {n2} {file}\n")
return files
@timer
def export(self, study, repertory, qlog = None):
files = []
self._export_ST(study, repertory, qlog)
files = files + self._export_PAR(study, repertory, qlog)
files = files + self._export_NET(study, repertory, qlog)
files = files + self._export_bound_cond(study, repertory, qlog)
files = files + self._export_RUG(study, repertory, qlog)
files = files + self._export_INI(study, repertory, qlog)
self._export_REP(study, repertory, files, qlog)
return True