Skip to content
95 changes: 93 additions & 2 deletions backend/src/xfd_django/xfd_api/api_methods/cve.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,42 @@
"""Cve API."""
# Standard Python Libraries
import datetime
from typing import Optional
import logging
from typing import Any, Dict, Optional

# Third-Party Libraries
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Q
from fastapi import HTTPException, status
from xfd_mini_dl.models import Cve as CveModel
from xfd_mini_dl.models import User, UserType

from ..auth import is_global_write_admin
from ..auth import (
get_org_memberships,
is_global_view_admin,
is_global_write_admin,
is_regional_admin,
)
from ..tasks.es_client import ESClient

LOGGER = logging.getLogger(__name__)


def escape_wildcard_query(search_term: str) -> str:
"""Escape wildcard metacharacters in search term for wildcard queries.

Only escape backslash, asterisk, and question mark which have special meaning
in Elasticsearch wildcard queries. Everything else (including dashes) is literal.
Makes search case-insensitive by converting to uppercase to match stored CVE names.
"""
# Convert to uppercase to match stored CVE names (CVE-2016-... format)
search_term = search_term.upper()
# Escape backslash first to avoid double-escaping
result = search_term.replace("\\", "\\\\")
# Escape wildcard characters
result = result.replace("*", "\\*")
result = result.replace("?", "\\?")
return result


def get_cves_by_id(cve_id):
Expand Down Expand Up @@ -87,3 +114,67 @@ async def get_all_cves(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"DB error: {e}",
)


def search_cves_task(search_body, current_user: User):
"""
Search CVEs in Elasticsearch.

Args:
search_body (dict): The search query body.
current_user: The current user object.

Returns:
dict: The CVE search results with Organization IDs from Elasticsearch.
"""
try:
# Check if user is GlobalViewAdmin or has memberships
if not (
is_global_view_admin(current_user) or is_regional_admin(current_user)
) and not get_org_memberships(current_user):
return []

# Initialize Elasticsearch client
client = ESClient()

# Construct the Elasticsearch query
query_body: Dict[str, Any] = {"query": {"bool": {"must": [], "filter": []}}}

# Use match_all if searchTerm is empty
if search_body.search_term.strip():
# Use wildcard query on name.keyword (non-tokenized) to preserve dashes in CVE names
# Only escape wildcard metacharacters (* and ?), leave dashes and other chars literal
sanitized_search_term = escape_wildcard_query(search_body.search_term)
query_body["query"]["bool"]["must"].append(
{"wildcard": {"name.keyword": "*{}*".format(sanitized_search_term)}}
)
else:
query_body["query"]["bool"]["must"].append({"match_all": {}})

# For standard users, only show CVEs affecting their organization
if current_user.user_type == UserType.STANDARD:
org_ids = get_org_memberships(current_user)
if not org_ids:
return []
query_body["query"]["bool"]["filter"].append(
{"terms": {"organization_ids": org_ids}}
)

# Log the query for debugging
LOGGER.info("CVE Search Query: %s", query_body)
LOGGER.info("Search term: %s", search_body.search_term)

# Execute the search
search_results = client.search_cves(query_body)
LOGGER.info(
"CVE Search Results: %d hits",
len(search_results.get("hits", {}).get("hits", [])),
)

return {"body": search_results}

