Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 5.2.11 on 2026-06-11 13:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0134_advisoryv2__all_impacts_unfurled_at_and_more"),
]

operations = [
migrations.AddField(
model_name="advisoryexploit",
name="record_id",
field=models.CharField(
blank=True,
help_text="The unique identifier for the exploit record in the original upstream data source, such as the CISA KEV ID or the exploitdb ID.",
max_length=255,
null=True,
),
),
migrations.AddConstraint(
model_name="advisoryexploit",
constraint=models.UniqueConstraint(
fields=("advisory", "data_source", "record_id"),
name="unique_advisory_exploit_source",
),
),
]
15 changes: 15 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3959,10 +3959,25 @@ class AdvisoryExploit(models.Model):
help_text="The URL to the exploit as provided in the original upstream data source.",
)

record_id = models.CharField(
null=True,
blank=True,
max_length=255,
help_text="The unique identifier for the exploit record in the original upstream data source, such as the CISA KEV ID or the exploitdb ID.",
)

@property
def get_known_ransomware_campaign_use_type(self):
return "Known" if self.known_ransomware_campaign_use else "Unknown"

class Meta:
constraints = [
models.UniqueConstraint(
fields=["advisory", "data_source", "record_id"],
name="unique_advisory_exploit_source",
)
]


class SSVC(models.Model):
vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.")
Expand Down
146 changes: 68 additions & 78 deletions vulnerabilities/pipelines/v2_improvers/enhance_with_exploitdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
import requests
from aboutcode.pipeline import LoopProgress
from dateutil import parser as dateparser
from django.db import DataError

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import AdvisoryExploit
from vulnerabilities.models import AdvisoryReference
from vulnerabilities.models import AdvisoryV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.utils import relate_aliases_with_advisories
from vulnerabilities.utils import build_alias_to_advisory_map


class ExploitDBImproverPipeline(VulnerableCodePipeline):
Expand Down Expand Up @@ -66,86 +62,80 @@ def add_exploit(self):
raw_data = list(csvreader)
fetched_exploit_count = len(raw_data)

vulnerability_exploit_count = 0
self.log(f"Enhancing the vulnerability with {fetched_exploit_count:,d} exploit records")
progress = LoopProgress(total_iterations=fetched_exploit_count, logger=self.log)

for row in progress.iter(raw_data):
vulnerability_exploit_count += add_vulnerability_exploit(row, self.log)

self.log(f"Successfully added {vulnerability_exploit_count:,d} exploit-db advisory exploit")


def add_vulnerability_exploit(row, logger):
advisories = set()

aliases = row["codes"].split(";") if row["codes"] else []

if not aliases:
return 0
all_aliases = set()

advisories = relate_aliases_with_advisories(aliases)
for row in raw_data:
if row["codes"]:
all_aliases.update(row["codes"].split(";"))

if not advisories:
logger(f"No advisory found for aliases {aliases}")
return 0
alias_to_advisories = build_alias_to_advisory_map(all_aliases)

date_added = parse_date(row["date_added"])
source_date_published = parse_date(row["date_published"])
source_date_updated = parse_date(row["date_updated"])
exploits = []
seen = set()

