-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathQueryConcurrencyController.py
More file actions
128 lines (119 loc) · 5.7 KB
/
QueryConcurrencyController.py
File metadata and controls
128 lines (119 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import sys
sys.path.append('./Concurrency_Control_Manager')
from Concurrency_Control_Manager.ConcurrencyControlManager import ConcurrencyControlManager
from Concurrency_Control_Manager.models.Operation import Operation
from Concurrency_Control_Manager.models.CCManagerEnums import OperationType, ResponseType
from Failure_Recovery import FailureRecoveryManager
from utils.models import ExecutionResult, Rows
from datetime import datetime
class QueryConcurrencyController:
operations: list[ExecutionResult]
def __init__(self):
self.ccm = ConcurrencyControlManager()
self.frm = FailureRecoveryManager.FailureRecoveryManager("Failure_Recovery/wal.log")
self.is_transacting = False
self.operations = []
self.queries_operations = []
self.failed_operations = []
self.transact_id = 1
self.is_rollingback = False
def begin_transaction(self):
self.is_transacting = True
def check_for_response_select(self, table_names: list[str]) -> list | str:
self.transact_id = self.ccm.begin_transaction()
try:
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.R, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
else:
return "LE WAIT"
self.ccm.end_transaction(self.transact_id)
return "OK"
except Exception as e:
print(f"BLa bla bla: {e}")
def check_for_response_insert(self, table_names: list[str]) -> list | str:
self.transact_id = self.ccm.begin_transaction()
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.W, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
self.ccm.end_transaction(self.transact_id)
return "OK"
def check_for_response_update(self, table_names: list[str]) -> list | str:
self.transact_id = self.ccm.begin_transaction()
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.R, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.W, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
return "OK"
def check_for_response_delete(self, table_names: list[str]) -> list | str:
self.transact_id = self.ccm.begin_transaction()
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.R, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
for res_string in table_names:
ops = Operation(self.transact_id, OperationType.W, f"{res_string}")
response = self.ccm.validate_object(ops)
if response.responseType.name == "ALLOWED":
self.ccm.log_object(ops)
elif response.responseType.name == "ABORT":
rollback = self.frm.recover(self.transact_id)
self.is_rollingback = True
self.failed_operations.append(ops)
self.ccm.end_transaction(self.transact_id)
return rollback
return "OK"
def end_transaction(self):
res = ExecutionResult(
transaction_id=self.transact_id,
timestamp=datetime.now(),
type="COMMIT",
status="success",
query="COMMIT",
previous_data=Rows(data=[], rows_count=0, schema=[], columns=[]),
new_data=Rows(data=[], rows_count=0, schema=[], columns=[])
)
self.frm.write_log(res)
self.is_transacting = False
self.ccm.end_transaction(self.transact_id)