# PointXYZ.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -*- coding: utf-8 -*- import logging from math import dist import numpy as np from Model.Tools.PamhyrDB import SQLSubModel from Model.Scenario import Scenario from Model.Geometry.Point import Point logger = logging.getLogger() class PointXYZ(Point): _sub_classes = [] def __init__(self, id: int = -1, x: float = 0.0, y: float = 0.0, z: float = 0.0, name: str = "", profile=None, status=None, owner_scenario=-1): super(PointXYZ, self).__init__( id=id, name=name, profile=profile, status=status, owner_scenario=owner_scenario ) self._x = float(x) self._y = float(y) self._z = float(z) @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE geometry_pointXYZ{ext} ( {cls.create_db_add_pamhyr_id()}, deleted BOOLEAN NOT NULL DEFAULT FALSE, ind INTEGER NOT NULL, name TEXT, x INTEGER NOT NULL, y INTEGER NOT NULL, z INTEGER NOT NULL, profile INTEGER NOT NULL, sl INTEGER, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(profile) REFERENCES geometry_profileXYZ(pamhyr_id), FOREIGN KEY(sl) REFERENCES sedimentary_layer(pamhyr_id), PRIMARY KEY(pamhyr_id, scenario) ) """) return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == minor == "0": if int(release) < 2: execute( """ ALTER TABLE geometry_pointXYZ ADD COLUMN sl INTEGER REFERENCES sedimentary_layer(id) """ ) cls._db_update_to_0_2_0(execute, data) if major == "0" and minor == "1": if int(release) < 2: execute( "ALTER TABLE geometry_pointXYZ " + "ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE" ) return cls._update_submodel(execute, version, data) @classmethod def _db_update_to_0_2_0(cls, execute, data): table = "geometry_pointXYZ" id2pid = data['id2pid'] profiles = id2pid['geometry_profileXYZ'] cls.update_db_add_pamhyr_id(execute, table, data) Scenario.update_db_add_scenario(execute, table) cls._db_create(execute, ext="_tmp") execute( f"INSERT INTO {table}_tmp " + "(pamhyr_id, ind, name, x, y, z, profile, sl, scenario) " + "SELECT pamhyr_id, ind, name, x, y, z, profile, sl, scenario " + f"FROM {table}" ) execute(f"DROP TABLE {table}") execute(f"ALTER TABLE {table}_tmp RENAME TO {table}") cls._db_update_to_0_2_0_set_profile_pid(execute, table, profiles) if 'sedimentary_layer' in id2pid: sl = id2pid['sedimentary_layer'] cls._db_update_to_0_2_0_set_sl_pid(execute, table, sl) @classmethod def _db_update_to_0_2_0_set_profile_pid(cls, execute, table, profiles): els = execute( f"SELECT pamhyr_id, profile FROM {table}" ) for row in els: it = iter(row) pid = next(it) profile_id = next(it) if profile_id not in profiles: logger.warning( f"This point as no associated profile ({profile_id})" ) continue execute( f"UPDATE {table} " + f"SET profile = {profiles[profile_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_update_to_0_2_0_set_sl_pid(cls, execute, table, sl): els = execute( f"SELECT pamhyr_id, sl FROM {table}" ) for row in els: it = iter(row) pid = next(it) sl_id = next(it) if sl_id == -1: continue execute( f"UPDATE {table} " + f"SET sl = {sl[sl_id]} " + f"WHERE pamhyr_id = {pid}" ) @classmethod def _db_load(cls, execute, data=None): new = [] status = data["status"] profile = data["profile"] scenario = data["scenario"] loaded = data['loaded_pid'] if scenario is None: return new table = execute( "SELECT pamhyr_id, ind, deleted, " + "name, x, y, z, sl, scenario " + "FROM geometry_pointXYZ " + f"WHERE profile = {profile.pamhyr_id} " + f"AND scenario = {scenario.id} " + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) " + "ORDER BY ind ASC" ) for row in table: it = iter(row) pid = next(it) ind = next(it) deleted = (next(it) == 1) name = next(it) x = next(it) y = next(it) z = next(it) sl = next(it) owner_scenario = next(it) point = cls( id=pid, name=name, x=x, y=y, z=z, profile=profile, status=status, owner_scenario=owner_scenario ) if deleted: point.set_as_deleted() if sl == -1 or sl is None: point._sl = None else: point._sl = next( filter( lambda s: s.pamhyr_id == sl, data["sediment_layers_list"].sediment_layers ) ) loaded.add(pid) new.append((ind, point)) data["scenario"] = scenario.parent new += cls._db_load(execute, data) data["scenario"] = scenario return new def _db_save(self, execute, data=None): if not self.must_be_saved(): return True profile = data["profile"] ind = data["ind"] sl = self._sl.pamhyr_id if self._sl is not None else -1 execute( "INSERT INTO " + "geometry_pointXYZ(pamhyr_id, deleted, ind, name, " + "x, y, z, profile, sl, scenario) " + "VALUES (" + f"{self.pamhyr_id}, {self._db_format(self.is_deleted())}, " + f"{ind}, '{self._db_format(self._name)}', " + f"{self.x}, {self.y}, {self.z}, " + f"{profile.pamhyr_id}, {sl}, " + f"{self._status.scenario_id}" + ")" ) return True @classmethod def from_data(cls, header, data): point = None try: if len(header) == 0: point = cls( *data ) else: valid_header = {'name', 'x', 'y', 'z', 'profile'} d = {} for i, v in enumerate(data): h = header[i].strip().lower().split(' ')[0] if h in valid_header: d[h] = v point = cls(**d) except Exception as e: raise ClipboardFormatError(header, data) return point def copy(self): new_p = PointXYZ( id=-1, name=self.name, x=self.x, y=self.y, z=self.z, profile=self.profile, status=self._status ) if self.is_deleted(): new_p.set_as_deleted() new_p._sl = self.sl new_p.modified() return new_p def __repr__(self): return f"({self._x}, {self._y}, {self._z}, {self._name})" @property def x(self): return self._x @x.setter def x(self, value): self._x = float(value) self.modified() @property def y(self): return self._y @y.setter def y(self, value): self._y = float(value) self.modified() @property def z(self): return self._z @z.setter def z(self, value): self._z = float(value) self.modified() def get_coordinate(self): return (self._x, self._y, self._z) def is_nan(self): """ Returns: True if at least one coordinate is as np.nan """ return (np.isnan(self.x) or np.isnan(self.y) or np.isnan(self.z)) def dist(self, p2): return PointXYZ.distance(self, p2) def dist_2d(self, p2): return PointXYZ.distance_2d(self, p2) @staticmethod def distance_2d(p1, p2): """Euclidean distance between p1 and p2. Args: p1: A XYZ Point p2: A XYZ Point Returns: Euclidean 2D distance between the two points """ return dist((p1.x, p1.y), (p2.x, p2.y)) @staticmethod def distance(p1, p2): """Euclidean distance between p1 and p2. Args: p1: A XYZ Point p2: A XYZ Point Returns: Euclidean 3D distance between the two points """ return dist((p1.x, p1.y, p1.z), (p2.x, p2.y, p2.z)) @staticmethod def areatriangle3d(p1, p2, p3): a = PointXYZ.distance(p1, p2) b = PointXYZ.distance(p2, p3) c = PointXYZ.distance(p3, p1) s = float((a + b + c) / 2) res = ( s * abs(s - a) * abs(s - b) * abs(s - c) ) ** 0.5 return res