Pamhyr2/src/Model/BoundaryConditionsAdisTS/BoundaryConditionAdisTS.py

553 lines
15 KiB
Python

# BoundaryConditionsAdisTS.py -- Pamhyr
# Copyright (C) 2023-2025 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from tools import (
trace, timer,
old_pamhyr_date_to_timestamp,
date_iso_to_timestamp,
date_dmy_to_timestamp,
)
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Except import NotImplementedMethodeError
from Model.Scenario import Scenario
logger = logging.getLogger()
class Data(SQLSubModel):
_sub_classes = []
def __init__(self,
data0, data1,
id: int = -1,
types=[float, float],
status=None,
owner_scenario=-1):
super(Data, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._types = types
self._data = [data0, data1]
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE boundary_condition_data_adists{ext}(
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
data0 TEXT NOT NULL,
data1 TEXT NOT NULL,
bca INTEGER,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(bca) REFERENCES boundary_condition_adists(pamhyr_id)
)
""")
return True
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
created = False
if major == "0" and int(minor) <= 1:
if int(release) < 7:
cls._db_create(execute)
created = True
if major == "0" and int(minor) < 2:
if not created:
cls._db_update_to_0_2_0(execute, data)
if not created:
return cls._update_submodel(execute, version, data)
return True
@classmethod
def _db_update_to_0_2_0(cls, execute, data):
table = "boundary_condition_data_adists"
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table}_tmp " +
"(pamhyr_id, data0, data1, bca, scenario) " +
"SELECT pamhyr_id, data0, data1, bc, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
cls._db_update_to_0_2_0_set_bca_pid(execute, data)
@classmethod
def _db_update_to_0_2_0_set_bca_pid(cls, execute, data):
pid_bca = data["id2pid"]["boundary_condition_adists"]
els = execute(
"SELECT pamhyr_id, bca " +
"FROM boundary_condition_data_adists"
)
for row in els:
it = iter(row)
pid = next(it)
bca_id = next(it)
if bca_id == -1:
continue
execute(
f"UPDATE boundary_condition_data_adists " +
f"SET bca = {pid_bca[bca_id]} " +
f"WHERE pamhyr_id = {pid}"
)
@classmethod
def _db_load(cls, execute, data=None):
new = []
status = data['status']
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
bca = data["bca"]
table = execute(
"SELECT pamhyr_id, deleted, data0, data1, scenario FROM " +
"boundary_condition_data_adists " +
f"WHERE bca = {bca.id} " +
f"AND scenario = {scenario.id}"
)
if table is not None:
for row in table:
it = iter(row)
pid = next(it)
deleted = (next(it) == 1)
data0 = next(it)
data1 = next(it)
owner_scenario = next(it)
bc = cls(
data0, data1,
id=pid,
status=status,
owner_scenario=owner_scenario
)
if deleted:
bc.set_as_deleted()
loaded.add(pid)
new.append(bc)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
if not self.must_be_saved():
return True
bca = data["bca"]
data0 = self._db_format(self._data[0])
data1 = self._db_format(self._data[1])
execute(
"INSERT INTO " +
"boundary_condition_data_adists(" +
"pamhyr_id, data0, data1, bca, scenario) " +
f"VALUES ({self.id}, '{data0}', {data1}, {bca.id}, " +
f"{self._status.scenario_id})"
)
return True
def __getitem__(self, key):
return self._types[key](self._data[key])
def __setitem__(self, key, value):
self._data[key] = self._types[key](value)
class BoundaryConditionAdisTS(SQLSubModel):
_sub_classes = [Data]
def __init__(self, id: int = -1,
pollutant: int = -1, status=None,
owner_scenario=-1):
super(BoundaryConditionAdisTS, self).__init__(
id=id, status=status,
owner_scenario=owner_scenario
)
self._status = status
self._type = ""
self._node = None
self._pollutant = pollutant
self._data = []
self._header = []
self._types = [self.time_convert, float]
@classmethod
def _db_create(cls, execute, ext=""):
execute(f"""
CREATE TABLE boundary_condition_adists{ext}(
{cls.create_db_add_pamhyr_id()},
deleted BOOLEAN NOT NULL DEFAULT FALSE,
pollutant INTEGER NOT NULL,
type TEXT NOT NULL,
node INTEGER,
{Scenario.create_db_add_scenario()},
{Scenario.create_db_add_scenario_fk()},
FOREIGN KEY(pollutant) REFERENCES Pollutants(pamhyr_id),
FOREIGN KEY(node) REFERENCES river_node(pamhyr_id)
)
""")
if ext != "":
return True
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version, data=None):
major, minor, release = version.strip().split(".")
created = False
if major == "0" and int(minor) <= 1:
if int(release) < 7:
cls._db_create(execute)
created = True
if major == "0" and int(minor) < 2:
if not created:
cls._db_update_to_0_2_0(execute, data)
if not created:
return cls._update_submodel(execute, version, data)
return True
@classmethod
def _db_update_to_0_2_0(cls, execute, data):
table = "boundary_condition_adists"
nodes = data['id2pid']['river_node']
cls.update_db_add_pamhyr_id(execute, table, data)
Scenario.update_db_add_scenario(execute, table)
cls._db_create(execute, ext="_tmp")
execute(
f"INSERT INTO {table}_tmp " +
"(pamhyr_id, pollutant, type, node, scenario) " +
"SELECT pamhyr_id, pollutant, type, node, scenario " +
f"FROM {table}"
)
execute(f"DROP TABLE {table}")
execute(f"ALTER TABLE {table}_tmp RENAME TO {table}")
cls._db_update_to_0_2_0_set_node_pid(execute, table, nodes)
cls._db_update_to_0_2_0_set_pollutants_pid(execute, data)
@classmethod
def _db_update_to_0_2_0_set_pollutants_pid(cls, execute, data):
pid_pol = data["id2pid"]["Pollutants"]
els = execute(
f"SELECT pamhyr_id, pollutant " +
"FROM boundary_condition_adists"
)
for row in els:
it = iter(row)
pid = next(it)
pol_id = next(it)
if pol_id == -1:
continue
execute(
f"UPDATE boundary_condition_adists " +
f"SET pollutant = {pid_pol[pol_id]} " +
f"WHERE pamhyr_id = {pid}"
)
@classmethod
def _db_load(cls, execute, data=None):
new = []
status = data['status']
nodes = data['nodes']
scenario = data["scenario"]
loaded = data['loaded_pid']
if scenario is None:
return new
table = execute(
"SELECT pamhyr_id, deleted, pollutant, type, node, scenario " +
"FROM boundary_condition_adists " +
f"WHERE scenario = {scenario.id} " +
f"AND pamhyr_id NOT IN ({', '.join(map(str, loaded))}) "
)
if table is not None:
for row in table:
it = iter(row)
pid = next(it)
deleted = (next(it) == 1)
pollutant = next(it)
bc_type = next(it)
node = next(it)
owner_scenario = next(it)
bc = cls(
id=pid,
pollutant=pollutant,
status=status,
owner_scenario=owner_scenario
)
if deleted:
bc.set_as_deleted()
bc.type = bc_type
bc.node = None
if row[3] != -1:
tmp = next(
filter(
lambda n: n.id == node, nodes
),
None
)
if tmp is not None:
bc.node = tmp.id
else:
bc.node = -1
data["bca"] = bc
bc._data = Data._db_load(execute, data=data)
loaded.add(pid)
new.append(bc)
data["scenario"] = scenario.parent
new += cls._db_load(execute, data)
data["scenario"] = scenario
return new
def _db_save(self, execute, data=None):
if not self.must_be_saved():
return True
execute(
"DELETE FROM boundary_condition_adists " +
f"WHERE pamhyr_id = {self.id} " +
f"AND scenario = {self._status.scenario_id} "
)
execute(
"DELETE FROM boundary_condition_data_adists " +
f"WHERE bca = {self.id} " +
f"AND scenario = {self._status.scenario_id} "
)
node = -1
if self._node is not None:
node = self._node
execute(
"INSERT INTO " +
"boundary_condition_adists(" +
"pamhyr_id, deleted, pollutant, type, " +
"node, scenario) " +
"VALUES (" +
f"{self.id}, {self._db_format(self.is_deleted())}, " +
f"{self._pollutant}, " +
f"'{self._db_format(self._type)}', {node}, " +
f"{self._status.scenario_id}" +
")"
)
ind = 0
for d in self._data:
data["ind"] = ind
d._db_save(execute, data)
ind += 1
return True
def __len__(self):
return len(self._data)
@classmethod
def time_convert(cls, data):
if type(data) is str:
if data.count("-") == 2:
return date_iso_to_timestamp(data)
if data.count("/") == 2:
return date_dmy_to_timestamp(data)
if data.count(":") == 3:
return old_pamhyr_date_to_timestamp(data)
if data.count(":") == 2:
return old_pamhyr_date_to_timestamp("00:" + data)
if data.count(".") == 1:
return round(float(data))
return int(data)
@property
def node(self):
return self._node
@node.setter
def node(self, node):
self._node = node
self._status.modified()
@property
def header(self):
return self._header.copy()
@header.setter
def header(self, header):
self._header = header
self._status.modified()
@property
def pollutant(self):
return self._pollutant
@pollutant.setter
def pollutant(self, pollutant):
self._pollutant = pollutant
self._status.modified()
@property
def type(self):
return self._type
@type.setter
def type(self, type):
self._type = type
self._status.modified()
@property
def data(self):
return self._data.copy()
@property
def _default_0(self):
return self._types[0](0)
@property
def _default_1(self):
return self._types[1](0.0)
def new_from_data(self, header, data):
new_0 = self._default_0
new_1 = self._default_1
if len(header) != 0:
for i in [0, 1]:
for j in range(len(header)):
if self._header[i] == header[j]:
if i == 0:
new_0 = self._types[i](data[j].replace(",", "."))
else:
new_1 = self._types[i](data[j].replace(",", "."))
else:
new_0 = self._types[0](data[0].replace(",", "."))
new_1 = self._types[1](data[1].replace(",", "."))
return (new_0, new_1)
def add(self, index: int):
value = (self._default_0, self._default_1)
self._data.insert(index, value)
self._status.modified()
return value
def insert(self, index: int, value):
self._data.insert(index, value)
self._status.modified()
def delete_i(self, indexes):
self._data = list(
map(
lambda e: e[1],
filter(
lambda e: e[0] not in indexes,
enumerate(self.data)
)
)
)
self._status.modified()
def sort(self, _reverse=False, key=None):
if key is None:
self._data.sort(reverse=_reverse)
else:
self._data.sort(reverse=_reverse, key=key)
self._status.modified()
def index(self, bc):
self._data.index(bc)
def get_i(self, index):
return self.data[index]
def get_range(self, _range):
lst = []
for r in _range:
lst.append(r)
return lst
def _set_i_c_v(self, index, column, value):
v = list(self._data[index])
v[column] = self._types[column](value)
self._data[index] = tuple(v)
self._status.modified()
def set_i_0(self, index: int, value):
self._set_i_c_v(index, 0, value)
def set_i_1(self, index: int, value):
self._set_i_c_v(index, 1, value)