-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvulnerability_db.py
129 lines (116 loc) · 6.43 KB
/
vulnerability_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import aiohttp
import asyncio
from typing import List, Dict
import logging
from colorama import Fore, Style
import json
import re
from bs4 import BeautifulSoup
import traceback
logger = logging.getLogger(__name__)
def log(message, color=Fore.WHITE):
logger.info(f"{color}{message}{Style.RESET_ALL}")
class VulnerabilityDB:
def __init__(self):
self.base_url = "https://services.nvd.nist.gov/rest/json/cves/1.0"
async def search_vulnerabilities(self, keyword: str) -> List[Dict]:
async with aiohttp.ClientSession() as session:
try:
async with session.get(f"{self.base_url}?keyword={keyword}") as response:
if response.status == 200:
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
data = await response.json()
return self.process_json_vulnerabilities(data)
else:
log(f"Unexpected content type: {content_type}. Attempting to parse as HTML.", Fore.YELLOW)
text_content = await response.text()
return self.parse_html_content(text_content)
else:
log(f"Error fetching vulnerabilities: HTTP {response.status}", Fore.RED)
return []
except aiohttp.ClientError as e:
log(f"Error connecting to vulnerability database: {str(e)}", Fore.RED)
return []
except Exception as e:
log(f"Unexpected error in search_vulnerabilities: {str(e)}", Fore.RED)
return []
def process_json_vulnerabilities(self, data: Dict) -> List[Dict]:
try:
vulnerabilities = []
for item in data.get("result", {}).get("CVE_Items", []):
cve = item.get("cve", {})
impact = item.get("impact", {})
vuln = {
"cve": cve.get("CVE_data_meta", {}).get("ID"),
"description": cve.get("description", {}).get("description_data", [{}])[0].get("value"),
"severity": self.get_severity(impact),
"cvss_score": self.get_cvss_score(impact),
"published_date": item.get("publishedDate"),
"last_modified_date": item.get("lastModifiedDate")
}
vulnerabilities.append(vuln)
return vulnerabilities
except Exception as e:
log(f"Error processing JSON vulnerabilities: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return []
def get_severity(self, impact: Dict) -> str:
try:
base_severity = impact.get("baseMetricV3", {}).get("cvssV3", {}).get("baseSeverity")
return base_severity if base_severity else "Unknown"
except Exception as e:
log(f"Error getting severity: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return "Unknown"
def get_cvss_score(self, impact: Dict) -> float:
try:
base_score = impact.get("baseMetricV3", {}).get("cvssV3", {}).get("baseScore")
return float(base_score) if base_score else 0.0
except Exception as e:
log(f"Error getting CVSS score: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return 0.0
def parse_html_content(self, html_content: str) -> List[Dict]:
try:
soup = BeautifulSoup(html_content, 'html.parser')
vulnerabilities = []
for vuln in soup.find_all('div', class_='vulnerability'):
cve_id = vuln.find('span', class_='cve-id').text if vuln.find('span', class_='cve-id') else 'Unknown'
description = vuln.find('p', class_='description').text if vuln.find('p', class_='description') else 'No description available'
severity = vuln.find('span', class_='severity').text if vuln.find('span', class_='severity') else 'Unknown'
published_date = vuln.find('span', class_='published-date').text if vuln.find('span', class_='published-date') else ''
last_modified_date = vuln.find('span', class_='last-modified-date').text if vuln.find('span', class_='last-modified-date') else ''
vulnerabilities.append({
'cve': cve_id,
'description': description,
'severity': severity,
'cvss_score': 0.0, # As HTML might not provide CVSS score, defaulting to 0.0
'published_date': published_date,
'last_modified_date': last_modified_date
})
return vulnerabilities
except Exception as e:
log(f"Error parsing HTML content: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return []
async def get_vulnerability_details(self, cve_id: str) -> Dict:
async with aiohttp.ClientSession() as session:
try:
async with session.get(f"{self.base_url}?cveId={cve_id}") as response:
if response.status == 200:
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
data = await response.json()
vulnerabilities = self.process_json_vulnerabilities(data)
return vulnerabilities[0] if vulnerabilities else {}
else:
log(f"Unexpected content type: {content_type}\n{traceback.format_exc()}", Fore.YELLOW)
text_content = await response.text()
vulnerabilities = self.parse_html_content(text_content)
return vulnerabilities[0] if vulnerabilities else {}
else:
log(f"Error fetching vulnerability details: HTTP {response.status}\n{traceback.format_exc()}", Fore.RED)
return {}
except aiohttp.ClientError as e:
log(f"Error connecting to vulnerability database: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return {}
except Exception as e:
log(f"Unexpected error in get_vulnerability_details: {str(e)}\n{traceback.format_exc()}", Fore.RED)
return {}