# Mage.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import os import logging import tempfile from ctypes import ( cdll, byref, Structure, c_char_p, c_wchar_p, create_string_buffer, POINTER, c_void_p, c_int, c_double, c_bool ) from PyQt5.QtCore import QProcess from tools import logger_color_red, logger_color_reset, logger_exception from Meshing.AMeshingTool import AMeshingTool logger = logging.getLogger() class MeshingWithMage(AMeshingTool): def __init__(self): super(MeshingWithMage, self).__init__() self._init_bief_lib() def _init_bief_lib(self): self._bief_lib = cdll.LoadLibrary(self._lib_path()) self._init_c_init_bief_from_geo_file() self._init_c_get_nb_sections() self._init_c_get_nb_points_section() self._init_c_set_bief_name() self._init_c_st_to_m_compl() self._init_c_interpolate_profils_pas_transversal() self._init_c_purge() self._init_c_output_bief() @classmethod def _lib_path(cls): ext = "so" if os.name == "posix" else "dll" return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "..", "..", "mage", f"libbief.{ext}" ) ) @classmethod def _path(cls): return cls._exe_path() def _init_c_init_bief_from_geo_file(self): self._c_init_bief_from_geo_file = getattr( self._bief_lib, 'c_init_bief_from_geo_file' ) self._c_init_bief_from_geo_file.argtypes = [ c_char_p, POINTER(c_int), POINTER(c_int) ] self._c_init_bief_from_geo_file.restype = None def _init_c_get_nb_sections(self): self._c_get_nb_sections = getattr(self._bief_lib, 'c_get_nb_sections') self._c_get_nb_sections.argtypes = [POINTER(c_int)] self._c_get_nb_sections.restype = None def _init_c_get_nb_points_section(self): self._c_get_nb_points_section = getattr( self._bief_lib, 'c_get_nb_points_section' ) self._c_get_nb_points_section.argtypes = [ POINTER(c_int), POINTER(c_int) ] self._c_get_nb_points_section.restype = None def _init_c_set_bief_name(self): self._c_set_bief_name = getattr(self._bief_lib, 'c_set_bief_name') self._c_set_bief_name.argtypes = [c_char_p] self._c_set_bief_name.restype = None def _init_c_st_to_m_compl(self): self._c_st_to_m_compl = getattr(self._bief_lib, 'c_st_to_m_compl') self._c_st_to_m_compl.argtypes = [POINTER(c_int), c_char_p, c_char_p] self._c_st_to_m_compl.restype = None def _init_c_interpolate_profils_pas_transversal(self): self._c_interpolate_profils_pas_transversal = getattr( self._bief_lib, 'c_interpolate_profils_pas_transversal' ) self._c_interpolate_profils_pas_transversal.argtypes = [ POINTER(c_int), POINTER(c_int), c_char_p, c_char_p, POINTER(c_double), POINTER(c_bool), POINTER(c_int), POINTER(c_bool) ] self._c_interpolate_profils_pas_transversal.restype = None def _init_c_purge(self): self._c_purge = getattr(self._bief_lib, 'c_purge') self._c_purge.argtypes = None self._c_purge.restype = None def _init_c_output_bief(self): self._c_output_bief = getattr(self._bief_lib, 'c_output_bief') self._c_output_bief.argtypes = [c_char_p] self._c_output_bief.restype = None ##################### # Binding functions # ##################### def init_bief_from_geo_file(self, name, with_charriage, with_water): cname = create_string_buffer(name.encode()) self._c_init_bief_from_geo_file( cname, byref(c_int(with_charriage)), byref(c_int(with_water)) ) def get_nb_sections(self): nb_sections = c_int(0) self._c_get_nb_sections(byref(nb_sections)) return nb_sections.value def get_nb_points_section(self, section): nb_points = c_int(0) self._c_get_nb_points_section( byref(c_int(section)), byref(nb_points) ) return nb_points.value def set_bief_name(self, name): cname = create_string_buffer(name.encode()) self._c_set_bief_name(cname) def st_to_m_compl(self, npoints, tag1=' ', tag2=' '): cnpoints = c_int(npoints) ctag1 = create_string_buffer(tag1.encode()) ctag2 = create_string_buffer(tag2.encode()) self._c_st_to_m_compl(byref(cnpoints), ctag1, ctag2) def interpolate_profils_pas_transversal(self, limite1, limite2, directrice1, directrice2, pas, lplan=False, lm=3, lineaire=False): climite1 = c_int(limite1) climite2 = c_int(limite2) cpas = c_double(pas) clplan = c_bool(lplan) clm = c_int(lm) clineaire = c_bool(lineaire) cdirectrice1 = create_string_buffer(directrice1.encode()) cdirectrice2 = create_string_buffer(directrice2.encode()) self._c_interpolate_profils_pas_transversal( byref(climite1), byref(climite2), cdirectrice1, cdirectrice2, byref(cpas), byref(clplan), byref(clm), byref(clineaire) ) def output_bief(self, name): cname = create_string_buffer(name.encode()) self._c_output_bief(cname) ########### # Meshing # ########### def meshing(self, reach, step: float = 50): if reach is None or len(reach.profiles) == 0: return reach with tempfile.TemporaryDirectory() as tmp: st_file = self.export_reach_to_st(reach, tmp) m_file = st_file.rsplit(".ST", 1)[0] + ".M" self.load_st_file(st_file) ns, npts_max = self.get_reach_stat() gl = reach.compute_guidelines() # we make sure that the lines are in the left-to-right order guide_list = [ x.name for x in reach.profiles[0].named_points() if x.name in gl[0] ] self.complete_cross_section(guide_list) self.interpolate_cross_section(ns, step) self.purge() self.export_to_m(m_file) self.import_m_file(reach, m_file) return reach def export_reach_to_st(self, reach, tmp): tmp_st = os.path.join(tmp, "meshing.ST") logger.debug(f"meshing: Export ST to {tmp_st}") reach.export_reach(tmp_st) return tmp_st def load_st_file(self, st): self.init_bief_from_geo_file(st, 0, 0) self.set_bief_name("tmp") def get_reach_stat(self): ns = self.get_nb_sections() npts_max = max( map( lambda i: self.get_nb_points_section(i), range(1, ns) ) ) return ns, npts_max def complete_cross_section(self, gl=[]): gl1 = ["un"] + gl gl2 = gl + ["np"] for gls in zip(gl1, gl2): self.st_to_m_compl(0, gls[0], gls[1]) def interpolate_cross_section(self, ns, step: float): self.interpolate_profils_pas_transversal( 1, ns, 'un', 'np', step ) def purge(self): self._c_purge() def export_to_m(self, m): self.output_bief(m) def import_m_file(self, reach, m): reach.purge() logger.debug(f"meshing: Import geometry from {m}") reach.import_geometry(m) class MeshingWithMageMailleurTT(AMeshingTool): def __init__(self): super(MeshingWithMageMailleurTT, self).__init__() @classmethod def _exe_path(cls): ext = "" if os.name == "posix" else ".exe" return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "..", "mage8", f"mailleurTT{ext}" ) ) @classmethod def _path(cls): return cls._exe_path() ########### # Meshing # ########### def meshing(self, reach, step: float = 50, limites=[-1, -1], origin=0, directrices=['un', 'np'], lplan: bool = False, lm: int = 3, linear: bool = False, origin_value=0.0): if reach is None or len(reach.profiles) == 0: return reach with tempfile.TemporaryDirectory() as tmp: logger.debug(f"temp file: {tmp}") st_file = self.export_reach_to_st(reach, tmp) m_file = st_file.rsplit(".ST", 1)[0] + ".M" proc = QProcess() proc.setWorkingDirectory(tmp) # Mage section index start at 1 origin += 1 limites[0] += 1 limites[1] += 1 lplan = 1 if lplan else 0 linear = 1 if linear else 0 logger.info( f"! {self._exe_path()} " + f"{st_file} {m_file} " + f"mesh " + f"{str(step)} " + f"{limites[0]} {limites[1]} " + f"{directrices[0]} {directrices[1]} " + f"{lplan} {lm} {linear} " + f"{origin} " ) proc.start( self._exe_path(), list( map( str, [ st_file, m_file, "mesh", step, limites[0], limites[1], directrices[0], directrices[1], lplan, lm, linear, origin, origin_value ] ) ) ) proc.waitForFinished() if proc.exitCode() != 0: logger.error( f"{logger_color_red()}" + f"MailleurTT error: {proc.exitCode()}" + f"{logger_color_reset()}" ) outputs = proc.readAllStandardOutput() logger.debug(outputs) errors = proc.readAllStandardError() logger.error( f"{logger_color_red()}{errors}{logger_color_reset()}" ) return reach self.import_m_file(reach, m_file) return reach def update_rk(self, reach, step: float = 50, limites=[-1, -1], origin=0, directrices=['un', 'np'], lplan: bool = False, lm: int = 3, linear: bool = False, origin_value=0.0, orientation=0): if reach is None or len(reach.profiles) == 0: return reach with tempfile.TemporaryDirectory() as tmp: logger.debug(f"temp file: {tmp}") st_file = self.export_reach_to_st(reach, tmp) m_file = st_file.rsplit(".ST", 1)[0] + ".M" os.sync() proc = QProcess() proc.setWorkingDirectory(tmp) # Mage section indices start at 1 origin += 1 limites[0] += 1 limites[1] += 1 lplan = 1 if lplan else 0 linear = 1 if linear else 0 logger.info( f"! {self._exe_path()} " + f"{st_file} {m_file} " + f"update_rk " + f"{str(step)} " + f"{limites[0]} {limites[1]} " + f"{directrices[0]} {directrices[1]} " + f"{orientation} {lm} {linear} " + f"{origin} " ) proc.start( self._exe_path(), list( map( str, [ st_file, m_file, "update_kp", step, limites[0], limites[1], directrices[0], directrices[1], orientation, lm, linear, origin, origin_value ] ) ) ) proc.waitForFinished() if proc.exitCode() != 0: logger.error( f"{logger_color_red()}" + f"MailleurTT error: {proc.exitCode()}" + f"{logger_color_reset()}" ) outputs = proc.readAllStandardOutput() logger.debug(outputs) errors = proc.readAllStandardError() logger.error( f"{logger_color_red()}{errors}{logger_color_reset()}" ) return reach self.import_m_file(reach, m_file) return reach def export_reach_to_st(self, reach, tmp): tmp_st = os.path.join(tmp, "meshing.ST") logger.debug(f"meshing: Export ST to {tmp_st}") reach.export_reach(tmp_st) return tmp_st def import_m_file(self, reach, m): if os.path.exists(m): reach.purge() logger.debug(f"meshing: Import geometry from {m}") reach.import_geometry(m) else: logger.error(f"meshing: Meshed geometry file {m} do not exists")