-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom-ntfy
64 lines (49 loc) · 1.67 KB
/
custom-ntfy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/var/ossec/framework/python/bin/python3
import json
import os
import sys
import requests
def main(args):
if len(args) < 4:
print("ERROR: Wrong arguments")
sys.exit(2)
alert_file_location = args[1]
hook_url = args[3]
try:
with open(alert_file_location) as alert_file:
alert_json = json.load(alert_file)
alert_level = alert_json['rule']['level']
description = alert_json['rule'].get('description', 'N/A')
agentname = alert_json['agent'].get('name', 'N/A')
timestamp = alert_json['timestamp']
ntfy_level = 1
if 2 <= alert_level <= 7:
ntfy_level = 1
elif 8 <= alert_level <= 9:
ntfy_level = 2
elif 10 <= alert_level <= 11:
ntfy_level = 3
elif 12 <= alert_level <= 13:
ntfy_level = 4
elif 14 <= alert_level <= 15:
ntfy_level = 5
headers = {
'X-Title': f'{agentname}: {description}',
'X-Priority': str(ntfy_level),
}
# Convert the date to a more readable format: 2024-03-27T15:45:27.388+0000
timestamp = timestamp.replace('T', ' ').replace('+0000', '')
data = f'Date: {timestamp}\nAgent: {agentname}\nDescription:{description}'
response = requests.post(url=hook_url, headers=headers, data=data)
response.raise_for_status()
except FileNotFoundError:
print(f"ERROR: Alert file {alert_file_location} not found")
sys.exit(6)
except json.JSONDecodeError:
print(f"ERROR: Invalid JSON in {alert_file_location}")
sys.exit(7)
except requests.RequestException as e:
print(f"ERROR: Failed to send request to {hook_url}: {e}")
sys.exit(1)
if __name__ == "__main__":
main(sys.argv)