Skip to content

Commit

Permalink
added CPE commands to the cli and starting working on copying functio…
Browse files Browse the repository at this point in the history
…nality from cve-search binaries
  • Loading branch information
P-T-I committed Oct 13, 2023
1 parent 63e8967 commit 9f58fed
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CveXplore/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.7.dev2
0.3.7.dev5
9 changes: 7 additions & 2 deletions CveXplore/cli.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import logging

import click
import click_completion.core

from CveXplore.cli_cmds.cpe_cmds import commands as group5
from CveXplore.cli_cmds.cve_cmds import commands as group2
from CveXplore.cli_cmds.db_cmds import commands as group4
from CveXplore.cli_cmds.search_cmds import commands as group1
from CveXplore.cli_cmds.find_cmds import commands as group1
from CveXplore.cli_cmds.stats_cmds import commands as group3
from CveXplore.main import CveXplore


click_completion.init()

logging.getLogger("dicttoxml").setLevel("ERROR")


@click.group(invoke_without_command=True)
@click.option("-v", "--version", is_flag=True, help="Show the current version and exit")
Expand All @@ -25,3 +29,4 @@ def main(ctx, version):
main.add_command(group2.cve_cmd)
main.add_command(group3.stats_cmd)
main.add_command(group4.db_cmd)
main.add_command(group5.cpe_cmd)
2 changes: 1 addition & 1 deletion CveXplore/cli_cmds/cli_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def format_output(format_type, input_list):
"""

if format_type == "json":
output = json.dumps(input_list)
output = json.dumps(input_list, indent=4, sort_keys=True, default=str)
elif format_type == "csv":
df = pd.DataFrame(input_list)
output = df.to_csv(index=False)
Expand Down
File renamed without changes.
158 changes: 158 additions & 0 deletions CveXplore/cli_cmds/cpe_cmds/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from pprint import pformat

import click
import pymongo

from CveXplore.cli_cmds.cli_utils.utils import printer
from CveXplore.cli_cmds.mutex_options.mutex import Mutex


@click.group("cpe", invoke_without_command=True, help="Query for cpe specific data")
@click.pass_context
def cpe_cmd(ctx):
if ctx.invoked_subcommand is None:
click.echo(cpe_cmd.get_help(ctx))


@cpe_cmd.group(
"search",
invoke_without_command=True,
help="Search for cpe entries by name or title. Results are sorted ASCENDING by default",
)
@click.option(
"-n",
"--name",
default=False,
is_flag=True,
help="Search by name",
cls=Mutex,
not_required_if=["title", "vendor"],
)
@click.option(
"-t",
"--title",
default=True,
is_flag=True,
help="Search by title (default)",
cls=Mutex,
not_required_if=["name", "vendor"],
)
@click.option(
"-v",
"--vendor",
default=False,
is_flag=True,
help="Search by vendor",
cls=Mutex,
not_required_if=["name", "title"],
)
@click.option(
"-m",
"--match",
help="Use match for searching",
cls=Mutex,
not_required_if=["regex"],
)
@click.option(
"-r",
"--regex",
help="Use regex for searching",
cls=Mutex,
not_required_if=["match"],
)
@click.option("-d", "--deprecated", is_flag=True, help="Filter deprecated cpe's")
@click.option("-c", "--cve", is_flag=True, help="Add related CVE's")
@click.option(
"-p",
"--product_search",
default=False,
is_flag=True,
help="If adding CVE's search for vulnerable products and not for vulnerable configurations",
)
@click.option("-l", "--limit", default=10, help="Search limit")
@click.option("-s", "--sort", is_flag=True, help="Sort DESCENDING")
@click.option(
"--pretty",
is_flag=True,
help="Pretty print the output",
cls=Mutex,
not_required_if=["output"],
)
@click.option(
"-o",
"--output",
default="json",
help="Set the desired output format (defaults to json)",
type=click.Choice(["json", "csv", "xml", "html"], case_sensitive=False),
cls=Mutex,
not_required_if=["pretty"],
)
@click.pass_context
def search_cmd(
ctx,
name,
title,
vendor,
match,
regex,
deprecated,
cve,
product_search,
limit,
sort,
pretty,
output,
):

if not name and not vendor and title:
search_by = "title"
elif name and not vendor and title:
search_by = "cpeName"
else:
search_by = "vendor"

if not sort:
sorting = pymongo.ASCENDING
else:
sorting = pymongo.DESCENDING

if regex:
if deprecated:
ret_list = getattr(ctx.obj["data_source"], "cpe").search_active_cpes(
search_by, regex, limit, sorting
)
else:
ret_list = (
getattr(getattr(ctx.obj["data_source"], "cpe"), search_by)
.search(regex)
.limit(limit)
.sort(search_by, sorting)
)
elif match:
if deprecated:
ret_list = getattr(ctx.obj["data_source"], "cpe").find_active_cpes(
search_by, match, limit, sorting
)
else:
ret_list = (
getattr(getattr(ctx.obj["data_source"], "cpe"), search_by)
.find(match)
.limit(limit)
.sort(search_by, sorting)
)
else:
click.echo(search_cmd.get_help(ctx))
return

if cve:
result = [result.to_cve_summary(product_search) for result in ret_list]
else:
result = [result.to_dict() for result in ret_list]

if ctx.invoked_subcommand is None:
printer(input_data=result, pretty=pretty, output=output)
else:
if pretty:
ctx.obj["RESULT"] = pformat(result, indent=4)
else:
ctx.obj["RESULT"] = result
15 changes: 6 additions & 9 deletions CveXplore/cli_cmds/cve_cmds/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
@click.group("cve", invoke_without_command=True, help="Query for cve specific data")
@click.pass_context
def cve_cmd(ctx):
pass
if ctx.invoked_subcommand is None:
click.echo(cve_cmd.get_help(ctx))


@cve_cmd.group(
"last", invoke_without_command=True, help="Query the last N amount of cve entries"
"last",
invoke_without_command=True,
help="Query the last L (-l) amount of cve entries",
)
@click.option("-l", "--limit", default=10, help="Query limit")
@click.option(
Expand All @@ -27,7 +30,7 @@ def cve_cmd(ctx):
"-o",
"--output",
default="json",
help="Set the desired output format",
help="Set the desired output format (defaults to json)",
type=click.Choice(["json", "csv", "xml", "html"], case_sensitive=False),
cls=Mutex,
not_required_if=["pretty"],
Expand All @@ -45,9 +48,3 @@ def last_cmd(ctx, limit, pretty, output):
ctx.obj["RESULT"] = pformat(result, indent=4)
else:
ctx.obj["RESULT"] = result


@last_cmd.command("less", help="Lets you scroll through the returned results")
@click.pass_context
def less_cmd(ctx):
click.echo_via_pager(ctx.obj["RESULT"])
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@


@click.group(
"search",
"find",
invoke_without_command=True,
help="Perform search queries on a single collection",
help="Perform find queries on a single collection",
)
@click.option(
"-c",
Expand All @@ -32,7 +32,7 @@
"-o",
"--output",
default="json",
help="Set the desired output format",
help="Set the desired output format (defaults to json)",
type=click.Choice(["json", "csv", "xml", "html"], case_sensitive=False),
cls=Mutex,
not_required_if=["pretty"],
Expand All @@ -54,9 +54,3 @@ def search_cmd(ctx, collection, field, value, limit, pretty, output):
ctx.obj["RESULT"] = pformat(result, indent=4)
else:
ctx.obj["RESULT"] = result


@search_cmd.command("less", help="Lets you scroll through the returned results")
@click.pass_context
def less_cmd(ctx):
click.echo_via_pager(ctx.obj["RESULT"])
24 changes: 24 additions & 0 deletions CveXplore/common/data_source_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,37 @@ class DatasourceConnection(CveXploreObject):
else MongoDBConnection(**json.loads(os.getenv("MONGODB_CON_DETAILS")))
)

def to_dict(self, *print_keys: str) -> dict:
"""
Method to convert the entire object to a dictionary
"""

if len(print_keys) != 0:
full_dict = {
k: v
for (k, v) in self.__dict__.items()
if not k.startswith("_") and k in print_keys
}
else:
full_dict = {
k: v for (k, v) in self.__dict__.items() if not k.startswith("_")
}

return full_dict

def __init__(self, collection: str):
"""
Create a DatasourceConnection object
"""
super().__init__()
self.__collection = collection

def __eq__(self, other):
return self.__dict__ == other.__dict__

def __ne__(self, other):
return self.__dict__ != other.__dict__

@property
def _datasource_connection(self):
return DatasourceConnection.__DATA_SOURCE_CONNECTION
Expand Down
64 changes: 57 additions & 7 deletions CveXplore/database/helpers/specific_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Specific database functions
===========================
"""
import re
from typing import List

