mirror of https://gitlab.com/pamhyr/pamhyr2
484 lines
11 KiB
Python
484 lines
11 KiB
Python
# tools.py -- Pamhyr tools function collection
|
|
# Copyright (C) 2023 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import time
|
|
import sqlite3
|
|
import logging
|
|
import traceback
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from colorama import Fore
|
|
from colorama import Back
|
|
from colorama import Style
|
|
|
|
from functools import (
|
|
reduce, partial, wraps
|
|
)
|
|
|
|
###########
|
|
# LOGGING #
|
|
###########
|
|
|
|
logger = logging.getLogger()
|
|
|
|
posix = os.name == "posix"
|
|
|
|
|
|
def logger_color_blue():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.BLUE}"
|
|
return ""
|
|
|
|
|
|
def logger_color_red():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.RED}"
|
|
return ""
|
|
|
|
|
|
def logger_color_green():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.GREEN}"
|
|
return ""
|
|
|
|
|
|
def logger_color_reset():
|
|
if posix:
|
|
return f"{Style.RESET_ALL}"
|
|
return ""
|
|
|
|
|
|
def logger_exception(exception):
|
|
logger.error(
|
|
f"[{logger_color_red()}ERROR{logger_color_reset()}] " +
|
|
f"{logger_color_red()}{exception}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"{logger_color_blue()}{exception}{logger_color_reset()}\n" +
|
|
f"{logger_color_red()}{traceback.format_exc()}{logger_color_reset()}"
|
|
)
|
|
|
|
##########
|
|
# TIMERS #
|
|
##########
|
|
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
|
|
def reset_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
|
|
def display_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
fmax = max(
|
|
map(
|
|
lambda f: len(f.__qualname__) + len(f.__module__),
|
|
_timers
|
|
)
|
|
)
|
|
|
|
head = " +--"
|
|
head += f"{logger_color_blue()}Timers{logger_color_reset()}"
|
|
for t in range(fmax + 26):
|
|
head += "-"
|
|
head += "+"
|
|
logger.debug(head)
|
|
|
|
lst = sorted(
|
|
map(
|
|
lambda f: (f, _timers[f], _calls[f]),
|
|
_timers
|
|
),
|
|
key=lambda f: f[1],
|
|
reverse=True
|
|
)
|
|
|
|
for func, time, calls in lst:
|
|
name = (f"{logger_color_blue()}{func.__module__}" +
|
|
f"{logger_color_reset()}" +
|
|
f".{logger_color_green()}" +
|
|
f"{func.__qualname__:<{fmax - len(func.__module__)}}" +
|
|
f"{logger_color_reset()}")
|
|
logger.debug(f" | {name} | {time:>10.6f} sec | {calls:>5} calls |")
|
|
|
|
tail = " +--"
|
|
for t in range(fmax + 32):
|
|
tail += "-"
|
|
tail += "+"
|
|
logger.debug(tail)
|
|
|
|
|
|
def timer(func):
|
|
"""Function wrapper to register function runtime"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.perf_counter()
|
|
|
|
value = None
|
|
try:
|
|
value = func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{logger_color_red()}ERROR{logger_color_reset()}] " +
|
|
f"{logger_color_red()}{e}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"[{func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}" +
|
|
f"{logger_color_reset()}]: " +
|
|
f"{logger_color_red()}{e}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"{logger_color_blue()}{e}{logger_color_reset()}\n" +
|
|
f"{logger_color_red()}{traceback.format_exc()}" +
|
|
f"{logger_color_reset()}"
|
|
)
|
|
|
|
end_time = time.perf_counter()
|
|
run_time = end_time - start_time
|
|
|
|
_timers[func] += run_time
|
|
_calls[func] += 1
|
|
|
|
return value
|
|
|
|
_timers[func] = 0
|
|
_calls[func] = 0
|
|
|
|
return wrapper
|
|
|
|
#########
|
|
# DEBUG #
|
|
#########
|
|
|
|
|
|
def trace(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
t = time.ctime()
|
|
head = f"[{logger_color_blue()}TRACE{logger_color_reset()}]"
|
|
c = (
|
|
f"{head}[{t}] Call {func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}{logger_color_reset()}({args}, {kwargs})"
|
|
)
|
|
logger.debug(c)
|
|
|
|
value = func(*args, **kwargs)
|
|
|
|
t = time.ctime()
|
|
r = (
|
|
f"{head}[{t}] Return {func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}{logger_color_reset()}: {value}"
|
|
)
|
|
logger.debug(r)
|
|
|
|
return value
|
|
|
|
return wrapper
|
|
|
|
################
|
|
# OTHERS TOOLS #
|
|
################
|
|
|
|
|
|
@timer
|
|
def flatten(lst):
|
|
"""Flatten list of list
|
|
|
|
Args:
|
|
lst: A list of list
|
|
|
|
Returns:
|
|
returns a list of element
|
|
"""
|
|
return reduce(list.__add__, lst, [])
|
|
|
|
|
|
def timestamp(dt: datetime):
|
|
# Fix timestamp for some windows version.
|
|
# - Issue : (https://bugs.python.org/issue29097)
|
|
if os.name == 'nt':
|
|
return (dt - datetime(1970, 1, 1)).total_seconds()
|
|
return dt.timestamp()
|
|
|
|
|
|
def old_pamhyr_date_to_timestamp(date: str):
|
|
v = date.split(":")
|
|
if len(v) != 4:
|
|
return 0
|
|
|
|
m = [
|
|
(24 * 60 * 60), # Day to sec
|
|
(60 * 60), # Hour to sec
|
|
60, # Minute to sec
|
|
1 # Sec
|
|
]
|
|
|
|
ts = reduce(
|
|
lambda acc, x: acc + x,
|
|
map(
|
|
lambda v, m: int(v) * int(m),
|
|
v, m
|
|
)
|
|
)
|
|
|
|
return ts
|
|
|
|
#######
|
|
# SQL #
|
|
#######
|
|
|
|
# This class is an abstract class to make class with save and load
|
|
# from sqlite3.
|
|
|
|
|
|
class SQL(object):
|
|
def _init_db_file(self, db):
|
|
exists = Path(db).exists()
|
|
|
|
os.makedirs(
|
|
os.path.dirname(db),
|
|
exist_ok=True
|
|
)
|
|
|
|
self._db = sqlite3.connect(db)
|
|
self._cur = self._db.cursor()
|
|
|
|
if not exists:
|
|
self._create() # Create db
|
|
self._save() # Save
|
|
else:
|
|
self._update() # Update db scheme if necessary
|
|
self._load() # Load data
|
|
|
|
def __init__(self, filename=None):
|
|
self._db = None
|
|
|
|
if filename is not None:
|
|
self._init_db_file(filename)
|
|
|
|
def commit(self):
|
|
logger.debug("SQL - commit")
|
|
self._db.commit()
|
|
|
|
def _close(self):
|
|
self.commit()
|
|
self._db.close()
|
|
|
|
def _fetch_string(self, s):
|
|
return s.replace("'", "'")
|
|
|
|
def _fetch_tuple(self, tup):
|
|
res = []
|
|
for v in tup:
|
|
if type(v) is str:
|
|
v = self._fetch_string(v)
|
|
res.append(v)
|
|
|
|
return res
|
|
|
|
def _fetch_list(self, lst):
|
|
res = []
|
|
for v in lst:
|
|
if type(v) is str:
|
|
v = self._fetch_string(v)
|
|
elif type(v) is tuple:
|
|
v = self._fetch_tuple(v)
|
|
res.append(v)
|
|
|
|
return res
|
|
|
|
def _fetch(self, res, one):
|
|
if one:
|
|
value = res.fetchone()
|
|
else:
|
|
value = res.fetchall()
|
|
res = value
|
|
|
|
if type(value) is list:
|
|
res = self._fetch_list(value)
|
|
elif type(value) is tuple:
|
|
res = self._fetch_tuple(value)
|
|
|
|
return res
|
|
|
|
def _db_format(self, value):
|
|
# Replace ''' by ''' to preserve SQL injection
|
|
if type(value) is str:
|
|
value = value.replace("'", "'")
|
|
return value
|
|
|
|
@timer
|
|
def execute(self, cmd, fetch_one=True, commit=False):
|
|
logger.debug(f"SQL - {cmd}")
|
|
|
|
value = None
|
|
try:
|
|
res = self._cur.execute(cmd)
|
|
|
|
if commit:
|
|
self._db.commit()
|
|
|
|
value = self._fetch(res, fetch_one)
|
|
except Exception as e:
|
|
logger_exception(e)
|
|
finally:
|
|
return value
|
|
|
|
def _create(self):
|
|
logger.warning("TODO: Create")
|
|
|
|
def _update(self):
|
|
logger.warning("TODO: Update")
|
|
|
|
def _save(self):
|
|
logger.warning("TODO: Save")
|
|
|
|
def _load(self):
|
|
logger.warning("TODO: LOAD")
|
|
|
|
|
|
#######################
|
|
# COMMAND LINE PARSER #
|
|
#######################
|
|
|
|
parser_special_char = ["\"", "\'"]
|
|
|
|
|
|
@timer
|
|
def parse_command_line(cmd):
|
|
"""Parse command line string and return list of string arguments
|
|
|
|
Parse command line string and returns the list of separate
|
|
arguments as string, this function take in consideration space
|
|
separator and quoted expression
|
|
|
|
Args:
|
|
cmd: The command line to parce
|
|
|
|
Returns:
|
|
List of arguments as string
|
|
"""
|
|
words = []
|
|
rest = cmd
|
|
|
|
try:
|
|
while True:
|
|
if len(rest) == 0:
|
|
break
|
|
|
|
word, rest = _parse_next_word(rest)
|
|
words.append(word)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"{parse_command_line}: " +
|
|
f"Failed to parse command line '{cmd}'"
|
|
)
|
|
logger.error(f" exception raise {e}")
|
|
return []
|
|
|
|
return words
|
|
|
|
|
|
def _parse_next_word(words):
|
|
"""Parse the next word in words string
|
|
|
|
Args:
|
|
words: The words string
|
|
|
|
Returns:
|
|
the next word and rests of words
|
|
"""
|
|
if len(words) == 1:
|
|
return words, ""
|
|
|
|
# Remove useless space
|
|
words = words.strip()
|
|
|
|
# Parse
|
|
if words[0] == "\"":
|
|
word, rest = _parse_word_up_to_next_sep(words, sep="\"")
|
|
elif words[0] == "\'":
|
|
word, rest = _parse_word_up_to_next_sep(words, sep="\'")
|
|
else:
|
|
word, rest = _parse_word_up_to_next_sep(words, sep=" ")
|
|
|
|
return word, rest
|
|
|
|
|
|
def _parse_word_up_to_next_sep(words, sep=" "):
|
|
word = ""
|
|
|
|
i = 0 if sep == " " else 1
|
|
cur = words[i]
|
|
skip_next = False
|
|
while True:
|
|
# Exit conditions
|
|
if cur == "":
|
|
break
|
|
|
|
if cur == sep:
|
|
if not skip_next:
|
|
break
|
|
|
|
# Take in consideration escape char in case of \<sep>
|
|
if cur == "\\":
|
|
# If previous char is a escape char, cancel next char
|
|
# skiping:
|
|
# \<sep> -> skip <sep> as separator
|
|
# \\<sep> -> do not skip <sep>
|
|
skip_next = not skip_next
|
|
else:
|
|
skip_next = False
|
|
|
|
word += cur
|
|
|
|
# Current word contain a word with different separator,
|
|
# typicaly, the string '-c="foo bar"' with ' ' seperator must
|
|
# be parse as one word.
|
|
#
|
|
# Correct: '-c="foo bar" baz' -> '-c="foo bar"', 'baz'
|
|
# Not correct: '-c="foo bar" baz' -> '-c="foo', 'bar" baz'
|
|
if cur in parser_special_char:
|
|
# Recursive call to parse this word
|
|
sub_word, rest = _parse_word_up_to_next_sep(words[i:], sep=cur)
|
|
i += len(sub_word) + 1
|
|
word += sub_word + cur
|
|
|
|
# Get next token
|
|
i += 1
|
|
if i >= len(words):
|
|
cur = ""
|
|
else:
|
|
cur = words[i]
|
|
|
|
rest = words[i+1:]
|
|
return word, rest
|