forked from CSE-ICE-21/kobuki-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQBot.py
More file actions
94 lines (71 loc) · 2.69 KB
/
QBot.py
File metadata and controls
94 lines (71 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import serial
class SubPayloadDecoders:
def BasicSensorDataDecoder(subPayload):
pass
def DockingIRSensorDataDecoder(subPayload):
pass
def InertialSensorDataDecoder(subPayload):
pass
def CliffSensorDataDecoder(subPayload):
pass
def CurrentSensorDataDecoder(subPayload):
pass
def HardwareVersionDecoder(subPayload):
pass
def FirmwareVersionDecoder(subPayload):
pass
def RawGyroDataDecoder(subPayload):
pass
def GPInputDataDecoder(subPayload):
pass
def UDIDDecoder(subPayload):
pass
def PIDControllerDataDecoder(subPayload):
pass
class QBot:
def __init__(self, serialPort):
self.serial = serial.Serial(
port = serialPort,
baudrate = 115200,
)
self.sensorData = {}
def divideToSubPayloads(self, payload):
subPayloads = {}
index = 0
while index < len(payload):
subPayloadLength = payload[index+1]
subPayloads[int.to_bytes(payload[index])] = payload[index:index+subPayloadLength+2]
index += subPayloadLength+2
return subPayloads
def readPayload(self):
payloadReceived = False
while not payloadReceived:
while self.serial.read() != b'\xAA': # TODO: Checkk if this is randomly a byte sequence in the data
pass # TODO: set a timeout
if self.serial.read() == b'\x55':
payloadReceived = True
payloadLength = int.from_bytes(self.serial.read())
payload = self.serial.read(payloadLength)
checksum = self.serial.read()
return payload if self.calculateChecksum(payload) == checksum else None
def calculateChecksum(self, payload):
checksum = len(payload)
for byte in payload:
checksum = byte ^ checksum
return int.to_bytes(checksum)
def decodeSubPayload(self, )
def close(self):
self.serial.close()
subPayloadDecoders = {
b'\x01': SubPayloadDecoders.BasicSensorDataDecoder,
b'\x03': SubPayloadDecoders.DockingIRSensorDataDecoder,
b'\x04': SubPayloadDecoders.InertialSensorDataDecoder,
b'\x05': SubPayloadDecoders.CliffSensorDataDecoder,
b'\x06': SubPayloadDecoders.CurrentSensorDataDecoder,
b'\x0A': SubPayloadDecoders.HardwareVersionDecoder,
b'\x0B': SubPayloadDecoders.FirmwareVersionDecoder,
b'\x0D': SubPayloadDecoders.RawGyroDataDecoder,
b'\x10': SubPayloadDecoders.GPInputDataDecoder,
b'\x13': SubPayloadDecoders.UDIDDecoder,
b'\x15': SubPayloadDecoders.PIDControllerDataDecoder
}