# Results.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import struct import logging import itertools import numpy as np from copy import deepcopy from datetime import datetime from Model.Scenario import Scenario from Model.Tools.PamhyrDB import SQLSubModel from Model.Results.River.River import River logger = logging.getLogger() class AdditionalData(SQLSubModel): _sub_classes = [] def __init__(self, id=-1, study=None, data=None): super(AdditionalData, self).__init__( id=id, status=study.status, owner_scenario=study.status.scenario.id ) self._study = study self._data = data @property def data(self): return self._data @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE results_add_data{ext} ( {cls.create_db_add_pamhyr_id()}, result INTEGER NOT NULL, type_x TEXT NOT NULL, type_y TEXT NOT NULL, legend TEXT NOT NULL, unit TEXT NOT NULL, data_len INTEGER NOT NULL, x BLOB NOT NULL, y BLOB NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, FOREIGN KEY(result) REFERENCES results(pamhyr_id), PRIMARY KEY(pamhyr_id, result, scenario) ) """) if ext != "": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") if major == "0" and int(minor) == 2 and int(release) <= 1: cls._db_create(execute) return cls._update_submodel(execute, version, data) @classmethod def _db_load(cls, execute, data=None): new = [] study = data['study'] status = data['status'] scenario = data["scenario"] table = execute( "SELECT pamhyr_id, type_x, type_y, " + "legend, unit, data_len, x, y, " + "scenario " + "FROM results_add_data " + f"WHERE scenario = {scenario.id}" ) if table is None: return new for v in table: it = iter(v) pid = next(it) type_x = next(it) type_y = next(it) legend = next(it) unit = next(it) data_len = next(it) bx = next(it) by = next(it) owner_scenario = next(it) data_format = ">" + ''.join(itertools.repeat("d", data_len)) x = struct.unpack(data_format, bx) y = struct.unpack(data_format, by) data = { 'type_x': type_x, 'type_y': type_y, 'legend': legend, 'unit': unit, 'x': x, 'y': y } new_data = cls(study=study) new_data._data = data new.append(new_data) return new def _db_save(self, execute, data=None): if self._status.scenario.id != self._owner_scenario: return pid = self._pamhyr_id data_len = len(self._data["x"]) data_format = ">" + ''.join(itertools.repeat("d", data_len)) bx = struct.pack(data_format, *self._data["x"]) by = struct.pack(data_format, *self._data["y"]) execute( "INSERT INTO " + "results_add_data (pamhyr_id, result, " + "type_x, type_y, " + "legend, unit, data_len, x, y, " + "scenario) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", self._pamhyr_id, data["result"], self._data["type_x"], self._data["type_y"], self._data["legend"], self._data["unit"], data_len, bx, by, self._owner_scenario ) return True class Results(SQLSubModel): _sub_classes = [River, AdditionalData] def __init__(self, id=-1, study=None, solver=None, repertory="", name="0"): super(Results, self).__init__( id=id, status=study.status, owner_scenario=study.status.scenario.id ) self._name = name self._study = study self._river = River(self._study) self._solver = solver self._repertory = repertory self._meta_data = { # Keep results creation date "creation_date": datetime.now(), "study_revision": study.status.version, "additional_data": [], } if solver is not None: self.set("solver_type", solver._type) @property def date(self): date = self._meta_data["creation_date"] return f"{date.isoformat(sep=' ')}" @property def river(self): return self._river @property def study(self): return self._study def set(self, key, value): self._meta_data[key] = value @property def is_valid(self): return ("timestamps" in self._meta_data) @property def solver_name(self): if self._solver is None: return self._meta_data["solver_name"] return self._solver.name def get(self, key): return self._meta_data[key] def reload(self): return self._solver.results( self._study, self._repertory, qlog=None, ) def bufferize(self, key): if self.is_valid: for reach in self._river._reachs: reach.bufferize(self._meta_data["timestamps"], key) def timestamps_to_struct(self): ts = self._meta_data["timestamps"] sf = ">" + ''.join(itertools.repeat("d", len(ts))) return struct.pack(sf, ts) @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE results{ext} ( {cls.create_db_add_pamhyr_id()}, solver_name TEXT NOT NULL, solver_type TEXT NOT NULL, study_revision INTEGER NOT NULL, creation_data DATE NOT NULL, nb_timestamps INTEGER NOT NULL, timestamps BLOB NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, PRIMARY KEY(pamhyr_id, scenario) ) """) if ext != "": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") create = False if major == "0" and int(minor) < 2: cls._db_create(execute) create = True if major == "0" and int(minor) == 2: if int(release) < 1 and not create: cls._db_create(execute) create = True return cls._update_submodel(execute, version, data) @classmethod def _db_load(cls, execute, data=None): new = None study = data['study'] status = data['status'] scenario = data["scenario"] table = execute( "SELECT pamhyr_id, solver_name, solver_type, " + "study_revision, creation_data, nb_timestamps, timestamps, " + "scenario " + "FROM results " + f"WHERE scenario = {scenario.id}" ) if table is None: yield if len(table) > 1: logger.warning("Multiple results for this scenario") for v in table: it = iter(v) pid = next(it) solver_name = next(it) solver_type = next(it) revision = next(it) creation_date = next(it) nb_timestamps = next(it) timestamps_bytes = next(it) owner_scenario = next(it) new_results = cls(study=study) new_results.set("solver_name", solver_name) new_results.set("solver_type", solver_type) new_results.set("study_revision", revision) new_results.set("creation_date", creation_date) sf = ">" + ''.join(itertools.repeat("d", nb_timestamps)) ts = struct.unpack(sf, timestamps_bytes) new_results.set("timestamps", ts) data["timestamps"] = sorted(ts) new_results._river = River._db_load(execute, data) new_results.set( "additional_data", AdditionalData._db_load(execute, data) ) yield (solver_type, new_results) def _db_save_clear(self, execute, solver_type, data=None): old_pid = execute( "SELECT pamhyr_id FROM results " + f"WHERE scenario = {self._owner_scenario} " + f"AND solver_type = '{solver_type}'" ) if len(old_pid) != 0: for pid in old_pid: pid = pid[0] execute( "DELETE FROM results " + f"WHERE scenario = {self._owner_scenario} " + f"AND solver_type = '{solver_type}'" ) execute( "DELETE FROM results_data " + f"WHERE scenario = {self._owner_scenario} " + f"AND result = {pid}" ) execute( "DELETE FROM results_add_data " + f"WHERE scenario = {self._owner_scenario} " + f"AND result = {pid}" ) def _db_save(self, execute, data=None): if self._status.scenario.id != self._owner_scenario: return True pid = self._pamhyr_id if self._solver is None: solver_name = self.get("solver_name") solver_type = self.get("solver_type") else: solver_name = self._solver._name solver_type = self._solver._type self._db_save_clear(execute, solver_type, data=data) ts = sorted(self.get("timestamps")) sf = ">" + ''.join(itertools.repeat("d", len(ts))) execute( "INSERT INTO " + "results (pamhyr_id, solver_name, solver_type, " + "study_revision, creation_data, " + "nb_timestamps, timestamps, " + "scenario) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", self._pamhyr_id, solver_name, solver_type, self._status.version, self.get("creation_date"), len(ts), struct.pack(sf, *ts), self._owner_scenario ) data["result"] = self._pamhyr_id self._river._db_save(execute, data) for add_data in self.get("additional_data"): add_data._db_save(execute, data) return True