except HTTPException as http_exc:
raise http_exc
except Exception as e:
LOGGER.exception("Error occurred while searching CVEs: %s", e)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
populate_sample_data,
populate_scan_results,
)
from xfd_api.tasks.helpers.syncdb_helpers.es_sync import (
from xfd_api.tasks.helpers.syncdb_helpers.es_sync import ( # sync_es_cves,
manage_elasticsearch_indices,
sync_es_cves,
sync_es_organizations,
)
from xfd_api.tasks.helpers.syncdb_helpers.fill_static_tables import (
Expand Down Expand Up @@ -144,7 +145,10 @@ def handle(self, *args, **options): # pylint: disable=R0915
# Step 4.1: Sync domains in ES
sync_es_domains({})

# Step 5: Sync organizations in ES
# Step 4.2: Sync CVEs in ES - moved here to ensure CVEs are synced inside the populate block to avoid erroring out when no Vulnerability Materialized Views exist yet.
sync_es_cves()

# Step 5: Sync organizations in ES - may not be needed since sync_es_domains() already syncs organizations, but keeping it for completeness
sync_es_organizations()

# Step 6: Populate Scan Results
Expand Down
6 changes: 6 additions & 0 deletions backend/src/xfd_django/xfd_api/schema_models/cve.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ class GetAllCvesResponse(BaseModel):

status: str
payload: List[Cve]


class CveSearchBody(BaseModel):
"""Elastic search CVE request model."""

search_term: str = ""
48 changes: 48 additions & 0 deletions backend/src/xfd_django/xfd_api/tasks/es_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Constants
DOMAINS_INDEX = "domains-5"
ORGANIZATIONS_INDEX = "organizations-1"
CVE_INDEX = "cves-1"

# Define mappings
organization_mapping = {
Expand All @@ -26,6 +27,16 @@
"suggest": {"type": "completion"},
}
}
cve_mapping = {
"properties": {
"name": {
"type": "keyword",
},
"suggest": {"type": "completion"},
"organization_ids": {"type": "keyword"},
}
}

LOGGER = logging.getLogger(__name__)
# Raise log level for Elasticsearch client to WARNING to suppress request logs
logging.getLogger("elasticsearch").setLevel(logging.WARNING)
Expand Down Expand Up @@ -63,6 +74,25 @@ def sync_organizations_index(self):
LOGGER.error("Error syncing organizations index: %s", e)
raise e

def sync_cves_index(self):
"""Create or updates the CVE index with mappings."""
try:
if not self.client.indices.exists(index=CVE_INDEX):
LOGGER.info("Creating index %s...", CVE_INDEX)
self.client.indices.create(
index=CVE_INDEX,
body={
"mappings": cve_mapping,
"settings": {"number_of_shards": 2},
},
)
else:
LOGGER.info("Updating index %s...", CVE_INDEX)
self.client.indices.put_mapping(index=CVE_INDEX, body=cve_mapping)
except Exception as e:
LOGGER.error("Error syncing CVE index: %s", e)
raise e

def sync_domains_index(self):
"""Create or updates the domains index with mappings."""
try:
Expand Down Expand Up @@ -102,6 +132,20 @@ def update_organizations(self, organizations):
]
self._bulk_update(actions)

def update_cves(self, cves):
"""Update or inserts CVEs into Elasticsearch."""
actions = [
{
"_op_type": "update",
"_index": CVE_INDEX,
"_id": cve["id"],
"doc": {**cve, "suggest": [{"input": cve["name"], "weight": 1}]},
"doc_as_upsert": True,
}
for cve in cves
]
self._bulk_update(actions)

