mirror of https://gitlab.com/pamhyr/pamhyr2
265 lines
5.7 KiB
Python
265 lines
5.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import time
|
|
import sqlite3
|
|
import traceback
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from colorama import Fore
|
|
from colorama import Back
|
|
from colorama import Style
|
|
|
|
from functools import (
|
|
reduce, partial, wraps
|
|
)
|
|
|
|
##########
|
|
# TIMERS #
|
|
##########
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
def reset_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
_timers = {}
|
|
_calls = {}
|
|
|
|
def display_timers():
|
|
global _timers
|
|
global _calls
|
|
|
|
fmax = max(
|
|
map(
|
|
lambda f: len(f.__qualname__) + len(f.__module__),
|
|
_timers
|
|
)
|
|
)
|
|
|
|
head = " +--"
|
|
head += f"{Style.BRIGHT}{Fore.BLUE}Timers{Style.RESET_ALL}"
|
|
for t in range(fmax + 26):
|
|
head += "-"
|
|
head += "+"
|
|
print(head)
|
|
|
|
lst = sorted(
|
|
map(
|
|
lambda f: (f, _timers[f], _calls[f]),
|
|
_timers
|
|
),
|
|
key = lambda f: f[1],
|
|
reverse = True
|
|
)
|
|
|
|
for func, time, calls in lst:
|
|
name = (f"{Fore.BLUE}{func.__module__}{Style.RESET_ALL}" +
|
|
f".{Style.BRIGHT}{Fore.GREEN}{func.__qualname__:<{fmax - len(func.__module__)}}{Style.RESET_ALL}")
|
|
print(f" | {name} | {time:>10.6f} sec | {calls:>5} calls |")
|
|
|
|
tail = " +--"
|
|
for t in range(fmax + 32):
|
|
tail += "-"
|
|
tail += "+"
|
|
print(tail)
|
|
|
|
def timer(func):
|
|
"""Function wrapper to register function runtime"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.perf_counter()
|
|
|
|
value = None
|
|
try:
|
|
value = func(*args, **kwargs)
|
|
except Exception as e:
|
|
print(f"[{Fore.RED}ERROR{Style.RESET_ALL}]" +
|
|
f"[{func.__module__}.{Fore.GREEN}{func.__qualname__}{Style.RESET_ALL}]: " +
|
|
f"{Fore.RED}{e}{Style.RESET_ALL}")
|
|
traceback.print_exc()
|
|
|
|
end_time = time.perf_counter()
|
|
run_time = end_time - start_time
|
|
|
|
_timers[func] += run_time
|
|
_calls[func] += 1
|
|
|
|
return value
|
|
|
|
_timers[func] = 0
|
|
_calls[func] = 0
|
|
|
|
return wrapper
|
|
|
|
#########
|
|
# DEBUG #
|
|
#########
|
|
|
|
def trace(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
t = time.ctime()
|
|
head = f"[{Fore.BLUE}TRACE{Style.RESET_ALL}]"
|
|
c = f"{head}[{t}] Call {func.__module__}.{Fore.GREEN}{func.__qualname__}{Style.RESET_ALL}({args}, {kwargs})"
|
|
print(c)
|
|
|
|
value = func(*args, **kwargs)
|
|
|
|
t = time.ctime()
|
|
r = f"{head}[{t}] Return {func.__module__}.{Fore.GREEN}{func.__qualname__}{Style.RESET_ALL}: {value}"
|
|
print(r)
|
|
|
|
return value
|
|
|
|
return wrapper
|
|
|
|
################
|
|
# OTHERS TOOLS #
|
|
################
|
|
|
|
@timer
|
|
def flatten(lst):
|
|
"""Flatten list of list
|
|
|
|
Args:
|
|
lst: A list of list
|
|
|
|
Returns:
|
|
returns a list of element
|
|
"""
|
|
if not lst:
|
|
return []
|
|
|
|
return reduce(list.__add__, lst)
|
|
|
|
def timestamp(dt:datetime):
|
|
# Fix timestamp for some windows version.
|
|
# - Issue : (https://bugs.python.org/issue29097)
|
|
if os.name == 'nt':
|
|
return (dt - datetime(1970, 1, 1)).total_seconds()
|
|
return dt.timestamp()
|
|
|
|
def old_pamhyr_date_to_timestamp(date:str):
|
|
v = date.split(":")
|
|
if len(v) != 4:
|
|
return 0
|
|
|
|
m = [
|
|
(24 * 60 * 60), # Day to sec
|
|
(60 * 60), # Hour to sec
|
|
60, # Minute to sec
|
|
1 # Sec
|
|
]
|
|
|
|
ts = reduce(
|
|
lambda acc, x: acc + x,
|
|
map(
|
|
lambda v, m: int(v) * int(m),
|
|
v, m
|
|
)
|
|
)
|
|
|
|
return ts
|
|
|
|
#######
|
|
# SQL #
|
|
#######
|
|
|
|
# This class is an abstract class to make class with save and load
|
|
# from sqlite3.
|
|
|
|
class SQL(object):
|
|
def _init_db_file(self, db):
|
|
exists = Path(db).exists()
|
|
|
|
self._db = sqlite3.connect(db)
|
|
self._cur = self._db.cursor()
|
|
|
|
if not exists:
|
|
self._create() # Create db
|
|
self._save() # Save
|
|
else:
|
|
self._update() # Update db scheme if necessary
|
|
self._load() # Load data
|
|
|
|
|
|
def __init__(self, filename = None):
|
|
self._db = None
|
|
|
|
if filename is not None:
|
|
self._init_db_file(filename)
|
|
|
|
def commit(self):
|
|
self._db.commit()
|
|
|
|
def _fetch_string(self, s):
|
|
return s.replace("'", "'")
|
|
|
|
def _fetch_tuple(self, tup):
|
|
res = []
|
|
for v in tup:
|
|
if type(v) == str:
|
|
v = self._fetch_string(v)
|
|
res.append(v)
|
|
|
|
return res
|
|
|
|
def _fetch_list(self, lst):
|
|
res = []
|
|
for v in lst:
|
|
if type(v) == str:
|
|
v = self._fetch_string(v)
|
|
elif type(v) == tuple:
|
|
v = self._fetch_tuple(v)
|
|
res.append(v)
|
|
|
|
return res
|
|
|
|
def _fetch(self, res, one):
|
|
if one:
|
|
value = res.fetchone()
|
|
else:
|
|
value = res.fetchall()
|
|
res = value
|
|
|
|
if type(value) == list:
|
|
res = self._fetch_list(value)
|
|
elif type(value) == tuple:
|
|
res = self._fetch_tuple(value)
|
|
|
|
return res
|
|
|
|
def _sql_format(self, value):
|
|
# Replace ''' by ''' to preserve SQL injection
|
|
if type(value) == str:
|
|
value = value.replace("'", "'")
|
|
return value
|
|
|
|
@timer
|
|
def execute(self, cmd, fetch_one = True, commit = False):
|
|
# print(f"[SQL] {cmd}")
|
|
res = self._cur.execute(cmd)
|
|
|
|
if commit:
|
|
self._db.commit()
|
|
|
|
value = self._fetch(res, fetch_one)
|
|
return value
|
|
|
|
def _create(self):
|
|
print("TODO: Create")
|
|
|
|
def _update(self):
|
|
print("TODO: Update")
|
|
|
|
def _save(self):
|
|
print("TODO: Save")
|
|
|
|
def _load(self):
|
|
print("TODO: Load")
|