-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
74 lines (66 loc) · 3.05 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
import aiohttp
from typing import List, Dict
import time
import logging
logger = logging.getLogger(__name__)
class Monitor:
def __init__(self, targets: List[str], check_interval: int = 3600, mode: str = 'regular'):
logger.debug(f"Initializing Monitor with targets: {targets}, check_interval: {check_interval}, mode: {mode}")
self.targets = targets
self.check_interval = check_interval
self.mode = mode
self.session = None
logger.debug("Monitor initialized")
async def start_monitoring(self):
logger.debug("Starting monitoring loop")
self.session = aiohttp.ClientSession()
try:
while True:
logger.debug(f"Creating tasks for targets: {self.targets}")
tasks = [self.check_target(target) for target in self.targets]
logger.debug("Gathering results from tasks")
results = await asyncio.gather(*tasks)
logger.debug("Processing results")
self.process_results(results)
if self.mode == 'regular':
logger.debug("Regular mode: stopping after one iteration")
break
logger.debug(f"Sleeping for {self.check_interval} seconds")
await asyncio.sleep(self.check_interval)
finally:
await self.stop_monitoring()
async def check_target(self, target: str) -> Dict:
logger.debug(f"Checking target: {target}")
try:
start_time = time.time()
logger.debug(f"Sending GET request to {target}")
async with self.session.get(target) as response:
logger.debug(f"Received response from {target}")
content = await response.text()
response_time = time.time() - start_time
logger.debug(f"Response time for {target}: {response_time:.2f}s")
return {
"target": target,
"status": response.status,
"response_time": response_time,
"content_length": len(content)
}
except Exception as e:
logger.error(f"Error checking {target}: {str(e)}")
return {"target": target, "error": str(e)}
def process_results(self, results: List[Dict]):
logger.debug(f"Processing {len(results)} results")
for result in results:
if "error" in result:
logger.warning(f"Target {result['target']} is unreachable: {result['error']}")
else:
logger.info(f"Target {result['target']}: Status {result['status']}, "
f"Response Time {result['response_time']:.2f}s, "
f"Content Length {result['content_length']} bytes")
logger.debug("Finished processing results")
async def stop_monitoring(self):
logger.debug("Stopping monitoring")
if self.session:
await self.session.close()
logger.debug("Monitoring stopped")