Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bayesian/api/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ def vulnerability_analysis_post():
"""
input_json: Dict = request.get_json()
ecosystem: str = input_json.get('ecosystem')
ignore_vulnerabilities: Dict = input_json.get('ignore', {})

try:
# Step1: Gather and clean Request
packages_list = validate_input(input_json, ecosystem)
# Step2: Get aggregated CA data from Query GraphDB,
graph_response = get_vulnerability_data(ecosystem, packages_list)
# Step3: Build Unknown packages and Generates Stack Recommendation.
stack_recommendation = get_known_pkgs(graph_response, packages_list)
stack_recommendation = get_known_pkgs(graph_response, packages_list, ignore_vulnerabilities)
except BadRequest as br:
logger.error(br)
raise HTTPError(400, str(br)) from br
Expand Down
21 changes: 20 additions & 1 deletion bayesian/utility/v2/component_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ def validate_input(input_json: Dict, ecosystem: str) -> List[Dict]:
error_msg: str = "package_versions is missing"
raise BadRequest(error_msg)

ignore_package_vulns = input_json.get('ignore', {})
if not isinstance(ignore_package_vulns, dict):
error_msg = "Expected a dictionary 'ignore' consisting of package names as key" \
", and list of vulnerabilities to ignore for that package as value"
raise BadRequest(error_msg)

packages_list = []
for pkg in input_json.get('package_versions'):
package = pkg.get("package")
Expand Down Expand Up @@ -171,6 +177,7 @@ def _fetcher_in_batches(func: Callable, packages: List,


def get_batch_ca_data(ecosystem: str, packages: List) -> dict:

"""Fetch package details for component analyses."""
logger.debug('Executing get_batch_ca_data')
started_at = time.time()
Expand Down Expand Up @@ -305,14 +312,26 @@ def clean_package_list(package_details_dict: Dict):
return packages_list


def get_known_pkgs(graph_response: Dict, packages_list: Dict) -> List[Dict]:
def get_known_pkgs(graph_response: Dict, packages_list: Dict, ignore_vulnerabilities: Dict) -> List[Dict]:
"""Analyse Known Packages."""
package_details_dict = {}

for temp in packages_list:
temp["vulnerabilities"] = []
package_details_dict[temp["name"]] = temp
for vulnerability in graph_response.get('result', {}).get('data'):
package_details = package_details_dict.get(vulnerability["package_name"][0])
vulns_to_ignore = ignore_vulnerabilities.get(vulnerability["package_name"][0], [])
if not isinstance(vulns_to_ignore, list):
error_msg = "Expected list of vulnerabilities to ignore in the 'ignore' dictionary as values, where the package name is the key"
raise BadRequest(error_msg)
# if package is in the 'ignore' dict and list of vulnerabilities to ignore is empty, ignore the vulnerability
if vulnerability["package_name"][0] in ignore_vulnerabilities and len(vulns_to_ignore) == 0:
continue
# if package is in the 'ignore' dict and list of vulnerabilities to ignore is not empty, ignore vulnerability
# if the vulnerability is in the list of vulnerabilities to ignore for that package
if vulnerability["package_name"][0] in ignore_vulnerabilities and vulnerability["snyk_vuln_id"][0] in vulns_to_ignore:
continue
if(check_vulnerable_package(package_details["version"],
vulnerability['vulnerable_versions'][0])):
package_details["vulnerabilities"].append(
Expand Down
46 changes: 30 additions & 16 deletions tests/utility/v2/test_component_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,48 @@ def test_get_known_pkgs_no_cve(self):
"""Test Known Pkgs, No Cve."""
input_pkgs = [{"name": "markdown2", "version": "2.3.2"}]
gremlin_batch_data_no_cve = {"result": {"data": []}}

stack_recommendation = get_known_pkgs(gremlin_batch_data_no_cve, input_pkgs)
ignore = {}
stack_recommendation = get_known_pkgs(gremlin_batch_data_no_cve, input_pkgs, ignore)
ideal_output = [{'name': 'markdown2',
'version': '2.3.2',
'vulnerabilities': []}]
self.assertListEqual(stack_recommendation, ideal_output)

def test_get_known_pkgs_with_cve(self):
def test_get_known_pkgs_with_cve_with_ignore(self):
"""Test Known Pkgs with Cve(VA)."""
input_pkgs = [{"name": "st", "version": "0.2.5"}]
batch_data_cve = os.path.join('tests/data/gremlin/va.json')
with open(batch_data_cve) as f:
gremlin_batch_data_cve = json.load(f)

stack_recommendation = get_known_pkgs(gremlin_batch_data_cve, input_pkgs)
ignore = {"st": ["SNYK-JS-ST-10820"]}
stack_recommendation = get_known_pkgs(gremlin_batch_data_cve, input_pkgs, ignore)
ideal_output = [{'name': 'st',
'version': '0.2.5',
'vulnerabilities': [{
"fixed_in": [
"1.2.2",
"1.2.3",
"1.2.4"
],
"id": "SNYK-JS-ST-10820",
"severity": "medium",
"title": "Open Redirect",
"url": "https://snyk.io/vuln/SNYK-JS-ST-10820"
}]}]
'vulnerabilities': []
}]
self.assertListEqual(stack_recommendation, ideal_output)

def test_get_known_pkgs_with_cve(self):
"""Test Known Pkgs with Cve(VA)."""
input_pkgs = [{"name": "st", "version": "0.2.5"}]
batch_data_cve = os.path.join('tests/data/gremlin/va.json')
with open(batch_data_cve) as f:
gremlin_batch_data_cve = json.load(f)
ignore = {}
stack_recommendation = get_known_pkgs(gremlin_batch_data_cve, input_pkgs, ignore)
ideal_output = [{'name': 'st',
'version': '0.2.5',
'vulnerabilities': [{
"fixed_in": [
"1.2.2",
"1.2.3",
"1.2.4"
],
"id": "SNYK-JS-ST-10820",
"severity": "medium",
"title": "Open Redirect",
"url": "https://snyk.io/vuln/SNYK-JS-ST-10820"
}]}]
self.assertListEqual(stack_recommendation, ideal_output)

@patch('bayesian.utility.v2.ca_response_builder.g')
Expand Down