1+ import signal
2+ import time
3+ import paho .mqtt .client as mqtt
4+
5+ class Hub ():
6+ def __init__ (self , account_id , api_key , client_id = None ):
7+ self .account_id = account_id
8+ self .client = mqtt .Client (client_id = client_id , clean_session = True )
9+ self .client .username_pw_set (account_id , api_key )
10+ # self.client.tls_set(ca_certs='labstack.com/cert.pem')
11+ self .handlers = {}
12+ def handler (client , userdata , msg ):
13+ self .handlers [msg .topic ](msg .payload )
14+ self .client .on_message = handler
15+ self ._run = True
16+ signal .signal (signal .SIGINT , self ._stop )
17+ signal .signal (signal .SIGTERM , self ._stop )
18+
19+ def connect (self , handler = None ):
20+ self .client .connect ("hub.labstack.com" , 1883 )
21+ self .client .loop_start ()
22+ def on_connect (client , userdata , flags , rc ):
23+ handler ()
24+ self .client .on_connect = on_connect
25+
26+ def publish (self , topic , message ):
27+ self .client .publish ('{}/{}' .format (self .account_id , topic ), message )
28+
29+ # def subscribe(self, topic, handler, shared=False):
30+ def subscribe (self , topic , handler ):
31+ topic = '{}/{}' .format (self .account_id , topic )
32+ # if shared:
33+ # topic = '$queue/' + topic
34+ self .client .subscribe (topic )
35+ self .handlers [topic ] = handler
36+
37+ def unsubscribe (self , topic ):
38+ self .client .unsubscribe (topic )
39+
40+ def disconnect (self ):
41+ self .client .loop_stop ()
42+ self .client .disconnect ()
43+
44+ def run (self ):
45+ while self ._run :
46+ time .sleep (1 )
47+
48+ def _stop (self , signum , frame ):
49+ self ._run = False
50+ self .disconnect ()
0 commit comments