Pamhyr2/src/View/Results/CustomPlot/Plot.py

1341 lines
41 KiB
Python

# Plot.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from functools import reduce
from datetime import datetime
from numpy import sqrt
from numpy import asarray
from tools import timer
from View.Tools.PamhyrPlot import PamhyrPlot
from View.Results.CustomPlot.Translate import CustomPlotTranslate
logger = logging.getLogger()
unit = {
"bed_elevation": "0-meter",
"bed_elevation_envelop": "0-meter",
"water_elevation": "0-meter",
"water_elevation_envelop": "0-meter",
"discharge": "1-m3s",
"discharge_envelop": "1-m3s",
"velocity": "2-ms",
"velocity_envelop": "2-ms",
"depth": "3-meter",
"depth_envelop": "3-meter",
"mean_depth": "3-meter",
"froude": "4-dimensionless",
"wet_area": "5-m2",
}
class CustomPlot(PamhyrPlot):
def __init__(self, x, y, envelop, reach, profile, timestamp,
data=None, canvas=None, trad=None, res_id=0,
toolbar=None, parent=None):
super(CustomPlot, self).__init__(
canvas=canvas,
trad=CustomPlotTranslate(),
data=data,
toolbar=toolbar,
parent=parent
)
self._x = x
self._y = y
self._envelop = envelop
self._reach = reach
self._profile = profile
self._timestamp = timestamp
self._current_res_id = res_id
self._parent = parent
logger.debug(
"Create custom plot for: " +
f"{x} -> {','.join(y)}: " +
f"reach={reach}, profile={profile}, " +
f"timestamp={timestamp}"
)
self._y_axes = sorted(
set(
map(
lambda y: unit[y],
self._y
)
)
)
self._axes = {}
self.lines = {}
def draw_bottom_with_bedload(self, reach):
self._bedrock = self.sl_compute_bedrock(reach)
rk = reach.geometry.get_rk()
z = self.sl_compute_current_z(reach)
return z
def sl_compute_current_z(self, reach):
z_br = self._bedrock
sl = self.sl_compute_current_rk(reach)
z = list(
map(
lambda z, sl: reduce(
lambda z, h: z + h[0],
sl, z
),
z_br, # Bedrock elevation
sl # Current sediment layers
)
)
return z
def sl_compute_bedrock(self, reach):
z_min = reach.geometry.get_z_min()
sl = self.sl_compute_initial(reach)
z = list(
map(
lambda z, sl: reduce(
lambda z, h: z - h[0],
sl, z
),
z_min, # Original geometry
sl # Original sediment layers
)
)
return z
def sl_compute_initial(self, reach):
"""
Get SL list for profile p at initial time (initial data)
"""
t0 = min(list(self.data[self._current_res_id].get("timestamps")))
return map(
lambda p: p.get_ts_key(t0, "sl")[0],
reach.profiles
)
def sl_compute_current_rk(self, reach):
"""
Get SL list for profile p at current time
"""
return map(
lambda p: p.get_ts_key(self._timestamp, "sl")[0],
reach.profiles
)
def get_ts_zmin(self, profile):
results = self.data[self._current_res_id]
nt = len(list(results.get("timestamps")))
reach = results.river.reach(self._reach)
berdrock = self.sl_compute_bedrock(reach)
sl = reach.profile(profile).get_key("sl")
ts_z_bedrock = [berdrock[profile]]*nt
ts_z_min = list(
map(
lambda z, sl: reduce(
lambda z, h: z + h,
sl, z
),
ts_z_bedrock, # Bedrock elevations
asarray(sl)[:, 0, :, 0] # Sediment layers
)
)
return ts_z_min
def _draw_rk(self):
results = self.data[self._current_res_id]
reach = results.river.reach(self._reach)
if self._current_res_id == 2: # compare results
reach1 = self.data[0].river.reach(self._reach)
reach2 = self.data[1].river.reach(self._reach)
rk = reach.geometry.get_rk()
if reach.has_sediment():
z_min = self.draw_bottom_with_bedload(reach)
else:
z_min = reach.geometry.get_z_min()
q = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Q"),
reach.profiles
)
)
z = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Z"),
reach.profiles
)
)
v = list(
map(
lambda p: p.get_ts_key(self._timestamp, "V"),
reach.profiles
)
)
shift = 0
compt = 0
for ax in sorted(self._axes):
if compt == 0:
self._axes[ax].spines['left'].set_position(('outward', shift))
compt += 1
else:
self._axes[ax].spines['right'].set_position(('outward', shift))
shift += 60
if len(self.lines) != 0:
self.lines = {}
if "bed_elevation" in self._y:
ax = self._axes[unit["bed_elevation"]]
line = ax.plot(
rk, z_min,
color='grey', lw=1.,
)
self.lines["bed_elevation"] = line
if self._envelop and reach.has_sediment():
ax = self._axes[unit["bed_elevation_envelop"]]
e = list(
map(
lambda p: max(self.get_ts_zmin(p)),
range(len(reach))
)
)
line1 = ax.plot(
rk, e,
color='grey', lw=1.,
linestyle='dotted',
)
self.lines["bed_elevation_envelop"] = line1
e = list(
map(
lambda p: min(self.get_ts_zmin(p)),
range(len(reach))
)
)
line2 = ax.plot(
rk, e,
color='grey', lw=1.,
linestyle='dotted',
)
# self.lines["bed_elevation_envelop"] = line2
if "water_elevation" in self._y:
ax = self._axes[unit["water_elevation"]]
line = ax.plot(
rk, z, lw=1.,
color='blue',
)
self.lines["water_elevation"] = line
if "bed_elevation" in self._y and self._current_res_id != 2:
self.fill = ax.fill_between(
rk, z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if self._envelop:
ax = self._axes[unit["water_elevation_envelop"]]
d = list(
map(
lambda p: max(p.get_key("Z")),
reach.profiles
)
)
line1 = ax.plot(
rk, d, lw=1.,
color='blue',
linestyle='dotted',
)
self.lines["water_elevation_envelop"] = line1
d = list(
map(
lambda p: min(p.get_key("Z")),
reach.profiles
)
)
line2 = ax.plot(
rk, d, lw=1.,
color='blue',
linestyle='dotted',
)
# self.lines["water_elevation_envelop2"] = line2
if "discharge" in self._y:
ax = self._axes[unit["discharge"]]
line = ax.plot(
rk, q, lw=1.,
color='r',
)
self.lines["discharge"] = line
if self._envelop:
ax = self._axes[unit["discharge_envelop"]]
q1 = list(
map(
lambda p: max(p.get_key("Q")),
reach.profiles
)
)
line1 = ax.plot(
rk, q1, lw=1.,
color='r',
linestyle='dotted',
)
self.lines["discharge_envelop"] = line1
q2 = list(
map(
lambda p: min(p.get_key("Q")),
reach.profiles
)
)
line2 = ax.plot(
rk, q2, lw=1.,
color='r',
linestyle='dotted',
)
# self.lines["discharge_envelop2"] = line2
if "velocity" in self._y:
ax = self._axes[unit["velocity"]]
line = ax.plot(
rk, v, lw=1.,
color='g',
)
self.lines["velocity"] = line
if self._envelop:
velocities = list(
map(
lambda p: p.get_key("V"),
reach.profiles
)
)
ax = self._axes[unit["velocity_envelop"]]
vmax = [max(v) for v in velocities]
line1 = ax.plot(
rk, vmax, lw=1.,
color='g',
linestyle='dotted',
)
self.lines["velocity_envelop"] = line1
vmin = [min(v) for v in velocities]
line2 = ax.plot(
rk, vmin, lw=1.,
color='g',
linestyle='dotted',
)
# self.lines["velocity_envelop2"] = line2
if "depth" in self._y:
ax = self._axes[unit["depth"]]
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
rk, d,
color='brown', lw=1.,
)
self.lines["depth"] = line
if self._envelop and self._current_res_id != 2:
ax = self._axes[unit["depth_envelop"]]
d = list(map(
lambda p1, p2: p1 - p2, map(
lambda p: max(p.get_key("Z")),
reach.profiles
), z_min)
)
line1 = ax.plot(
rk, d,
color='brown', lw=1.,
linestyle='dotted',
)
self.lines["depth_envelop"] = line1
d = list(map(
lambda p1, p2: p1 - p2, map(
lambda p: min(p.get_key("Z")),
reach.profiles
), z_min)
)
line2 = ax.plot(
rk, d,
color='brown', lw=1.,
linestyle='dotted',
)
# self.lines["depth_envelop2"] = line2
if "mean_depth" in self._y:
ax = self._axes[unit["mean_depth"]]
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
rk, d,
color='orange', lw=1.,
)
self.lines["mean_depth"] = line
if "froude" in self._y:
ax = self._axes[unit["froude"]]
if self._current_res_id != 2:
fr = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach.profiles
)
)
else:
fr1 = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach1.profiles
)
)
fr2 = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach2.profiles
)
)
fr = list(
map(
lambda x, y: x - y,
fr1, fr2
)
)
line = ax.plot(
rk, fr, color='black', linestyle='--', lw=1.,
)
self.lines["froude"] = line
if "wet_area" in self._y:
ax = self._axes[unit["wet_area"]]
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
rk, d,
color='blue', linestyle='--', lw=1.,
)
self.lines["wet_area"] = line
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: self.lines[line], self.lines),
[]
)
labs = list(map(lambda line: self._trad[line], self.lines))
self.canvas.axes.legend(lns, labs, loc="best")
def _redraw_rk(self):
results = self.data[self._current_res_id]
reach = results.river.reach(self._reach)
if self._current_res_id == 2: # compare results
reach1 = self.data[0].river.reach(self._reach)
reach2 = self.data[1].river.reach(self._reach)
rk = reach.geometry.get_rk()
z_min = reach.geometry.get_z_min()
if reach.has_sediment():
z_min = self.draw_bottom_with_bedload(reach)
else:
z_min = reach.geometry.get_z_min()
q = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Q"),
reach.profiles
)
)
z = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Z"),
reach.profiles
)
)
v = list(
map(
lambda p: p.get_ts_key(self._timestamp, "V"),
reach.profiles
)
)
if "bed_elevation" in self._y:
self.lines["bed_elevation"][0].set_ydata(z_min)
if "water_elevation" in self._y:
self.lines["water_elevation"][0].set_ydata(z)
if "bed_elevation" in self._y and self._current_res_id != 2:
ax = self._axes[unit["water_elevation"]]
self.fill.remove()
self.fill = ax.fill_between(
rk, z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
self.lines["discharge"][0].set_ydata(q)
if "velocity" in self._y:
self.lines["velocity"][0].set_ydata(v)
if "depth" in self._y:
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.max_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["depth"][0].set_ydata(d)
if "mean_depth" in self._y:
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.mean_water_depth(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["mean_depth"][0].set_ydata(d)
if "froude" in self._y:
if self._current_res_id != 2:
fr = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach.profiles
)
)
else:
fr1 = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach1.profiles
)
)
fr2 = list(
map(
lambda p:
p.get_ts_key(self._timestamp, "V") /
sqrt(9.81 * (
p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")) /
p.geometry.wet_width(
p.get_ts_key(self._timestamp, "Z"))
)),
reach2.profiles
)
)
fr = list(
map(
lambda x, y: x - y,
fr1, fr2
)
)
self.lines["froude"][0].set_ydata(fr)
if "wet_area" in self._y:
if self._current_res_id != 2:
d = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach.profiles
)
)
else:
d1 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach1.profiles
)
)
d2 = list(
map(
lambda p: p.geometry.wet_area(
p.get_ts_key(self._timestamp, "Z")),
reach2.profiles
)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["wet_area"][0].set_ydata(d)
def _customize_x_axes_time(self, ts, mode="time"):
# Custom time display
nb = len(ts)
mod = int(nb / 5)
mod = mod if mod > 0 else nb
fx = list(
map(
lambda x: x[1],
filter(
lambda x: x[0] % mod == 0,
enumerate(ts)
)
)
)
if mode == "time":
t0 = datetime.fromtimestamp(0)
xt = list(
map(
lambda v: (
str(
datetime.fromtimestamp(v) - t0
).split(",")[0]
.replace("days", self._trad["days"])
.replace("day", self._trad["day"])
),
fx
)
)
else:
xt = list(
map(
lambda v: str(datetime.fromtimestamp(v).date()),
fx
)
)
self.canvas.axes.set_xticks(ticks=fx, labels=xt, rotation=45)
def _draw_time(self):
results = self.data[self._current_res_id]
reach = results.river.reach(self._reach)
profile = reach.profile(self._profile)
shift = 0
compt = 0
for ax in sorted(self._axes):
if compt == 0:
self._axes[ax].spines['left'].set_position(('outward', shift))
compt += 1
else:
self._axes[ax].spines['right'].set_position(('outward', shift))
shift += 60
ts = self._parent._timestamps
if self._current_res_id == 2: # compare results
reach1 = self.data[0].river.reach(self._reach)
reach2 = self.data[1].river.reach(self._reach)
profile1 = reach1.profile(self._profile)
profile2 = reach2.profile(self._profile)
q1 = profile1.get_key("Q")
z1 = profile1.get_key("Z")
v1 = profile1.get_key("V")
q2 = profile2.get_key("Q")
z2 = profile2.get_key("Z")
v2 = profile2.get_key("V")
q = profile.get_key("Q")
z = profile.get_key("Z")
v = profile.get_key("V")
z_min = profile.geometry.z_min()
if reach.has_sediment():
ts_z_min = self.get_ts_zmin(self._profile)
else:
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
self.lines = {}
if "bed_elevation" in self._y:
# Z min is constant in time
ax = self._axes[unit["bed_elevation"]]
line = ax.plot(
ts, ts_z_min,
color='grey', lw=1.
)
self.lines["bed_elevation"] = line
if "water_elevation" in self._y:
ax = self._axes[unit["water_elevation"]]
line = ax.plot(
ts, z, lw=1.,
color='b',
)
self.lines["water_elevation"] = line
if "bed_elevation" in self._y and self._current_res_id != 2:
self.fill = ax.fill_between(
ts, ts_z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
ax = self._axes[unit["discharge"]]
line = ax.plot(
ts, q, lw=1.,
color='r',
)
self.lines["discharge"] = line
if "velocity" in self._y:
ax = self._axes[unit["velocity"]]
line = ax.plot(
ts, v, lw=1.,
color='g',
)
self.lines["velocity"] = line
if "depth" in self._y:
ax = self._axes[unit["depth"]]
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.max_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.max_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.max_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
ts, d,
color='brown', lw=1.,
)
self.lines["depth"] = line
if "mean_depth" in self._y:
ax = self._axes[unit["mean_depth"]]
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.mean_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.mean_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.mean_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
ts, d,
color='orange', lw=1.,
)
self.lines["mean_depth"] = line
if "froude" in self._y:
ax = self._axes[unit["froude"]]
if self._current_res_id != 2:
d = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z, v)
)
else:
d1 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z1, v1)
)
d2 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z2, v2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
ts, d, color='black', linestyle='--', lw=1.,
)
self.lines["froude"] = line
if "wet_area" in self._y:
ax = self._axes[unit["wet_area"]]
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.wet_area(z), z)
)
else:
d1 = list(
map(lambda z: profile.geometry.wet_area(z), z1)
)
d2 = list(
map(lambda z: profile.geometry.wet_area(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
line = ax.plot(
ts, d, color='blue', linestyle='--', lw=1.,
)
self.lines["wet_area"] = line
self._customize_x_axes_time(ts)
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: self.lines[line], self.lines),
[]
)
labs = list(map(lambda line: self._trad[line], self.lines))
self.canvas.axes.legend(lns, labs, loc="best")
def _redraw_time(self):
results = self.data[self._current_res_id]
reach = results.river.reach(self._reach)
profile = reach.profile(self._profile)
ts = list(results.get("timestamps"))
ts.sort()
if self._current_res_id == 2: # compare results
reach1 = self.data[0].river.reach(self._reach)
reach2 = self.data[1].river.reach(self._reach)
profile1 = reach1.profile(self._profile)
profile2 = reach2.profile(self._profile)
q1 = profile1.get_key("Q")
z1 = profile1.get_key("Z")
v1 = profile1.get_key("V")
q2 = profile2.get_key("Q")
z2 = profile2.get_key("Z")
v2 = profile2.get_key("V")
q = profile.get_key("Q")
z = profile.get_key("Z")
v = profile.get_key("V")
if reach.has_sediment():
ts_z_min = self.get_ts_zmin(self._profile)
else:
z_min = profile.geometry.z_min()
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
if "bed_elevation" in self._y:
self.lines["bed_elevation"][0].set_ydata(ts_z_min)
if "water_elevation" in self._y:
self.lines["water_elevation"][0].set_ydata(z)
if "bed_elevation" in self._y and self._current_res_id != 2:
ax = self._axes[unit["bed_elevation"]]
self.fill.remove()
self.fill = ax.fill_between(
ts, ts_z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
self.lines["discharge"][0].set_ydata(q)
if "velocity" in self._y:
self.lines["velocity"][0].set_ydata(v)
if "depth" in self._y:
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.max_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.max_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.max_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["depth"][0].set_ydata(d)
if "mean_depth" in self._y:
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.mean_water_depth(z), z)
)
else:
d1 = list(
map(lambda z: profile1.geometry.mean_water_depth(z), z1)
)
d2 = list(
map(lambda z: profile2.geometry.mean_water_depth(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["mean_depth"][0].set_ydata(d)
if "froude" in self._y:
if self._current_res_id != 2:
d = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z, v)
)
else:
d1 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z1, v1)
)
d2 = list(
map(lambda z, v:
v /
sqrt(9.81 * (
profile.geometry.wet_area(z) /
profile.geometry.wet_width(z))
), z2, v2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["froude"][0].set_ydata(d)
if "wet_area" in self._y:
if self._current_res_id != 2:
d = list(
map(lambda z: profile.geometry.wet_area(z), z)
)
else:
d1 = list(
map(lambda z: profile.geometry.wet_area(z), z1)
)
d2 = list(
map(lambda z: profile.geometry.wet_area(z), z2)
)
d = list(
map(
lambda x, y: x - y,
d1, d2
)
)
self.lines["wet_area"][0].set_ydata(d)
self.canvas.axes.relim(visible_only=True)
self.canvas.axes.autoscale_view()
def draw(self):
self.draw_static()
def draw_update(self):
if self._x == "rk":
self._redraw_rk()
elif self._x == "time":
self._redraw_time()
@timer
def draw_static(self):
self.canvas.axes.cla()
self.canvas.axes.grid(color='grey', linestyle='--', linewidth=0.5)
if self.data is None:
return
self._y_axes = sorted(
set(
map(
lambda y: unit[y],
self._y
)
)
)
self.canvas.axes.set_xlabel(
self._trad[self._x],
color='black', fontsize=10
)
if self._current_res_id != 2:
self.canvas.axes.set_ylabel(
self._trad[self._y_axes[0]],
color='black', fontsize=10
)
else:
self.canvas.axes.set_ylabel(
"Δ " + self._trad[self._y_axes[0]],
color='black', fontsize=10
)
self._axes[self._y_axes[0]] = self.canvas.axes
if len(self.lines) != 0:
self.lines.clear()
for axes in self._y_axes[1:]:
print("axes: ", axes)
if axes in self._axes.keys():
for l in self._axes[axes].lines:
l.remove()
else:
ax_new = self.canvas.axes.twinx()
self._axes[axes] = ax_new
if self._current_res_id != 2:
self._axes[axes].set_ylabel(
self._trad[axes],
color='black', fontsize=10
)
else:
self._axes[axes].set_ylabel(
"Δ " + self._trad[axes],
color='black', fontsize=10
if self._x == "rk":
self._draw_rk()
elif self._x == "time":
self._draw_time()
if self._x == "rk":
reach = self.data[self._current_res_id].river.reach(self._reach)
profile = reach.profile(self._profile)
x = profile.rk
elif self._x == "time":
x = self._timestamp
self._current, = self.canvas.axes.plot(
[x, x],
self.canvas.axes.get_ylim(),
# label=self.label_timestamp,
color='grey',
linestyle="dashed",
lw=1.,
)
if self.toolbar is not None:
self.toolbar.update()
self.canvas.draw_idle()
@timer
def update(self):
self.draw_update()
self.draw_current()
# self.draw_static()
return
def set_reach(self, reach_id):
self._reach = reach_id
self._profile = 0
self.update()
def set_profile(self, profile_id):
self._profile = profile_id
if self._x != "rk":
self.update()
else:
self.draw_current()
def set_result(self, res_id):
self._current_res_id = res_id
self.draw()
def set_timestamp(self, timestamp):
self._timestamp = timestamp
if self._x != "time":
self.update()
else:
self.draw_current()
def draw_current(self):
if self._x == "rk":
reach = self.data[self._current_res_id].river.reach(self._reach)
profile = reach.profile(self._profile)
x = profile.rk
elif self._x == "time":
x = self._timestamp
self._current.set_data([x, x], self.canvas.axes.get_ylim())
self.canvas.draw_idle()