# HydraulicStructures.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from functools import reduce from tools import trace, timer, old_pamhyr_date_to_timestamp from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario from Model.Except import NotImplementedMethodeError from Model.HydraulicStructures.Basic.HydraulicStructures import BasicHS from Model.HydraulicStructures.Basic.Types import ( NotDefined, ) logger = logging.getLogger() class HydraulicStructure(SQLSubModel): _sub_classes = [ BasicHS, ] def __init__(self, id: int = -1, name: str = "", status=None, owner_scenario=-1): super(HydraulicStructure, self).__init__( id=id, status=status, owner_scenario=owner_scenario ) self._name = name self._input_section = None self._output_section = None self._input_reach = None self._output_reach = None self._enabled = True self._data = [] @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE hydraulic_structures{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, name TEXT NOT NULL, enabled BOOLEAN NOT NULL, input_reach INTEGER, output_reach INTEGER, input_section INTEGER, output_section INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(input_reach) REFERENCES river_reach(pamhyr_id), FOREIGN KEY(output_reach) REFERENCES river_reach(pamhyr_id), FOREIGN KEY(input_section) REFERENCES geometry_profileXYZ(pamhyr_id), FOREIGN KEY(output_section) REFERENCES geometry_profileXYZ(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) if ext == "_tmp": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") rl = int(release) if major == minor == "0": if rl < 6: cls._db_create(execute) return True if rl < 11: for v in ["input", "output"]: execute( f""" ALTER TABLE hydraulic_structures RENAME COLUMN {v}_kp TO {v}_rk """ ) cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if rl < 1: cls._db_update_to_0_1_1(execute, data) if rl < 2: execute( "ALTER TABLE hydraulic_structures " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "hydraulic_structures" reachs = data['id2pid']['river_reach'] cls.update_db_add_pamhyr_id(execute, table, data) cls._db_update_to_0_2_0_set_reach_pid(execute, table, reachs) cls._db_update_to_0_1_1( execute, data, origin_version="0.0.*" ) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, name, enabled, input_section, output_section, " + "input_reach, output_reach, scenario) " + "SELECT pamhyr_id, name, enabled, " + "input_section, output_section, " + "input_reach, output_reach, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") @classmethod def _db_update_to_0_2_0_set_reach_pid(cls, execute, table, reachs): els = execute( f"SELECT pamhyr_id, input_reach, output_reach FROM {table}" ) for row in els: it = iter(row) pid = next(it) in_reach_id = next(it) out_reach_id = next(it) if in_reach_id == -1: return if out_reach_id == -1: out_reach_id = in_reach_id execute( f"UPDATE {table} " + f"SET input_reach = {reachs[in_reach_id]}, " + f"output_reach = {reachs[out_reach_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_update_to_0_1_1(cls, execute, data, origin_version="0.1.0"): for v in ["input", "output"]: execute( "ALTER TABLE hydraulic_structures " + f"ADD COLUMN {v}_section INTEGER" ) cls._db_update_to_0_1_1_assoc_section_from_rk( execute, "hydraulic_structures", reach_column=f"{v}_reach", rk_column=f"{v}_rk", section_column=f"{v}_section", origin_version=origin_version ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data["status"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return [] table = execute( "SELECT pamhyr_id, deleted, name, enabled, " + "input_section, output_section, " + "input_reach, output_reach, scenario " + "FROM hydraulic_structures " + f"WHERE scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" ) for row in table: new.append(cls._db_load_new(execute, data, row)) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new @classmethod def _db_load_new(cls, execute, data, row): it = iter(row) status = data["status"] scenario = data["scenario"] loaded = data['loaded_pid'] hs_id = next(it) deleted = (next(it) == 1) name = next(it) enabled = (next(it) == 1) input_section_id = next(it) input_section_id = ( -1 if input_section_id is None else input_section_id ) output_section_id = next(it) output_section_id = ( -1 if output_section_id is None else output_section_id ) input_reach_id = next(it) output_reach_id = next(it) owner_scenario = next(it) hs = cls( id=hs_id, name=name, status=status, owner_scenario=owner_scenario ) if deleted: hs.set_as_deleted() hs.enabled = enabled hs.input_reach, hs.output_reach = reduce( lambda acc, n: ( n if n.pamhyr_id == input_reach_id else acc[0], n if n.pamhyr_id == output_reach_id else acc[1] ), data["edges"], [None, None] ) sections = [] if hs.input_reach is not None: sections += hs.input_reach.reach.profiles if hs.output_reach is not None: sections += hs.output_reach.reach.profiles hs.input_section, hs.output_section = reduce( lambda acc, s: ( s if s.pamhyr_id == input_section_id else acc[0], s if s.pamhyr_id == output_section_id else acc[1] ), sections, [None, None] ) loaded.add(hs_id) data['hs_id'] = hs_id hs._data = BasicHS._db_load(execute, data) return hs def _db_save(self, execute, data=None): execute( "DELETE FROM hydraulic_structures " + f"WHERE pamhyr_id = {self.pamhyr_id} " + f"AND scenario = {self._status.scenario_id}" ) input_reach_id = -1 if self._input_reach is not None: input_reach_id = self._input_reach.pamhyr_id output_reach_id = -1 if self._output_reach is not None: output_reach_id = self._output_reach.pamhyr_id input_section = 'NULL' if self.input_section is not None: input_section = self.input_section.pamhyr_id output_section = 'NULL' if self.output_section is not None: output_section = self.output_section.pamhyr_id execute( "INSERT INTO " + "hydraulic_structures(" + " pamhyr_id, deleted, name, enabled, " + " input_section, output_section, " + " input_reach, output_reach, " + " scenario" + ") " + "VALUES (" + f"{self.pamhyr_id}, " + f"{self._db_format(self.is_deleted())}, " + f"'{self._db_format(self._name)}', " + f"{self._db_format(self.enabled)}, " + f"{input_section}, {output_section}, " + f"{input_reach_id}, {output_reach_id}, " + f"{self._status.scenario_id}" ")" ) data['hs_id'] = self.pamhyr_id execute( "DELETE FROM hydraulic_structures_basic " + f"WHERE hs = {self.pamhyr_id} " + f"AND scenario = {self._status.scenario_id}" ) for basic in self._data: basic._db_save(execute, data) return True def __len__(self): return len(self.lst) @property def lst(self): return list( filter( lambda bhs: not bhs.is_deleted(), self._data, ) ) @property def name(self): if self._name == "": return f"HS #{self.pamhyr_id}" return self._name @name.setter def name(self, name): self._name = name self.modified() @property def input_rk(self): if self._input_section is None: return None return self._input_section @property def output_rk(self): if self._output_section is None: return None return self._output_section @property def input_section(self): return self._input_section @input_section.setter def input_section(self, input_section): self._input_section = input_section self.modified() @property def output_section(self): return self._output_section @output_section.setter def output_section(self, output_section): self._output_section = output_section self.modified() @property def enabled(self): return self._enabled @enabled.setter def enabled(self, enabled): self._enabled = enabled self.modified() @property def input_reach(self): return self._input_reach @input_reach.setter def input_reach(self, input_reach): self._input_reach = input_reach self.modified() @property def output_reach(self): return self._output_reach @output_reach.setter def output_reach(self, output_reach): self._output_reach = output_reach self.modified() @property def basic_structures(self): return self.lst.copy() def basic_structure(self, index: int): return self.lst[index] def add(self, index: int): value = NotDefined(status=self._status) self._data.insert(index, value) self.modified() return value def insert(self, index: int, value: BasicHS): if value in self._data: value.set_as_not_deleted() else: self._data.insert(index, value) self.modified() def hard_delete_i(self, indexes): self._data = list( map( lambda e: e[1], filter( lambda e: e[0] not in indexes, enumerate(self._data) ) ) ) self.modified() def delete_i(self, indexes): list( map( lambda e: e[1].set_as_deleted(), filter( lambda e: e[0] in indexes, enumerate(self.lst) ) ) ) self.modified() def delete(self, els): for el in els: el.set_as_deleted() self.modified() def sort(self, _reverse=False, key=None): if key is None: self._data.sort(reverse=_reverse) else: self._data.sort(reverse=_reverse, key=key) self.modified()