Pamhyr2/src/Model/Results/River/River.py

415 lines
12 KiB
Python

# River.py -- Pamhyr
# Copyright (C) 2023-2025 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import struct
import logging
import itertools
from tools import flatten
from functools import reduce
from datetime import datetime
from Model.Scenario import Scenario
from Model.Tools.PamhyrDB import SQLSubModel
logger = logging.getLogger()
class Profile(SQLSubModel):
def __init__(self, profile, study):
super(Profile, self).__init__(
id=-1, status=study.status,
owner_scenario=study.status.scenario.id
)
self._study = study
self._profile = profile # Source profile in the study
self._data = {} # Dict of dict {<ts>: {<key>: <value>, ...}, ...}
def __len__(self):
return len(self._data)
@property
def name(self):
return self._profile.name
@property
def rk(self):
return self._profile.rk
@property
def geometry(self):
return self._profile
def set(self, timestamp, key, data):
if timestamp not in self._data:
self._data[timestamp] = {}
self._data[timestamp][key] = data
def get_ts(self, timestamp):
return self._data[timestamp]
def get_key(self, key):
res = list(
map(lambda ts: self._data[ts][key],
sorted(self._data.keys()))
)
return res
def get_ts_key(self, timestamp, key):
if timestamp in self._data:
if key in self._data[timestamp]:
return self._data[timestamp][key]
return None
def has_sediment(self):
return any(map(lambda ts: "sl" in self._data[ts], self._data))
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE results_data{ext}(
{cls.create_db_add_pamhyr_id()},
result INTEGER NOT NULL,
key TEXT NOT NULL,
reach INTEGER NOT NULL,
section INTEGER NOT NULL,
len_data INTEGER NOT NULL,
data BLOB NOT NULL,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(result) REFERENCES results(pamhyr_id),
FOREIGN KEY(reach) REFERENCES river_reach(pamhyr_id),
FOREIGN KEY(section)
REFERENCES geometry_profileXYZ(pamhyr_id),
PRIMARY KEY(pamhyr_id, result, key, scenario)
)
""")
return True
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
create = False
if major == "0" and int(minor) < 2:
cls._db_create(execute)
create = True
if major == "0" and int(minor) == 2:
if int(release) < 1 and not create:
cls._db_create(execute)
create = True
return cls._update_submodel(execute, version, data)
@classmethod
def _db_load(cls, execute, data=None):
new = {}
status = data['status']
study = data['study']
reach = data['reach']
profile = data['profile']
scenario = data["scenario"]
loaded = data['loaded_pid']
timestamps = data['timestamps']
values = execute(
"SELECT pamhyr_id, result, key, " +
"len_data, data, scenario " +
"FROM results_data " +
f"WHERE scenario = {scenario.id} " +
f"AND reach = {reach.pamhyr_id} " +
f"AND section = {profile.pamhyr_id}"
)
for v in values:
it = iter(v)
pid = next(it)
result = next(it)
key = next(it)
len_data = next(it)
data = next(it)
owner_scenario = next(it)
if profile not in new:
new_data = cls(profile, study)
new[profile] = new_data
else:
new_data = new[profile]
if key in ["Z", "Q", "V"]:
sf = ">" + ''.join(itertools.repeat("f", len_data))
len_values = len(values)
values = struct.unpack(sf, data)
elif key in ["sl"]:
sf = ">" + ''.join(itertools.repeat("f", len_data))
values = struct.unpack(sf, data)
values = cls._db_load_data_sl_format(
values, len_data, timestamps
)
for timestamp, value in zip(timestamps, values):
new_data.set(timestamp, key, value)
if key == "Z":
new_data.update_water_limits(timestamp, value)
return list(new.values())
@classmethod
def _db_load_data_sl_format(cls, values, len_data, timestamps):
tuple_size = 3
# HACK: Transforme list of value to list of n-tuple
# sediment results: (h, d50, sigma)
tmp_values = [[]]
for value in values:
t = tmp_values[-1]
if len(t) < tuple_size:
t.append(value)
else:
tmp_values.append([value])
tmp_values = list(map(tuple, tmp_values))
# HACK: Transforme list of n-tuple to list of n-list
# of n-tuple, where n is the number of layer.
nb_layer = len_data / len(timestamps) / tuple_size
values = [[]]
for value in tmp_values:
t = values[-1]
if len(t) < nb_layer:
t.append(value)
else:
values.append([value])
return list(map(lambda x: [x], values))
def update_water_limits(self, timestamp, z):
limits = self.geometry.get_water_limits(z)
self.set(
timestamp, "water_limits", limits
)
def get_keys(self):
return reduce(
lambda acc, ts: acc.union(self._data[ts].keys()),
self._data.keys(), set()
)
def _db_save(self, execute, data=None):
pid = self._pamhyr_id
result = data["result"]
keys = self.get_keys()
logger.debug(f"{keys}...")
for key in keys:
values = self.get_key(key)
if key in ["Z", "Q", "V"]:
values = list(map(float, values))
sf = ">" + ''.join(itertools.repeat("f", len(values)))
len_values = len(values)
elif key == "sl":
# HACK: Some dirty code to transforme list of list of
# tuple to list of values and ensure the values is
# float type...
values = flatten(
flatten(
map(lambda v: list(
map(lambda t: list(
map(float, t)), v[0])),
values)))
len_values = len(values)
sf = ">" + ''.join(itertools.repeat("f", len_values))
else:
continue
data_bytes = struct.pack(sf, *values)
execute(
"INSERT INTO " +
"results_data (pamhyr_id, result, " +
"reach, section, " +
"key, len_data, data, " +
"scenario) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
pid, result,
data["reach"].pamhyr_id,
self._profile.pamhyr_id,
key, len_values, data_bytes,
self._owner_scenario
)
return True
class Reach(SQLSubModel):
_sub_classes = [Profile]
def __init__(self, reach, study, with_init=True):
super(Reach, self).__init__(
id=-1, status=study.status,
owner_scenario=study.status.scenario.id
)
self._study = study
self._reach = reach # Source reach in the study
self._profiles = []
if with_init:
self._profiles = list(
map(
lambda p: Profile(p, self._study),
reach.profiles
)
)
self._profile_mask = list(
map(
lambda p: p.name[0:8] != 'interpol', self._profiles
)
)
def __len__(self):
return len(self._profiles)
@property
def name(self):
return self._reach.name
@property
def geometry(self):
return self._reach
@property
def profiles(self):
return self._profiles.copy()
@property
def profile_mask(self):
return self._profile_mask
def profile(self, id):
return self._profiles[id]
def set(self, profile_id, timestamp, key, data):
self._profiles[profile_id].set(timestamp, key, data)
def has_sediment(self):
return any(map(lambda profile: profile.has_sediment(), self._profiles))
@classmethod
def _db_create(cls, execute, ext=""):
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
return cls._update_submodel(execute, version, data)
@classmethod
def _db_load(cls, execute, data=None):
reach = data["reach"]
new_reach = cls(
data["reach"], data["study"], with_init=False
)
for i, profile in enumerate(reach.profiles):
data["profile"] = profile
new_reach._profiles += Profile._db_load(execute, data)
return new_reach
def _db_save(self, execute, data=None):
logger.debug("Save reach...")
for profile in self._profiles:
data["profile"] = profile.geometry.pamhyr_id
profile._db_save(execute, data)
class River(SQLSubModel):
_sub_classes = [Reach]
def __init__(self, study):
super(River, self).__init__(
id=-1, status=study.status,
owner_scenario=study.status.scenario.id
)
self._study = study
# Dict with timestamps as key
self._reachs = []
def __len__(self):
return len(self._reachs)
@property
def reachs(self):
return self._reachs.copy()
def reach(self, id):
return self._reachs[id]
def has_reach(self, id):
return 0 <= id < len(self._reachs)
def add(self, reach_id):
reachs = self._study.river.enable_edges()
new = Reach(reachs[reach_id].reach, self._study)
self._reachs.append(new)
return new
def get_reach_by_geometry(self, geometry_reach):
return next(
filter(
lambda r: r.geometry is geometry_reach,
self._reachs
)
)
@classmethod
def _db_create(cls, execute, ext=""):
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
return cls._update_submodel(execute, version, data)
@classmethod
def _db_load(cls, execute, data=None):
study = data["study"]
new_river = cls(study)
for reach in study.river.reachs():
data["reach"] = reach.reach
new_river._reachs.append(
Reach._db_load(execute, data)
)
return new_river
def _db_save(self, execute, data=None):
logger.debug("Save river...")
for reach in self._reachs:
data["reach"] = reach.geometry
reach._db_save(execute, data)