mirror of https://gitlab.com/pamhyr/pamhyr2
454 lines
10 KiB
Python
454 lines
10 KiB
Python
# tools.py -- Pamhyr tools function collection
|
|
# Copyright (C) 2023-2024 INRAE
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
import traceback
|
|
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
from colorama import Fore
|
|
from colorama import Back
|
|
from colorama import Style
|
|
|
|
from functools import (
|
|
reduce, partial, wraps
|
|
)
|
|
|
|
try:
|
|
import pwd
|
|
with_pwd = True
|
|
except Exception as e:
|
|
print("Module 'pwd' is not available")
|
|
with_pwd = False
|
|
|
|
###########
|
|
# LOGGING #
|
|
###########
|
|
|
|
logger = logging.getLogger()
|
|
|
|
posix = os.name == "posix"
|
|
|
|
|
|
def logger_color_blue():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.BLUE}"
|
|
return ""
|
|
|
|
|
|
def logger_color_red():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.RED}"
|
|
return ""
|
|
|
|
|
|
def logger_color_green():
|
|
if posix:
|
|
return f"{Style.BRIGHT}{Fore.GREEN}"
|
|
return ""
|
|
|
|
|
|
def logger_color_reset():
|
|
if posix:
|
|
return f"{Style.RESET_ALL}"
|
|
return ""
|
|
|
|
|
|
def logger_exception(exception):
|
|
logger.error(
|
|
f"[{logger_color_red()}ERROR{logger_color_reset()}] " +
|
|
f"{logger_color_red()}{exception}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"{logger_color_blue()}{exception}{logger_color_reset()}\n" +
|
|
f"{logger_color_red()}{traceback.format_exc()}{logger_color_reset()}"
|
|
)
|
|
|
|
##########
|
|
# TIMERS #
|
|
##########
|
|
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
|
|
def reset_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
|
|
def display_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
fmax = max(
|
|
map(
|
|
lambda f: len(f.__qualname__) + len(f.__module__),
|
|
_timers
|
|
)
|
|
)
|
|
|
|
head = " +--"
|
|
head += f"{logger_color_blue()}Timers{logger_color_reset()}"
|
|
for t in range(fmax + 26):
|
|
head += "-"
|
|
head += "+"
|
|
logger.debug(head)
|
|
|
|
lst = sorted(
|
|
map(
|
|
lambda f: (f, _timers[f], _calls[f]),
|
|
_timers
|
|
),
|
|
key=lambda f: f[1],
|
|
reverse=True
|
|
)
|
|
|
|
for func, time, calls in lst:
|
|
name = (f"{logger_color_blue()}{func.__module__}" +
|
|
f"{logger_color_reset()}" +
|
|
f".{logger_color_green()}" +
|
|
f"{func.__qualname__:<{fmax - len(func.__module__)}}" +
|
|
f"{logger_color_reset()}")
|
|
logger.debug(f" | {name} | {time:>10.6f} sec | {calls:>5} calls |")
|
|
|
|
tail = " +--"
|
|
for t in range(fmax + 32):
|
|
tail += "-"
|
|
tail += "+"
|
|
logger.debug(tail)
|
|
|
|
|
|
def timer(func):
|
|
"""Function wrapper to register function runtime"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.perf_counter()
|
|
|
|
value = None
|
|
try:
|
|
value = func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{logger_color_red()}ERROR{logger_color_reset()}] " +
|
|
f"{logger_color_red()}{e}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"[{func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}" +
|
|
f"{logger_color_reset()}]: " +
|
|
f"{logger_color_red()}{e}{logger_color_reset()}"
|
|
)
|
|
logger.debug(
|
|
f"{logger_color_blue()}{e}{logger_color_reset()}\n" +
|
|
f"{logger_color_red()}{traceback.format_exc()}" +
|
|
f"{logger_color_reset()}"
|
|
)
|
|
|
|
end_time = time.perf_counter()
|
|
run_time = end_time - start_time
|
|
|
|
_timers[func] += run_time
|
|
_calls[func] += 1
|
|
|
|
return value
|
|
|
|
_timers[func] = 0
|
|
_calls[func] = 0
|
|
|
|
return wrapper
|
|
|
|
#########
|
|
# DEBUG #
|
|
#########
|
|
|
|
|
|
def trace(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
t = time.ctime()
|
|
head = f"[{logger_color_blue()}TRACE{logger_color_reset()}]"
|
|
c = (
|
|
f"{head}[{t}] Call {func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}{logger_color_reset()}({args}, {kwargs})"
|
|
)
|
|
logger.debug(c)
|
|
|
|
value = func(*args, **kwargs)
|
|
|
|
t = time.ctime()
|
|
r = (
|
|
f"{head}[{t}] Return {func.__module__}.{logger_color_green()}" +
|
|
f"{func.__qualname__}{logger_color_reset()}: {value}"
|
|
)
|
|
logger.debug(r)
|
|
|
|
return value
|
|
|
|
return wrapper
|
|
|
|
################
|
|
# OTHERS TOOLS #
|
|
################
|
|
|
|
|
|
@timer
|
|
def flatten(lst):
|
|
"""Flatten list of list
|
|
|
|
Args:
|
|
lst: A list of list
|
|
|
|
Returns:
|
|
returns a list of element
|
|
"""
|
|
return reduce(list.__add__, lst, [])
|
|
|
|
|
|
def timestamp(dt: datetime):
|
|
# Fix timestamp for some windows version.
|
|
# - Issue : (https://bugs.python.org/issue29097)
|
|
if os.name == 'nt':
|
|
return (dt - datetime(1970, 1, 1)).total_seconds()
|
|
return dt.timestamp()
|
|
|
|
|
|
def date_iso_to_timestamp(date: str):
|
|
if type(date) is str:
|
|
return timestamp(datetime.fromisoformat(date))
|
|
else:
|
|
return datetime.isoformat(date)
|
|
|
|
|
|
def date_dmy_to_timestamp(date: str):
|
|
if date.count(":") == 0:
|
|
ret = datetime.strptime(date, "%d/%m/%y")
|
|
elif date.count(".") == 1:
|
|
ret = datetime.strptime(date, "%d/%m/%y %H:%M:%S.%f")
|
|
elif date.count(":") == 1:
|
|
ret = datetime.strptime(date, "%d/%m/%y %H:%M")
|
|
elif date.count(":") == 2:
|
|
ret = datetime.strptime(date, "%d/%m/%y %H:%M:%S")
|
|
else:
|
|
ret = datetime.now()
|
|
|
|
return ret.timestamp()
|
|
|
|
|
|
def old_pamhyr_date_to_timestamp(date: str):
|
|
v = date.split(":")
|
|
if len(v) != 4:
|
|
return 0
|
|
|
|
m = [
|
|
(24 * 60 * 60), # Day to sec
|
|
(60 * 60), # Hour to sec
|
|
60, # Minute to sec
|
|
1 # Sec
|
|
]
|
|
|
|
ts = reduce(
|
|
lambda acc, x: acc + x,
|
|
map(
|
|
lambda v, m: int(v) * int(m),
|
|
v, m
|
|
)
|
|
)
|
|
|
|
return ts
|
|
|
|
|
|
def timestamp_to_old_pamhyr_date(time: int):
|
|
t0 = datetime.fromtimestamp(0)
|
|
|
|
# HACK: Windows do not accept negative timestamps
|
|
if time < 0:
|
|
t = t0 + timedelta(seconds=int(time))
|
|
else:
|
|
t = datetime.fromtimestamp(int(time))
|
|
|
|
dt = t - t0
|
|
hours = dt.seconds // 3600
|
|
minutes = (dt.seconds % 3600) // 60
|
|
seconds = dt.seconds % 60
|
|
|
|
s = f"{dt.days:>3}:{hours:>2}:{minutes:>2}:{seconds:>2}"
|
|
s = s.replace(" ", "0")
|
|
|
|
return s
|
|
|
|
def timestamp_to_old_pamhyr_date_adists(time: int):
|
|
t0 = datetime.fromtimestamp(0)
|
|
|
|
# HACK: Windows do not accept negative timestamps
|
|
if time < 0:
|
|
t = t0 + timedelta(seconds=int(time))
|
|
else:
|
|
t = datetime.fromtimestamp(int(time))
|
|
|
|
dt = t - t0
|
|
hours = dt.seconds // 3600
|
|
minutes = (dt.seconds % 3600) // 60
|
|
seconds = dt.seconds % 60
|
|
|
|
s = f"{dt.days:>3}:{hours:>2}:{minutes:>2}"
|
|
s = s.replace(" ", "0")
|
|
|
|
return s
|
|
|
|
def get_user_name():
|
|
if with_pwd:
|
|
return pwd.getpwuid(os.getuid()).pw_gecos
|
|
else:
|
|
return "Me"
|
|
|
|
|
|
def get_version():
|
|
with open(os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"VERSION"
|
|
)
|
|
), "r") as f:
|
|
return f.readline().strip()
|
|
|
|
#######################
|
|
# COMMAND LINE PARSER #
|
|
#######################
|
|
|
|
|
|
parser_special_char = ["\"", "\'"]
|
|
|
|
|
|
@timer
|
|
def parse_command_line(cmd):
|
|
"""Parse command line string and return list of string arguments
|
|
|
|
Parse command line string and returns the list of separate
|
|
arguments as string, this function take in consideration space
|
|
separator and quoted expression
|
|
|
|
Args:
|
|
cmd: The command line to parce
|
|
|
|
Returns:
|
|
List of arguments as string
|
|
"""
|
|
words = []
|
|
rest = cmd
|
|
|
|
try:
|
|
while True:
|
|
if len(rest) == 0:
|
|
break
|
|
|
|
word, rest = _parse_next_word(rest)
|
|
words.append(word)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"{parse_command_line}: " +
|
|
f"Failed to parse command line '{cmd}'"
|
|
)
|
|
logger.error(f" exception raise {e}")
|
|
return []
|
|
|
|
return words
|
|
|
|
|
|
def _parse_next_word(words):
|
|
"""Parse the next word in words string
|
|
|
|
Args:
|
|
words: The words string
|
|
|
|
Returns:
|
|
the next word and rests of words
|
|
"""
|
|
if len(words) == 1:
|
|
return words, ""
|
|
|
|
# Remove useless space
|
|
words = words.strip()
|
|
|
|
# Parse
|
|
if words[0] == "\"":
|
|
word, rest = _parse_word_up_to_next_sep(words, sep="\"")
|
|
elif words[0] == "\'":
|
|
word, rest = _parse_word_up_to_next_sep(words, sep="\'")
|
|
else:
|
|
word, rest = _parse_word_up_to_next_sep(words, sep=" ")
|
|
|
|
return word, rest
|
|
|
|
|
|
def _parse_word_up_to_next_sep(words, sep=" "):
|
|
word = ""
|
|
|
|
i = 0 if sep == " " else 1
|
|
cur = words[i]
|
|
skip_next = False
|
|
while True:
|
|
# Exit conditions
|
|
if cur == "":
|
|
break
|
|
|
|
if cur == sep:
|
|
if not skip_next:
|
|
break
|
|
|
|
# Take in consideration escape char in case of \<sep>
|
|
if cur == "\\":
|
|
# If previous char is a escape char, cancel next char
|
|
# skiping:
|
|
# \<sep> -> skip <sep> as separator
|
|
# \\<sep> -> do not skip <sep>
|
|
skip_next = not skip_next
|
|
else:
|
|
skip_next = False
|
|
|
|
word += cur
|
|
|
|
# Current word contain a word with different separator,
|
|
# typicaly, the string '-c="foo bar"' with ' ' seperator must
|
|
# be parse as one word.
|
|
#
|
|
# Correct: '-c="foo bar" baz' -> '-c="foo bar"', 'baz'
|
|
# Not correct: '-c="foo bar" baz' -> '-c="foo', 'bar" baz'
|
|
if cur in parser_special_char:
|
|
# Recursive call to parse this word
|
|
sub_word, rest = _parse_word_up_to_next_sep(words[i:], sep=cur)
|
|
i += len(sub_word) + 1
|
|
word += sub_word + cur
|
|
|
|
# Get next token
|
|
i += 1
|
|
if i >= len(words):
|
|
cur = ""
|
|
else:
|
|
cur = words[i]
|
|
|
|
rest = words[i+1:]
|
|
return word, rest
|