Merge branch 'master' of gitlab-ssh.irstea.fr:theophile.terraz/pamhyr

setup.py
Theophile Terraz 2023-11-02 16:44:02 +01:00
commit 90776f0af9
47 changed files with 2057 additions and 589 deletions

View File

@ -16,8 +16,9 @@
stages:
- downloads
- build
- configure
- test
- build
- package
- release
@ -59,12 +60,12 @@ dl-mage-windows:
- mage-windows/mage_extraire.exe
- mage-windows/mailleurPF.exe
#########
# BUILD #
#########
#############
# CONFIGURE #
#############
set-version:
stage: build
stage: configure
tags:
- linux
script:
@ -75,7 +76,7 @@ set-version:
- VERSION
build-lang:
stage: build
stage: configure
tags:
- linux
script:
@ -85,6 +86,49 @@ build-lang:
paths:
- src/lang/*.qm
#########
# TESTS #
#########
unittest:
stage: test
tags:
- linux
needs:
- job: set-version
artifacts: true
script:
- python3 -m venv venv
- . venv/bin/activate
- pip3 install -U pip
- pip3 install -r ./full-requirements.txt
- pip3 install -U -r ./full-requirements.txt
- cd src
- python3 -m unittest discover -t .
test-pep8:
stage: test
tags:
- linux
needs:
- job: set-version
artifacts: true
- job: unittest
script:
# Setup virtual env
- python3 -m venv venv
- . venv/bin/activate
- pip3 install -U pip
- pip3 install -r ./requirements.txt
- pip3 install -U -r ./requirements.txt
- pip3 install pycodestyle
- pycodestyle ./src
allow_failure: true
#########
# BUILD #
#########
build-users-doc:
stage: build
tags:
@ -242,56 +286,6 @@ build-windows:
paths:
- windows/pamhyr
#########
# TESTS #
#########
test-pep8:
stage: test
tags:
- linux
needs:
- job: set-version
artifacts: true
script:
- mkdir -p pep8
- cd pep8
# Setup virtual env
- python3 -m venv venv
- . venv/bin/activate
- pip3 install -U pip
- pip3 install -r ../requirements.txt
- pip3 install -U -r ../requirements.txt
- pip3 install pycodestyle
- pycodestyle ../src
allow_failure: true
# test-windows:
# stage: test
# tags:
# - wine
# needs:
# - job: set-version
# artifacts: true
# - job: build-windows
# artifacts: true
# script:
# - cd windows\pamhyr
# - pamhyr\pamhyr.exe hello
# test-linux:
# stage: test
# tags:
# - linux
# needs:
# - job: set-version
# artifacts: true
# - job: build-linux
# artifacts: true
# script:
# - cd linux/pamhyr
# - ./Pamhyr2 hello
############
# PACKAGES #
############

View File

@ -174,7 +174,7 @@ class Data(SQLSubModel):
def _update_from_kp(self):
min = self._update_get_min()
self._elevation = min - self._height
self._elevation = min + self._height
def _update_from_elevation(self):
min = self._update_get_min()

View File

@ -290,3 +290,11 @@ class Study(SQLModel):
self._save_submodel([self._river])
self.commit()
def close(self):
"""Close db connection
Returns:
Nothing.
"""
self._close()

17
src/Model/__init__.py Normal file
View File

@ -0,0 +1,17 @@
# __init__.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-

110
src/Model/test_Model.py Normal file
View File

@ -0,0 +1,110 @@
# test_Model.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import unittest
import tempfile
from Model.Saved import SavedStatus
from Model.Study import Study
from Model.River import River
class StudyTestCase(unittest.TestCase):
def test_create_study(self):
study = Study.new("foo", "bar")
self.assertEqual(study.name, "foo")
self.assertEqual(study.description, "bar")
def test_open_study(self):
study = Study.open("../tests_cases/Enlargement/Enlargement.pamhyr")
self.assertNotEqual(study, None)
self.assertEqual(study.name, "Enlargement")
def test_save_open_study(self):
study = Study.new("foo", "bar")
dir = tempfile.mkdtemp()
f = os.path.join(dir, "foo.pamhyr")
# Save study
study.filename = f
study.save()
study.close()
# Reopen study
study = Study.open(f)
# Check
self.assertNotEqual(study, None)
self.assertEqual(study.name, "foo")
self.assertEqual(study.description, "bar")
def test_create_study_river(self):
study = Study.new("foo", "bar")
self.assertNotEqual(study.river, None)
class RiverTestCase(unittest.TestCase):
def test_create_river(self):
status = SavedStatus()
river = River(status=status)
self.assertNotEqual(river, None)
def test_create_river_nodes(self):
status = SavedStatus()
river = River(status=status)
self.assertNotEqual(river, None)
# Add nodes
n0 = river.add_node()
n1 = river.add_node(x=1.0, y=0.0)
n2 = river.add_node(x=0.0, y=1.0)
# Checks
self.assertEqual(river.nodes_counts(), 3)
nodes = river.nodes()
self.assertEqual(nodes[0], n0)
self.assertEqual(nodes[1], n1)
self.assertEqual(nodes[2], n2)
def test_create_river_edges(self):
status = SavedStatus()
river = River(status=status)
self.assertNotEqual(river, None)
# Add nodes
n0 = river.add_node()
n1 = river.add_node(x=1.0, y=0.0)
n2 = river.add_node(x=0.0, y=1.0)
self.assertEqual(river.nodes_counts(), 3)
# Add edges
e0 = river.add_edge(n0, n1)
e1 = river.add_edge(n1, n2)
# Checks
self.assertEqual(river.edges_counts(), 2)
edges = river.edges()
self.assertEqual(edges[0], e0)
self.assertEqual(edges[1], e1)

38
src/Scripts/ListSolver.py Normal file
View File

@ -0,0 +1,38 @@
# ListSolver.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
from Scripts.AScript import AScript
logger = logging.getLogger()
class ScriptListSolver(AScript):
name = "ListSolver"
description = "List configured solver(s) for Pamhyr2"
def usage(self):
logger.info(f"Usage : {self._args[0]} {self._args[1]}")
def run(self):
for solver in self._conf.solvers:
print(f"{solver.name:<16} ({solver.type}): {solver.description}")
return 0

113
src/Scripts/Run.py Normal file
View File

@ -0,0 +1,113 @@
# Run.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
from queue import Queue
from PyQt5.QtCore import QProcess
from Scripts.AScript import AScript
from Model.Study import Study
logger = logging.getLogger()
class ScriptRun(AScript):
name = "Run"
description = "Run solver on Pamhyr2 a study"
def usage(self):
logger.info(
f"Usage : {self._args[0]} {self._args[1]} " +
"<SOLVER> <STUDY>"
)
def run(self):
if len(self._args) < 4:
return 1
command = self._args[1]
solver_name = self._args[2]
study_file = os.path.abspath(
self._args[3]
)
try:
solver = next(
filter(
lambda solver: solver.name == solver_name,
self._conf.solvers
)
)
except Exception as e:
logger.error(f"No solver found: {e}")
return 2
study = Study.open(study_file)
self._solver = solver
self._study = study
logger.info(
f"Run {solver.name} ({solver.type}) " +
"on study '{study.name}' ({study_file})"
)
# Workdir
workdir = os.path.join(
os.path.dirname(study.filename),
"_PAMHYR_",
study.name.replace(" ", "_"),
solver.name.replace(" ", "_")
)
os.makedirs(workdir, exist_ok=True)
logger.info(f"Set working dir to {workdir}")
# Preparate process
p = QProcess(None)
p.setWorkingDirectory(workdir)
self._q = Queue()
# Export and Run
logger.info(f"~Export~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
solver.export(study, workdir, qlog=self._q)
while self._q.qsize() != 0:
s = self._q.get()
logger.info(s)
if command == "run":
logger.info(f"~Run~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
solver.run(
study,
process=p,
output_queue=self._q
)
p.waitForFinished()
logger.info(f"~End~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
return 0
class ScriptExport(ScriptRun):
name = "Export"
description = "Export Pamhyr2 study for solver"

View File

@ -59,17 +59,6 @@ class AbstractSolver(object):
self._name = name
self._description = ""
self._path_input = ""
self._path_solver = ""
self._path_output = ""
self._cmd_input = ""
self._cmd_solver = ""
self._cmd_output = ""
self._process = None
self._output = None
# Last study running
self._study = None
@ -92,7 +81,6 @@ class AbstractSolver(object):
("all_init_time", "000:00:00:00"),
("all_final_time", "999:99:00:00"),
("all_timestep", "300.0"),
("all_command_line_arguments", ""),
]
return lst
@ -141,18 +129,6 @@ class AbstractSolver(object):
def description(self, description):
self._description = description
def set_input(self, path, cmd):
self._path_input = path
self._cmd_input = cmd
def set_solver(self, path, cmd):
self._path_solver = path
self._cmd_solver = cmd
def set_output(self, path, cmd):
self._path_output = path
self._cmd_output = cmd
##########
# Export #
##########
@ -160,41 +136,6 @@ class AbstractSolver(object):
def export(self, study, repertory, qlog=None):
raise NotImplementedMethodeError(self, self.export)
def cmd_args(self, study):
"""Return solver command line arguments list
Returns:
Command line arguments list
"""
params = study.river.get_params(self.type)
args = params.get_by_key("all_command_line_arguments")
return args.split(" ")
def input_param(self):
"""Return input command line parameter(s)
Returns:
Returns input parameter(s) string
"""
raise NotImplementedMethodeError(self, self.input_param)
def output_param(self):
"""Return output command line parameter(s)
Returns:
Returns output parameter(s) string
"""
raise NotImplementedMethodeError(self, self.output_param)
def log_file(self):
"""Return log file name
Returns:
Returns log file name as string
"""
raise NotImplementedMethodeError(self, self.log_file)
###########
# RESULTS #
###########
@ -208,192 +149,17 @@ class AbstractSolver(object):
# Run #
#######
def _install_dir(self):
return os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..", ".."
)
)
def _format_command(self, study, cmd, path=""):
"""Format command line
Args:
cmd: The command line
path: Optional path string (replace @path in cmd)
Returns:
The executable and list of arguments
"""
# HACK: Works in most case... Trust me i'm an engineer
cmd = cmd.replace("@install_dir", self._install_dir())
cmd = cmd.replace("@path", path.replace(" ", "%20"))
cmd = cmd.replace("@input", self.input_param())
cmd = cmd.replace("@output", self.output_param())
cmd = cmd.replace("@dir", self._process.workingDirectory())
cmd = cmd.replace("@args", " ".join(self.cmd_args(study)))
logger.debug(f"! {cmd}")
if cmd[0] == "\"":
# Command line executable path is between " char
cmd = cmd.split("\"")
exe = cmd[1].replace("%20", " ")
args = list(
filter(
lambda s: s != "",
"\"".join(cmd[2:]).split(" ")[1:]
)
)
else:
# We suppose the command line executable path as no space char
cmd = cmd.replace("\\ ", "&_&").split(" ")
exe = cmd[0].replace("&_&", " ")
args = list(
filter(
lambda s: s != "",
map(lambda s: s.replace("&_&", " "), cmd[1:])
)
)
logger.info(f"! {exe} {args}")
return exe, args
def run_input_data_fomater(self, study):
if self._cmd_input == "":
self._run_next(study)
return True
cmd = self._cmd_input
exe, args = self._format_command(study, cmd, self._path_input)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
return True
def run_solver(self, study):
if self._cmd_solver == "":
self._run_next(study)
return True
cmd = self._cmd_solver
exe, args = self._format_command(study, cmd, self._path_solver)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
self._status = STATUS.RUNNING
return True
def run_output_data_fomater(self, study):
if self._cmd_output == "":
self._run_next(study)
return True
cmd = self._cmd_output
exe, args = self._format_command(study, cmd, self._path_output)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
return True
def _data_ready(self):
s = self._process.readAll().data().decode()
if self._output is not None:
for x in s.split('\n'):
self._output.put(x)
def _run_next(self, study):
self._step += 1
if self._step < len(self._runs):
res = self._runs[self._step](study)
if res is not True:
self._output.put(res)
else:
self._status = STATUS.STOPED
def _finished(self, study, exit_code, exit_status):
if self._output is not None:
self._output.put(exit_code)
self._run_next(study)
def run(self, study, process=None, output_queue=None):
if process is not None:
self._process = process
if output_queue is not None:
self._output = output_queue
self._process.readyRead.connect(self._data_ready)
self._process.finished.connect(
lambda c, s: self._finished(study, c, s))
self._runs = [
self.run_input_data_fomater,
self.run_solver,
self.run_output_data_fomater,
]
self._step = 0
# Run first step
res = self._runs[0](study)
if res is not True:
self._output.put(res)
def run(self, study):
raise NotImplementedMethodeError(self, self.run)
def kill(self):
if self._process is None:
return True
self._process.kill()
self._status = STATUS.STOPED
return True
raise NotImplementedMethodeError(self, self.kill)
def start(self, study, process=None):
if _signal:
# Solver is PAUSED, so continue execution
if self._status == STATUS.PAUSED:
os.kill(self._process.pid(), SIGCONT)
self._status = STATUS.RUNNING
return True
self.run(study, process)
return True
raise NotImplementedMethodeError(self, self.start)
def pause(self):
if _signal:
if self._process is None:
return False
# Send SIGSTOP to PAUSED solver
os.kill(self._process.pid(), SIGSTOP)
self._status = STATUS.PAUSED
return True
return False
raise NotImplementedMethodeError(self, self.pause)
def stop(self):
if self._process is None:
return False
self._process.terminate()
self._status = STATUS.STOPED
return True
raise NotImplementedMethodeError(self, self.stop)

309
src/Solver/CommandLine.py Normal file
View File

@ -0,0 +1,309 @@
# CommandLine.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
from tools import timer, parse_command_line
try:
# Installation allow Unix-like signal
from signal import SIGTERM, SIGSTOP, SIGCONT
_signal = True
except Exception:
_signal = False
from enum import Enum
from Model.Except import NotImplementedMethodeError
from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile
from Solver.ASolver import AbstractSolver, STATUS
logger = logging.getLogger()
class CommandLineSolver(AbstractSolver):
_type = ""
def __init__(self, name):
super(CommandLineSolver, self).__init__(name)
self._current_process = None
self._status = STATUS.NOT_LAUNCHED
self._path_input = ""
self._path_solver = ""
self._path_output = ""
self._cmd_input = ""
self._cmd_solver = ""
self._cmd_output = ""
self._process = None
self._output = None
# Last study running
self._study = None
@classmethod
def default_parameters(cls):
lst = super(CommandLineSolver, cls).default_parameters()
lst += [
("all_command_line_arguments", ""),
]
return lst
def set_input(self, path, cmd):
self._path_input = path
self._cmd_input = cmd
def set_solver(self, path, cmd):
self._path_solver = path
self._cmd_solver = cmd
def set_output(self, path, cmd):
self._path_output = path
self._cmd_output = cmd
##########
# Export #
##########
def cmd_args(self, study):
"""Return solver command line arguments list
Returns:
Command line arguments list
"""
params = study.river.get_params(self.type)
args = params.get_by_key("all_command_line_arguments")
return args.split(" ")
def input_param(self):
"""Return input command line parameter(s)
Returns:
Returns input parameter(s) string
"""
raise NotImplementedMethodeError(self, self.input_param)
def output_param(self):
"""Return output command line parameter(s)
Returns:
Returns output parameter(s) string
"""
raise NotImplementedMethodeError(self, self.output_param)
def log_file(self):
"""Return log file name
Returns:
Returns log file name as string
"""
raise NotImplementedMethodeError(self, self.log_file)
#######
# Run #
#######
def _install_dir(self):
return os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..", ".."
)
)
def _format_command(self, study, cmd, path=""):
"""Format command line
Args:
cmd: The command line
path: Optional path string (replace @path in cmd)
Returns:
The executable and list of arguments
"""
cmd = cmd.replace("@install_dir", self._install_dir())
cmd = cmd.replace("@path", "\"" + path + "\"")
cmd = cmd.replace("@input", self.input_param())
cmd = cmd.replace("@output", self.output_param())
cmd = cmd.replace("@dir", self._process.workingDirectory())
cmd = cmd.replace("@args", " ".join(self.cmd_args(study)))
logger.debug(f"! {cmd}")
words = parse_command_line(cmd)
exe = words[0]
args = words[1:]
logger.info(f"! {exe} {args}")
return exe, args
def run_input_data_fomater(self, study):
if self._cmd_input == "":
self._run_next(study)
return True
cmd = self._cmd_input
exe, args = self._format_command(study, cmd, self._path_input)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
return True
def run_solver(self, study):
if self._cmd_solver == "":
self._run_next(study)
return True
cmd = self._cmd_solver
exe, args = self._format_command(study, cmd, self._path_solver)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
self._process.waitForStarted()
self._status = STATUS.RUNNING
return True
def run_output_data_fomater(self, study):
if self._cmd_output == "":
self._run_next(study)
return True
cmd = self._cmd_output
exe, args = self._format_command(study, cmd, self._path_output)
if not os.path.exists(exe):
error = f"[ERROR] Path {exe} do not exists"
logger.info(error)
return error
self._process.start(
exe, args,
)
return True
def _data_ready(self):
# Read process output and put lines in queue
s = self._process.readAll().data().decode()
if self._output is not None:
for x in s.split('\n'):
self._output.put(x)
def _run_next(self, study):
self._step += 1
if self._step < len(self._runs):
res = self._runs[self._step](study)
if res is not True:
self._output.put(res)
else:
self._status = STATUS.STOPED
def _finished(self, study, exit_code, exit_status):
if self._output is not None:
self._output.put(exit_code)
self._run_next(study)
def run(self, study, process=None, output_queue=None):
self._study = study
# Replace old values if needed
if process is not None:
self._process = process
if output_queue is not None:
self._output = output_queue
# Connect / reconnect signal
self._process.readyRead.connect(self._data_ready)
self._process.finished.connect(
lambda c, s: self._finished(study, c, s))
# Prepare running step
self._runs = [
self.run_input_data_fomater,
self.run_solver,
self.run_output_data_fomater,
]
self._step = 0
# Run first step
res = self._runs[0](study)
if res is not True:
self._output.put(res)
def kill(self):
if self._process is None:
return True
self._process.kill()
self._status = STATUS.STOPED
return True
def start(self, study, process=None):
if _signal:
# Solver is PAUSED, so continue execution
if self._status == STATUS.PAUSED:
os.kill(self._process.pid(), SIGCONT)
self._status = STATUS.RUNNING
return True
self.run(study, process)
return True
def pause(self):
if _signal:
if self._process is None:
return False
# Send SIGSTOP to PAUSED solver
os.kill(self._process.pid(), SIGSTOP)
self._status = STATUS.PAUSED
return True
return False
def stop(self):
if self._process is None:
return False
self._process.terminate()
self._status = STATUS.STOPED
return True

View File

@ -16,12 +16,10 @@
# -*- coding: utf-8 -*-
from Solver.ASolver import (
AbstractSolver, STATUS
)
from Solver.CommandLine import CommandLineSolver
class GenericSolver(AbstractSolver):
class GenericSolver(CommandLineSolver):
_type = "generic"
def __init__(self, name):

View File

@ -22,7 +22,7 @@ import numpy as np
from tools import timer
from Solver.ASolver import AbstractSolver
from Solver.CommandLine import CommandLineSolver
from Checker.Mage import MageNetworkGraphChecker
from Model.Results.Results import Results
@ -41,7 +41,7 @@ def mage_file_open(filepath, mode):
return f
class Mage(AbstractSolver):
class Mage(CommandLineSolver):
_type = "mage"
def __init__(self, name):
@ -125,7 +125,6 @@ class Mage(AbstractSolver):
qlog.put("Export ST file")
os.makedirs(os.path.join(repertory, "net"), exist_ok=True)
gra_file = f"{name}.GRA"
# Write header
edges = study.river.edges()
@ -149,72 +148,83 @@ class Mage(AbstractSolver):
cnt_num = 1
for profile in edge.reach.profiles:
num = f"{cnt_num:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
kp = f"{profile.kp:>12f}"[0:12]
pname = profile.name
if profile.name == "":
# Generate name from profile id prefixed with
# 'p' (and replace space char with '0' char)
pname = f"p{profile.id:>3}".replace(" ", "0")
name = f"{pname:<19}"
# Generate sediment additional data if available
sediment = ""
if profile.sl is not None:
if not any(filter(lambda f: ".GRA" in f, files)):
files.append(gra_file)
# Number of layers
nl = len(profile.sl)
sediment = f" {nl:>3}"
# Layers data
for layer in profile.sl.layers:
sediment += (
f" {layer.height:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Profile header line
f.write(f"{num}{c1}{c2}{t} {kp} {name} {sediment}\n")
self._export_ST_profile_header(
f, files, profile, cnt_num
)
cnt_num += 1
# Points
for point in profile.points:
x = f"{point.x:<12f}"[0:12]
y = f"{point.y:<12f}"[0:12]
z = f"{point.z:<12f}"[0:12]
n = f"{point.name:<3}"
# Generate sediment additional data if available
sediment = ""
prev = point.z
if point.sl is not None:
# Number of layers
nl = len(point.sl)
sediment = f"{nl:>3}"
# Layers data
for layer in point.sl.layers:
prev = round(prev - layer.height, 5)
sediment += (
f" {prev:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Point line
f.write(f"{x} {y} {z}{n} {sediment}\n")
self._export_ST_point_line(
f, files, point
)
# Profile last line
f.write(f" 999.9990 999.9990 999.9990\n")
return files
def _export_ST_profile_header(self, wfile, files,
profile, cnt):
num = f"{cnt:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
kp = f"{profile.kp:>12f}"[0:12]
pname = profile.name
if profile.name == "":
# Generate name from profile id prefixed with
# 'p' (and replace space char with '0' char)
pname = f"p{profile.id:>3}".replace(" ", "0")
name = f"{pname:<19}"
# Generate sediment additional data if available
sediment = ""
if profile.sl is not None:
if not any(filter(lambda f: ".GRA" in f, files)):
files.append(self._gra_file)
# Number of layers
nl = len(profile.sl)
sediment = f" {nl:>3}"
# Layers data
for layer in profile.sl.layers:
sediment += (
f" {layer.height:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Profile header line
wfile.write(f"{num}{c1}{c2}{t} {kp} {name} {sediment}\n")
def _export_ST_point_line(self, wfile, files, point):
x = f"{point.x:<12f}"[0:12]
y = f"{point.y:<12f}"[0:12]
z = f"{point.z:<12f}"[0:12]
n = f"{point.name:<3}"
# Generate sediment additional data if available
sediment = ""
prev = point.z
if point.sl is not None:
# Number of layers
nl = len(point.sl)
sediment = f"{nl:>3}"
# Layers data
for layer in point.sl.layers:
prev = round(prev - layer.height, 5)
sediment += (
f" {prev:>10} {layer.d50:>10} " +
f"{layer.sigma:>10} " +
f"{layer.critical_constraint:>10}"
)
# Point line
wfile.write(f"{x} {y} {z} {n} {sediment}\n")
@timer
def _export_BC(self, t, bounds, repertory, qlog, name="0"):
files = []
@ -429,6 +439,10 @@ class Mage(AbstractSolver):
self._study = study
name = study.name.replace(" ", "_")
# Define GRA file name
self._gra_file = f"{name}.GRA"
self._bin_file = f"{name}.BIN"
self._export_ST(study, repertory, qlog, name=name)
return True
@ -622,6 +636,11 @@ class Mage8(Mage):
self._study = study
name = study.name.replace(" ", "_")
# Define GRA file name
self._gra_file = f"{name}.GRA"
self._bin_file = f"{name}.BIN"
# Generate files
files = []
files = self._export_ST(study, repertory, qlog, name=name)

View File

@ -124,6 +124,8 @@ class EditBoundaryConditionWindow(PamhyrWindow):
parent=parent
)
self._hash_data.append(data)
self.setup_table()
self.setup_plot()
self.setup_data()

View File

@ -214,17 +214,17 @@ class BoundaryConditionWindow(PamhyrWindow):
tab = self.current_tab()
rows = self.index_selected_rows()
for row in rows:
win = self.sub_win_filter_first(
"Edit boundary condition",
contain=[f"({self._bcs.get(tab, row).id})"]
)
data = self._bcs.get(tab, row)
if win is None:
win = EditBoundaryConditionWindow(
data=self._bcs.get(tab, row),
study=self._study,
parent=self
)
win.show()
else:
win.activateWindow()
if self.sub_window_exists(
EditBoundaryConditionWindow,
data=[self._study, None, data]
):
continue
win = EditBoundaryConditionWindow(
data=data,
study=self._study,
parent=self
)
win.show()

View File

@ -68,6 +68,9 @@ class CheckListWindow(PamhyrWindow):
parent=parent
)
# Add solver to hash computation data
self._hash_data.append(self._solver)
self._checker_list = (
self._study.checkers() +
self._solver.checkers()

View File

@ -83,6 +83,9 @@ class FrictionsWindow(PamhyrWindow):
parent=parent
)
# Add reach to hash computation data
self._hash_data.append(self._reach)
self.setup_table()
self.setup_graph()
self.setup_connections()
@ -237,17 +240,15 @@ class FrictionsWindow(PamhyrWindow):
self._table.redo()
def edit_stricklers(self):
strick = self.sub_win_filter_first(
"Stricklers",
contain=[]
)
if self.sub_window_exists(
StricklersWindow,
data=[self._study, self.parent.conf]
):
return
if strick is None:
strick = StricklersWindow(
study=self._study,
config=self.parent.conf,
parent=self
)
strick.show()
else:
strick.activateWindow()
strick = StricklersWindow(
study=self._study,
config=self.parent.conf,
parent=self
)
strick.show()

View File

@ -72,11 +72,11 @@ class PlotAC(PamhyrPlot):
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "Transverse abscissa (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Height (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.figure.tight_layout()
@ -176,11 +176,11 @@ class PlotAC(PamhyrPlot):
self.canvas.axes.grid(color='grey', linestyle='--', linewidth=0.5)
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "Abscisse en travers (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Cote (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.figure.tight_layout()

View File

@ -64,11 +64,11 @@ class PlotKPZ(PamhyrPlot):
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "Kp (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Height (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
kp = self.data.get_kp()

View File

@ -65,11 +65,11 @@ class PlotXY(PamhyrPlot):
# Axes
self.canvas.axes.set_xlabel(
_translate("Geometry", "X (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("Geometry", "Y (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.axis("equal")

View File

@ -65,6 +65,8 @@ class ProfileWindow(PamhyrWindow):
parent=parent
)
self._hash_data.append(profile)
self.setup_table()
self.setup_plot()
self.setup_connections()

View File

@ -73,6 +73,9 @@ class GeometryWindow(PamhyrWindow):
parent=parent
)
# Add reach to hash computation data
self._hash_data.append(self._reach)
self._tablemodel = None
self._profile_window = []
@ -225,20 +228,18 @@ class GeometryWindow(PamhyrWindow):
for row in rows:
profile = self._reach.profile(row)
win = self.sub_win_filter_first(
"Profile",
contain=[self._reach.name, str(profile.kp)]
)
if self.sub_window_exists(
ProfileWindow,
data=[None, None, profile]
):
continue
if win is None:
win = ProfileWindow(
profile=profile,
parent=self,
)
self._profile_window.append(win)
win.show()
else:
win.activateWindow()
win = ProfileWindow(
profile=profile,
parent=self,
)
self._profile_window.append(win)
win.show()
self.tableView.model().blockSignals(False)

View File

@ -87,6 +87,9 @@ class InitialConditionsWindow(PamhyrWindow):
parent=parent
)
# Add reach to hash computation data
self._hash_data.append(self._reach)
self._ics = study.river.initial_conditions.get(self._reach)
self.setup_table()

View File

@ -77,6 +77,8 @@ class EditLateralContributionWindow(PamhyrWindow):
parent=parent
)
self._hash_data.append(data)
self.setup_table()
self.setup_plot()
self.setup_connections()

View File

@ -258,17 +258,17 @@ class LateralContributionWindow(PamhyrWindow):
tab = self.current_tab()
rows = self.index_selected_rows()
for row in rows:
win = self.sub_win_filter_first(
"Edit lateral contribution",
contain=[f"({self._lcs.get(tab, row).id})"]
)
data = self._lcs.get(tab, row)
if win is None:
win = EditLateralContributionWindow(
data=self._lcs.get(tab, row),
study=self._study,
parent=self
)
win.show()
else:
win.activateWindow()
if self.sub_window_exists(
EditLateralContributionWindow,
data=[self._study, None, data]
):
continue
win = EditLateralContributionWindow(
data=data,
study=self._study,
parent=self
)
win.show()

View File

@ -488,6 +488,12 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Returns:
Nothing
"""
if self.sub_window_exists(
ConfigureWindow,
data=[None, self.conf]
):
return
self.config = ConfigureWindow(config=self.conf, parent=self)
self.config.show()
@ -499,6 +505,12 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Returns:
Nothing
"""
if self.sub_window_exists(
AboutWindow,
data=[None, None]
):
return
self.about = AboutWindow(parent=self)
self.about.show()
@ -527,6 +539,12 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Nothing
"""
if self._study is None:
if self.sub_window_exists(
NewStudyWindow,
data=[None, None]
):
return
self.new_study = NewStudyWindow(parent=self)
self.new_study.show()
@ -537,6 +555,12 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Nothing
"""
if self._study is not None:
if self.sub_window_exists(
NewStudyWindow,
data=[self._study, None]
):
return
self.new_study = NewStudyWindow(study=self._study, parent=self)
self.new_study.show()
@ -547,11 +571,14 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Nothing
"""
if self._study is not None:
if not self.sub_win_exists("River network"):
self.network = NetworkWindow(study=self._study, parent=self)
self.network.show()
else:
self.network.activateWindow()
if self.sub_window_exists(
NetworkWindow,
data=[self._study, None]
):
return
self.network = NetworkWindow(study=self._study, parent=self)
self.network.show()
def open_geometry(self):
"""Open geometry window
@ -560,115 +587,117 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
Nothing
"""
if (self._study is not None and self._study.river.has_current_reach()):
geometry = self.sub_win_filter_first(
"Geometry",
contain=[self._study.river.current_reach().name]
)
reach = self._study.river.current_reach().reach
if geometry is None:
geometry = GeometryWindow(
study=self._study, config=self.conf, parent=self)
geometry.show()
else:
geometry.activateWindow()
if self.sub_window_exists(
GeometryWindow,
data=[self._study, self.conf, reach]
):
return
geometry = GeometryWindow(
study=self._study,
config=self.conf,
reach=reach,
parent=self
)
geometry.show()
else:
self.msg_select_reach()
def open_boundary_cond(self):
bound = self.sub_win_filter_first(
"Boundary conditions",
contain=[]
)
if self.sub_window_exists(
BoundaryConditionWindow,
data=[self._study, None]
):
return
if bound is None:
bound = BoundaryConditionWindow(study=self._study, parent=self)
bound.show()
else:
bound.activateWindow()
bound = BoundaryConditionWindow(study=self._study, parent=self)
bound.show()
def open_lateral_contrib(self):
lateral = self.sub_win_filter_first(
"Lateral contribution",
contain=[]
)
if self.sub_window_exists(
LateralContributionWindow,
data=[self._study, None]
):
return
if lateral is None:
lateral = LateralContributionWindow(study=self._study, parent=self)
lateral.show()
else:
lateral.activateWindow()
lateral = LateralContributionWindow(study=self._study, parent=self)
lateral.show()
def open_stricklers(self):
strick = self.sub_win_filter_first(
"Stricklers",
contain=[]
)
if self.sub_window_exists(
StricklersWindow,
data=[self._study, self.conf]
):
return
if strick is None:
strick = StricklersWindow(
study=self._study,
config=self.conf,
parent=self
)
strick.show()
else:
strick.activateWindow()
strick = StricklersWindow(
study=self._study,
config=self.conf,
parent=self
)
strick.show()
def open_frictions(self):
if (self._study is not None and
self._study.river.has_current_reach()):
if self._study is not None:
if self._study.river.has_current_reach():
reach = self._study.river.current_reach()
frictions = self.sub_win_filter_first(
"Frictions",
contain=[self._study.river.current_reach().name]
)
if self.sub_window_exists(
FrictionsWindow,
data=[self._study, None, reach]
):
return
if frictions is None:
frictions = FrictionsWindow(
study=self._study,
parent=self
)
frictions.show()
else:
frictions.activateWindow()
else:
self.msg_select_reach()
self.msg_select_reach()
def open_initial_conditions(self):
if self._study.river.has_current_reach():
initial = self.sub_win_filter_first(
"Initial condition",
contain=[self._study.river.current_reach().name]
)
reach = self._study.river.current_reach()
if initial is None:
initial = InitialConditionsWindow(
study=self._study,
config=self.conf,
parent=self
)
initial.show()
else:
initial.activateWindow()
if self.sub_window_exists(
InitialConditionsWindow,
data=[self._study, self.conf, reach]
):
return
initial = InitialConditionsWindow(
study=self._study,
config=self.conf,
reach=reach,
parent=self
)
initial.show()
else:
self.msg_select_reach()
def open_solver_parameters(self):
params = self.sub_win_filter_first(
"Solver parameters",
contain=[]
)
if self.sub_window_exists(
SolverParametersWindow,
data=[self._study, None]
):
return
if params is None:
params = SolverParametersWindow(
study=self._study,
parent=self
)
params.show()
else:
params.activateWindow()
params = SolverParametersWindow(
study=self._study,
parent=self
)
params.show()
def open_sediment_layers(self):
if self.sub_window_exists(
SedimentLayersWindow,
data=[self._study, None]
):
return
sl = SedimentLayersWindow(
study=self._study,
parent=self
@ -676,8 +705,17 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
sl.show()
def open_reach_sediment_layers(self):
reach = self._study.river.current_reach().reach
if self.sub_window_exists(
ReachSedimentLayersWindow,
data=[self._study, None, reach]
):
return
sl = ReachSedimentLayersWindow(
study=self._study,
reach=reach,
parent=self
)
sl.show()
@ -693,6 +731,17 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
)
if run.exec():
solver = run.solver
if self.sub_window_exists(
CheckListWindow,
data=[
self._study,
self.conf,
solver
]
):
return
check = CheckListWindow(
study=self._study,
config=self.conf,
@ -720,21 +769,24 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
return
# Windows already opened
res = self.sub_win_filter_first(
"Results",
contain=[solver.name, results.date]
)
if self.sub_window_exists(
ResultsWindow,
data=[
self._study,
None, # No config
solver,
results
]
):
return
if res is None:
res = ResultsWindow(
study=self._study,
solver=solver,
results=results,
parent=self
)
res.show()
else:
res.activateWindow()
res = ResultsWindow(
study=self._study,
solver=solver,
results=results,
parent=self
)
res.show()
def open_last_results(self):
if self._last_solver is None or self._last_results is None:

View File

@ -442,7 +442,7 @@ class GraphWidget(QGraphicsView):
Nothing
"""
for i in self.texts:
if i is NodeItem:
if type(i) is NodeItem:
self.texts[i].rename()
def enable_edge(self, edge, prev):
@ -699,11 +699,11 @@ class GraphWidget(QGraphicsView):
self._selected_new_edge_src_node = None
items = self.items(event.pos())
if items and items[0] is EdgeItem:
if items and type(items[0]) is EdgeItem:
edge = items[0]
if edge:
self.set_current_edge(edge)
elif items and items[0] is NodeItem:
elif items and type(items[0]) is NodeItem:
self._mouse_origin_x = pos.x()
self._mouse_origin_y = pos.y()
self._current_moved_node = items[0]
@ -711,7 +711,7 @@ class GraphWidget(QGraphicsView):
# Add nodes and edges
elif self._state == "add":
items = self.items(event.pos())
nodes = list(filter(lambda i: i is NodeItem, items))
nodes = list(filter(lambda i: type(i) is NodeItem, items))
if not nodes:
self.add_node(pos)
else:
@ -725,15 +725,15 @@ class GraphWidget(QGraphicsView):
self._selected_new_edge_src_node = None
items = list(
filter(
lambda i: i is NodeItem or i is EdgeItem,
lambda i: type(i) is NodeItem or type(i) is EdgeItem,
self.items(event.pos())
)
)
if len(items) > 0:
item = items[0]
if item is NodeItem:
if type(item) is NodeItem:
self.del_node(item)
elif item is EdgeItem:
elif type(item) is EdgeItem:
self.del_edge(item)
self.update()
@ -767,7 +767,7 @@ class GraphWidget(QGraphicsView):
items = self.items(event.pos())
selectable_items = list(
filter(
lambda i: (i is NodeItem or i is EdgeItem),
lambda i: (type(i) is NodeItem or type(i) is EdgeItem),
items
)
)
@ -858,7 +858,7 @@ class GraphWidget(QGraphicsView):
menu = QMenu(self)
if len(items) == 0:
self._menu_default(event, pos, items, menu)
elif items[0] is NodeItem:
elif type(items[0]) is NodeItem:
self._menu_node(event, pos, items, menu)
elif items[0] is EdgeItem:
elif type(items[0]) is EdgeItem:
self._menu_edge(event, pos, items, menu)

View File

@ -0,0 +1,97 @@
# CustomPlotValuesSelectionDialog.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
from View.Tools.PamhyrWindow import PamhyrDialog
from PyQt5.QtWidgets import (
QRadioButton, QCheckBox, QVBoxLayout,
)
from View.Results.CustomPlot.Translate import CustomPlotTranslate
class CustomPlotValuesSelectionDialog(PamhyrDialog):
_pamhyr_ui = "CustomPlotValuesSelectionDialog"
_pamhyr_name = "Custom Plot Selection"
def __init__(self, parent=None):
super(CustomPlotValuesSelectionDialog, self).__init__(
title=self._pamhyr_name,
options=[],
trad=CustomPlotTranslate(),
parent=parent
)
self._available_values_x = self._trad.get_dict("values_x")
self._available_values_y = self._trad.get_dict("values_y")
self.setup_radio_buttons()
self.setup_check_boxs()
self.value = None
def setup_radio_buttons(self):
self._radio = []
layout = self.find(QVBoxLayout, "verticalLayout_x")
for value in self._available_values_x:
btn = QRadioButton(
self._available_values_x[value],
parent=self
)
self._radio.append((value, btn))
layout.addWidget(btn)
self._radio[0][1].setChecked(True)
layout.addStretch()
def setup_check_boxs(self):
self._check = []
layout = self.find(QVBoxLayout, "verticalLayout_y")
for value in self._available_values_y:
btn = QCheckBox(
self._available_values_y[value],
parent=self
)
self._check.append((value, btn))
layout.addWidget(btn)
self._check[0][1].setChecked(True)
layout.addStretch()
def accept(self):
x = next(
filter(
lambda r: r[1].isChecked(),
self._radio
)
)[0]
y = list(
map(
lambda b: b[0],
filter(
lambda b: b[1].isChecked(),
self._check
)
)
)
self.value = x, y
super().accept()

View File

@ -0,0 +1,346 @@
# Plot.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
from functools import reduce
from datetime import datetime
from tools import timer
from View.Tools.PamhyrPlot import PamhyrPlot
from View.Results.CustomPlot.Translate import CustomPlotTranslate
logger = logging.getLogger()
unit = {
"elevation": "0-meter",
"water_elevation": "0-meter",
"discharge": "1-m3s",
}
class CustomPlot(PamhyrPlot):
def __init__(self, x, y, reach, profile, timestamp,
data=None, canvas=None, trad=None,
toolbar=None, parent=None):
super(CustomPlot, self).__init__(
canvas=canvas,
trad=CustomPlotTranslate(),
data=data,
toolbar=toolbar,
parent=parent
)
self._x = x
self._y = y
self._reach = reach
self._profile = profile
self._timestamp = timestamp
logger.debug(
"Create custom plot for: " +
f"{x} -> {','.join(y)}: " +
f"reach={reach}, profile={profile}, " +
f"timestamp={timestamp}"
)
self._y_axes = sorted(
set(
map(
lambda y: unit[y],
self._y
)
)
)
self._axes = {}
def _draw_kp(self):
results = self.data
reach = results.river.reach(self._reach)
kp = reach.geometry.get_kp()
z_min = reach.geometry.get_z_min()
self.canvas.axes.set_xlim(
left=min(kp), right=max(kp)
)
meter_axes = self.canvas.axes
m3S_axes = self.canvas.axes
if "0-meter" in self._y_axes and "1-m3s" in self._y_axes:
m3s_axes = self._axes["1-m3s"]
lines = {}
if "elevation" in self._y:
meter_axes.set_ylim(
bottom=min(0, min(z_min)),
top=max(z_min) + 1
)
line = meter_axes.plot(
kp, z_min,
color='grey', lw=1.,
)
lines["elevation"] = line
if "water_elevation" in self._y:
# Water elevation
water_z = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Z"),
reach.profiles
)
)
meter_axes.set_ylim(
bottom=min(0, min(z_min)),
top=max(water_z) + 1
)
line = meter_axes.plot(
kp, water_z, lw=1.,
color='blue',
)
lines["water_elevation"] = line
if "elevation" in self._y:
meter_axes.fill_between(
kp, z_min, water_z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
q = list(
map(
lambda p: p.get_ts_key(self._timestamp, "Q"),
reach.profiles
)
)
m3s_axes.set_ylim(
bottom=min(0, min(q)),
top=max(q) + 1
)
line = m3s_axes.plot(
kp, q, lw=1.,
color='r',
)
lines["discharge"] = line
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: lines[line], lines),
[]
)
labs = list(map(lambda line: self._trad[line], lines))
self.canvas.axes.legend(lns, labs, loc="lower left")
def _customize_x_axes_time(self, ts, mode="time"):
# Custom time display
nb = len(ts)
mod = int(nb / 5)
mod = mod if mod > 0 else nb
fx = list(
map(
lambda x: x[1],
filter(
lambda x: x[0] % mod == 0,
enumerate(ts)
)
)
)
if mode == "time":
t0 = datetime.fromtimestamp(0)
xt = list(
map(
lambda v: (
str(
datetime.fromtimestamp(v) - t0
).split(",")[0]
.replace("days", self._trad["days"])
.replace("day", self._trad["day"])
),
fx
)
)
else:
xt = list(
map(
lambda v: str(datetime.fromtimestamp(v).date()),
fx
)
)
self.canvas.axes.set_xticks(ticks=fx, labels=xt, rotation=45)
def _draw_time(self):
results = self.data
reach = results.river.reach(self._reach)
profile = reach.profile(self._profile)
meter_axes = self.canvas.axes
m3S_axes = self.canvas.axes
if "0-meter" in self._y_axes and "1-m3s" in self._y_axes:
m3s_axes = self._axes["1-m3s"]
ts = list(results.get("timestamps"))
ts.sort()
self.canvas.axes.set_xlim(
left=min(ts), right=max(ts)
)
x = ts
lines = {}
if "elevation" in self._y:
# Z min is constant in time
z_min = profile.geometry.z_min()
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
line = meter_axes.plot(
ts, ts_z_min,
color='grey', lw=1.
)
lines["elevation"] = line
if "water_elevation" in self._y:
# Water elevation
z = profile.get_key("Z")
meter_axes.set_ylim(
bottom=min(0, min(z)),
top=max(z) + 1
)
line = meter_axes.plot(
ts, z, lw=1.,
color='b',
)
lines["water_elevation"] = line
if "elevation" in self._y:
z_min = profile.geometry.z_min()
ts_z_min = list(
map(
lambda ts: z_min,
ts
)
)
meter_axes.fill_between(
ts, ts_z_min, z,
color='blue', alpha=0.5, interpolate=True
)
if "discharge" in self._y:
q = profile.get_key("Q")
m3s_axes.set_ylim(
bottom=min(0, min(q)),
top=max(q) + 1
)
line = m3s_axes.plot(
ts, q, lw=1.,
color='r',
)
lines["discharge"] = line
self._customize_x_axes_time(ts)
# Legend
lns = reduce(
lambda acc, line: acc + line,
map(lambda line: lines[line], lines),
[]
)
labs = list(map(lambda line: self._trad[line], lines))
self.canvas.axes.legend(lns, labs, loc="lower left")
@timer
def draw(self):
self.canvas.axes.cla()
self.canvas.axes.grid(color='grey', linestyle='--', linewidth=0.5)
if self.data is None:
return
self.canvas.axes.set_xlabel(
self._trad[self._x],
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
self._trad[self._y_axes[0]],
color='green', fontsize=10
)
for axes in self._y_axes[1:]:
if axes in self._axes:
continue
ax_new = self.canvas.axes.twinx()
ax_new.set_ylabel(
self._trad[axes],
color='green', fontsize=10
)
self._axes[axes] = ax_new
if self._x == "kp":
self._draw_kp()
elif self._x == "time":
self._draw_time()
self.canvas.figure.tight_layout()
self.canvas.figure.canvas.draw_idle()
if self.toolbar is not None:
self.toolbar.update()
@timer
def update(self):
if not self._init:
self.draw()
return
def set_reach(self, reach_id):
self._reach = reach_id
self._profile = 0
self.update()
def set_profile(self, profile_id):
self._profile = profile_id
if self._x != "kp":
self.update()
def set_timestamp(self, timestamp):
self._timestamp = timestamp
if self._x != "time":
self.update()

View File

@ -0,0 +1,67 @@
# Translate.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
from PyQt5.QtCore import QCoreApplication
from View.Results.translate import ResultsTranslate
_translate = QCoreApplication.translate
class CustomPlotTranslate(ResultsTranslate):
def __init__(self):
super(CustomPlotTranslate, self).__init__()
# Value type
self._dict['time'] = _translate(
"CustomPlot", "Time (sec)"
)
self._dict['kp'] = _translate(
"CustomPlot", "Kp (m)"
)
self._dict['elevation'] = _translate(
"CustomPlot", "Elevation (m)"
)
self._dict['water_elevation'] = _translate(
"CustomPlot", "Water elevation (m)"
)
self._dict['discharge'] = _translate(
"CustomPlot", "Discharge (m³/s)"
)
# Unit corresponding long name (plot axes display)
self._dict['0-meter'] = _translate(
"CustomPlot", "Elevation (m)"
)
self._dict['1-m3s'] = _translate(
"CustomPlot", "Discharge (m³/s)"
)
# SubDict
self._sub_dict["values_x"] = {
"kp": self._dict["kp"],
"time": self._dict["time"],
}
self._sub_dict["values_y"] = {
"elevation": self._dict["elevation"],
"water_elevation": self._dict["water_elevation"],
"discharge": self._dict["discharge"],
}

View File

@ -82,11 +82,11 @@ class PlotH(PamhyrPlot):
# Axes
self.canvas.axes.set_xlabel(
_translate("Results", "Time (s)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("Results", "Discharge (m³/s)"),
color='green', fontsize=12
color='green', fontsize=10
)
ts = list(self.results.get("timestamps"))

View File

@ -132,11 +132,11 @@ class PlotSedProfile(PamhyrPlot):
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "X (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Height (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
x = profile.geometry.get_station()

View File

@ -213,11 +213,11 @@ class PlotSedReach(PamhyrPlot):
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "Kp (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Height (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
kp = reach.geometry.get_kp()

View File

@ -83,11 +83,11 @@ class PlotXY(PamhyrPlot):
# Axes
self.canvas.axes.set_xlabel(
_translate("Results", "X (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("Results", "Y (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.axis("equal")

View File

@ -38,7 +38,7 @@ from PyQt5.QtWidgets import (
QFileDialog, QTableView, QAbstractItemView,
QUndoStack, QShortcut, QAction, QItemDelegate,
QComboBox, QVBoxLayout, QHeaderView, QTabWidget,
QSlider, QLabel,
QSlider, QLabel, QWidget, QGridLayout,
)
from View.Tools.Plot.PamhyrCanvas import MplCanvas
@ -51,6 +51,11 @@ from View.Results.PlotH import PlotH
from View.Results.PlotSedReach import PlotSedReach
from View.Results.PlotSedProfile import PlotSedProfile
from View.Results.CustomPlot.Plot import CustomPlot
from View.Results.CustomPlot.CustomPlotValuesSelectionDialog import (
CustomPlotValuesSelectionDialog,
)
from View.Results.Table import TableModel
from View.Results.translate import ResultsTranslate
from View.Stricklers.Window import StricklersWindow
@ -86,6 +91,11 @@ class ResultsWindow(PamhyrWindow):
parent=parent
)
self._hash_data.append(self._solver)
self._hash_data.append(self._results)
self._additional_plot = {}
self.setup_table()
self.setup_plot()
self.setup_slider()
@ -274,6 +284,7 @@ class ResultsWindow(PamhyrWindow):
# Action
actions = {
"action_reload": self._reload,
"action_add": self._add_custom_plot
}
for action in actions:
@ -281,6 +292,7 @@ class ResultsWindow(PamhyrWindow):
actions[action]
)
# Table and Plot
fun = {
"reach": self._set_current_reach,
"profile": self._set_current_profile,
@ -336,6 +348,9 @@ class ResultsWindow(PamhyrWindow):
self.plot_sed_reach.set_reach(reach_id)
self.plot_sed_profile.set_reach(reach_id)
for plot in self._additional_plot:
self._additional_plot[plot].set_reach(reach_id)
self.update_table_selection_reach(reach_id)
self.update_table_selection_profile(0)
@ -349,7 +364,11 @@ class ResultsWindow(PamhyrWindow):
self.plot_sed_reach.set_profile(profile_id)
self.plot_sed_profile.set_profile(profile_id)
for plot in self._additional_plot:
self._additional_plot[plot].set_profile(profile_id)
self.update_table_selection_profile(profile_id)
if timestamp is not None:
self.plot_xy.set_timestamp(timestamp)
self.plot_ac.set_timestamp(timestamp)
@ -360,17 +379,32 @@ class ResultsWindow(PamhyrWindow):
self.plot_sed_reach.set_timestamp(timestamp)
self.plot_sed_profile.set_timestamp(timestamp)
self.plot_xy.draw()
self.plot_ac.draw()
self.plot_kpc.draw()
self.plot_h.draw()
if self._study.river.has_sediment():
self.plot_sed_reach.draw()
self.plot_sed_profile.draw()
for plot in self._additional_plot:
self._additional_plot[plot].set_timestamp(timestamp)
self.update_statusbar()
def _get_current_reach(self):
table = self.find(QTableView, f"tableView_reach")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return 0
return indexes[0].row()
def _get_current_profile(self):
table = self.find(QTableView, f"tableView_profile")
indexes = table.selectedIndexes()
if len(indexes) == 0:
return 0
return indexes[0].row()
def _get_current_timestamp(self):
return self._timestamps[
self._slider_time.value()
]
def _set_current_reach(self):
table = self.find(QTableView, f"tableView_reach")
indexes = table.selectedIndexes()
@ -430,6 +464,56 @@ class ResultsWindow(PamhyrWindow):
self._reload_plots()
self._reload_slider()
def _add_custom_plot(self):
dlg = CustomPlotValuesSelectionDialog(parent=self)
if dlg.exec():
x, y = dlg.value
self.create_new_tab_custom_plot(x, y)
def create_new_tab_custom_plot(self, x: str, y: list):
name = f"{x}: {','.join(y)}"
wname = f"tab_custom_{x}_{y}"
tab_widget = self.find(QTabWidget, f"tabWidget")
# This plot already exists
if name in self._additional_plot:
tab_widget.setCurrentWidget(
tab_widget.findChild(QWidget, wname)
)
return
widget = QWidget()
grid = QGridLayout()
widget.setObjectName(wname)
canvas = MplCanvas(width=5, height=4, dpi=100)
canvas.setObjectName(f"canvas_{x}_{y}")
toolbar = PamhyrPlotToolbar(
canvas, self
)
plot = CustomPlot(
x, y,
self._get_current_reach(),
self._get_current_profile(),
self._get_current_timestamp(),
data=self._results,
canvas=canvas,
toolbar=toolbar,
parent=self,
)
plot.draw()
# Add plot to additional plot
self._additional_plot[name] = plot
grid.addWidget(toolbar, 0, 0)
grid.addWidget(canvas, 1, 0)
widget.setLayout(grid)
tab_widget.addTab(widget, name)
def _copy(self):
logger.info("TODO: copy")

View File

@ -27,6 +27,13 @@ class ResultsTranslate(PamhyrTranslate):
def __init__(self):
super(ResultsTranslate, self).__init__()
self._dict['day'] = _translate(
"Results", "day"
)
self._dict['days'] = _translate(
"Results", "days"
)
self._sub_dict["table_headers_reach"] = {
"name": _translate("Results", "Reach name"),
}

View File

@ -35,7 +35,7 @@ class Plot(PamhyrPlot):
self.canvas.axes.axes.get_xaxis().set_visible(False)
self.canvas.axes.set_ylabel(
self._trad["height"],
color='green', fontsize=12
color='green', fontsize=10
)
if self.data is None:

View File

@ -46,11 +46,11 @@ class Plot(PamhyrPlot):
self.canvas.axes.set_xlabel(
self._trad["kp"],
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
self._trad["height"],
color='green', fontsize=12
color='green', fontsize=10
)
kp = self.data.get_kp()

View File

@ -46,11 +46,11 @@ class Plot(PamhyrPlot):
self.canvas.axes.set_xlabel(
_translate("MainWindow_reach", "X (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
self.canvas.axes.set_ylabel(
_translate("MainWindow_reach", "Height (m)"),
color='green', fontsize=12
color='green', fontsize=10
)
x = self.data.get_station()

View File

@ -46,9 +46,12 @@ class ReachSedimentLayersWindow(PamhyrWindow):
_pamhyr_ui = "ReachSedimentLayers"
_pamhyr_name = "Reach sediment layers"
def __init__(self, study=None, config=None, parent=None):
def __init__(self, study=None, config=None, reach=None, parent=None):
self._sediment_layers = study.river.sediment_layers
self._reach = study.river.current_reach().reach
if reach is None:
self._reach = study.river.current_reach().reach
else:
self._reach = reach
name = (
self._pamhyr_name + " - " +
@ -64,6 +67,9 @@ class ReachSedimentLayersWindow(PamhyrWindow):
parent=parent
)
# Add reach to hash computation data
self._hash_data.append(self._reach)
self.setup_table()
self.setup_plot()
self.setup_connections()

View File

@ -483,7 +483,7 @@ class ASubMainWindow(QMainWindow, ASubWindowFeatures, WindowToolKit):
def closeEvent(self, event):
if self.parent is not None:
self.parent.sub_win_del(self.name)
self.parent.sub_win_del(self.hash())
def find(self, qtype, name):
"""Find an ui component
@ -520,7 +520,7 @@ class ASubWindow(QDialog, ASubWindowFeatures, WindowToolKit):
def closeEvent(self, event):
if self.parent is not None:
self.parent.sub_win_del(self.name)
self.parent.sub_win_del(self.hash())
def find(self, qtype, name):
"""Find an ui component

View File

@ -39,64 +39,62 @@ class ListedSubWindow(object):
self.sub_win_cnt += 1
try:
logger.info(
f"Open window: {name}: {self.sub_win_cnt}: {win.hash()}")
f"Open window: {name}: {self.sub_win_cnt}")
except Exception:
logger.info(f"Open window: {name}: {self.sub_win_cnt}: X")
logger.warning(f"Sub window without hash method !")
def sub_win_del(self, name):
def sub_win_del(self, h):
self.sub_win_list = list(
filter(
lambda x: x[0] != name,
lambda x: x[1].hash() != h,
self.sub_win_list
)
)
self.sub_win_cnt = len(self.sub_win_list)
logger.info(f"Close window: {name}: {self.sub_win_cnt}")
logger.info(f"Close window: ({h}) {self.sub_win_cnt}")
def _sub_win_exists(self, name):
def _sub_win_exists(self, h):
return reduce(
lambda acc, n: (acc or (n[0] == name)),
lambda acc, el: (acc or (h == (el[1].hash()))),
self.sub_win_list,
False
)
def _sub_win_exists_with_contain(self, name, contain):
return reduce(
lambda acc, n: (
acc or
(
(n[0] == name) and
reduce(
lambda acc, c: acc and (c in n[1]._title),
contain,
True
)
)
),
self.sub_win_list,
False
)
def sub_win_exists(self, h):
return self._sub_win_exists(h)
def sub_win_exists(self, name, contain=[]):
if contain == []:
return self._sub_win_exists(name)
else:
return self._sub_win_exists_with_contain(name, contain)
def sub_win_filter_first(self, name, contain):
def get_sub_win(self, h):
try:
return next(
filter(
lambda n: (
(name in n[0]) and
reduce(
lambda acc, c: acc and (c in n[1]._title),
contain,
True
)
),
lambda el: (h == el[1].hash()),
self.sub_win_list,
)
)[1]
except Exception:
return None
def sub_window_exists(self, cls,
data=None):
"""Check if window already exists
Check if window already exists, used to deni window open
duplication
Args:
cls: Window class, must inerit to PamhyrWindow or
PamhyrDialog
data: Data used for hash computation of cls
Returns:
The window if hash already exists on sub window dictionary,
otherelse None
"""
hash = cls._hash(data)
if self.sub_win_exists(hash):
win = self.get_sub_win(hash)
win.activateWindow()
return True
else:
return False

View File

@ -122,7 +122,6 @@ class PamhyrWindowTools(object):
hash_str += repr(el)
h = hash(hash_str)
logger.debug(f"Compute hash = {h} for window {cls._pamhyr_name}")
return h

View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Dialog</class>
<widget class="QDialog" name="Dialog">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>414</width>
<height>482</height>
</rect>
</property>
<property name="windowTitle">
<string>Dialog</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QSplitter" name="splitter">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<widget class="QWidget" name="verticalLayoutWidget">
<layout class="QVBoxLayout" name="verticalLayout_x">
<item>
<widget class="QLabel" name="label">
<property name="text">
<string>X axis:</string>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="verticalLayoutWidget_2">
<layout class="QVBoxLayout" name="verticalLayout_y">
<item>
<widget class="QLabel" name="label_2">
<property name="text">
<string>Y axis:</string>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
</item>
<item row="1" column="0">
<widget class="QDialogButtonBox" name="buttonBox">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="standardButtons">
<set>QDialogButtonBox::Cancel|QDialogButtonBox::Ok</set>
</property>
</widget>
</item>
</layout>
</widget>
<resources/>
<connections>
<connection>
<sender>buttonBox</sender>
<signal>accepted()</signal>
<receiver>Dialog</receiver>
<slot>accept()</slot>
<hints>
<hint type="sourcelabel">
<x>248</x>
<y>254</y>
</hint>
<hint type="destinationlabel">
<x>157</x>
<y>274</y>
</hint>
</hints>
</connection>
<connection>
<sender>buttonBox</sender>
<signal>rejected()</signal>
<receiver>Dialog</receiver>
<slot>reject()</slot>
<hints>
<hint type="sourcelabel">
<x>316</x>
<y>260</y>
</hint>
<hint type="destinationlabel">
<x>286</x>
<y>274</y>
</hint>
</hints>
</connection>
</connections>
</ui>

17
src/__init__.py Normal file
View File

@ -0,0 +1,17 @@
# __init__.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-

View File

@ -37,6 +37,8 @@ from Model.Study import Study
from Scripts.P3DST import Script3DST
from Scripts.Hello import ScriptHello
from Scripts.ListSolver import ScriptListSolver
from Scripts.Run import ScriptExport, ScriptRun
from init import legal_info, debug_info, setup_lang
@ -44,6 +46,9 @@ logger = logging.getLogger()
scripts = {
"hello": ScriptHello,
"solvers": ScriptListSolver,
"export": ScriptExport,
"run": ScriptRun,
"3DST": Script3DST,
}

171
src/test_pamhyr.py Normal file
View File

@ -0,0 +1,171 @@
# test_pamhyr.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import unittest
import tempfile
from tools import flatten, parse_command_line
class FlattenTestCase(unittest.TestCase):
def test_flatten_0(self):
input = []
output = []
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
def test_flatten_1(self):
input = [['foo']]
output = ['foo']
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
def test_flatten_2(self):
input = [['foo', 'bar']]
output = ['foo', 'bar']
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
def test_flatten_3(self):
input = [['foo'], ['bar']]
output = ['foo', 'bar']
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
def test_flatten_4(self):
input = [['foo'], ['bar', 'baz'], ['bazz']]
output = ['foo', 'bar', 'baz', 'bazz']
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
def test_flatten_5(self):
input = [['foo'], ['bar', ['baz']], ['bazz']]
output = ['foo', 'bar', ['baz'], 'bazz']
res = flatten(input)
self.assertEqual(len(res), len(output))
for i, o in enumerate(output):
self.assertEqual(res[i], o)
class ToolsCMDParserTestCase(unittest.TestCase):
def test_trivial(self):
cmd = "foo"
expect = ["foo"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_simple(self):
cmd = "/foo/bar a -b -c"
expect = ["/foo/bar", "a", '-b', "-c"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_quoted(self):
cmd = "\"/foo/bar\" -a -b -c"
expect = ["/foo/bar", "-a", '-b', "-c"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_quoted_with_space(self):
cmd = "\"/foo/bar baz\" -a -b -c"
expect = ["/foo/bar baz", "-a", '-b', "-c"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_quoted_args(self):
cmd = "/foo/bar -a -b -c=\"baz\""
expect = ["/foo/bar", "-a", '-b', "-c=\"baz\""]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_quoted_args_with_space(self):
cmd = "/foo/bar -a -b -c=\"baz bazz\""
expect = ["/foo/bar", "-a", '-b', "-c=\"baz bazz\""]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_unix_escape_space(self):
cmd = r"/foo/bar\ baz -a -b -c"
expect = [r"/foo/bar\ baz", "-a", '-b', "-c"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_windows_prog_files(self):
cmd = "\"C:\\Program Files (x86)\foo\bar\" a -b -c"
expect = ["C:\\Program Files (x86)\foo\bar", "a", '-b', "-c"]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)
def test_windows_prog_files_args(self):
cmd = "\"C:\\Program Files (x86)\foo\bar\" a -b=\"baz bazz\" -c"
expect = [
"C:\\Program Files (x86)\foo\bar",
"a", '-b=\"baz bazz\"', "-c"
]
res = parse_command_line(cmd)
for i, s in enumerate(expect):
self.assertEqual(res[i], s)

View File

@ -272,6 +272,10 @@ class SQL(object):
logger.debug("SQL - commit")
self._db.commit()
def _close(self):
self.commit()
self._db.close()
def _fetch_string(self, s):
return s.replace("&#39;", "'")
@ -337,3 +341,113 @@ class SQL(object):
def _load(self):
logger.warning("TODO: LOAD")
#######################
# COMMAND LINE PARSER #
#######################
parser_special_char = ["\"", "\'"]
@timer
def parse_command_line(cmd):
"""Parse command line string and return list of string arguments
Parse command line string and returns the list of separate
arguments as string, this function take in consideration space
separator and quoted expression
Args:
cmd: The command line to parce
Returns:
List of arguments as string
"""
words = []
rest = cmd
while True:
if len(rest) == 0:
break
word, rest = _parse_next_word(rest)
words.append(word)
return words
def _parse_next_word(words):
"""Parse the next word in words string
Args:
words: The words string
Returns:
the next word and rests of words
"""
if len(words) == 1:
return words, ""
# Remove useless space
words = words.strip()
# Parse
if words[0] == "\"":
word, rest = _parse_word_up_to_next_sep(words, sep="\"")
elif words[0] == "\'":
word, rest = _parse_word_up_to_next_sep(words, sep="\'")
else:
word, rest = _parse_word_up_to_next_sep(words, sep=" ")
return word, rest
def _parse_word_up_to_next_sep(words, sep=" "):
word = ""
i = 0 if sep == " " else 1
cur = words[i]
skip_next = False
while True:
# Exit conditions
if cur == "":
break
if cur == sep:
if not skip_next:
break
# Take in consideration escape char in case of \<sep>
if cur == "\\":
# If previous char is a escape char, cancel next char
# skiping:
# \<sep> -> skip <sep> as separator
# \\<sep> -> do not skip <sep>
skip_next = not skip_next
else:
skip_next = False
word += cur
# Current word contain a word with different separator,
# typicaly, the string '-c="foo bar"' with ' ' seperator must
# be parse as one word.
#
# Correct: '-c="foo bar" baz' -> '-c="foo bar"', 'baz'
# Not correct: '-c="foo bar" baz' -> '-c="foo', 'bar" baz'
if cur in parser_special_char:
# Recursive call to parse this word
sub_word, rest = _parse_word_up_to_next_sep(words[i:], sep=cur)
i += len(sub_word) + 1
word += sub_word + cur
# Get next symbol
i += 1
if i == len(words):
cur = ""
else:
cur = words[i]
rest = words[i+1:]
return word, rest

26
tests.sh Executable file
View File

@ -0,0 +1,26 @@
#! /bin/sh
echo " Setup ENV"
python3 -m venv venv
. venv/bin/activate
pip3 install -U pip
pip3 install -r ./full-requirements.txt
pip3 install -U -r ./full-requirements.txt
echo " UNITTEST"
cd src/
python3 -m unittest discover -v -t .
cd ..
echo " PEP8"
pycodestyle ./src
if [ $? -eq 0 ]
then
echo "OK"
else
echo "WARNING"
fi