diff --git a/src/Model/GeoTIFF/GeoTIFF.py b/src/Model/GeoTIFF/GeoTIFF.py new file mode 100644 index 00000000..4bd17632 --- /dev/null +++ b/src/Model/GeoTIFF/GeoTIFF.py @@ -0,0 +1,298 @@ +# GeoTIFF.py -- Pamhyr +# Copyright (C) 2024-2025 INRAE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# -*- coding: utf-8 -*- + +import os +import struct +import logging + +from functools import reduce + +from tools import trace, timer + +from Model.Tools.PamhyrDB import SQLSubModel +from Model.Except import NotImplementedMethodeError +from Model.Scenario import Scenario + +try: + import rasterio + import rasterio.control + import rasterio.crs + import rasterio.sample + import rasterio.vrt + import rasterio._features + _rasterio_loaded = True +except Exception as e: + print(f"Module 'rasterio' is not available: {e}") + _rasterio_loaded = False + + +class GeoTIFF(SQLSubModel): + _sub_classes = [] + + def __init__(self, id: int = -1, enabled=True, + name="", description="", + path="", coordinates=None, + status=None, owner_scenario=-1): + super(GeoTIFF, self).__init__( + id=id, status=status, + owner_scenario=owner_scenario + ) + + self._enabled = enabled + self._name = f"GeoTIFF #{self._pamhyr_id}" if name == "" else name + self._description = text + + self._file_bytes = b'' + self._coordinates = coordinates + + if path != "": + self.read_file(path) + + self._memfile = None + + def __getitem__(self, key): + value = None + + if key == "enabled": + value = self._enabled + elif key == "name": + value = self.name + elif key == "description": + value = self.description + elif key == "file_name": + value = self.file_name + elif key == "coordinates": + value = self.coordinates + elif key == "memfile": + value = self.memfile + + return value + + def __setitem__(self, key, value): + if key == "enabled": + self.enabled = value + elif key == "name": + self.name = value + elif key == "description": + self.description = value + elif key == "file_name": + self.file_name = value + elif key == "coordinates": + self.coordinates = value + + self.modified() + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, enabled): + self._enabled = enabled + self.modified() + + def is_enabled(self): + return self._enabled + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name + self.modified() + + @property + def description(self): + return self._description + + @description.setter + def description(self, description): + self._description = description + self.modified() + + @property + def file_name(self): + return self._file_name + + @file_name.setter + def file_name(self, file_name): + self._file_name = file_name + self.modified() + + @property + def coordinates(self): + return self._coordinates + + @coordinates.setter + def coordinates(self, coordinates): + self._coordinates = coordinates + self.modified() + + @property + def memfile(self): + if not _rasterio_loaded: + return None + + if self._file_bytes == b'': + return None + + if self._memfile == None: + self._memfile = MemoryFile() + self._memfile.write(self._file_bytes) + + return self._memfile + + def read_file(self, path): + self._file_name = path + self._file_bytes = b'' + self._memfile = None + + with open(path, "rb") as f: + while True: + data = f.read(4096) + if not data: + break + self._file_bytes += data + + def write_file(self, path): + with open(path, "w+b") as f: + f.write(self._file_bytes) + + @classmethod + def _db_create(cls, execute, ext=""): + execute(f""" + CREATE TABLE geotiff{ext} ( + {cls.create_db_add_pamhyr_id()}, + enabled BOOLEAN NOT NULL, + deleted BOOLEAN NOT NULL DEFAULT FALSE, + name TEXT NOT NULL, + description TEXT NOT NULL, + file_name TEXT NOT NULL, + file_bytes BLOB NOT NULL, + coordinates_bottom REAL NOT NULL, + coordinates_top REAL NOT NULL, + coordinates_left REAL NOT NULL, + coordinates_right REAL NOT NULL, + {Scenario.create_db_add_scenario()}, + {Scenario.create_db_add_scenario_fk()}, + PRIMARY KEY(pamhyr_id, scenario) + ) + """) + + return cls._create_submodel(execute) + + @classmethod + def _db_update(cls, execute, version, data=None): + major, minor, release = version.strip().split(".") + + if major == "0" and int(minor) <= 2: + if int(release) < 3: + cls._create_submodel(execute) + + return True + + @classmethod + def _db_load(cls, execute, data=None): + new = [] + scenario = data["scenario"] + loaded = data['loaded_pid'] + + if scenario is None: + return new + + table = execute( + "SELECT pamhyr_id, enabled, deleted, " + + "name, description, file_name, file_bytes, " + + "coordinates_bottom, coordinates_top, " + + "coordinates_left, coordinates_right, " + + "scenario " + + "FROM geotiff " + + f"WHERE scenario = {scenario.id} " + + f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))})" + ) + + for row in table: + it = iter(row) + + id = next(it) + enabled = (next(it) == 1) + deleted = (next(it) == 1) + name = next(it) + description = next(it) + file_name = next(it) + file_bytes = next(it) + coordinates_bottom = next(it) + coordinates_top = next(it) + coordinates_left = next(it) + coordinates_right = next(it) + owner_scenario = next(it) + + f = cls( + id=id, enabled=enabled, name=name, + description=description, coordinates={ + "bottom": coordinates_bottom, + "top": coordinates_top, + "left": coordinates_left, + "right": coordinates_right, + }, + status=data['status'], + owner_scenario=owner_scenario + ) + if deleted: + f.set_as_deleted() + + f._file_bytes = file_bytes + + loaded.add(id) + new.append(f) + + data["scenario"] = scenario.parent + new += cls._db_load(execute, data) + data["scenario"] = scenario + + return new + + def _db_save(self, execute, data=None): + if not self.must_be_saved(): + return True + + execute( + "INSERT INTO geotiff (" + + "pamhyr_id, enabled, deleted, " + + "name, description, file_name, file_bytes, " + + "coordinates_bottom, coordinates_top, " + + "coordinates_left, coordinates_right, " + + "scenario) " + + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + self._pamhyr_id, + self._enabled, + self.is_deleted(), + self.name, + self.description, + self.file_name, + self.file_bytes + self.coordinates['bottom'], + self.coordinates['top'], + self.coordinates['left'], + self.coordinates['right'], + self._status.scenario_id, + ) + + return True diff --git a/src/Model/GeoTIFF/GeoTIFFList.py b/src/Model/GeoTIFF/GeoTIFFList.py new file mode 100644 index 00000000..9e0fafb6 --- /dev/null +++ b/src/Model/GeoTIFF/GeoTIFFList.py @@ -0,0 +1,59 @@ +# GeoTIFFList.py -- Pamhyr +# Copyright (C) 2024-2025 INRAE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# -*- coding: utf-8 -*- + +from tools import trace, timer + +from Model.Except import NotImplementedMethodeError +from Model.Tools.PamhyrListExt import PamhyrModelList +from Model.AdditionalFile.GeoTIFF import GeoTIFF + + +class GeoTIFFList(PamhyrModelList): + _sub_classes = [GeoTIFF] + + @classmethod + def _db_load(cls, execute, data=None): + new = cls(status=data["status"]) + + new._lst = GeoTIFF._db_load(execute, data) + + return new + + def _db_save(self, execute, data=None): + ok = True + + # Delete previous data + execute( + "DELETE FROM geotiff " + + f"WHERE scenario = {self._status.scenario_id}" + ) + + for af in self._lst: + ok &= af._db_save(execute, data) + + return ok + + @property + def files(self): + return self.lst + + def new(self, index): + n = GeoTIFF(status=self._status) + self.insert(index, n) + self._status.modified() + return n