# Results.py -- Pamhyr # Copyright (C) 2023-2025 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import struct import logging import itertools import numpy as np from copy import deepcopy from datetime import datetime from Model.Scenario import Scenario from Model.Tools.PamhyrDB import SQLSubModel from Model.Results.River.River import River logger = logging.getLogger() class Results(SQLSubModel): _sub_classes = [River] def __init__(self, id=-1, study=None, solver=None, repertory="", name="0"): super(Results, self).__init__( id=id, status=study.status, owner_scenario=study.status.scenario.id ) self._name = name self._study = study self._river = River(self._study) self._solver = solver self._repertory = repertory self._meta_data = { # Keep results creation date "creation_date": datetime.now(), "study_revision": study.status.version, } if solver is not None: self.set("solver_type", solver._type) @property def date(self): date = self._meta_data["creation_date"] return f"{date.isoformat(sep=' ')}" @property def river(self): return self._river @property def study(self): return self._study def set(self, key, value): self._meta_data[key] = value @property def is_valid(self): return ("timestamps" in self._meta_data) @property def solver_name(self): if self._solver is None: return self._meta_data["solver_name"] return self._solver.name def get(self, key): return self._meta_data[key] def reload(self): return self._solver.results( self._study, self._repertory, qlog=None, ) def timestamps_to_struct(self): ts = self._meta_data["timestamps"] sf = ">" + ''.join(itertools.repeat("d", len(ts))) return struct.pack(sf, ts) @classmethod def _db_create(cls, execute, ext=""): execute(f""" CREATE TABLE results{ext} ( {cls.create_db_add_pamhyr_id()}, solver_name TEXT NOT NULL, solver_type TEXT NOT NULL, study_revision INTEGER NOT NULL, creation_data DATE NOT NULL, nb_timestamps INTEGER NOT NULL, timestamps BLOB NOT NULL, {Scenario.create_db_add_scenario()}, {Scenario.create_db_add_scenario_fk()}, PRIMARY KEY(pamhyr_id, scenario) ) """) if ext != "": return True return cls._create_submodel(execute) @classmethod def _db_update(cls, execute, version, data=None): major, minor, release = version.strip().split(".") create = False if major == "0" and int(minor) < 2: cls._db_create(execute) create = True if major == "0" and int(minor) == 2: if int(release) < 1 and not create: cls._db_create(execute) create = True return cls._update_submodel(execute, version, data) @classmethod def _db_load(cls, execute, data=None): new = None study = data['study'] status = data['status'] scenario = data["scenario"] table = execute( "SELECT pamhyr_id, solver_name, solver_type, " + "study_revision, creation_data, nb_timestamps, timestamps, " + "scenario " + "FROM results " + f"WHERE scenario = {scenario.id}" ) if table is None: yield if len(table) > 1: logger.warning("Multiple results for this scenario") for v in table: it = iter(v) pid = next(it) solver_name = next(it) solver_type = next(it) revision = next(it) creation_date = next(it) nb_timestamps = next(it) timestamps_bytes = next(it) owner_scenario = next(it) new_results = cls(study=study) new_results.set("solver_name", solver_name) new_results.set("solver_type", solver_type) new_results.set("study_revision", revision) new_results.set("creation_date", creation_date) sf = ">" + ''.join(itertools.repeat("d", nb_timestamps)) ts = struct.unpack(sf, timestamps_bytes) new_results.set("timestamps", ts) data["timestamps"] = sorted(ts) new_results._river = River._db_load(execute, data) yield (solver_type, new_results) def _db_save_clear(self, execute, solver_type, data=None): old_pid = execute( "SELECT pamhyr_id FROM results " + f"WHERE scenario = {self._owner_scenario} " + f"AND solver_type = '{solver_type}'" ) if len(old_pid) != 0: for pid in old_pid: pid = pid[0] execute( "DELETE FROM results " + f"WHERE scenario = {self._owner_scenario} " + f"AND solver_type = '{solver_type}'" ) execute( "DELETE FROM results_data " + f"WHERE scenario = {self._owner_scenario} " + f"AND result = {pid}" ) def _db_save(self, execute, data=None): if self._status.scenario.id != self._owner_scenario: return pid = self._pamhyr_id if self._solver is None: solver_name = self.get("solver_name") solver_type = self.get("solver_type") else: solver_name = self._solver._name solver_type = self._solver._type self._db_save_clear(execute, solver_type, data=data) ts = sorted(self.get("timestamps")) sf = ">" + ''.join(itertools.repeat("d", len(ts))) execute( "INSERT INTO " + "results (pamhyr_id, solver_name, solver_type, " + "study_revision, creation_data, " + "nb_timestamps, timestamps, " + "scenario) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", self._pamhyr_id, solver_name, solver_type, self._status.version, self.get("creation_date"), len(ts), struct.pack(sf, *ts), self._owner_scenario ) data["result"] = self._pamhyr_id self._river._db_save(execute, data) return True