-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
328 lines (266 loc) · 8.87 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import asyncio
from time import time
import signal
from tinydb import TinyDB
import websockets
from core import Packet, BotMiddleware, LoggerMiddleware, Log
from actions import (
PingAction,
SetNickAction
)
class Bot(object):
TAG_RAW = 'tag_raw'
TAG_DO_ACTION = 'tag_do_action'
RECONNECT_TIMEOUT = 5
def __init__(self, room_address, db_name='./MusicBotDB.json'):
self.loop = asyncio.get_event_loop()
self.action_queue = asyncio.JoinableQueue()
self.packet_queue = asyncio.JoinableQueue()
self.ws_queue = asyncio.JoinableQueue()
self.action_task = None
self.recv_task = None
self.running_actions = []
self.room_address = room_address
self.ws = None
self.db = TinyDB(db_name)
self.mid = 0
self.next_ping_time = int(time())
self.last_latency = None
self.last_ping_log = int(time())
self.reset_mid()
### middleware
self.middleware = {}
self.queues = {}
self.packet_queues = []
self.add_middleware(LoggerMiddleware())
self.log_queue = self.get_input_queue(LoggerMiddleware.TAG)
signal.signal(signal.SIGINT, self.sigint_handler())
self.closing = False
def sigint_handler(self):
def handler(signum, frame):
self.closing = True
return handler
def add_middleware(self, middleware):
if not middleware.TAG in self.middleware:
#print('new middleware: ', middleware)
for required_middleware in middleware.get_middleware_required():
#print('adding required middleware: ', required_middleware)
self.add_middleware(required_middleware())
#print('indexing middleware: ', middleware)
self.middleware[middleware.TAG] = middleware
middleware.register_queues(self)
middleware.load_state_from_db(self.db)
for tag, request in middleware.MIDDLEWARE_SUPPORT_REQUESTS.items():
result = self.middleware[tag].request_support(request)
if result != True:
middleware.support_request_failed(tag, result)
self.error('middleware request for support failed: {} -> {}'.format(middleware, tag))
return
middleware.create_task(self.loop, self.db)
def recieve_messages_for_tag(self, tag, queue):
if tag == self.TAG_RAW:
self.packet_queues.append(queue)
else:
if self.middleware[tag].TYPE == BotMiddleware.OUTPUT:
self.middleware[tag].add_output_queue(tag, queue)
def get_input_queue(self, tag):
if tag == self.TAG_DO_ACTION:
#print('get_input_queue({}) -> {}'.format(tag, self.action_queue))
return self.action_queue
else:
#print('get_input_queue({}) -> {}'.format(tag, self.middleware[tag].input))
if self.middleware[tag].TYPE == BotMiddleware.INPUT:
return self.middleware[tag].input
def reset_mid(self):
def mid_itr():
i = 0
while True:
yield i
i += 1
self.mid = mid_itr()
#do doo do, poor engineering practices
# copied and pasted from logging middleware because I am a bad person
@asyncio.coroutine
def log(self, level, *args):
l = Log(level, 'BOT', *args)
yield from self.log_queue.put((LoggerMiddleware.TAG ,l))
def exception(self, *args):
asyncio.async(self.log(Log.EXCEPTION, *args))
def error(self, *args):
asyncio.async(self.log(Log.ERROR, *args))
def debug(self, *args):
asyncio.async(self.log(Log.DEBUG, *args))
def verbose(self, *args):
asyncio.async(self.log(Log.VERBOSE, *args))
@asyncio.coroutine
def connect_ws(self, max_attempts = -1):
attempts = 0
# if we've ever had a web socket, empty ws_queue
if self.ws:
yield from self.ws_queue.get()
self.ws_queue.task_done()
while True:
try:
self.ws = yield from websockets.connect(self.room_address)
break
except:
if self.closing:
return True
self.debug('connection attempt {} failed', attempts)
yield from asyncio.sleep(5)
attempts += 1
if max_attempts > 0 and attempts >= max_attempts:
self.debug('max connection attempts exceeded, closing.')
return False
self.debug('connection succeeded')
if self.closing:
yield from self.ws.close()
else:
yield from self.setup()
yield from self.ws_queue.put(self.ws)
new_recv_task = self.loop.create_task(self.recv_loop())
if self.recv_task:
self.recv_task.cancel()
self.recv_task = new_recv_task
return True
@asyncio.coroutine
def close_bot(self):
for m in self.middleware.values():
yield from m.start_close()
if self.ws:
yield from self.ws.close()
while True:
done = True
for m in self.middleware.values():
done = done and m.done()
if done:
break
else:
yield from asyncio.sleep(0.1)
# empty queue
yield from self.action_queue.join()
self.action_task.cancel()
@asyncio.coroutine
def connection_monitor(self):
# create connection
connect_succeeded = yield from self.connect_ws(max_attempts = 1)
# sleep to allow first ping
yield from asyncio.sleep(1)
while True:
now = int(time())
if not connect_succeeded:
self.debug('Max connection attempts exceeded, closing bot.')
self.closing = True
if self.closing:
yield from self.close_bot()
break
yield from self.check_tasks()
if self.next_ping_timelimit <= now:
self.debug('Ping timeout has been missed, re-connecting')
connect_succeeded = yield from self.connect_ws(max_attempts = 600)
else:
yield from asyncio.sleep(1)
def anticipate_ping(self, ping_packet):
now = int(time())
latency = now - ping_packet.data['time']
if latency != self.last_latency:
self.debug('Current latency from server: {}', latency)
self.last_latency = latency
# delay before reconnect is equal to next - now
# plus timeout and travel time
self.next_ping_timelimit = ping_packet.data['next'] + self.RECONNECT_TIMEOUT + latency
@asyncio.coroutine
def setup(self):
return
@asyncio.coroutine
def recv_loop(self):
while True:
packet = yield from self.ws.recv()
if packet:
try:
packet = Packet(packet)
except:
self.error('Packet {} did not meet expectations! Please investigate!'.format(packet))
continue
#self.verbose('Packet type: {}', packet.type)
if packet.type == 'ping-event':
self.anticipate_ping(packet)
yield from self.action_queue.put((self.TAG_DO_ACTION, PingAction()))
else:
for queue in self.packet_queues:
yield from queue.put((self.TAG_RAW, packet))
else:
if self.closing:
break
else:
self.debug("websocket is closed, reconnecting")
yield from self.connect_ws(max_attempts = 600)
@asyncio.coroutine
def ws_queue_alarm(self):
yield from asyncio.sleep(2)
self.debug('ws queue has blocked for 2 seconds')
yield from asyncio.sleep(8)
self.error('ws queue has blocked for 10 seconds')
yield from asyncio.sleep(30)
self.exception('ws queue has blocked for 30 seconds')
@asyncio.coroutine
def action_alarm(self, action):
sleeps = [3, 7, 20]
total_time = 0
for sleep in sleeps:
yield from asyncio.sleep(sleep)
if action.done:
break
self.debug('Action {} has been running for {} seconds!', action, sleep)
total_time += sleep
@asyncio.coroutine
def execute_actions_task(self):
while True:
tag, action = yield from self.action_queue.get()
# set websocket
queue_alarm = asyncio.async(self.ws_queue_alarm())
ws = yield from self.ws_queue.get()
action.ws = ws
action.set_log_queue(self.log_queue)
yield from self.ws_queue.put(ws)
self.ws_queue.task_done()
queue_alarm.cancel()
#print('processing action: {}'.format(action))
action_alarm = asyncio.async(self.action_alarm(action))
task = action.get_coroutine(self.db, self.mid, self.action_queue)
action_task = self.loop.create_task(task())
self.running_actions.append(action_task)
self.action_queue.task_done()
@asyncio.coroutine
def check_tasks(self):
tasks = [(tag, middleware.task) for tag, middleware in self.middleware.items()]
tasks.append(('tag_do_action', self.action_task))
tasks.append(('tag_raw', self.recv_task))
finished_actions = [running_action for running_action in self.running_actions if running_action.done()]
for tag, task in tasks:
# should not happen, really
if task.done():
try:
result = task.result()
self.error('{} middleware done early, returned: {}', tag, result)
# catch all the things!
except Exception as e:
self.exception('{} middleware threw exception: {}', tag, e)
# again!
if tag in self.middleware:
middleware = self.middleware[tag]
middleware.create_task(self.loop, self.db)
elif tag == 'tag_do_action':
self.action_task = self.loop.create_task(self.execute_actions_task())
elif tag == 'tag_raw':
seld.debug('Recv loop will be reconnected by connection monitor.')
for done_action in finished_actions:
try:
e = done_action.exception()
if e:
self.exception('Action {} encountered an exception while executing: {}', done_action, e)
except CancelledError:
self.exception('Action {} was cancled?!', done_action)
def connect(self):
self.action_task = self.loop.create_task(self.execute_actions_task())
self.loop.run_until_complete(self.connection_monitor())