from pymongo import DESCENDING
Expand All @@ -24,13 +25,6 @@ def get_cves_for_vendor(
) -> List[CveXploreObject] | None:
"""
Function to return cves based on a given vendor. By default, to result is sorted descending on th cvss field.
:param vendor: A vendor to search for; e.g. microsoft
:type vendor: str
:param limit: Limit the amount of returned results
:type limit: int
:return: List with cves objects
:rtype: list
"""

the_result = list(
Expand All @@ -47,3 +41,59 @@ def get_cves_for_vendor(
def __repr__(self):
"""String representation of object"""
return "<< CvesDatabaseFunctions:{} >>".format(self._collection)


class CpeDatabaseFunctions(GenericDatabaseFactory):
"""
The CpeDatabaseFunctions is a specific class that provides the cpe attribute of a CveXplore instance additional
functions that only apply to the 'cpe' collection
"""

def __init__(self, collection: str):
super().__init__(collection)

def search_active_cpes(
self, field: str, value: str, limit: int = 0, sorting: int = 1
) -> List[CveXploreObject] | None:
"""
Function to regex search for cpe based on value in string. Only active (deprecated == false) cpe records are
returned.
"""
regex = re.compile(value, re.IGNORECASE)

query = {"$and": [{field: {"$regex": regex}}, {"deprecated": False}]}

the_result = list(
self._datasource_collection_connection.find(query)
.limit(limit)
.sort(field, sorting)
)

if len(the_result) != 0:
return the_result
else:
return None

def find_active_cpes(
self, field: str, value: str, limit: int = 0, sorting: int = 1
) -> List[CveXploreObject] | None:
"""
Function to find cpe based on value in string. Only active (deprecated == false) cpe records are
returned.
"""
query = {"$and": [{field: value}, {"deprecated": False}]}

the_result = list(
self._datasource_collection_connection.find(query)
.limit(limit)
.sort(field, sorting)
)

if len(the_result) != 0:
return the_result
else:
return None

def __repr__(self):
"""String representation of object"""
return "<< CpeDatabaseFunctions:{} >>".format(self._collection)
Loading

0 comments on commit 9f58fed

Please sign in to comment.