Pamhyr2/src/View/Results/CustomPlot/Plot.py

820 lines
23 KiB
Python

# Plot.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from functools import reduce
from datetime import datetime
from numpy import sqrt
from tools import timer
from View.Tools.PamhyrPlot import PamhyrPlot
from View.Results.CustomPlot.Translate import CustomPlotTranslate
logger = logging.getLogger()
unit = {
"bed_elevation": "0-meter",
"water_elevation": "0-meter",
"water_elevation_envelop": "0-meter",
"discharge": "1-m3s",
"discharge_envelop": "1-m3s",
"velocity": "2-ms",
"velocity_envelop": "2-ms",
"depth": "3-meter",
"depth_envelop": "3-meter",
"mean_depth": "3-meter",
"froude": "4-dimensionless",
"wet_area": "5-m2",
}
class CustomPlot(PamhyrPlot):
def __init__(self, x, y, reach, profile, timestamp,
data=None, canvas=None, trad=None,
toolbar=None, parent=None):
super(CustomPlot, self).__init__(
canvas=canvas,
trad=CustomPlotTranslate(),
data=data,
toolbar=toolbar,
parent=parent
)
self._x = x
self._y = y
self._reach = reach
self._profile = profile
self._timestamp = timestamp
logger.debug(
"Create custom plot for: " +
f"{x} -> {','.join(y)}: " +
f"reach={reach}, profile={profile}, " +
f"timestamp={timestamp}"
)
self._y_axes = sorted(
set(
map(
lambda y: unit[y],
self._y
)
)
)
self._axes = {}
def _draw_rk(self):
results = self.data
reach = results.river.reach(self._reach)
rk = reach.geometry.get_rk()
z_min = reach.geometry.get_z_min()
q = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Q"),
reach.profiles
)
)
z = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Z"),
reach.profiles
)
)
shift = 0
compt = 0
for ax in sorted(self._axes):
if compt == 0:
self._axes[ax].spines['left'].set_position(('outward', shift))
compt += 1
else:
self._axes[ax].spines['right'].set_position(('outward', shift))
shift += 60
self.lines = {}
if "bed_elevation" in self._y:
ax = self._axes[unit["bed_elevation"]]
line = ax.plot(
rk, z_min,
color='grey', lw=1.,
)
self.lines["bed_elevation"] = line
if "water_elevation" in self._y:
ax = self._axes[unit["water_elevation"]]
line = ax.plot(
rk, z, lw=1.,
color='blue',
)
self.lines["water_elevation"] = line
if "bed_elevation" in self._y:
self.fill = ax.fill_between(
rk, z_min, z,
color='blue', alpha=0.5, interpolate=True
)
#if "water_elevation_envelop" in self._y:
ax = self._axes[unit["water_elevation_envelop"]]
d = list(
map(
lambda p: max(p.get_key("Z")),
reach.profiles
)
)
line1 = ax.plot(
rk, d, lw=1.,
color='blue',
linestyle='dotted',
)
self.lines["water_elevation_envelop"] = line1
d = list(
map(
lambda p: min(p.get_key("Z")),
reach.profiles
)
)
line2 = ax.plot(
rk, d, lw=1.,
color='blue',
linestyle='dotted',
)
#self.lines["water_elevation_envelop2"] = line2
if "discharge" in self._y:
ax = self._axes[unit["discharge"]]
line = ax.plot(
rk, q, lw=1.,
color='r',
)
self.lines["discharge"] = line
#if "discharge_envelop" in self._y:
ax = self._axes[unit["discharge_envelop"]]
q1 = list(
map(
lambda p: max(p.get_key("Q")),
reach.profiles
)
)
line1 = ax.plot(
rk, q1, lw=1.,
color='r',
linestyle='dotted',
)
self.lines["discharge_envelop"] = line1
q2 = list(
map(
lambda p: min(p.get_key("Q")),
reach.profiles
)
)
line2 = ax.plot(
rk, q2, lw=1.,
color='r',
linestyle='dotted',
)
#self.lines["discharge_envelop2"] = line2
if "velocity" in self._y:
ax = self._axes[unit["velocity"]]
v = list(
map(
lambda p: p.geometry.speed(
p.get_ts_key(self._timestamp, "Q"),
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
line = ax.plot(
rk, v, lw=1.,
color='g',
)
self.lines["velocity"] = line
#if "velocity_envelop" in self._y:
velocities = list(map(lambda p:
list(map(lambda q, z:
p.geometry.speed(q, z),
p.get_key("Q"), p.get_key("Z")
)), reach.profiles
)
)
ax = self._axes[unit["velocity_envelop"]]
vmax = [max(v) for v in velocities]
line1 = ax.plot(
rk, vmax, lw=1.,
color='g',
linestyle='dotted',
)
self.lines["velocity_envelop"] = line1
vmin = [min(v) for v in velocities]
line2 = ax.plot(
rk, vmin, lw=1.,
color='g',
linestyle='dotted',
)
#self.lines["velocity_envelop2"] = line2
if "depth" in self._y:
ax = self._axes[unit["depth"]]
d = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
line = ax.plot(
rk, d,
color='brown', lw=1.,
)
self.lines["depth"] = line
#if "depth_envelop" in self._y:
ax = self._axes[unit["depth_envelop"]]
d = list(map(lambda p1, p2: p1 - p2,
map(
lambda p: max(p.get_key("Z")),
reach.profiles
), z_min)
)
line1 = ax.plot(
rk, d,
color='brown', lw=1.,
linestyle='dotted',
)
self.lines["depth_envelop"] = line1
d = list(map(lambda p1, p2: p1 - p2,
map(
lambda p: min(p.get_key("Z")),
reach.profiles
), z_min)
)
line2 = ax.plot(
rk, d,
color='brown', lw=1.,
linestyle='dotted',
)
#self.lines["depth_envelop2"] = line2
if "mean_depth" in self._y:
ax = self._axes[unit["mean_depth"]]
d = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
line = ax.plot(
rk, d,
color='orange', lw=1.,
)
self.lines["mean_depth"] = line
if "froude" in self._y:
ax = self._axes[unit["froude"]]
fr = list(
map(
lambda p:
p.geometry.speed(
p.get_ts_key(self._timestamp, "Q"),
p.get_ts_key(self._timestamp, "Z")) /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach.profiles
)
)
line = ax.plot(
rk, fr, color='black', linestyle='--', lw=1.,
)
self.lines["froude"] = line
if "wet_area" in self._y:
ax = self._axes[unit["wet_area"]]
d = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
line = ax.plot(
rk, d,
color='blue', linestyle='--', lw=1.,
)
self.lines["wet_area"] = line
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: self.lines[line], self.lines),
[]
)
labs = list(map(lambda line: self._trad[line], self.lines))
self.canvas.axes.legend(lns, labs, loc="best")
def _redraw_rk(self):
results = self.data
reach = results.river.reach(self._reach)
rk = reach.geometry.get_rk()
z_min = reach.geometry.get_z_min()
q = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Q"),
reach.profiles
)
)
z = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Z"),
reach.profiles
)
)
if "bed_elevation" in self._y:
self.lines["bed_elevation"][0].set_ydata(z_min)
if "water_elevation" in self._y:
self.lines["water_elevation"][0].set_ydata(z)
if "bed_elevation" in self._y:
ax = self._axes[unit["water_elevation"]]
self.fill.remove()
self.fill = ax.fill_between(
rk, z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
self.lines["discharge"][0].set_ydata(q)
if "velocity" in self._y:
v = list(
map(
lambda p: p.geometry.speed(
p.get_ts_key(self._timestamp, "Q"),
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
self.lines["discharge"][0].set_ydata(v)
if "depth" in self._y:
d = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
self.lines["depth"][0].set_ydata(d)
if "mean_depth" in self._y:
d = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
self.lines["mean_depth"][0].set_ydata(d)
if "froude" in self._y:
fr = list(
map(
lambda p:
p.geometry.speed(
p.get_ts_key(self._timestamp, "Q"),
p.get_ts_key(self._timestamp, "Z")) /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach.profiles
)
)
self.lines["froude"][0].set_ydata(fr)
if "wet_area" in self._y:
d = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
self.lines["wet_area"][0].set_ydata(d)
def _customize_x_axes_time(self, ts, mode="time"):
# Custom time display
nb = len(ts)
mod = int(nb / 5)
mod = mod if mod > 0 else nb
fx = list(
map(
lambda x: x[1],
filter(
lambda x: x[0] % mod == 0,
enumerate(ts)
)
)
)
if mode == "time":
t0 = datetime.fromtimestamp(0)
xt = list(
map(
lambda v: (
str(
datetime.fromtimestamp(v) - t0
).split(",")[0]
.replace("days", self._trad["days"])
.replace("day", self._trad["day"])
),
fx
)
)
else:
xt = list(
map(
lambda v: str(datetime.fromtimestamp(v).date()),
fx
)
)
self.canvas.axes.set_xticks(ticks=fx, labels=xt, rotation=45)
def _draw_time(self):
results = self.data
reach = results.river.reach(self._reach)
profile = reach.profile(self._profile)
shift = 0
compt = 0
for ax in sorted(self._axes):
if compt == 0:
self._axes[ax].spines['left'].set_position(('outward', shift))
compt += 1
else:
self._axes[ax].spines['right'].set_position(('outward', shift))
shift += 60
ts = list(results.get("timestamps"))
ts.sort()
q = profile.get_key("Q")
z = profile.get_key("Z")
z_min = profile.geometry.z_min()
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
self.lines = {}
if "bed_elevation" in self._y:
# Z min is constant in time
ax = self._axes[unit["bed_elevation"]]
line = ax.plot(
ts, ts_z_min,
color='grey', lw=1.
)
self.lines["bed_elevation"] = line
if "water_elevation" in self._y:
ax = self._axes[unit["water_elevation"]]
line = ax.plot(
ts, z, lw=1.,
color='b',
)
self.lines["water_elevation"] = line
if "bed_elevation" in self._y:
self.fill = ax.fill_between(
ts, ts_z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
ax = self._axes[unit["discharge"]]
line = ax.plot(
ts, q, lw=1.,
color='r',
)
self.lines["discharge"] = line
if "velocity" in self._y:
ax = self._axes[unit["velocity"]]
v = list(
map(
lambda q, z: profile.geometry.speed(q, z),
q, z
)
)
line = ax.plot(
ts, v, lw=1.,
color='g',
)
self.lines["velocity"] = line
if "depth" in self._y:
ax = self._axes[unit["depth"]]
d = list(
map(lambda z: profile.geometry.max_water_depth(z), z)
)
line = ax.plot(
ts, d,
color='brown', lw=1.,
)
self.lines["depth"] = line
if "mean_depth" in self._y:
ax = self._axes[unit["mean_depth"]]
d = list(
map(lambda z: profile.geometry.mean_water_depth(z), z)
)
line = ax.plot(
ts, d,
color='orange', lw=1.,
)
self.lines["mean_depth"] = line
if "froude" in self._y:
ax = self._axes[unit["froude"]]
d = list(
map(lambda z, q:
profile.geometry.speed(q, z) /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z, q)
)
line = ax.plot(
ts, d, color='black', linestyle='--', lw=1.,
)
self.lines["froude"] = line
if "wet_area" in self._y:
ax = self._axes[unit["wet_area"]]
d = list(
map(lambda z: profile.geometry.wet_area(z), z)
)
line = ax.plot(
ts, d, color='blue', linestyle='--', lw=1.,
)
self.lines["wet_area"] = line
self._customize_x_axes_time(ts)
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: self.lines[line], self.lines),
[]
)
labs = list(map(lambda line: self._trad[line], self.lines))
self.canvas.axes.legend(lns, labs, loc="best")
def _redraw_time(self):
results = self.data
reach = results.river.reach(self._reach)
profile = reach.profile(self._profile)
ts = list(results.get("timestamps"))
ts.sort()
q = profile.get_key("Q")
z = profile.get_key("Z")
z_min = profile.geometry.z_min()
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
if "water_elevation" in self._y:
self.lines["water_elevation"][0].set_ydata(z)
if "bed_elevation" in self._y:
ax = self._axes[unit["bed_elevation"]]
self.fill.remove()
self.fill = ax.fill_between(
ts, ts_z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
self.lines["discharge"][0].set_ydata(q)
if "velocity" in self._y:
v = list(
map(
lambda q, z: profile.geometry.speed(q, z),
q, z
)
)
self.lines["velocity"][0].set_ydata(v)
if "depth" in self._y:
d = list(
map(lambda z: profile.geometry.max_water_depth(z), z)
)
self.lines["depth"][0].set_ydata(d)
if "mean_depth" in self._y:
d = list(
map(lambda z: profile.geometry.mean_water_depth(z), z)
)
self.lines["mean_depth"][0].set_ydata(d)
if "froude" in self._y:
d = list(
map(lambda z, q:
profile.geometry.speed(q, z) /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z, q)
)
self.lines["froude"][0].set_ydata(d)
if "wet_area" in self._y:
d = list(
map(lambda z: profile.geometry.wet_area(z), z)
)
self.lines["wet_area"][0].set_ydata(d)
def draw(self):
self.draw_static()
def draw_update(self):
if self._x == "rk":
self._redraw_rk()
elif self._x == "time":
self._redraw_time()
@timer
def draw_static(self):
self.canvas.axes.cla()
self.canvas.axes.grid(color='grey', linestyle='--', linewidth=0.5)
if self.data is None:
return
self.canvas.axes.set_xlabel(
self._trad[self._x],
color='black', fontsize=10
)
self.canvas.axes.set_ylabel(
self._trad[self._y_axes[0]],
color='black', fontsize=10
)
self._axes[self._y_axes[0]] = self.canvas.axes
for axes in self._y_axes[1:]:
if axes in self._axes:
self._axes[axes].clear()
continue
ax_new = self.canvas.axes.twinx()
ax_new.set_ylabel(
self._trad[axes],
color='black', fontsize=10
)
self._axes[axes] = ax_new
if self._x == "rk":
self._draw_rk()
elif self._x == "time":
self._draw_time()
if self._x == "rk":
reach = self.data.river.reach(self._reach)
profile = reach.profile(self._profile)
x = profile.rk
elif self._x == "time":
x = self._timestamp
self._current, = self.canvas.axes.plot(
[x, x],
self.canvas.axes.get_ylim(),
# label=self.label_timestamp,
color='grey',
linestyle="dashed",
lw=1.,
)
self.canvas.figure.canvas.draw_idle()
if self.toolbar is not None:
self.toolbar.update()
@timer
def update(self):
if not self._init:
#self.draw_static()
self.draw_current()
self.draw_update()
return
def set_reach(self, reach_id):
self._reach = reach_id
self._profile = 0
self.update()
def set_profile(self, profile_id):
self._profile = profile_id
if self._x != "rk":
self.update()
else:
self.draw_current()
def set_timestamp(self, timestamp):
self._timestamp = timestamp
if self._x != "time":
self.update()
else:
self.draw_current()
def draw_current(self):
if self._x == "rk":
reach = self.data.river.reach(self._reach)
profile = reach.profile(self._profile)
x = profile.rk
elif self._x == "time":
x = self._timestamp
self._current.set_data([x, x], self.canvas.axes.get_ylim())
self.canvas.figure.canvas.draw_idle()