SQL: Move SQL class to separate file.

setup.py
Pierre-Antoine Rouby 2024-02-14 09:29:34 +01:00
parent b79fc28a5f
commit 41e14208e4
5 changed files with 153 additions and 123 deletions

View File

@ -23,7 +23,7 @@ import logging
from pathlib import Path
from functools import reduce
from tools import SQL
from SQL import SQL
from Model.Except import NotImplementedMethodeError
logger = logging.getLogger()

133
src/SQL.py Normal file
View File

@ -0,0 +1,133 @@
# SQL.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import os
import logging
import sqlite3
from pathlib import Path
from tools import timer
logger = logging.getLogger()
class SQL(object):
def _init_db_file(self, db):
exists = Path(db).exists()
os.makedirs(
os.path.dirname(db),
exist_ok=True
)
self._db = sqlite3.connect(db)
self._cur = self._db.cursor()
if not exists:
self._create() # Create db
self._save() # Save
else:
self._update() # Update db scheme if necessary
self._load() # Load data
def __init__(self, filename=None):
self._db = None
if filename is not None:
self._init_db_file(filename)
def commit(self):
logger.debug("SQL - commit")
self._db.commit()
def _close(self):
self.commit()
self._db.close()
def _fetch_string(self, s):
return s.replace("&#39;", "'")
def _fetch_tuple(self, tup):
res = []
for v in tup:
if type(v) is str:
v = self._fetch_string(v)
res.append(v)
return res
def _fetch_list(self, lst):
res = []
for v in lst:
if type(v) is str:
v = self._fetch_string(v)
elif type(v) is tuple:
v = self._fetch_tuple(v)
res.append(v)
return res
def _fetch(self, res, one):
if one:
value = res.fetchone()
else:
value = res.fetchall()
res = value
if type(value) is list:
res = self._fetch_list(value)
elif type(value) is tuple:
res = self._fetch_tuple(value)
return res
def _db_format(self, value):
# Replace ''' by '&#39;' to preserve SQL injection
if type(value) is str:
value = value.replace("'", "&#39;")
return value
@timer
def execute(self, cmd, fetch_one=True, commit=False):
logger.debug(f"SQL - {cmd}")
value = None
try:
res = self._cur.execute(cmd)
if commit:
self._db.commit()
value = self._fetch(res, fetch_one)
except Exception as e:
logger_exception(e)
finally:
return value
def _create(self):
logger.warning("TODO: Create")
def _update(self):
logger.warning("TODO: Update")
def _save(self):
logger.warning("TODO: Save")
def _load(self):
logger.warning("TODO: LOAD")

View File

@ -29,6 +29,7 @@ from PyQt5.QtGui import (
from PyQt5.QtCore import (
Qt, QTranslator, QEvent, QUrl, QTimer,
QCoreApplication,
)
from PyQt5.QtWidgets import (
QMainWindow, QApplication, QAction,
@ -72,6 +73,8 @@ except Exception as e:
from Model.Study import Study
_translate = QCoreApplication.translate
logger = logging.getLogger()
no_model_action = [
@ -296,16 +299,24 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
def setup_debug_mode(self, init=False):
menu = self.findChild(QMenu, "menu_help")
menu.setToolTipsVisible(True)
self.set_title()
if init:
self.debug_action = QAction("Debug", self)
self.debug_action.setStatusTip("Debug")
self.debug_action.setToolTip(
_translate("MainWindow", "Open debug window")
)
self.debug_action.triggered.connect(self.open_debug)
self.debug_sqlite_action = QAction("Debug SQLite", self)
self.debug_sqlite_action.setStatusTip(
"Open SQLite debuging tool (sqlitebrowser)")
self.debug_sqlite_action.setToolTip(
_translate(
"MainWindow",
"Open SQLite debuging tool ('sqlitebrowser')"
)
)
self.debug_sqlite_action.triggered.connect(self.open_sqlite)
if self.conf.debug:
@ -599,6 +610,7 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
def _do_update_window_list(self):
menu = self.findChild(QMenu, "menu_windows")
menu.setToolTipsVisible(True)
# Remove all actions
menu.clear()
@ -622,9 +634,9 @@ class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit):
return lambda: self._activate_window(h)
action = QAction(window._title, self)
# action.setStatusTip(
# _translate("MainWindowdow", "Activate this windowdow")
# )
action.setToolTip(
_translate("MainWindow", "Activate this window")
)
h = window.hash()
fn = lambda_generator(h)
action.triggered.connect(fn)

View File

@ -20,7 +20,7 @@ import os
import pickle
import logging
from tools import SQL
from SQL import SQL
from Model.Stricklers.Stricklers import Stricklers
from Model.Stricklers.StricklersList import StricklersList

View File

@ -18,7 +18,6 @@
import os
import time
import sqlite3
import logging
import traceback
@ -251,120 +250,6 @@ def old_pamhyr_date_to_timestamp(date: str):
return ts
#######
# SQL #
#######
# This class is an abstract class to make class with save and load
# from sqlite3.
class SQL(object):
def _init_db_file(self, db):
exists = Path(db).exists()
os.makedirs(
os.path.dirname(db),
exist_ok=True
)
self._db = sqlite3.connect(db)
self._cur = self._db.cursor()
if not exists:
self._create() # Create db
self._save() # Save
else:
self._update() # Update db scheme if necessary
self._load() # Load data
def __init__(self, filename=None):
self._db = None
if filename is not None:
self._init_db_file(filename)
def commit(self):
logger.debug("SQL - commit")
self._db.commit()
def _close(self):
self.commit()
self._db.close()
def _fetch_string(self, s):
return s.replace("&#39;", "'")
def _fetch_tuple(self, tup):
res = []
for v in tup:
if type(v) is str:
v = self._fetch_string(v)
res.append(v)
return res
def _fetch_list(self, lst):
res = []
for v in lst:
if type(v) is str:
v = self._fetch_string(v)
elif type(v) is tuple:
v = self._fetch_tuple(v)
res.append(v)
return res
def _fetch(self, res, one):
if one:
value = res.fetchone()
else:
value = res.fetchall()
res = value
if type(value) is list:
res = self._fetch_list(value)
elif type(value) is tuple:
res = self._fetch_tuple(value)
return res
def _db_format(self, value):
# Replace ''' by '&#39;' to preserve SQL injection
if type(value) is str:
value = value.replace("'", "&#39;")
return value
@timer
def execute(self, cmd, fetch_one=True, commit=False):
logger.debug(f"SQL - {cmd}")
value = None
try:
res = self._cur.execute(cmd)
if commit:
self._db.commit()
value = self._fetch(res, fetch_one)
except Exception as e:
logger_exception(e)
finally:
return value
def _create(self):
logger.warning("TODO: Create")
def _update(self):
logger.warning("TODO: Update")
def _save(self):
logger.warning("TODO: Save")
def _load(self):
logger.warning("TODO: LOAD")
#######################
# COMMAND LINE PARSER #
#######################