-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathConnectionSample.py
More file actions
85 lines (60 loc) · 2.58 KB
/
ConnectionSample.py
File metadata and controls
85 lines (60 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import logging
from enos.core.MqttClient import MqttClient
from enos.core.internal.Profile import Profile
from enos.sample.SampleHelper import SampleHelper
# these file are generated by get_cert.py via EnOS cert tool, for tls connection
ca_file = 'edge_ca.pem'
key_file = 'edge.key'
cer_file = 'edge.pem'
key_file_password = 'PRIVATE_KEY_PASSWORD'
def on_connect():
""" Called when the connection to the server is completed."""
print('connect success')
def on_disconnect():
""" Called when the client connection lost."""
print('connect lost')
def on_connect_failed():
""" Called when the client connect failed"""
print('connect failed...')
def on_device_dynamic_activation(device_secret):
""" Record the device_secret for further connection."""
print('device dynamic activate success, the device secret is:' + device_secret)
def static_device_login():
static_profile = Profile.create_instance(SampleHelper.get_device_static_login())
client = MqttClient(profile=static_profile)
client.get_profile().set_auto_reconnect(True)
# register connection callback
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_connected_failed = on_connect_failed
client.connect() # connect in sync
client.close()
def dynamic_device_login():
dynamic_activation_profile = Profile.create_instance(SampleHelper.get_device_dynamic_login())
client = MqttClient(profile=dynamic_activation_profile)
client.get_profile().set_auto_reconnect(True)
# register dynamic activation callback to acquire device_secret
client.on_device_dynamic_active = on_device_dynamic_activation
# register connection callback
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_connected_failed = on_connect_failed
client.connect() # connect in sync
client.close()
def secure_login():
static_ssl_profile = Profile.create_instance(SampleHelper.get_device_static_ssl_login())
client = MqttClient(profile=static_ssl_profile)
client.get_profile().set_auto_reconnect(True)
# set the certificate files for bi-directional certification
client.get_profile().set_ssl_context(ca_file, cer_file, key_file, key_file_password)
client.setup_basic_logger(logging.INFO)
# register connection callback
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_connected_failed = on_connect_failed
client.connect() # connect in sync
client.close()
if __name__ == "__main__":
# static_device_login()
# dynamic_device_login()
secure_login()