Pamhyr2/src/Model/BoundaryCondition/BoundaryCondition.py

386 lines
10 KiB
Python

# BoundaryCondition.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from tools import (
trace, timer,
old_pamhyr_date_to_timestamp,
date_iso_to_timestamp,
date_dmy_to_timestamp,
)
from Model.Tools.PamhyrDB import SQLSubModel
from Model.Except import NotImplementedMethodeError
logger = logging.getLogger()
class BoundaryCondition(SQLSubModel):
_sub_classes = []
_id_cnt = 0
def __init__(self, id: int = -1, name: str = "",
status=None):
super(BoundaryCondition, self).__init__()
self._status = status
if id == -1:
self.id = BoundaryCondition._id_cnt
else:
self.id = id
self._name = name
self._type = ""
self._node = None
self._data = []
self._header = []
self._types = [float, float]
BoundaryCondition._id_cnt = max(BoundaryCondition._id_cnt + 1, self.id)
@classmethod
def _db_create(cls, execute):
execute("""
CREATE TABLE boundary_condition(
id INTEGER NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
tab TEXT NOT NULL,
node INTEGER,
FOREIGN KEY(node) REFERENCES river_node(id)
)
""")
execute("""
CREATE TABLE boundary_condition_data(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
ind INTEGER NOT NULL,
data0 TEXT NOT NULL,
data1 TEXT NOT NULL,
bc INTEGER,
FOREIGN KEY(bc) REFERENCES boundary_condition(id)
)
""")
return cls._create_submodel(execute)
@classmethod
def _db_update(cls, execute, version):
return True
@classmethod
def _get_ctor_from_type(cls, t):
from Model.BoundaryCondition.BoundaryConditionTypes import (
NotDefined, PonctualContribution,
TimeOverZ, TimeOverDischarge, ZOverDischarge,
Solid,
)
res = NotDefined
if t == "PC":
res = PonctualContribution
elif t == "TZ":
res = TimeOverZ
elif t == "TD":
res = TimeOverDischarge
elif t == "ZD":
res = ZOverDischarge
elif t == "SL":
res = Solid
return res
@classmethod
def _db_load(cls, execute, data=None):
new = []
tab = data["tab"]
table = execute(
"SELECT id, name, type, node " +
"FROM boundary_condition " +
f"WHERE tab = '{tab}'"
)
for row in table:
t = row[2]
ctor = cls._get_ctor_from_type(t)
bc = ctor(
id=row[0],
name=row[1],
status=data['status']
)
bc.node = None
if row[3] != -1:
bc.node = next(filter(lambda n: n.id == row[3], data["nodes"]))
values = execute(
"SELECT ind, data0, data1 FROM boundary_condition_data " +
f"WHERE bc = '{bc.id}'"
)
# Create dummy data list
for _ in values:
bc.add(0)
# Write data
for v in values:
ind = v[0]
data0 = bc._types[0](v[1])
data1 = bc._types[1](v[2])
# Replace data at pos ind
bc._data[ind] = (data0, data1)
new.append(bc)
return new
def _db_save(self, execute, data=None):
tab = data["tab"]
execute(f"DELETE FROM boundary_condition WHERE id = {self.id}")
execute(f"DELETE FROM boundary_condition_data WHERE bc = {self.id}")
node = -1
if self._node is not None:
node = self._node.id
sql = (
"INSERT INTO " +
"boundary_condition(id, name, type, tab, node) " +
"VALUES (" +
f"{self.id}, '{self._db_format(self._name)}', " +
f"'{self._db_format(self._type)}', '{tab}', {node}" +
")"
)
execute(sql)
ind = 0
for d in self._data:
data0 = self._db_format(str(d[0]))
data1 = self._db_format(str(d[1]))
sql = (
"INSERT INTO " +
"boundary_condition_data(ind, data0, data1, bc) " +
f"VALUES ({ind}, '{data0}', {data1}, {self.id})"
)
execute(sql)
ind += 1
return True
def __len__(self):
return len(self._data)
@classmethod
def compatibility(cls):
return ["liquid", "solid", "suspenssion"]
@classmethod
def time_convert(cls, data):
if type(data) is str:
if data.count("-") == 2:
return date_iso_to_timestamp(data)
if data.count("/") == 2:
return date_dmy_to_timestamp(data)
if data.count(":") == 3:
return old_pamhyr_date_to_timestamp(data)
if data.count(":") == 2:
return old_pamhyr_date_to_timestamp("00:" + data)
if data.count(".") == 1:
return round(float(data))
return int(data)
@property
def name(self):
if self._name == "":
return f"B{self.id + 1}"
return self._name
@name.setter
def name(self, name):
self._name = name
self._status.modified()
@property
def bctype(self):
return self._type
@property
def node(self):
return self._node
@node.setter
def node(self, node):
self._node = node
self._status.modified()
def has_node(self):
return self._node is not None
@property
def header(self):
return self._header.copy()
@property
def data(self):
return self._data.copy()
def get_type_column(self, column):
if 0 <= column < 2:
return self._types[column]
return None
@property
def _default_0(self):
return self._types[0](0)
@property
def _default_1(self):
return self._types[1](0.0)
def is_define(self):
return self._data is not None
def new_from_data(self, header, data):
new_0 = self._default_0
new_1 = self._default_1
if len(header) != 0:
for i in [0, 1]:
for j in range(len(header)):
if self._header[i] == header[j]:
if i == 0:
new_0 = self._types[i](data[j].replace(",", "."))
else:
new_1 = self._types[i](data[j].replace(",", "."))
else:
new_0 = self._types[0](data[0].replace(",", "."))
new_1 = self._types[1](data[1].replace(",", "."))
return (new_0, new_1)
def add(self, index: int):
value = (self._default_0, self._default_1)
self._data.insert(index, value)
self._status.modified()
return value
def insert(self, index: int, value):
self._data.insert(index, value)
self._status.modified()
def delete_i(self, indexes):
self._data = list(
map(
lambda e: e[1],
filter(
lambda e: e[0] not in indexes,
enumerate(self.data)
)
)
)
self._status.modified()
def delete(self, els):
self._data = list(
filter(
lambda e: e not in els,
self.data
)
)
self._status.modified()
def sort(self, _reverse=False, key=None):
if key is None:
self._data.sort(reverse=_reverse)
else:
self._data.sort(reverse=_reverse, key=key)
self._status.modified()
def index(self, bc):
self._data.index(bc)
def get_i(self, index):
return self.data[index]
def get_range(self, _range):
lst = []
for r in _range:
lst.append(r)
return lst
def _set_i_c_v(self, index, column, value):
v = list(self._data[index])
v[column] = self._types[column](value)
self._data[index] = tuple(v)
self._status.modified()
def set_i_0(self, index: int, value):
self._set_i_c_v(index, 0, value)
def set_i_1(self, index: int, value):
self._set_i_c_v(index, 1, value)
@timer
def convert(self, cls):
new = cls(name=self.name, status=self._status)
new.node = self.node
for i, _ in enumerate(self.data):
new.add(i)
for i in [0, 1]:
for j in [0, 1]:
if self._header[i] == new.header[j]:
for ind, v in self.data:
try:
new._set_i_c_v(ind, j, v[i])
except Exception as e:
logger.info(e)
return new
def move_up(self, index):
if index < len(self):
next = index - 1
d = self._data
d[index], d[next] = d[next], d[index]
self._status.modified()
def move_down(self, index):
if index >= 0:
prev = index + 1
d = self._data
d[index], d[prev] = d[prev], d[index]
self._status.modified()
def reach(self, river):
edges = []
if self._node != None:
if river != None:
for edge in river.edges():
if edge.node1.name == self._node.name:
edges.append(edge.reach)
if edge.node2.name == self._node.name:
edges.append(edge.reach)
return edges