for advisory in advisories:
add_exploit_references(row["codes"], row["source_url"], row["file"], advisory, logger)
try:
AdvisoryExploit.objects.update_or_create(
advisory=advisory,
data_source="Exploit-DB",
defaults={
"date_added": date_added,
"description": row["description"],
"known_ransomware_campaign_use": row["verified"],
"source_date_published": source_date_published,
"exploit_type": row["type"],
"platform": row["platform"],
"source_date_updated": source_date_updated,
"source_url": row["source_url"],
},
)
except DataError as e:
logger(
f"Failed to Create the Vulnerability Exploit-DB with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
return 1


def add_exploit_references(ref_id, direct_url, path, adv, logger):
url_map = {
"file_url": f"https://gitlab.com/exploit-database/exploitdb/-/blob/main/{path}",
"direct_url": direct_url,
}

for key, url in url_map.items():
if url:
try:
ref, created = AdvisoryReference.objects.update_or_create(
url=url,
defaults={
"reference_id": ref_id,
"reference_type": AdvisoryReference.EXPLOIT,
},
)

if created:
ref.advisories.add(adv)
ref.save()
logger(f"Created {ref} for {adv} with {key}={url}")

except DataError as e:
logger(
f"Failed to Create the Vulnerability Reference For Exploit-DB with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
for row in progress.iter(raw_data):
aliases = row["codes"].split(";") if row["codes"] else []

if not aliases:
continue

date_added = parse_date(row["date_added"])
source_date_published = parse_date(row["date_published"])
source_date_updated = parse_date(row["date_updated"])

for alias in aliases:
for advisory in alias_to_advisories.get(alias, ()):

key = (
advisory.id,
"Exploit-DB",
alias,
)

if key in seen:
continue

seen.add(key)

exploits.append(
AdvisoryExploit(
advisory=advisory,
record_id=alias,
data_source="Exploit-DB",
date_added=date_added,
description=row["description"],
known_ransomware_campaign_use=row["verified"],
source_date_published=source_date_published,
exploit_type=row["type"],
platform=row["platform"],
source_date_updated=source_date_updated,
source_url=row["source_url"],
)
)

AdvisoryExploit.objects.bulk_create(
exploits,
update_conflicts=True,
unique_fields=[
"advisory",
"data_source",
"record_id",
],
update_fields=[
"date_added",
"description",
"known_ransomware_campaign_use",
"source_date_published",
"exploit_type",
"platform",
"source_date_updated",
"source_url",
],
batch_size=1000,
)


def parse_date(date_string):
Expand Down
90 changes: 52 additions & 38 deletions vulnerabilities/pipelines/v2_improvers/enhance_with_kev.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
import requests
from aboutcode.pipeline import LoopProgress

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import AdvisoryExploit
from vulnerabilities.models import AdvisoryV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.utils import relate_aliases_with_advisories
from vulnerabilities.utils import build_alias_to_advisory_map


class VulnerabilityKevPipeline(VulnerableCodePipeline):
Expand Down Expand Up @@ -54,40 +52,56 @@ def fetch_exploits(self):
def add_exploits(self):
fetched_exploit_count = self.kev_data.get("count")
self.log(f"Enhancing the vulnerability with {fetched_exploit_count:,d} exploit records")

vulnerability_exploit_count = 0
progress = LoopProgress(total_iterations=fetched_exploit_count, logger=self.log)

for record in progress.iter(self.kev_data.get("vulnerabilities", [])):
vulnerability_exploit_count += add_vulnerability_exploit(
kev_vul=record,
logger=self.log,
)

self.log(f"Successfully added {vulnerability_exploit_count:,d} kev exploit")


def add_vulnerability_exploit(kev_vul, logger):
cve_id = kev_vul.get("cveID")

if not cve_id:
return 0

advisories = relate_aliases_with_advisories([cve_id])

for advisory in advisories:
AdvisoryExploit.objects.update_or_create(
advisory=advisory,
data_source="KEV",
defaults={
"description": kev_vul["shortDescription"],
"date_added": kev_vul["dateAdded"],
"required_action": kev_vul["requiredAction"],
"due_date": kev_vul["dueDate"],
"notes": kev_vul["notes"],
"known_ransomware_campaign_use": (
True if kev_vul["knownRansomwareCampaignUse"] == "Known" else False
),
},
cve_ids = {
record["cveID"] for record in self.kev_data["vulnerabilities"] if record.get("cveID")
}

cve_to_advisories = build_alias_to_advisory_map(cve_ids)

exploits = []

advisories_seen_multiple_times = set()

for record in progress.iter(self.kev_data["vulnerabilities"]):
cve_id = record.get("cveID")

if not cve_id:
continue

for advisory in cve_to_advisories.get(cve_id, []):
if (advisory.avid, cve_id) in advisories_seen_multiple_times:
continue
advisories_seen_multiple_times.add((advisory.avid, cve_id))
exploits.append(
AdvisoryExploit(
advisory=advisory,
record_id=cve_id,
data_source="KEV",
description=record["shortDescription"],
date_added=record["dateAdded"],
required_action=record["requiredAction"],
due_date=record["dueDate"],
notes=record["notes"],
known_ransomware_campaign_use=(
record["knownRansomwareCampaignUse"] == "Known"
),
)
)
if not exploits:
return

AdvisoryExploit.objects.bulk_create(
exploits,
update_conflicts=True,
unique_fields=["advisory", "data_source", "record_id"],
update_fields=[
"description",
"date_added",
"required_action",
"due_date",
"notes",
"known_ransomware_campaign_use",
],
batch_size=1000,
)
return 1
Loading