def update_domains(self, domains, max_retries=5, backoff_base=2):
"""Update or insert domains into Elasticsearch with retry and backoff."""
actions = [
Expand Down Expand Up @@ -189,6 +233,10 @@ def search_organizations(self, body):
"""Search organizations index with specified query body."""
return self.client.search(index=ORGANIZATIONS_INDEX, body=body)

def search_cves(self, body):
"""Search CVE index with specified query body."""
return self.client.search(index=CVE_INDEX, body=body)

def _bulk_update(self, actions):
"""Update to Elasticsearch."""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

# Third-Party Libraries
from xfd_api.tasks.es_client import ESClient
from xfd_mini_dl.models import Organization
from xfd_mini_dl.models import Cve as CveModel
from xfd_mini_dl.models import Organization, Vulnerability

# Elasticsearch client
es_client = ESClient()
Expand All @@ -23,6 +24,7 @@ def manage_elasticsearch_indices(dangerouslyforce):
es_client.delete_all()
es_client.sync_organizations_index()
es_client.sync_domains_index()
es_client.sync_cves_index()
LOGGER.info("Elasticsearch indices synchronized.")
except Exception as e:
LOGGER.error("Error managing Elasticsearch indices: %s", e)
Expand Down Expand Up @@ -73,3 +75,53 @@ def sync_es_organizations():
except Exception as e:
LOGGER.exception("Error syncing organizations: %s", e)
raise e


def sync_es_cves():
"""Sync elastic search CVEs."""
try:
# Fetch all CVEs with their affected organizations
cves_with_orgs = {}

# Get unique CVE-Organization pairs from vulnerabilities
vulns = (
Vulnerability.objects.filter(cve__isnull=False)
.values("cve", "organization_id")
.distinct()
)

for vuln in vulns:
cve_name = vuln["cve"]
org_id = vuln["organization_id"]
if cve_name not in cves_with_orgs:
cves_with_orgs[cve_name] = []
if org_id:
cves_with_orgs[cve_name].append(str(org_id))

# Fetch all CVEs
cves = list(
CveModel.objects.all().values(
"id",
"name",
"published_at",
"modified_at",
"status",
"description",
)
)

# Add organization IDs to each CVE
for cve in cves:
cve["organization_ids"] = cves_with_orgs.get(cve["name"], [])

LOGGER.info("Found %d CVEs to sync.", len(cves))

if cves:
# Update Elasticsearch with CVEs
es_client.update_cves(cves)
LOGGER.info("CVE sync complete.")
else:
LOGGER.info("No CVEs to sync.")
except Exception as e:
LOGGER.exception("Error syncing CVEs: %s", e)
raise e
6 changes: 5 additions & 1 deletion backend/src/xfd_django/xfd_api/tasks/searchSync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from xfd_mini_dl.models import Domain, Ip, SubDomains

from .es_client import ESClient
from .helpers.syncdb_helpers.es_sync import sync_es_organizations
from .helpers.syncdb_helpers.es_sync import sync_es_cves, sync_es_organizations

# Set up logging
LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -186,3 +186,7 @@ def handler(command_options):
LOGGER.info("Syncing organizations..")
sync_es_organizations()
LOGGER.info("Organization sync complete.")

LOGGER.info("Syncing CVEs..")
sync_es_cves()
LOGGER.info("CVE sync complete.")
22 changes: 20 additions & 2 deletions backend/src/xfd_django/xfd_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from .api_methods import organization, proxy, scan, scan_tasks, user
from .api_methods.blocklist import handle_bulk_check_ips
from .api_methods.cpe import get_cpes_by_id
from .api_methods.cve import get_all_cves, get_cves_by_id, get_cves_by_name
from .api_methods.cve import (
get_all_cves,
get_cves_by_id,
get_cves_by_name,
search_cves_task,
)
from .api_methods.dmz_sync import CybersixSyncParams
from .api_methods.dns_twist_sync import dns_twist_sync_post
from .api_methods.domain import (
Expand Down Expand Up @@ -123,7 +128,7 @@
)
from .schema_models.cpe import Cpe as CpeSchema
from .schema_models.cve import Cve as CveSchema
from .schema_models.cve import GetAllCvesResponse
from .schema_models.cve import CveSearchBody, GetAllCvesResponse
from .schema_models.dmz_sync import (
AsmSyncRequest,
AsmSyncResponse,
Expand Down Expand Up @@ -435,6 +440,19 @@ async def get_call_all_cves(
)


@api_router.post(
"/search/cves",
dependencies=[Depends(get_current_active_user)],
tags=["CVEs"],
)
async def search_cves(
search_body: CveSearchBody,
current_user: User = Depends(get_current_active_user),
):
"""Search CVEs in Elasticsearch."""
return search_cves_task(search_body, current_user)


# ========================================
# New Export Endpoint
# ========================================
Expand Down
Loading
Loading