18 KiB
Scenario implementation in Pamhyr2: \textbf{Preparatory work}
#
#
#
COMMENT tools
(add-to-list 'org-structure-template-alist
'("p" "#+begin_src python :python python3 :results output\n\n#+end_src"))
((p #+begin_src python :python python3 :results output
#+end_src) (d . #+name: graph-XXX
#+header: :results drawer
#+header: :exports results
#+header: :post attr_wrap(width="12cm", data=*this*, name="graph-XXX", caption="Graph XXX", float="t")
#+begin_src dot :file "images/graph-XXX.png" :cache no
digraph {
bgcolor="transparent";
node[colorscheme=set19,shape=box,style="filled",fillcolor=9];
}
#+end_src) (p . src python :python python3 :results output :noweb yes
src) (t . EXPORT latex
\begin{table}
\centering
\resizebox{0.6\linewidth}{!}{
}
%\vspace{0.5pt}
%\caption{Number of tests by errors types}
\end{table}
#+E) (B . src shell :session *shell* :results output :exports both) (a . export ascii) (c . center) (C . comment) (e . example) (E . export) (h . export html) (l . export latex) (q . quote) (s . src) (v . verse))
def execute(db, query):
print(query)
return db.execute(query)
def execute(db, query):
return db.execute(query)
Principe
Scenario
A scenario is defined by a name, a description and a parent
scenario. If a scenario as no parent, parent value is None. So, in
Pamhyr2 the list of scenario must by a tree with a root defined by a
default scenarios with no parent.
CREATE TABLE scenario(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL, -- Name of the scenario
description TEXT NOT NULL, -- Rich text description
parent_id INTEGER REFERENCES scenario(id) -- Recursive references
-- for parent scenario
)
Data
Each data is associated with one and only one scenario. This senario
is defined by the id. So, each data structure who can saved in DB
must add the following lines in table définition:
scenario_id INTEGER NOT NULL,
FOREIGN KEY(scenario_id) REFERENCES scenario(id)
To avoid, at least, memory usage exploitation, by default, a scenario must saved only the modified data from its parent. This contrains require to wrap any data modification to update the data scenario id to current scenario id in source code. In addition, we need to modify the data loading méthode to take in considération this recursive dependencies.
def rec_load(cls, db, current_scenario):
# Default case, end of recursion
if current_scenario is None:
return [] # Default value
table = db.execute(f"<MY SELECT> WHERE scenario_id = {current_scenario}")
# If no data for this scenario, recursion
if len(table) == 0:
parent_id = db.get_parent_id(current_scenario)
return cls.rec_load(db, parent_id)
# Otherelse, parse data and return...
With this method, modify a data in parent scenario, modify this data in child scenario too. Unless is value has been previously modified in the child scenario. Deny tree node modification, except for leaf.
This method work, but it dont allow to delete data un child scenarios,
because if we dont found data for this scenario we search for parent
scenario. So, we need a condition to stop recursion in case of data
deletion in child scenario. To stop the recurtion, we can use a dummy
data flag. By default, this flags is false, but if this flags is
true the data is ignored at loading.
So, we can use this flags for each data structure who can saved in DB:
is_dummy BOOLEAN, -- True if this data as been deleted
Modify the loading method:
def rec_load_2(cls, db, current_scenario):
# Default case, end of recursion
if current_scenario is None:
return [] # Default value
table = db.execute(f"<MY SELECT> WHERE scenario_id = {current_scenario}")
# If no data for this scenario, recursion
if len(table) == 0:
parent_id = db.get_parent_id(current_scenario)
return cls.rec_load(db, parent_id)
for data in table:
if is_dummy(data):
continue
# Otherelse, parse data ...
# return ...
And add a data reduction method:
def reduce_change(self):
# Get the greater scenario
scenar = reduce(
lambda acc, x: max(acc, x._scenario),
self._values,
self._scenario
)
# Apply change on this subtree
self._scenario = scenar
if is_dummy(self): # There are no more subtree
self.values = []
else: # Update data subtree scenario id
for value in self._values:
value.set_scenario(self._scenario)
# Dummy data is no more necessary in this subtree
self._values = list(
filter(
lambda v: not is_dummy(v),
self._values
)
)
# Keep data id unique for all subtree
self.get_new_db_id()
Prototype
Scenario
class Scenario():
def __init__(self, id, name, parent):
self._id = id
self._name = name
self._parent = parent
@classmethod
def create_db(cls, db):
execute(
db, """
CREATE TABLE scenario(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
parent_id INTEGER REFERENCES scenario(id)
)
"""
)
@classmethod
def load_db(cls, db):
cur = execute(
db, f"SELECT * FROM scenario"
)
table = cur.fetchall()
new = []
for values in table:
new.append(
cls(
values[0], values[1], values[2],
)
)
return new
def save(self, db):
sid = 'NULL' if self._parent is None else self._parent
execute(
db,
"INSERT INTO scenario (id, name, parent_id) " +
f"VALUES ({self._id}, '{self._name}', {sid})"
)
def __repr__(self):
return f"Scenario {{ {self._id}, {self._name}, parent {self._parent} }}"
def __str__(self):
return f"{self._name}({self._id})"
Data
class B():
def __init__(self, value, scenario):
self._value = value
self._scenario = scenario
self._dummy = False
@classmethod
def create_db(cls, db):
execute(db, """
CREATE TABLE b(
id INTEGER PRIMARY KEY AUTOINCREMENT,
dummy_data BOOLEAN,
x INTEGER,
a_id INTEGER NOT NULL,
scenario_id INTEGER NOT NULL,
FOREIGN KEY(a_id) REFERENCES a(id),
FOREIGN KEY(scenario_id) REFERENCES scenario(id)
)
""")
def save(self, db, a_id):
execute(
db,
"INSERT INTO b (x, dummy_data, a_id, scenario_id) " +
f"VALUES ({self._value}, {self._dummy}, {a_id}, {self._scenario})"
)
@classmethod
def load_db(cls, db, a_id, senar):
if senar is None:
return []
cur = execute(
db,
"SELECT x, scenario_id, dummy_data FROM b " +
f"WHERE scenario_id = {senar} AND a_id = {a_id}"
)
table = cur.fetchall()
if len(table) == 0:
parent = execute(
db,
f"SELECT parent_id FROM scenario WHERE id = {senar}"
).fetchone()
return cls.load_db(db, parent[0])
new = []
for values in table:
if values[2]: # Is dummy
continue
new.append(cls(values[0], values[1]))
return new
def set_value(self, value, scenario):
self._value = value
self._scenario = scenario
def set_dummy(self, scenario):
self._scenario = scenario
self._dummy = True
def __repr__(self):
if self._dummy:
return ""
return f"{self._value}"
def __str__(self):
if self._dummy:
return ""
return f"{self._value}"
class A():
def __init__(self, id, values, scenario):
self._id = id
self._values = values
self._scenario = scenario
self._dummy = False
@classmethod
def create_db(cls, db):
execute(db, """
CREATE TABLE a(
id INTEGER PRIMARY KEY,
dummy_data BOOLEAN,
scenario_id INTEGER,
FOREIGN KEY(scenario_id) REFERENCES scenario(id)
)
""")
def save(self, db):
self.reduce_change()
execute(db, f"DELETE FROM a WHERE scenario_id = {self._scenario}")
execute(
db,
"INSERT INTO a (id, dummy_data, scenario_id) " +
f"VALUES ({self._id}, {self._dummy}, {self._scenario})"
)
execute(
db,
"DELETE FROM b " +
f"WHERE scenario_id = {self._scenario} " +
f"AND a_id = {self._id}"
)
for value in self._values:
value.save(db, self._id)
def reduce_change(self):
diff_scenar = reduce(
lambda acc, x: max(acc, x._scenario),
self._values,
self._scenario
)
print(f" ~> Reduce: {self._scenario} to {diff_scenar}")
self._scenario = diff_scenar
# Reduce new scenario value on each value
for value in self._values:
value._scenario = self._scenario
# Delete useless dummy data
self._values = list(
filter(
lambda v: not v._dummy,
self._values
)
)
# HACK: keep id unique
self._id = self._id * 100 + diff_scenar
@classmethod
def load_db(cls, db, senar):
if senar is None:
return []
cur = execute(
db,
f"SELECT id, scenario_id, dummy_data FROM a WHERE scenario_id = {senar}"
)
table = cur.fetchall()
if len(table) == 0:
parent = execute(
db,
f"SELECT parent_id FROM scenario WHERE id = {senar}"
).fetchone()
return cls.load_db(db, parent[0])
new = []
for values in table:
if values[2]:
continue
bb = B.load_db(db, values[0], senar)
new.append(cls(values[0], bb, values[1]))
return new
def set_dummy(self, scenario):
self._scenario = scenario
self._dummy = True
def add(self, b):
self._values.append(b)
def delete(self, index, scenario):
self._values[index].set_dummy(scenario)
self._scenario = scenario
def __repr__(self):
if self._dummy:
return "{ }"
return f"{{ {self._values}, ({self._scenario}) }}"
def __str__(self):
if self._dummy:
return "{ }"
return f"{{ {list(map(str, self._values))}, ({self._scenario}) }}"
Full prototype
from functools import reduce
def execute(db, query):
# print(query)
return db.execute(query)
def get_tree(db):
from treelib import Node, Tree
tree = Tree()
for senar in db.execute("SELECT * FROM scenario").fetchall():
if senar[1] is None:
tree.create_node(
str(Scenario(senar[0], senar[1], senar[2])) +
" : " + str(A.load_db(db, senar[0])[0]),
senar[0]
)
else:
tree.create_node(
str(Scenario(senar[0], senar[1], senar[2])) +
" : " + str(A.load_db(db, senar[0])[0]),
senar[0],
parent=senar[2]
)
return tree
### --- Scenario ---
<<scenario>>
### --- class B ---
<<data-b>>
### --- class A ---
<<data-a>>
### --- Script ---
import sqlite3
import pprint
pp = pprint.PrettyPrinter(width=79, compact=True, depth=6)
with sqlite3.connect(":memory:") as db:
cur = db.cursor()
Scenario.create_db(cur)
A.create_db(cur)
B.create_db(cur)
print("--- Default scenario (s0) ---")
# One scenario
s0 = Scenario(0, "s0", None)
a0 = A(
1,
[
B(0, s0._id),
B(1, s0._id),
B(2, s0._id),
],
s0._id
)
s0.save(cur)
a0.save(cur)
print("--- New scenario (s1) -- copy without modification ---")
# New scenario
s1 = Scenario(1, "s1", s0._id)
s1.save(cur)
print("--- New scenario (s2) -- data modification ---")
# New scenario 2 with data
s2 = Scenario(2, "s2", s0._id)
s2.save(cur)
a2 = a0
bb = list(map(
lambda b: b.set_value(b._value + 1, s2._id),
a2._values
))
a2.save(cur)
print("--- New scenario (s3) -- delete data ---")
# New scenario 3 with data
s3 = Scenario(3, "s3", s0._id)
s3.save(cur)
a3 = A.load_db(cur, s3._id)[0]
a3._values[0].set_dummy(s3._id)
a3.save(cur)
print("--- New scenario (s4) -- new data ---")
s4 = Scenario(4, "s4", s3._id)
s4.save(cur)
a4 = A.load_db(cur, s3._id)[0]
a4.add(B(666, s4._id))
a4.save(cur)
print("--- Scenario tree ---")
tree = get_tree(db)
print(tree)
print("--- Modify s0 data ---")
aa = A.load_db(db, s0._id)[0]
bb = list(map(
lambda b: b.set_value(b._value * 100 - 1, s0._id),
aa._values
))
aa.save(db)
print("--- Scenario tree ---")
tree = get_tree(db)
print(tree)
print("--- DB ---")
ss = db.execute("SELECT * FROM scenario").fetchall()
print(f"+ scenario ({len(ss)}):")
pp.pprint(ss)
aa = db.execute("SELECT * FROM a").fetchall()
print(f"+ a ({len(aa)}):")
pp.pprint(aa)
bb = db.execute("SELECT * FROM b").fetchall()
print(f"+ b ({len(bb)}):")
pp.pprint(bb)
--- Default scenario (s0) ---
~> Reduce: 0 to 0
--- New scenario (s1) -- copy without modification ---
--- New scenario (s2) -- data modification ---
~> Reduce: 0 to 2
--- New scenario (s3) -- delete data ---
~> Reduce: 0 to 3
--- New scenario (s4) -- new data ---
~> Reduce: 3 to 4
--- Scenario tree ---
s0(0) : { ['0', '1', '2'], (0) }
├── s1(1) : { ['0', '1', '2'], (0) }
├── s2(2) : { ['1', '2', '3'], (2) }
└── s3(3) : { ['1', '2'], (3) }
└── s4(4) : { ['1', '2', '666'], (4) }
--- Modify s0 data ---
~> Reduce: 0 to 0
--- Scenario tree ---
s0(0) : { ['-1', '99', '199'], (0) }
├── s1(1) : { ['-1', '99', '199'], (0) }
├── s2(2) : { ['1', '2', '3'], (2) }
└── s3(3) : { ['1', '2'], (3) }
└── s4(4) : { ['1', '2', '666'], (4) }
--- DB ---
+ scenario (5):
[(0, 's0', None), (1, 's1', 0), (2, 's2', 0), (3, 's3', 0), (4, 's4', 3)]
+ a (4):
[(10000, 0, 0), (10002, 0, 2), (10003, 0, 3), (1000304, 0, 4)]
+ b (14):
[(1, 0, 0, 100, 0), (2, 0, 1, 100, 0), (3, 0, 2, 100, 0), (4, 0, 1, 10002, 2),
(5, 0, 2, 10002, 2), (6, 0, 3, 10002, 2), (7, 0, 1, 10003, 3),
(8, 0, 2, 10003, 3), (9, 0, 1, 1000304, 4), (10, 0, 2, 1000304, 4),
(11, 0, 666, 1000304, 4), (12, 0, -1, 10000, 0), (13, 0, 99, 10000, 0),
(14, 0, 199, 10000, 0)]