From a5a59af17195411ff54a327fb041c3f13a680fb7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 7 Apr 2026 18:19:31 +0000 Subject: [PATCH 1/5] Add get_secret_scanning_scan_history_progress.py script Implements the paradigm from Get-GHSecretScanningHistoryProgress.ps1 in Python. Adds: - Enterprise/org/repo resolution with GraphQL for enterprise orgs - Concurrent scan history API calls via ThreadPoolExecutor - Summary progress bars for backfill/incremental/pattern_update scans - Detailed markdown table output option - Error reporting for repos with disabled secret scanning Also adds list_org_repos, list_enterprise_orgs, and get_secret_scanning_scan_history methods to githubapi.py. Agent-Logs-Url: https://github.com/advanced-security/ghas-api-python-scripts/sessions/f6a777a1-48ca-4999-ba66-76934aa82818 Co-authored-by: felickz <1760475+felickz@users.noreply.github.com> --- README.md | 43 ++ get_secret_scanning_scan_history_progress.py | 439 +++++++++++++++++++ githubapi.py | 49 +++ 3 files changed, 531 insertions(+) create mode 100644 get_secret_scanning_scan_history_progress.py diff --git a/README.md b/README.md index e4bc1dc..a41ff9e 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,49 @@ This is a set of scripts that use these APIs to access and manage alerts. The sc A note on common arguments: generally, the date in `--since` can be specified as `YYYY-MM-DD` or as `Nd` where `N` is the number of days ago. Full ISO formats are also supported. If a timezone is not specified, the date is assumed to be in UTC (`Z` timezone). +### Get secret scanning scan history progress + +This script retrieves the secret scanning scan history for repositories across an Enterprise, organization, or single repo. It queries the `GET /repos/{owner}/{repo}/secret-scanning/scan-history` endpoint for each repo concurrently and displays a summary with progress bars showing backfill, incremental, pattern update, and custom pattern scan completion status. Optionally outputs a detailed markdown table. + +```text +usage: get_secret_scanning_scan_history_progress.py [-h] (--enterprise ENTERPRISE | --org ORG | --repo REPO) + [--detailed] [--concurrency CONCURRENCY] [--hostname HOSTNAME] + [--ca-cert-bundle CA_CERT_BUNDLE] [--no-verify-tls] [--quiet] + [--debug] + +options: + -h, --help show this help message and exit + --enterprise ENTERPRISE + GitHub Enterprise slug. Lists all orgs, then all repos per org. + --org ORG GitHub Organization name. Lists all repos in the org. + --repo REPO A single repository in owner/repo format. + --detailed Show full markdown table with per-repo scan details instead of summary progress bars. + --concurrency CONCURRENCY + Number of concurrent API requests (default: 10). + --hostname HOSTNAME GitHub Enterprise hostname (defaults to github.com) + --ca-cert-bundle CA_CERT_BUNDLE, -C CA_CERT_BUNDLE + Path to CA certificate bundle in PEM format (e.g. for self-signed server certificates) + --no-verify-tls Do not verify TLS connection certificates (warning: insecure) + --quiet, -q Suppress non-error log messages + --debug, -d Enable debug logging +``` + +Examples: + +```bash +# Single repo +GITHUB_TOKEN=$(gh auth token) python3 get_secret_scanning_scan_history_progress.py --repo octocat/Hello-World + +# Organization +GITHUB_TOKEN=$(gh auth token) python3 get_secret_scanning_scan_history_progress.py --org my-org + +# Enterprise (requires read:enterprise scope) +GITHUB_TOKEN=$(gh auth token) python3 get_secret_scanning_scan_history_progress.py --enterprise my-enterprise + +# With detailed markdown table +GITHUB_TOKEN=$(gh auth token) python3 get_secret_scanning_scan_history_progress.py --org my-org --detailed +``` + ### List secret scanning alerts This script retrieves secret scanning alerts from GitHub repositories, organizations, or Enterprises and outputs them in CSV or JSON format. It supports filtering by state, date, and push protection bypass status. Use this to audit, analyze, or export secret scanning data for compliance or security purposes. diff --git a/get_secret_scanning_scan_history_progress.py b/get_secret_scanning_scan_history_progress.py new file mode 100644 index 0000000..9a1581c --- /dev/null +++ b/get_secret_scanning_scan_history_progress.py @@ -0,0 +1,439 @@ +#!/usr/bin/env python3 + +"""Get secret scanning scan history progress for repos across an enterprise, org, or single repo. + +Calls GET /repos/{owner}/{repo}/secret-scanning/scan-history for each repo and outputs +a summary of backfill scan status with progress bars, or a detailed markdown table. + +Supports cascading: + - Enterprise → Orgs → Repos + - Org → Repos + - Single repo via owner/repo + +https://docs.github.com/en/enterprise-cloud@latest/rest/secret-scanning/secret-scanning?apiVersion=2022-11-28#get-secret-scanning-scan-history-for-a-repository +""" + +import argparse +import logging +import math +import re +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from datetime import datetime + +from githubapi import GitHub + +LOG = logging.getLogger(__name__) + +BAR_WIDTH = 30 +DEFAULT_CONCURRENCY = 10 + + +@dataclass +class ScanRow: + """A single scan entry for a repo.""" + + repo: str + category: str + scan_type: str + status: str + started_at: str | None = None + completed_at: str | None = None + + +@dataclass +class RepoResult: + """Result of querying scan history for a single repo.""" + + repo: str + is_error: bool = False + error: str | None = None + rows: list[ScanRow] = field(default_factory=list) + + +def resolve_repos(g: GitHub, scope: str, name: str) -> list[str]: + """Resolve the list of repos to query based on scope.""" + if scope == "repo": + return [name] + + orgs: list[str] = [] + if scope == "ent": + print(f"Fetching orgs for enterprise '{name}'...", file=sys.stderr) + try: + orgs = g.list_enterprise_orgs(name) + except Exception as e: + LOG.error("Failed to list orgs for enterprise '%s': %s", name, e) + LOG.warning( + "Ensure your token has the read:enterprise scope " + "(e.g. gh auth refresh --scopes read:enterprise)" + ) + return [] + print(f"Found {len(orgs)} org(s)", file=sys.stderr) + elif scope == "org": + orgs = [name] + + repos: list[str] = [] + for org in orgs: + print(f"Fetching repos for org '{org}'...", file=sys.stderr) + try: + org_repos = list(g.list_org_repos(org)) + except Exception as e: + LOG.warning("Failed to list repos for org '%s': %s", org, e) + continue + print(f" Found {len(org_repos)} repo(s) in '{org}'", file=sys.stderr) + repos.extend(org_repos) + + return repos + + +def _extract_scan_rows(history: dict, repo_nwo: str) -> list[ScanRow]: + """Extract scan rows from a scan history response.""" + rows: list[ScanRow] = [] + + for category_key, category_label in [ + ("backfill_scans", "backfill"), + ("incremental_scans", "incremental"), + ("pattern_update_scans", "pattern_update"), + ]: + for scan in history.get(category_key, []): + rows.append( + ScanRow( + repo=repo_nwo, + category=category_label, + scan_type=scan.get("type", ""), + status=scan.get("status", ""), + started_at=scan.get("started_at"), + completed_at=scan.get("completed_at"), + ) + ) + + for scan in history.get("custom_pattern_backfill_scans", []): + slug = scan.get("pattern_slug", "unknown") + rows.append( + ScanRow( + repo=repo_nwo, + category=f"custom_pattern ({slug})", + scan_type=scan.get("type", ""), + status=scan.get("status", ""), + started_at=scan.get("started_at"), + completed_at=scan.get("completed_at"), + ) + ) + + return rows + + +def _fetch_scan_history(g: GitHub, repo_nwo: str) -> RepoResult: + """Fetch scan history for a single repo. Returns a RepoResult.""" + try: + history = g.get_secret_scanning_scan_history(repo_nwo) + except Exception as e: + error_msg = str(e) + # Try to extract the API error message + match = re.search(r'"message":\s*"([^"]+)"', error_msg) + short = match.group(1) if match else error_msg + return RepoResult(repo=repo_nwo, is_error=True, error=short) + + rows = _extract_scan_rows(history, repo_nwo) + if not rows: + rows.append( + ScanRow( + repo=repo_nwo, + category="-", + scan_type="-", + status="no scan data", + ) + ) + return RepoResult(repo=repo_nwo, rows=rows) + + +def query_all_repos( + g: GitHub, repos: list[str], concurrency: int = DEFAULT_CONCURRENCY +) -> tuple[list[ScanRow], list[RepoResult]]: + """Query scan history for all repos concurrently. + + Returns (all_rows, errors). + """ + all_rows: list[ScanRow] = [] + errors: list[RepoResult] = [] + + print( + f"\nQuerying scan history for {len(repos)} repo(s) " + f"({concurrency} concurrent)...\n", + file=sys.stderr, + ) + + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = { + executor.submit(_fetch_scan_history, g, repo): repo for repo in repos + } + for future in as_completed(futures): + result = future.result() + if result.is_error: + errors.append(result) + else: + all_rows.extend(result.rows) + + return all_rows, errors + + +def _parse_completed_date(date_str: str | None) -> datetime | None: + """Parse a completed_at date string, returning None on failure.""" + if not date_str or date_str == "-": + return None + try: + return datetime.fromisoformat(date_str) + except (ValueError, TypeError): + return None + + +def print_progress_summary( + all_rows: list[ScanRow], + errors: list[RepoResult], + total_repos: int, +) -> None: + """Print the summary with progress bars.""" + success_repos = total_repos - len(errors) + + print() + print( + f"Secret Scanning History Progress ({success_repos}/{total_repos} repos reporting)" + ) + print("=" * 70) + + # Filter out placeholder rows + scan_rows = [r for r in all_rows if r.category != "-"] + + # Group by (category, scan_type) + groups: dict[tuple[str, str], list[ScanRow]] = {} + for row in scan_rows: + key = (row.category, row.scan_type) + groups.setdefault(key, []).append(row) + + # Sort categories in logical order + category_order = {"backfill": 0, "incremental": 1, "pattern_update": 2} + sorted_keys = sorted( + groups.keys(), + key=lambda k: (category_order.get(k[0], 3), k[1]), + ) + + last_category = "" + for cat, scan_type in sorted_keys: + if cat != last_category: + print(f"\n {cat.upper()}") + last_category = cat + + group = groups[(cat, scan_type)] + total = len(group) + completed_count = sum(1 for r in group if r.status == "completed") + in_progress_count = sum(1 for r in group if r.status == "in_progress") + pct = min(round((completed_count / total) * 100), 100) if total > 0 else 0 + missing_count = max(success_repos - total, 0) + + # Most recent completed timestamp + completed_dates = [ + _parse_completed_date(r.completed_at) + for r in group + if r.status == "completed" + ] + valid_dates = [d for d in completed_dates if d is not None] + last_completed_str = ( + max(valid_dates).strftime("%Y-%m-%d") if valid_dates else "-" + ) + + filled_len = min(math.floor(BAR_WIDTH * completed_count / total), BAR_WIDTH) + progress_len = min( + math.floor(BAR_WIDTH * in_progress_count / total), + BAR_WIDTH - filled_len, + ) + empty_len = BAR_WIDTH - filled_len - progress_len + + bar = "=" * filled_len + ">" * progress_len + " " * empty_len + + stats = f"{completed_count}/{total} done" + if in_progress_count > 0: + stats += f", {in_progress_count} in progress" + if missing_count > 0: + stats += f", {missing_count} n/a" + stats += f", last: {last_completed_str}" + + label = f"{scan_type:<16}" + print(f" {label} [{bar}] {pct:3d}% ({stats})") + + print() + + +def print_pending_repos(all_rows: list[ScanRow]) -> None: + """Print a markdown table of repos that haven't completed all scans.""" + scan_rows = [r for r in all_rows if r.category != "-"] + pending = [r for r in scan_rows if r.status != "completed"] + + if not pending: + return + + pending.sort(key=lambda r: (r.repo, r.category, r.scan_type)) + print(f"### Repos not yet completed ({len(pending)})") + print() + print("| Repo | Category | Type | Status | Started | Completed |") + print("| --- | --- | --- | --- | --- | --- |") + for row in pending: + started = row.started_at or "-" + completed = row.completed_at or "-" + print( + f"| {row.repo} | {row.category} | {row.scan_type} " + f"| {row.status} | {started} | {completed} |" + ) + print() + + +def print_detailed_table(all_rows: list[ScanRow]) -> None: + """Print a full markdown table with per-repo scan details.""" + print() + print("| Repo | Category | Type | Status | Started | Completed |") + print("| --- | --- | --- | --- | --- | --- |") + for row in all_rows: + started = row.started_at or "-" + completed = row.completed_at or "-" + print( + f"| {row.repo} | {row.category} | {row.scan_type} " + f"| {row.status} | {started} | {completed} |" + ) + + +def print_errors(errors: list[RepoResult]) -> None: + """Print repos that had errors.""" + if not errors: + return + + print(f"Repos with errors ({len(errors)}):", file=sys.stderr) + for e in errors: + print(f" {e.repo}: {e.error}", file=sys.stderr) + print(file=sys.stderr) + + +def add_args(parser: argparse.ArgumentParser) -> None: + """Add command-line arguments to the parser.""" + scope_group = parser.add_mutually_exclusive_group(required=True) + scope_group.add_argument( + "--enterprise", + type=str, + help="GitHub Enterprise slug. Lists all orgs, then all repos per org.", + ) + scope_group.add_argument( + "--org", + type=str, + help="GitHub Organization name. Lists all repos in the org.", + ) + scope_group.add_argument( + "--repo", + type=str, + help="A single repository in owner/repo format.", + ) + parser.add_argument( + "--detailed", + action="store_true", + help="Show full markdown table with per-repo scan details instead of summary progress bars.", + ) + parser.add_argument( + "--concurrency", + type=int, + default=DEFAULT_CONCURRENCY, + help=f"Number of concurrent API requests (default: {DEFAULT_CONCURRENCY}).", + ) + parser.add_argument( + "--hostname", + type=str, + default="github.com", + required=False, + help="GitHub Enterprise hostname (defaults to github.com)", + ) + parser.add_argument( + "--ca-cert-bundle", + "-C", + type=str, + required=False, + help="Path to CA certificate bundle in PEM format (e.g. for self-signed server certificates)", + ) + parser.add_argument( + "--no-verify-tls", + action="store_true", + help="Do not verify TLS connection certificates (warning: insecure)", + ) + parser.add_argument( + "--quiet", + "-q", + action="store_true", + help="Suppress non-error log messages", + ) + parser.add_argument( + "--debug", "-d", action="store_true", help="Enable debug logging" + ) + + +def main() -> None: + """CLI entrypoint.""" + parser = argparse.ArgumentParser(description=__doc__) + add_args(parser) + args = parser.parse_args() + + logging.basicConfig( + level=( + logging.DEBUG + if args.debug + else logging.INFO + if not args.quiet + else logging.ERROR + ), + format="%(asctime)s %(levelname)s %(message)s", + ) + + # Determine scope and name + if args.enterprise: + scope, name = "ent", args.enterprise + elif args.org: + scope, name = "org", args.org + else: + scope, name = "repo", args.repo + + if not GitHub.check_name(name, scope): + print(f"Error: Invalid name '{name}' for scope '{scope}'", file=sys.stderr) + sys.exit(1) + + verify: bool | str = True + if args.ca_cert_bundle: + verify = args.ca_cert_bundle + if args.no_verify_tls: + verify = False + LOG.warning( + "Disabling TLS verification. This is insecure and should not be used in production." + ) + import urllib3 + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + g = GitHub(hostname=args.hostname, verify=verify) + + # Resolve repos + repo_list = resolve_repos(g, scope, name) + if not repo_list: + print( + "Error: No repos resolved. Provide --enterprise, --org, or --repo.", + file=sys.stderr, + ) + sys.exit(1) + + # Query scan history + all_rows, errors = query_all_repos(g, repo_list, concurrency=args.concurrency) + + # Output + if args.detailed: + print_detailed_table(all_rows) + + print_progress_summary(all_rows, errors, total_repos=len(repo_list)) + print_pending_repos(all_rows) + print_errors(errors) + + +if __name__ == "__main__": + main() diff --git a/githubapi.py b/githubapi.py index 3301333..95f9f9b 100644 --- a/githubapi.py +++ b/githubapi.py @@ -500,6 +500,55 @@ def list_dependabot_alerts( return results + def list_org_repos(self, org: str) -> Generator[str, None, None]: + """List all repository full names (owner/repo) in an organization.""" + url = self.construct_api_url("org", org, "/repos", {"type": "all"}, "cursor") + for repo in self.paginate(url, progress=False): + yield repo["full_name"] + + def list_enterprise_orgs(self, enterprise: str) -> list[str]: + """List all organization logins in an enterprise using GraphQL.""" + base = "https://api.github.com" if self.hostname == "github.com" else f"https://{self.hostname}/api" + graphql_url = f"{base}/graphql" + orgs: list[str] = [] + cursor = None + while True: + gql_query = ( + "query($slug: String!, $cursor: String) {" + " enterprise(slug: $slug) {" + " organizations(first: 100, after: $cursor) {" + " pageInfo { hasNextPage endCursor }" + " nodes { login }" + " }" + " }" + "}" + ) + variables = {"slug": enterprise, "cursor": cursor} + response = self.session.post( + graphql_url, + json={"query": gql_query, "variables": variables}, + ) + response.raise_for_status() + data = response.json() + if "errors" in data: + raise RuntimeError(f"GraphQL errors: {data['errors']}") + org_data = data["data"]["enterprise"]["organizations"] + for node in org_data["nodes"]: + if node and node.get("login"): + orgs.append(node["login"]) + if not org_data["pageInfo"]["hasNextPage"]: + break + cursor = org_data["pageInfo"]["endCursor"] + return orgs + + def get_secret_scanning_scan_history(self, repo_nwo: str) -> dict: + """Get secret scanning scan history for a single repository. + + Returns the raw JSON response from GET /repos/{owner}/{repo}/secret-scanning/scan-history. + """ + result = self.query_once("repo", repo_nwo, "/secret-scanning/scan-history") + return result if result is not None else {} + def parse_date(date: str) -> datetime.datetime | None: """Parse a date string and return a datetime object. From 20515fa66790d3c9d17e2e146c2b4ee880e6d869 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 7 Apr 2026 18:20:49 +0000 Subject: [PATCH 2/5] Address review feedback: guard against zero-length groups Agent-Logs-Url: https://github.com/advanced-security/ghas-api-python-scripts/sessions/f6a777a1-48ca-4999-ba66-76934aa82818 Co-authored-by: felickz <1760475+felickz@users.noreply.github.com> --- get_secret_scanning_scan_history_progress.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/get_secret_scanning_scan_history_progress.py b/get_secret_scanning_scan_history_progress.py index 9a1581c..cf6dc94 100644 --- a/get_secret_scanning_scan_history_progress.py +++ b/get_secret_scanning_scan_history_progress.py @@ -226,9 +226,11 @@ def print_progress_summary( group = groups[(cat, scan_type)] total = len(group) + if total == 0: + continue completed_count = sum(1 for r in group if r.status == "completed") in_progress_count = sum(1 for r in group if r.status == "in_progress") - pct = min(round((completed_count / total) * 100), 100) if total > 0 else 0 + pct = round((completed_count / total) * 100) missing_count = max(success_repos - total, 0) # Most recent completed timestamp From e189e7c7eff9dbb3ed178cd144f84ea38f4c470a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:22:13 +0000 Subject: [PATCH 3/5] Fix get_secret_scanning_scan_history to use direct API call The method was using query_once which defaults to cursor-based pagination. The scan-history endpoint returns a single JSON object (not a paginated list), so pagination params were incorrect and HTTP errors were silently swallowed. Now uses _get directly so: - No spurious per_page/before query params - HTTP errors (e.g. 403) properly propagate to callers Agent-Logs-Url: https://github.com/advanced-security/ghas-api-python-scripts/sessions/7fb48030-50b4-4f88-bce8-1e571af0e38b Co-authored-by: felickz <1760475+felickz@users.noreply.github.com> --- githubapi.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/githubapi.py b/githubapi.py index 95f9f9b..eb4762d 100644 --- a/githubapi.py +++ b/githubapi.py @@ -545,9 +545,11 @@ def get_secret_scanning_scan_history(self, repo_nwo: str) -> dict: """Get secret scanning scan history for a single repository. Returns the raw JSON response from GET /repos/{owner}/{repo}/secret-scanning/scan-history. + Raises on HTTP errors so callers can handle them. """ - result = self.query_once("repo", repo_nwo, "/secret-scanning/scan-history") - return result if result is not None else {} + url = self.construct_api_url("repo", repo_nwo, "/secret-scanning/scan-history", None, None) + response = self._get(url) + return response.json() def parse_date(date: str) -> datetime.datetime | None: From fb956e89b73c75538490b327e603ca0b298d59ab Mon Sep 17 00:00:00 2001 From: Chad Bentz <1760475+felickz@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:35:33 -0400 Subject: [PATCH 4/5] Update .gitignore to include .venv and fix formatting in githubapi.py pagination --- .gitignore | 3 ++- githubapi.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index abb2cc3..909a06b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ *.pdf __pycache__/ .mypy_cache/ -*.pyc \ No newline at end of file +*.pyc +.venv/ \ No newline at end of file diff --git a/githubapi.py b/githubapi.py index eb4762d..7ff1e94 100644 --- a/githubapi.py +++ b/githubapi.py @@ -503,7 +503,7 @@ def list_dependabot_alerts( def list_org_repos(self, org: str) -> Generator[str, None, None]: """List all repository full names (owner/repo) in an organization.""" url = self.construct_api_url("org", org, "/repos", {"type": "all"}, "cursor") - for repo in self.paginate(url, progress=False): + for repo in self.paginate(url, progress=False, cursor=True): yield repo["full_name"] def list_enterprise_orgs(self, enterprise: str) -> list[str]: From 912186078e22acdc566aa1e5ef72f94848a5a19a Mon Sep 17 00:00:00 2001 From: Chad Bentz <1760475+felickz@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:48:22 -0400 Subject: [PATCH 5/5] Improve concurrency handling in query_all_repos by using a dedicated GitHub client for each thread --- get_secret_scanning_scan_history_progress.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/get_secret_scanning_scan_history_progress.py b/get_secret_scanning_scan_history_progress.py index cf6dc94..4ff8cbb 100644 --- a/get_secret_scanning_scan_history_progress.py +++ b/get_secret_scanning_scan_history_progress.py @@ -164,9 +164,17 @@ def query_all_repos( file=sys.stderr, ) + def _worker(repo: str) -> RepoResult: + # Each thread gets its own GitHub client to avoid sharing + # the non-thread-safe requests.Session across threads. + thread_gh = GitHub( + hostname=g.hostname, verify=g.session.verify + ) + return _fetch_scan_history(thread_gh, repo) + with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = { - executor.submit(_fetch_scan_history, g, repo): repo for repo in repos + executor.submit(_worker, repo): repo for repo in repos } for future in as_completed(futures): result = future.result() @@ -182,8 +190,9 @@ def _parse_completed_date(date_str: str | None) -> datetime | None: """Parse a completed_at date string, returning None on failure.""" if not date_str or date_str == "-": return None + normalized = date_str[:-1] + "+00:00" if date_str.endswith("Z") else date_str try: - return datetime.fromisoformat(date_str) + return datetime.fromisoformat(normalized) except (ValueError, TypeError): return None