# RubarBE.py -- Pamhyr # Copyright (C) 2024-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import logging import numpy as np from tools import timer, trace, old_pamhyr_date_to_timestamp, logger_exception from Solver.CommandLine import CommandLineSolver from Model.Results.Results import Results from Model.Results.River.River import River, Reach, Profile logger = logging.getLogger() class Rubar3(CommandLineSolver): _type = "rubar3" def __init__(self, name): super(Rubar3, self).__init__(name) self._type = "rubar3" self._cmd_input = "" self._cmd_solver = "@path @args @input" self._cmd_output = "" @classmethod def default_parameters(cls): # lst = super(Rubar3, cls).default_parameters() lst = [ ("rubarbe_cfl", "0.50000E+00"), ("rubarbe_condam", "1"), ("rubarbe_condav", "3"), ("rubarbe_regime", "0"), ("rubarbe_iodev", "n"), ("rubarbe_iodebord", ""), ("rubarbe_iostockage", ""), ("rubarbe_iopdt", "y"), ("rubarbe_iovis", "n"), ("rubarbe_rep", "n"), ("rubarbe_tinit", "000:00:00:00"), ("rubarbe_tmax", "999:99:99:00"), ("rubarbe_tiopdt", "000:00:00:00"), ("rubarbe_dt", "5.0"), ("rubarbe_ts", "999:99:99:00"), ("rubarbe_dtsauv", "00:00:00:05"), ("rubarbe_psave", "00:00:00:05"), ("rubarbe_fdeb1", "1"), ("rubarbe_fdeb2", "10"), ("rubarbe_fdeb3", "100"), ("rubarbe_tf_1", "y"), ("rubarbe_tf_2", "y"), ("rubarbe_tf_3", "y"), ("rubarbe_tf_4", "y"), ("rubarbe_tf_5", "y"), ("rubarbe_tf_6", "n"), ("rubarbe_trased", "n"), ("rubarbe_optfpc", "0"), # trased parameters ("rubarbe_ros", "2650.0"), ("rubarbe_dm", "0.1"), ("rubarbe_segma", "1.0"), # Sediment parameters ("rubarbe_sediment_ros", "2650.0"), ("rubarbe_sediment_por", "0.4"), ("rubarbe_sediment_dcharg", "0.0"), ("rubarbe_sediment_halfa", "1.0"), ("rubarbe_sediment_mult_1", "1.0"), ("rubarbe_sediment_mult_2", ""), ("rubarbe_sediment_mult_3", ""), ("rubarbe_sediment_mult_4", ""), ("rubarbe_sediment_mult_5", ""), ("rubarbe_sediment_visc", "0.047"), ("rubarbe_sediment_opts", "6"), ("rubarbe_sediment_odchar", "0"), ("rubarbe_sediment_unisol", "1"), ("rubarbe_sediment_typdef", "3"), ("rubarbe_sediment_depot", "2"), ("rubarbe_sediment_choixc", "2"), ("rubarbe_sediment_option", "2"), ("rubarbe_sediment_capsol", "1"), ("rubarbe_sediment_bmiu", "0.85"), ("rubarbe_sediment_demix", "0"), ("rubarbe_sediment_defond", "1"), ("rubarbe_sediment_varcons", "1"), ("rubarbe_sediment_dchard", "0.0"), ("rubarbe_sediment_dchars", "0.0"), ] return lst @classmethod def checkers(cls): lst = [ ] return lst ########## # Export # ########## def cmd_args(self, study): lst = super(Rubar3, self).cmd_args(study) return lst def input_param(self): name = self._study.name return f"{name}" def output_param(self): name = self._study.name return f"profil.{name}" def log_file(self): name = self._study.name return f"geomac-i.{name}" def export(self, study, repertory, qlog=None): self._study = study name = study.name.replace(" ", "_") self._export_donnee(study, repertory, qlog, name=name) self._export_ts(study, repertory, qlog, name=name) self._export_geomac_i(study, repertory, qlog, name=name) self._export_mail(study, repertory, qlog, name=name) self._export_devers(study, repertory, qlog, name=name) self._export_condin(study, repertory, qlog, name=name) self._export_stricklers(study, repertory, qlog, name=name) self._export_hydro(study, repertory, qlog, name=name) self._export_condav(study, repertory, qlog, name=name) # self._export_abshyd(study, repertory, qlog, name=name) return True def _export_donnee(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export DONNEE file") with open( os.path.join( repertory, f"donnee.{name}" ), "w+" ) as f: params = filter( lambda p: "rubarbe_sediment_" not in p.name, study.river.get_params(self._type).parameters ) it = iter(params) line = 0 while line < 27: param = next(it) name = param.name value = param.value if "all_" in name: continue if value != "": # Value format if value.count(':') == 3: value = old_pamhyr_date_to_timestamp(value) value = f"{value:>12.5e}".upper() if value.count('.') == 1: value = f"{float(value):>12.5e}".upper() if value == "y" or value == "n": value = "O" if value == "y" else "N" # Write value f.write(f"{name:<50}{value}") # Add values of 'rubarbe_iodebord' and # 'rubarbe_iostockage' if name == "rubarbe_iodev": v2 = next(it).value v3 = next(it).value # f.write(f"{v2}{v3}") # New line f.write(f"\n") line += 1 def _export_ts(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export TS file") with open( os.path.join( repertory, f"ts.{name}" ), "w+" ) as f: def float_format(string): if "." in string: return f"{float(string):>10.0f}" return "" params = filter( lambda p: "rubarbe_sediment_" in p.name, study.river.get_params(self.type).parameters ) it = iter(params) line = 0 while line < 20: param = next(it) name = param.name value = param.value if value != "": # Value format if value.count('.') == 1: value = f"{float_format(value)}" else: value = f"{value:>10}" # Write value f.write(f"{name:<50}{value}") # Add values of 'rubarbe_iodebord' and # 'rubarbe_iostockage' if name == "rubarbe_sediment_mult_1": m2 = f"{float_format(next(it).value)}" m3 = f"{float_format(next(it).value)}" m4 = f"{float_format(next(it).value)}" m5 = f"{float_format(next(it).value)}" f.write(f"{m2}{m3}{m4}{m5}") # New line f.write(f"\n") line += 1 def _export_geomac_i(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export GEOMAC-i file") with open( os.path.join( repertory, f"geomac-i.{name}" ), "w+" ) as f: for edge in study.river.enable_edges(): reach = edge.reach n_profiles = len(reach) time = 0.0 f.write(f"{n_profiles:>5} {time:>11.3f}\n") ind = 1 for profile in reach.profiles: rk = profile.rk n_points = len(profile) f.write(f"{ind:>4} {rk:>11.3f} {n_points:>4}\n") station = profile.get_station() for i, point in enumerate(profile.points): label = point.name.lower() if label != "": if label[0] == "r": label = label[1].upper() else: label = " " else: label = " " y = station[i] z = point.z dcs = 1.0 scs = 1.0 tmcs = 0.0 f.write( f"{label} {y:>11.5f}" + f"{z:>13.5f}{dcs:>15.10f}" + f"{scs:>15.10f}{tmcs:>15.5f}" + "\n" ) ind += 1 def _export_mail(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export MAIL file") with open( os.path.join( repertory, f"mail.{name}" ), "w+" ) as f: for edge in study.river.enable_edges(): reach = edge.reach lm = len(reach) + 1 f.write(f"{lm:>13}\n") for mails in [reach.inter_profiles_rk(), reach.get_rk()]: ind = 0 for mail in mails: f.write(f"{mail:15.3f}") ind += 1 if ind % 3 == 0: f.write("\n") if ind % 3 != 0: f.write("\n") def _export_abshyd(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export ABSHYD file") with open( os.path.join( repertory, f"abshyd.{name}" ), "w+" ) as f: reach_ind = 1 for edge in study.river.enable_edges(): reach = edge.reach lm = len(reach) + 1 f.write(f"{lm:>13}\n") ind = 1 for mail in reach.inter_profiles_rk(): f.write(f"{ind:>4} {mail:15.3f} {reach_ind:>4}\n") ind += 1 reach_ind += 1 def _export_devers(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export DEVERS file") with open( os.path.join( repertory, f"devers.{name}" ), "w+" ) as f: reach_ind = 1 for edge in study.river.enable_edges(): reach = edge.reach lm = len(reach) + 1 f.write(f"{lm:>13}\n") ind = 1 for mail in reach.inter_profiles_rk(): f.write(f"{ind:>4} {0.0:15.3f} {0.0:15.3f} {1.0:>15.3f}\n") ind += 1 reach_ind += 1 def _export_stricklers(self, study, repertory, qlog, name="0"): self._export_frot(study, repertory, qlog, name=name, version="") self._export_frot(study, repertory, qlog, name=name, version="2") def _export_frot(self, study, repertory, qlog, name="0", version=""): if qlog is not None: qlog.put(f"Export FROT{version} file") with open( os.path.join( repertory, f"frot{version}.{name}" ), "w+" ) as f: for edge in study.river.enable_edges(): reach = edge.reach lm = len(reach) + 1 f.write(f"{lm:>6}\n") lst = list(filter( lambda f: f.is_full_defined(), edge.frictions.frictions )) rk_min = 9999999.9 rk_max = -9999999.9 coeff_min = -1.0 coeff_max = -1.0 for s in lst: # TODO optimise ? if s.begin_rk > rk_max: rk_max = s.begin_rk coeff_max = s.begin_strickler if s.begin_rk < rk_min: rk_min = s.begin_rk coeff_min = s.begin_strickler if s.end_rk > rk_max: rk_max = s.end_rk coeff_max = s.end_strickler if s.end_rk < rk_min: rk_min = s.end_rk coeff_min = s.end_strickler def get_stricklers_from_rk(rk, lst): coeff = None if rk > rk_max: coeff = coeff_max elif rk < rk_min: coeff = coeff_min else: for s in lst: print(s.begin_rk, s.end_rk) if (rk >= s.begin_rk and rk <= s.end_rk or rk <= s.begin_rk and rk >= s.end_rk): coeff = s.begin_strickler # TODO: inerpolate break # TODO interpolation if rk is not in frictons if coeff is None: logger.error( "Study frictions are not fully defined" ) return None return coeff.medium if version == "2" else coeff.minor ind = 1 for mail in edge.reach.inter_profiles_rk(): coef = get_stricklers_from_rk(mail, lst) if coef is not None: f.write(f"{ind:>6} {coef:>12.5f}") ind += 1 f.write("\n") def _export_condin(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export CONDIN file") with open( os.path.join( repertory, f"condin.{name}" ), "w+" ) as f: for edge in study.river.enable_edges(): reach = edge.reach f.write(f"{0.0}\n") ics = study.river.initial_conditions.get(edge) data = self._export_condin_init_data(ics) profiles = reach.profiles first = profiles[0] last = profiles[-1] if first not in data or last not in data: print(data) logger.error( "Study initial condition is not fully defined" ) return f_h_s = self._export_condin_profile_height_speed(first, data) l_h_s = self._export_condin_profile_height_speed(last, data) # First mail f.write(f"{1:>5} {f_h_s[0]} {f_h_s[1]}\n") ind = 2 it = iter(profiles) prev = next(it) prev_h, prev_s = f_h_s for profile in it: if profile not in data: ind += 1 continue cur_h, cur_s = self._export_condin_profile_height_speed( profile, data ) # Mean of height and speed h = (prev_h + cur_h) / 2 s = (prev_s + cur_s) / 2 f.write(f"{ind:>5} {h} {s}\n") prev_h, prev_s = cur_h, cur_s ind += 1 # Last mail f.write(f"{ind:>5} {f_h_s[0]} {f_h_s[1]}\n") def _export_condin_init_data(self, ics): data = {} for d in ics.data: data[d['rk']] = ( d['elevation'], d['discharge'], ) return data def _export_condin_profile_height_speed(self, profile, data): z = data[profile][0] q = data[profile][1] height = z - profile.z_min() speed = profile.speed(q, z) return height, speed def _export_hydro(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export HYDRO file") with open( os.path.join( repertory, f"hydro.{name}" ), "w+" ) as f: bcs = [] for edge in study.river.enable_edges(): for bound in study.river.boundary_condition.get_tab("liquid"): # BC is an hydrogramme if bound.bctype == "TD" or bound.bctype == "PC": # BC is on input node of this reach if bound.node == edge.node1: bcs.append(bound) for bc in bcs: f.write(f"{len(bc)}\n") for d0, d1 in bc.data: f.write(f"{d1} {d0}\n") def _export_condav(self, study, repertory, qlog, name="0"): if qlog is not None: qlog.put("Export CONDAV file") with open( os.path.join( repertory, f"condav.{name}" ), "w+" ) as f: bcs = [] for edge in study.river.enable_edges(): for bound in study.river.boundary_condition.get_tab("liquid"): # BC is an hydrogramme if bound.bctype == "ZD" or bound.bctype == "TZ": # BC is on input node of this reach if bound.node == edge.node2: bcs.append(bound) for bc in bcs: f.write(f"{len(bc)}\n") for d0, d1 in bc.data: f.write(f"{d1} {d0}\n") def read_profil(self, study, fname, results, qlog=None, name="0"): logger.info(f"read: profil.{name}") with open(fname, "r") as f: reachs = [] for i, edge in enumerate(study.river.enable_edges()): reach = edge.reach # Add results reach to reach list res_reach = results.river.add(i) reachs.append((res_reach, len(reach) + 1)) def read_data_line(f): line = f.readline().split() return tuple(map(float, line)) def set_and_compute_limites(reach, ind, z, q, s): reach.set(ind, timestamp, "Z", z) reach.set(ind, timestamp, "Q", q) reach.set(ind, timestamp, "V", s) profile = reach.profile(ind) limits = profile.geometry.get_water_limits(z) reach.set( ind, timestamp, "water_limits", limits ) ts = set() end = False while True: line = f.readline() if line == "": results.set("timestamps", ts) return timestamp = float(line) ts.add(timestamp) logger.info(f"read: profil.{name}: timestamp = {timestamp}") for reach, lm in reachs: # First profile h, s, q, z = read_data_line(f) set_and_compute_limites(reach, 0, z+h, q, s) # For each profile ind = 1 ph, ps, pq, pz = read_data_line(f) while ind < lm - 2: h, s, q, z = read_data_line(f) set_and_compute_limites( reach, ind, (ph + pz + ph + h) / 2, (pz + s) / 2, (pq + q) / 2 ) ph = h ps = s pq = q pz = z ind += 1 # Last profile h, s, q, z = read_data_line(f) set_and_compute_limites(reach, ind, z+h, q, s) @timer def results(self, study, repertory, qlog=None, name="0"): results = Results( study=study, solver=self, repertory=repertory, name=name, ) results_file = f"profil.{name}" fname = os.path.join(repertory, results_file) if not os.path.isfile(fname): logger.warning(f"Result file {results_file} does not exist") return None try: self.read_profil(study, fname, results, qlog, name=name) except Exception as e: logger.error(f"Failed to read results") logger_exception(e) return None return results class RubarBE(Rubar3): _type = "rubarbe" def __init__(self, name): super(RubarBE, self).__init__(name) self._type = "rubarbe" self._cmd_input = "" self._cmd_solver = "@path @args @input" self._cmd_output = ""