From b6b071a793c0518901a83f0d8705a4a322d07b06 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Sun, 26 Oct 2025 15:34:31 -0600 Subject: [PATCH 1/8] scripts/es-sarif/ for MRVA->SARIF->Elasticsearch --- scripts/es-sarif/.gitignore | 181 +++++ scripts/es-sarif/activate.sh | 18 + .../index-sarif-results-in-elasticsearch.md | 88 +++ .../index-sarif-results-in-elasticsearch.py | 544 ++++++++++++++ scripts/es-sarif/requirements.txt | 5 + .../es-sarif/run-gh-mrva-for-query-suite.md | 191 +++++ .../es-sarif/run-gh-mrva-for-query-suite.py | 671 ++++++++++++++++++ scripts/es-sarif/setup-venv.sh | 58 ++ 8 files changed, 1756 insertions(+) create mode 100644 scripts/es-sarif/.gitignore create mode 100755 scripts/es-sarif/activate.sh create mode 100644 scripts/es-sarif/index-sarif-results-in-elasticsearch.md create mode 100644 scripts/es-sarif/index-sarif-results-in-elasticsearch.py create mode 100644 scripts/es-sarif/requirements.txt create mode 100644 scripts/es-sarif/run-gh-mrva-for-query-suite.md create mode 100644 scripts/es-sarif/run-gh-mrva-for-query-suite.py create mode 100755 scripts/es-sarif/setup-venv.sh diff --git a/scripts/es-sarif/.gitignore b/scripts/es-sarif/.gitignore new file mode 100644 index 000000000..d65a23223 --- /dev/null +++ b/scripts/es-sarif/.gitignore @@ -0,0 +1,181 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.env.local +.env.*.local +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be added to the global gitignore or merged into this project gitignore. For a PyCharm +# project, it is recommended to include/store project specific gitignore file(s) within +# the project root. +# https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +.idea/ + +# IDE specific files +.vscode/ +*.swp +*.swo +*~ + +# OS specific files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +elastic-start-local/ +mrva/ diff --git a/scripts/es-sarif/activate.sh b/scripts/es-sarif/activate.sh new file mode 100755 index 000000000..70eb02ce2 --- /dev/null +++ b/scripts/es-sarif/activate.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Convenience script to activate the SARIF Elasticsearch Indexer environment + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV_DIR="$SCRIPT_DIR/.venv" + +if [ ! -d "$VENV_DIR" ]; then + echo "Virtual environment not found. Run setup.sh first." + exit 1 +fi + +echo "Activating SARIF Elasticsearch Indexer environment..." +echo "Python version: $($VENV_DIR/bin/python --version)" +echo "To deactivate, run: deactivate" +echo + +# Start a new shell with the virtual environment activated +exec bash --rcfile <(echo "source $VENV_DIR/bin/activate; PS1='(es-sarif) \u@\h:\w\$ '") diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.md b/scripts/es-sarif/index-sarif-results-in-elasticsearch.md new file mode 100644 index 000000000..5984c6b4c --- /dev/null +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.md @@ -0,0 +1,88 @@ +# SARIF Files Elasticsearch Indexer + +This script creates a fresh Elasticsearch index and indexes SARIF 2.1.0 results from multiple SARIF files into it. + +## Requirements + +- Python 3.11+ +- SARIF files conforming to version 2.1.0 specification (such as those produced by `gh mrva`) +- Accessible URLs for running instances of Elasticsearch (aka "es") and Kibana (e.g. via `Quick Setup` below) + +## Usage + +```bash +python index-sarif-results-in-elasticsearch.py +``` + +## Input File Format + +The SARIF files list should be a plain text file with one relative file path per line: + +```text +output_misra-c-and-cpp-default_top-1000/solvespace/solvespace/solvespace_solvespace_18606.sarif +output_misra-c-and-cpp-default_top-1000/solvespace/solvespace/solvespace_solvespace_18607.sarif +# Comments starting with # are ignored +``` + +**Note**: Paths are resolved relative to the directory containing the list file. + +## Quick Setup + +1. **Set up Python environment:** + +```bash +## Change to the directory that contains this document +cd scripts/es-sarif +bash setup-venv.sh +source .venv/bin/activate +``` + +1. **Set up Elasticsearch and Kibana with Docker:** + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +1. **Run the indexer:** + +```bash +## from the `scripts/es-sarif` directory +python index-sarif-results-in-elasticsearch.py mrva/sessions/sarif-files.txt codeql-coding-standards-misra-sarif +``` + +The `elastic-start-local` setup provides: + +- Elasticsearch at `http://localhost:9200` +- Kibana at `http://localhost:5601` +- API key stored in `elastic-start-local/.env` as `ES_LOCAL_API_KEY` + +## Example Queries + +Search for high-severity results: + +```json +GET /codeql-coding-standards-misra-sarif/_search +{ + "query": { "term": { "level": "error" } } +} +``` + +Find results for a specific rule: + +```json +GET /codeql-coding-standards-misra-sarif/_search +{ + "query": { "term": { "ruleId": "CERT-C-MSC30-C" } } +} +``` + +## Managing Elasticsearch Services + +Control the Docker services: + +```bash +cd elastic-start-local +./start.sh # Start services +./stop.sh # Stop services +./uninstall.sh # Remove everything (deletes all data) +``` diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py new file mode 100644 index 000000000..0a130e307 --- /dev/null +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -0,0 +1,544 @@ +#!/usr/bin/env python3 +""" +SARIF Results Elasticsearch Indexer + +This script creates a fresh Elasticsearch index and indexes individual SARIF results +from multiple SARIF files into it. Each result from runs[].results[] becomes a +separate document in the Elasticsearch index, allowing for granular querying and +analysis of code scanning findings. + +The script reads a list of SARIF file paths from a text file and bulk indexes all +results into a single Elasticsearch index. Each result document includes: +- All original SARIF result fields (ruleId, message, locations, etc.) +- Derived fields (ruleGroup, ruleLanguage) parsed from ruleId +- Run-level metadata (tool info, version control provenance) +- Source file tracking metadata + +Usage: + python index-sarif-results-in-elasticsearch.py + +Environment Variables: + ES_LOCAL_URL - Elasticsearch host URL (default: http://localhost:9200) + ES_LOCAL_API_KEY - API key for authentication (optional, enables API key auth) + +Requirements: + - Python 3.11+ + - elasticsearch (pip install elasticsearch) + - Elasticsearch instance accessible at configured host +""" + +import sys +import json +import os +from pathlib import Path +from elasticsearch import Elasticsearch, helpers +from elasticsearch.exceptions import ConnectionError, RequestError + + +def load_env_file(env_file_path): + """Load environment variables from a .env file.""" + if not os.path.exists(env_file_path): + return + + with open(env_file_path, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#') and '=' in line: + key, value = line.split('=', 1) + # Handle variable substitution + if '${' in value and '}' in value: + # Simple variable substitution for ${VAR} patterns + import re + def replace_var(match): + var_name = match.group(1) + return os.environ.get(var_name, match.group(0)) + value = re.sub(r'\$\{([^}]+)\}', replace_var, value) + os.environ[key] = value + +# --- Configuration --- +DEFAULT_ELASTIC_HOST = "http://localhost:9200" +SARIF_VERSION = "2.1.0" + +# Elasticsearch mapping optimized for SARIF result documents +SARIF_MAPPING = { + "mappings": { + "properties": { + # Core SARIF result fields + "ruleId": {"type": "keyword"}, + "ruleIndex": {"type": "integer"}, + "kind": {"type": "keyword"}, + "level": {"type": "keyword"}, + + # Derived fields from ruleId parsing + "ruleGroup": {"type": "keyword"}, + "ruleLanguage": {"type": "keyword"}, + + # Message object + "message": { + "properties": { + "text": {"type": "text"}, + "markdown": {"type": "text"}, + "id": {"type": "keyword"}, + "arguments": {"type": "keyword"} + } + }, + + # Locations array - each result can have multiple locations + "locations": { + "type": "nested", + "properties": { + "id": {"type": "integer"}, + "physicalLocation": { + "properties": { + "artifactLocation": { + "properties": { + "uri": {"type": "keyword"}, + "uriBaseId": {"type": "keyword"}, + "index": {"type": "integer"} + } + }, + "region": { + "properties": { + "startLine": {"type": "integer"}, + "startColumn": {"type": "integer"}, + "endLine": {"type": "integer"}, + "endColumn": {"type": "integer"}, + "charOffset": {"type": "integer"}, + "charLength": {"type": "integer"}, + "byteOffset": {"type": "integer"}, + "byteLength": {"type": "integer"} + } + }, + "contextRegion": { + "properties": { + "startLine": {"type": "integer"}, + "endLine": {"type": "integer"}, + "snippet": { + "properties": { + "text": {"type": "text"} + } + } + } + } + } + }, + "logicalLocations": { + "type": "nested", + "properties": { + "name": {"type": "keyword"}, + "fullyQualifiedName": {"type": "keyword"}, + "decoratedName": {"type": "keyword"}, + "kind": {"type": "keyword"} + } + } + } + }, + + # Rule reference + "rule": { + "properties": { + "id": {"type": "keyword"}, + "index": {"type": "integer"} + } + }, + + # Fingerprints for deduplication + "partialFingerprints": {"type": "object"}, + "fingerprints": {"type": "object"}, + + # Analysis and classification + "analysisTarget": { + "properties": { + "uri": {"type": "keyword"}, + "uriBaseId": {"type": "keyword"}, + "index": {"type": "integer"} + } + }, + "guid": {"type": "keyword"}, + "correlationGuid": {"type": "keyword"}, + "occurrenceCount": {"type": "integer"}, + "rank": {"type": "float"}, + "baselineState": {"type": "keyword"}, + + # Run-level metadata (tool, repo info, etc.) + "run": { + "properties": { + "tool": { + "properties": { + "driver": { + "properties": { + "name": {"type": "keyword"}, + "organization": {"type": "keyword"}, + "product": {"type": "keyword"}, + "version": {"type": "keyword"}, + "semanticVersion": {"type": "keyword"} + } + } + } + }, + "automationDetails": { + "properties": { + "id": {"type": "keyword"}, + "guid": {"type": "keyword"}, + "correlationGuid": {"type": "keyword"} + } + }, + "versionControlProvenance": { + "type": "nested", + "properties": { + "repositoryUri": {"type": "keyword"}, + "revisionId": {"type": "keyword"} + } + } + } + }, + + # Metadata for tracking source SARIF file + "_sarif_source": { + "properties": { + "file_path": {"type": "keyword"}, + "file_name": {"type": "keyword"}, + "indexed_at": {"type": "date"} + } + } + } + }, + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "analysis": { + "analyzer": { + "sarif_text": { + "type": "standard", + "stopwords": "_none_" + } + } + } + } +} + + +def create_elasticsearch_client(host, api_key=None): + """Create Elasticsearch client with optional API key authentication.""" + if api_key and api_key.strip(): + return Elasticsearch( + hosts=[host], + api_key=api_key.strip(), + verify_certs=False, # For local development + ssl_show_warn=False + ) + else: + return Elasticsearch(hosts=[host]) + + +def validate_elasticsearch_connection(es_client, host): + """Test connection to Elasticsearch server.""" + try: + if not es_client.ping(): + print("Error: Could not connect to Elasticsearch. Please check your host and port.") + return False + print(f"✓ Connected to Elasticsearch at {host}") + return True + except ConnectionError: + print(f"Error: Could not connect to Elasticsearch at {host}") + print("Please ensure Elasticsearch is running and accessible.") + return False + + +def validate_index_name(es_client, index_name): + """Validate that the index doesn't already exist.""" + if es_client.indices.exists(index=index_name): + print(f"Error: Index '{index_name}' already exists.") + print("This script requires a fresh index. Please choose a different name or delete the existing index.") + return False + print(f"✓ Index name '{index_name}' is available") + return True + + +def create_index_with_mapping(es_client, index_name): + """Create the Elasticsearch index with SARIF 2.1.0 mapping.""" + try: + es_client.indices.create(index=index_name, body=SARIF_MAPPING) + print(f"✓ Created index '{index_name}' with SARIF 2.1.0 mapping") + return True + except RequestError as e: + print(f"Error creating index: {e}") + return False + + +def read_sarif_files_list(list_file_path): + """Read the list of SARIF files from the specified text file.""" + list_file = Path(list_file_path) + if not list_file.exists(): + print(f"Error: SARIF files list not found at {list_file_path}") + return None + + base_dir = list_file.parent + sarif_files = [] + + with open(list_file, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line or line.startswith('#'): + continue + + # Resolve relative paths relative to the list file + sarif_path = base_dir / line + if not sarif_path.exists(): + print(f"Warning: SARIF file not found: {sarif_path} (line {line_num})") + continue + + if not sarif_path.suffix.lower() == '.sarif': + print(f"Warning: File does not have .sarif extension: {sarif_path} (line {line_num})") + continue + + sarif_files.append(sarif_path) + + print(f"✓ Found {len(sarif_files)} valid SARIF files to process") + return sarif_files + + +def parse_rule_id(rule_id): + """ + Parse a ruleId to extract ruleGroup and ruleLanguage. + + Examples: + - "cpp/misra/function-like-macros-defined" -> ruleGroup="cpp/misra", ruleLanguage="cpp" + - "c/misra/cast-between-pointer-to-object-and-non-int-arithmetic-type" -> ruleGroup="c/misra", ruleLanguage="c" + - "py/baseline/expected-extracted-files" -> ruleGroup="py/baseline", ruleLanguage="py" + """ + if not rule_id: + return None, None + + parts = rule_id.split('/') + if len(parts) < 2: + return None, None + + # First part is the language + rule_language = parts[0] + + # Rule group is language + first category (e.g., "cpp/misra", "py/baseline") + if len(parts) >= 2: + rule_group = f"{parts[0]}/{parts[1]}" + else: + rule_group = rule_language + + return rule_group, rule_language + + +def sarif_results_generator(sarif_files, index_name): + """ + Generator that yields Elasticsearch bulk actions for individual SARIF results. + + For each SARIF file: + 1. Processes each run in runs[] + 2. Extracts each result from runs[].results[] + 3. Creates a separate Elasticsearch document per result + 4. Adds derived fields (ruleGroup, ruleLanguage) from ruleId parsing + 5. Includes run-level metadata and source file tracking + + This approach allows for granular querying of individual code scanning findings + rather than treating entire SARIF files as single documents. + """ + from datetime import datetime + indexed_at = datetime.utcnow().isoformat() + + total_results = 0 + + for sarif_file in sarif_files: + print(f"Processing {sarif_file.name}...") + + try: + with open(sarif_file, 'r', encoding='utf-8') as f: + sarif_data = json.load(f) + + # Validate SARIF version + if sarif_data.get('version') != SARIF_VERSION: + print(f"Warning: {sarif_file.name} has version {sarif_data.get('version')}, expected {SARIF_VERSION}") + + # Extract metadata from the SARIF file + runs = sarif_data.get('runs', []) + if not runs: + print(f"Warning: No runs found in {sarif_file.name}") + continue + + file_results_count = 0 + for run_index, run in enumerate(runs): + results = run.get('results', []) + if not results: + print(f"Warning: No results found in run {run_index} of {sarif_file.name}") + continue + + file_results_count += len(results) + + # Extract run-level metadata + run_metadata = { + 'tool': run.get('tool', {}), + 'automationDetails': run.get('automationDetails', {}), + 'versionControlProvenance': run.get('versionControlProvenance', []) + } + + for result_index, result in enumerate(results): + # Create a document that includes both the result and metadata + document = dict(result) # Copy all result fields + + # Add derived fields from ruleId parsing + rule_id = document.get('ruleId') + if rule_id: + rule_group, rule_language = parse_rule_id(rule_id) + if rule_group: + document['ruleGroup'] = rule_group + if rule_language: + document['ruleLanguage'] = rule_language + + # Add run-level metadata + document['run'] = run_metadata + + # Add source file metadata + document['_sarif_source'] = { + 'file_path': str(sarif_file), + 'file_name': sarif_file.name, + 'indexed_at': indexed_at + } + + yield { + "_index": index_name, + "_source": document, + } + + total_results += 1 + + print(f" → Found {file_results_count} results in {sarif_file.name}") + + except FileNotFoundError: + print(f"Error: SARIF file not found: {sarif_file}") + continue + except json.JSONDecodeError as e: + print(f"Error: Could not decode JSON from {sarif_file}: {e}") + continue + except Exception as e: + print(f"Error processing {sarif_file}: {e}") + continue + + print(f"✓ Prepared {total_results} individual results for indexing from {len(sarif_files)} SARIF files") + + +def index_sarif_files(sarif_files, index_name, host, api_key=None): + """ + Connect to Elasticsearch and bulk index all SARIF results. + """ + es_client = create_elasticsearch_client(host, api_key) + + # Validate connection + if not validate_elasticsearch_connection(es_client, host): + return False + + # Validate index name + if not validate_index_name(es_client, index_name): + return False + + # Create index with mapping + if not create_index_with_mapping(es_client, index_name): + return False + + print(f"Indexing results from {len(sarif_files)} SARIF files...") + + try: + # Use bulk helper to index all documents + success_count, failed_docs = helpers.bulk( + es_client, + sarif_results_generator(sarif_files, index_name), + chunk_size=500, + request_timeout=60 + ) + + print("-" * 50) + print(f"✓ Bulk indexing complete") + print(f"✓ Successfully indexed: {success_count} documents") + print(f"✗ Failed to index: {len(failed_docs)} documents") + + if failed_docs: + print("\nFailed documents:") + for doc in failed_docs[:5]: # Show first 5 failures + print(f" - {doc}") + if len(failed_docs) > 5: + print(f" ... and {len(failed_docs) - 5} more") + + # Get final index stats + stats = es_client.indices.stats(index=index_name) + doc_count = stats['indices'][index_name]['total']['docs']['count'] + print(f"✓ Final document count in index: {doc_count}") + + return True + + except Exception as e: + print(f"Error during bulk indexing: {e}") + return False + + +def main(): + if len(sys.argv) != 3: + print(f"Usage: python {sys.argv[0]} ") + print() + print("Arguments:") + print(" sarif_files_list.txt - Text file containing relative paths to SARIF files (one per line)") + print(" elasticsearch_index_name - Name for the new Elasticsearch index to create") + print() + print("Environment Variables:") + print(" ES_LOCAL_URL - Elasticsearch host URL (default: http://localhost:9200)") + print(" ES_LOCAL_API_KEY - API key for authentication (optional)") + print() + print("Example:") + print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") + print(" ES_LOCAL_URL=https://my-cluster.elastic.co:9243 \\") + print(" ES_LOCAL_API_KEY=your_api_key \\") + print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") + sys.exit(1) + + sarif_files_list = sys.argv[1] + index_name = sys.argv[2] + + # Load environment variables from .env file if it exists + script_dir = Path(__file__).parent + env_file = script_dir / 'elastic-start-local' / '.env' + load_env_file(env_file) + + # Get configuration from environment variables + elastic_host = os.getenv('ES_LOCAL_URL', DEFAULT_ELASTIC_HOST) + elastic_api_key = os.getenv('ES_LOCAL_API_KEY') + + # Handle variable substitution in ES_LOCAL_URL if needed + if elastic_host and '${ES_LOCAL_PORT}' in elastic_host: + es_local_port = os.getenv('ES_LOCAL_PORT', '9200') + elastic_host = elastic_host.replace('${ES_LOCAL_PORT}', es_local_port) + + # Treat empty string or literal "None" as None for API key + if elastic_api_key == '' or elastic_api_key == 'None': + elastic_api_key = None + + print(f"SARIF Files Elasticsearch Indexer") + print(f"==================================") + print(f"SARIF files list: {sarif_files_list}") + print(f"Elasticsearch index: {index_name}") + print(f"Elasticsearch host: {elastic_host}") + print(f"Authentication: {'API Key' if elastic_api_key else 'None (HTTP Basic)'}") + print() + + # Read and validate SARIF files list + sarif_files = read_sarif_files_list(sarif_files_list) + if not sarif_files: + print("No valid SARIF files found. Exiting.") + sys.exit(1) + + # Index the files + if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key): + print(f"\n✓ Successfully created and populated index '{index_name}'") + print(f"You can now query the index using Elasticsearch APIs or Kibana.") + sys.exit(0) + else: + print(f"\n✗ Failed to create or populate index '{index_name}'") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/es-sarif/requirements.txt b/scripts/es-sarif/requirements.txt new file mode 100644 index 000000000..938ab14ff --- /dev/null +++ b/scripts/es-sarif/requirements.txt @@ -0,0 +1,5 @@ +# Required for index-sarif-results-in-elasticsearch.py +elasticsearch>=7.0.0,<9.0.0 + +# Note: run-gh-mrva-for-query-suite.py only uses Python standard library modules +# and does not require any external dependencies \ No newline at end of file diff --git a/scripts/es-sarif/run-gh-mrva-for-query-suite.md b/scripts/es-sarif/run-gh-mrva-for-query-suite.md new file mode 100644 index 000000000..3401f6956 --- /dev/null +++ b/scripts/es-sarif/run-gh-mrva-for-query-suite.md @@ -0,0 +1,191 @@ +# MRVA Query Suite Runner + +This script automates the execution of Multi-Repository Vulnerability Analysis (MRVA) sessions for all queries in a CodeQL query suite. It manages the complete lifecycle: submission, monitoring, and downloading results. + +## Features + +- **Automated Query Resolution**: Uses CodeQL CLI to resolve all queries in a suite +- **Concurrent Session Management**: Submits and monitors multiple MRVA sessions simultaneously +- **State Persistence**: Saves progress to resume interrupted runs +- **Status Monitoring**: Periodically checks session status and downloads completed results +- **Error Handling**: Robust error handling with detailed logging + +## Prerequisites + +1. **GitHub CLI with MRVA extension**: + + ```bash + gh extension install github/gh-mrva + ``` + +2. **CodeQL CLI**: Must be available in your PATH + + ```bash + codeql --version + ``` + +3. **Authentication**: GitHub CLI must be authenticated with appropriate permissions + +## Usage + +### Basic Usage + +```bash +python run-gh-mrva-for-query-suite.py \ + --query-suite codeql/misra-cpp-coding-standards@2.50.0 \ + --output-base-dir ./mrva/sessions \ + --session-prefix t1-misra-cpp-default \ + --language cpp \ + --repository-list cpp_top_1000 +``` + +### With Custom Concurrency + +```bash +python run-gh-mrva-for-query-suite.py \ + --query-suite codeql/misra-c-coding-standards@2.50.0 \ + --output-base-dir ./mrva/sessions \ + --session-prefix t1-misra-c-default \ + --language c \ + --repository-list cpp_top_1000 \ + --max-concurrent 10 \ + --check-interval 600 +``` + +## Arguments + +### Required Arguments + +- `--query-suite`: CodeQL query suite to analyze (e.g., `codeql/misra-cpp-coding-standards@2.50.0`) +- `--output-base-dir`: Base directory for output files and session state +- `--session-prefix`: Prefix for MRVA session names (e.g., `t1-misra-cpp-default`) +- `--language`: Programming language (`cpp`, `c`, `java`, `javascript`, `python`, `go`, `csharp`) +- `--repository-list`: Repository list for MRVA analysis (e.g., `top_1000`) + +### Optional Arguments + +- `--max-concurrent`: Maximum concurrent MRVA sessions (default: 20) +- `--check-interval`: Status check interval in seconds (default: 300) + +## Output Structure + +The script creates the following output structure: + +```text +/ +├── _state.json # Session state and progress +└── sessions/ # Downloaded SARIF results + ├── -query1/ + │ ├── owner1_repo1_12345.sarif + │ ├── owner2_repo2_12346.sarif + │ └── ... + ├── -query2/ + │ ├── owner1_repo1_12347.sarif + │ └── ... + └── ... +``` + +## State Management + +The script maintains a state file (`_state.json`) that tracks: + +- Query resolution and session mapping +- Submission timestamps +- Current status of each session +- Completion timestamps +- Download locations +- Error messages + +This allows the script to be safely interrupted and resumed. + +## Session Naming Convention + +MRVA sessions are named using the pattern: + +```text +- +``` + +For example: + +- Query: `cpp/misra/function-like-macros-defined.ql` +- Session: `t1-misra-cpp-default-function-like-macros-defined` + +## Workflow + +1. **Query Resolution**: Resolves all `.ql` files in the specified query suite +2. **Initial Submission**: Submits up to `max-concurrent` sessions +3. **Monitoring Loop**: + - Check status of active sessions every `check-interval` seconds + - Download results for completed sessions + - Submit new sessions when capacity is available + - Continue until all queries are processed +4. **Completion**: Provides final summary of results + +## Status Tracking + +Sessions progress through these states: + +- `not_started`: Session not yet submitted +- `submitted`: Session submitted to MRVA +- `in_progress`: Session is running +- `completed`: Session finished successfully +- `failed`: Session encountered an error + +## Integration with Elasticsearch + +After running this script, you can index the SARIF results into Elasticsearch using the companion script: + +```bash +# Create a file list of all SARIF files +find ./mrva/sessions -name "*.sarif" > results/misra/sarif-files.txt + +# Index results into Elasticsearch +python index-sarif-results-in-elasticsearch.py results/misra/sarif-files.txt misra-mrva-results-top-1000-repos +``` + +## Error Handling + +- **Network Issues**: Retries status checks and downloads +- **API Rate Limits**: Respects GitHub API limits with appropriate delays +- **Partial Failures**: Continues processing other sessions if individual sessions fail +- **Interruption**: Saves state and allows for clean resumption + +## Monitoring + +The script provides real-time status updates: + +```text +Status: {'submitted': 5, 'in_progress': 10, 'completed': 25, 'failed': 1} | Active: 15 | Remaining: 12 +Waiting 300 seconds before next check... +``` + +## Troubleshooting + +### Common Issues + +1. **MRVA Extension Not Found**: + + ```bash + gh extension install github/gh-mrva + ``` + +2. **CodeQL Not in PATH**: + + ```bash + export PATH="/path/to/codeql:$PATH" + ``` + +3. **Authentication Issues**: + + ```bash + gh auth login + ``` + +4. **Session Limits**: Reduce `--max-concurrent` if hitting API limits + +### Logs and Debugging + +- Session state is saved in `_state.json` +- Check GitHub CLI logs: `gh mrva list --json` +- Verify query resolution: `codeql resolve queries -- ` diff --git a/scripts/es-sarif/run-gh-mrva-for-query-suite.py b/scripts/es-sarif/run-gh-mrva-for-query-suite.py new file mode 100644 index 000000000..f79286d28 --- /dev/null +++ b/scripts/es-sarif/run-gh-mrva-for-query-suite.py @@ -0,0 +1,671 @@ +#!/usr/bin/env python3 +""" +github/codeql-coding-standards:scripts/es-sarif/run-gh-mrva-for-query-suite.py + +MRVA = Multi-Repository Vulnerability Analysis => scale one CodeQL query to many (1000s of) repos. + +This script creates a MRVA session for each query in a given query suite and manages +the execution lifecycle including submission, monitoring, and downloading of results. +The script can optionally load SARIF results into an Elasticsearch index for analysis. + +This script expects that the `mrva` extension has already been installed for the +`gh` CLI tool, such that `gh mrva` commands can be executed. + +Usage: + python run-gh-mrva-for-query-suite.py --query-suite --output-base-dir [options] + +Requirements: + - GitHub CLI with mrva extension installed + - CodeQL CLI available in PATH + - Python 3.11+ + - Optional: Elasticsearch for result indexing +""" + +import sys +import json +import os +import argparse +import subprocess +import time +from pathlib import Path +from typing import Dict, List, Optional +from datetime import datetime +import re + + +class MRVASession: + """Represents a single MRVA session for a CodeQL query.""" + + def __init__(self, query_path: str, session_name: str, language: str): + self.query_path = query_path + self.session_name = session_name + self.language = language + self.status = "not_started" # not_started, submitted, in_progress, completed, download_in_progress, downloaded, failed + self.submitted_at: Optional[datetime] = None + self.completed_at: Optional[datetime] = None + self.downloaded_at: Optional[datetime] = None + self.output_dir: Optional[Path] = None + self.error_message: Optional[str] = None + self.run_stats: Optional[Dict] = None # Track run-level statistics + + def to_dict(self) -> Dict: + """Convert session to dictionary for JSON serialization.""" + return { + "query_path": self.query_path, + "session_name": self.session_name, + "language": self.language, + "status": self.status, + "submitted_at": self.submitted_at.isoformat() if self.submitted_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "downloaded_at": self.downloaded_at.isoformat() if self.downloaded_at else None, + "output_dir": str(self.output_dir) if self.output_dir else None, + "error_message": self.error_message, + "run_stats": self.run_stats + } + + @classmethod + def from_dict(cls, data: Dict) -> 'MRVASession': + """Create session from dictionary.""" + session = cls(data["query_path"], data["session_name"], data["language"]) + session.status = data["status"] + session.submitted_at = datetime.fromisoformat(data["submitted_at"]) if data["submitted_at"] else None + session.completed_at = datetime.fromisoformat(data["completed_at"]) if data["completed_at"] else None + session.downloaded_at = datetime.fromisoformat(data["downloaded_at"]) if data.get("downloaded_at") else None + session.output_dir = Path(data["output_dir"]) if data["output_dir"] else None + session.error_message = data.get("error_message") + session.run_stats = data.get("run_stats") + return session + + +class MRVAManager: + """Manages multiple MRVA sessions for a query suite.""" + + def __init__(self, query_suite: str, output_base_dir: Path, session_prefix: str, + language: str, repository_list: str, max_concurrent: int = 20, dry_run: bool = False): + self.query_suite = query_suite + self.output_base_dir = output_base_dir + self.session_prefix = session_prefix + self.language = language + self.repository_list = repository_list + self.max_concurrent = max_concurrent + self.dry_run = dry_run + self.sessions: Dict[str, MRVASession] = {} + self.state_file = output_base_dir / f"{session_prefix}_state.json" + + # Create output directory + if not self.dry_run: + self.output_base_dir.mkdir(parents=True, exist_ok=True) + + # Load existing state if available + if not self.dry_run: + self._load_state() + + def _load_state(self): + """Load existing session state from file.""" + if self.state_file.exists(): + try: + with open(self.state_file, 'r') as f: + state_data = json.load(f) + for session_data in state_data.get("sessions", []): + session = MRVASession.from_dict(session_data) + self.sessions[session.session_name] = session + print(f"✓ Loaded {len(self.sessions)} existing sessions from state file") + except Exception as e: + print(f"Warning: Could not load state file: {e}") + + def _save_state(self): + """Save current session state to file.""" + state_data = { + "query_suite": self.query_suite, + "session_prefix": self.session_prefix, + "language": self.language, + "repository_list": self.repository_list, + "updated_at": datetime.utcnow().isoformat(), + "sessions": [session.to_dict() for session in self.sessions.values()] + } + + with open(self.state_file, 'w') as f: + json.dump(state_data, f, indent=2) + + def resolve_queries(self) -> List[str]: + """Resolve query files from the query suite using CodeQL CLI.""" + print(f"Resolving queries from suite: {self.query_suite}") + + try: + result = subprocess.run( + ["codeql", "resolve", "queries", "--", self.query_suite], + capture_output=True, text=True, check=True + ) + + query_paths = [] + for line in result.stdout.strip().split('\n'): + line = line.strip() + if line and line.endswith('.ql'): + query_paths.append(line) + + print(f"✓ Found {len(query_paths)} queries in suite") + return query_paths + + except subprocess.CalledProcessError as e: + print(f"Error resolving queries: {e}") + print(f"STDOUT: {e.stdout}") + print(f"STDERR: {e.stderr}") + sys.exit(1) + + def _generate_session_name(self, query_path: str) -> str: + """Generate a session name from query path.""" + # Extract query name from path + query_name = Path(query_path).stem + # Sanitize for session name + sanitized = re.sub(r'[^a-zA-Z0-9\-_]', '-', query_name) + return f"{self.session_prefix}-{sanitized}" + + def _get_active_sessions_count(self) -> int: + """Count sessions that are currently running.""" + return len([s for s in self.sessions.values() + if s.status in ["submitted", "in_progress"]]) + + def _can_submit_new_session(self) -> bool: + """Check if we can submit a new session based on concurrency limits.""" + return self._get_active_sessions_count() < self.max_concurrent + + def submit_session(self, query_path: str) -> bool: + """Submit a new MRVA session for a query.""" + session_name = self._generate_session_name(query_path) + + if session_name in self.sessions: + session = self.sessions[session_name] + if session.status in ["completed", "failed"]: + print(f"Session {session_name} already processed, skipping") + return True + elif session.status in ["submitted", "in_progress"]: + print(f"Session {session_name} already active, skipping") + return True + + # Create new session + session = MRVASession(query_path, session_name, self.language) + + if not self.dry_run: + print(f" Submitting: {session_name}") + else: + print(f" [DRY RUN] Would submit: {session_name}") + + try: + # Submit the MRVA session - use -q/--query flag as per CLI help + cmd = [ + "gh", "mrva", "submit", + "--language", self.language, + "--session", session_name, + "--list", self.repository_list, + "--query", query_path + ] + + if self.dry_run: + print(f" Command: {' '.join(cmd)}") + session.status = "submitted" + session.submitted_at = datetime.utcnow() + self.sessions[session_name] = session + return True + + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + + session.status = "submitted" + session.submitted_at = datetime.utcnow() + self.sessions[session_name] = session + + self._save_state() + return True + + except subprocess.CalledProcessError as e: + print(f" ✗ Failed to submit: {e}") + if e.stderr: + print(f" STDERR: {e.stderr}") + + session.status = "failed" + session.error_message = str(e) + self.sessions[session_name] = session + if not self.dry_run: + self._save_state() + return False + + def check_session_status(self, session_name: str) -> Optional[str]: + """Check the status of a specific session.""" + if self.dry_run: + # In dry run mode, simulate status progression + return "in_progress" + + try: + result = subprocess.run( + ["gh", "mrva", "status", "--json", "--session", session_name], + capture_output=True, text=True, check=True + ) + + status_data = json.loads(result.stdout) + # The status JSON is a list with one element containing session info + if status_data and len(status_data) > 0: + session_info = status_data[0] + return session_info.get("status", "unknown") + return None + + except subprocess.CalledProcessError: + return None + except json.JSONDecodeError: + return None + + def get_session_details(self, session_name: str) -> Optional[Dict]: + """Get detailed information about a session including run statuses.""" + if self.dry_run: + # In dry run mode, simulate session details + return { + "name": session_name, + "status": "in_progress", + "runs": [ + {"id": 12345, "status": "succeeded", "query": "sample.ql"}, + {"id": 12346, "status": "in_progress", "query": "sample2.ql"} + ] + } + + try: + result = subprocess.run( + ["gh", "mrva", "status", "--json", "--session", session_name], + capture_output=True, text=True, check=True + ) + + status_data = json.loads(result.stdout) + if status_data and len(status_data) > 0: + return status_data[0] + return None + + except subprocess.CalledProcessError: + return None + except json.JSONDecodeError: + return None + + def update_session_statuses(self): + """Update the status of all active sessions.""" + active_sessions = [s for s in self.sessions.values() + if s.status in ["submitted", "in_progress"]] + + if not active_sessions: + return + + print(f"\nChecking status of {len(active_sessions)} active sessions...") + + for session in active_sessions: + session_details = self.get_session_details(session.session_name) + if session_details: + old_status = session.status + new_status = session_details.get("status", "unknown") + + # Update session status + session.status = new_status + + # Extract and store run statistics + runs = session_details.get("runs", []) + if runs: + run_stats = { + "total": len(runs), + "succeeded": len([r for r in runs if r.get("status") == "succeeded"]), + "failed": len([r for r in runs if r.get("status") == "failed"]), + "in_progress": len([r for r in runs if r.get("status") == "in_progress"]), + "pending": len([r for r in runs if r.get("status") == "pending"]) + } + session.run_stats = run_stats + + # Check if session is completed + if new_status == "completed" and old_status != "completed": + session.completed_at = datetime.utcnow() + print(f" ✓ Session completed: {session.session_name}") + if session.run_stats: + print(f" Runs: {session.run_stats['succeeded']}/{session.run_stats['total']} succeeded, {session.run_stats['failed']} failed") + elif new_status == "failed" and old_status != "failed": + session.error_message = "Session failed" + print(f" ✗ Session failed: {session.session_name}") + elif new_status != old_status: + print(f" Status update: {session.session_name} -> {new_status}") + + # For in_progress sessions, show run details + if new_status == "in_progress" and session.run_stats: + print(f" {session.session_name}: {session.run_stats['succeeded']}/{session.run_stats['total']} runs completed, {session.run_stats['in_progress']} in progress") + + if not self.dry_run: + self._save_state() + + def download_completed_sessions(self): + """Download results for completed sessions.""" + completed_sessions = [s for s in self.sessions.values() + if s.status == "completed" and not s.output_dir] + + if not completed_sessions: + return + + print(f"\n{'[DRY RUN] ' if self.dry_run else ''}Downloading results for {len(completed_sessions)} completed sessions...") + + for session in completed_sessions: + output_dir = self.output_base_dir / "sessions" / session.session_name + + try: + # Mark as download in progress + session.status = "download_in_progress" + if not self.dry_run: + self._save_state() + + cmd = [ + "gh", "mrva", "download", + "--session", session.session_name, + "--output-dir", str(output_dir) + ] + + if self.dry_run: + print(f" Command: {' '.join(cmd)}") + session.output_dir = output_dir + session.status = "downloaded" + session.downloaded_at = datetime.utcnow() + print(f" ✓ [DRY RUN] Would download results: {session.session_name} -> {output_dir}") + continue + + print(f" Downloading: {session.session_name}...") + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + + session.output_dir = output_dir + session.status = "downloaded" + session.downloaded_at = datetime.utcnow() + print(f" ✓ Downloaded results: {session.session_name} -> {output_dir}") + + except subprocess.CalledProcessError as e: + print(f" ✗ Failed to download {session.session_name}: {e}") + print(f" STDERR: {e.stderr}") + session.status = "completed" # Revert to completed so we can retry + session.error_message = f"Download failed: {e}" + + if not self.dry_run: + self._save_state() + + def get_status_summary(self) -> Dict[str, int]: + """Get a summary of session statuses.""" + summary = {} + for session in self.sessions.values(): + summary[session.status] = summary.get(session.status, 0) + 1 + return summary + + def print_detailed_progress(self, remaining_queries: List[str]): + """Print detailed progress information about all sessions.""" + summary = self.get_status_summary() + + # Calculate various counts + not_started = len(remaining_queries) + submitted = summary.get("submitted", 0) + in_progress = summary.get("in_progress", 0) + completed_not_downloaded = summary.get("completed", 0) + download_in_progress = summary.get("download_in_progress", 0) + downloaded = summary.get("downloaded", 0) + failed = summary.get("failed", 0) + + total_queries = len(self.sessions) + not_started + total_processed = submitted + in_progress + completed_not_downloaded + download_in_progress + downloaded + failed + + print("\n" + "=" * 70) + print(f"PROGRESS REPORT - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") + print("=" * 70) + print(f"Total queries in suite: {total_queries}") + print(f" Not yet submitted: {not_started}") + print(f" Submitted (pending): {submitted}") + print(f" MRVA runs in progress: {in_progress}") + print(f" MRVA completed (not downloaded):{completed_not_downloaded}") + print(f" Download in progress: {download_in_progress}") + print(f" Downloaded (complete): {downloaded}") + print(f" Failed/Errored: {failed}") + print("-" * 70) + print(f"Total processed: {total_processed}/{total_queries}") + print(f"Completion rate: {(downloaded / total_queries * 100):.1f}%") + + # Show detailed run statistics for in-progress sessions + in_progress_sessions = [s for s in self.sessions.values() if s.status == "in_progress"] + if in_progress_sessions: + print("\nIn-Progress Sessions:") + for session in in_progress_sessions[:10]: # Show first 10 + if session.run_stats: + stats = session.run_stats + print(f" {session.session_name}:") + print(f" Runs: {stats['succeeded']}/{stats['total']} succeeded, " + f"{stats['in_progress']} in progress, {stats['failed']} failed") + if len(in_progress_sessions) > 10: + print(f" ... and {len(in_progress_sessions) - 10} more in-progress sessions") + + # Show recently completed sessions + completed_sessions = [s for s in self.sessions.values() + if s.status == "completed" and s.completed_at] + if completed_sessions: + recent = sorted(completed_sessions, key=lambda s: s.completed_at, reverse=True)[:5] + print("\nRecently Completed (awaiting download):") + for session in recent: + elapsed = (datetime.utcnow() - session.completed_at).total_seconds() / 60 + print(f" {session.session_name} (completed {elapsed:.1f} minutes ago)") + + # Show failed sessions + failed_sessions = [s for s in self.sessions.values() if s.status == "failed"] + if failed_sessions: + print("\nFailed Sessions:") + for session in failed_sessions[:5]: # Show first 5 + print(f" {session.session_name}") + if session.error_message: + print(f" Error: {session.error_message}") + if len(failed_sessions) > 5: + print(f" ... and {len(failed_sessions) - 5} more failed sessions") + + print("=" * 70) + print() + + def run_until_complete(self, check_interval: int = 300) -> bool: + """Run the MRVA manager until all sessions are complete.""" + # Resolve queries and create sessions + query_paths = self.resolve_queries() + + print(f"{'[DRY RUN] ' if self.dry_run else ''}Planning to submit {len(query_paths)} MRVA sessions...") + print(f"Session prefix: {self.session_prefix}") + print(f"Max concurrent sessions: {self.max_concurrent}") + print(f"Check interval: {check_interval} seconds") + print() + + if self.dry_run: + print("DRY RUN MODE: Commands will be printed but not executed") + print() + + # Submit initial batch of sessions + print(f"Submitting initial batch of up to {self.max_concurrent} sessions...") + submitted_count = 0 + for query_path in query_paths: + if not self._can_submit_new_session(): + break + + if self.submit_session(query_path): + submitted_count += 1 + if not self.dry_run: + time.sleep(1) # Brief pause between submissions + + print(f"\nInitial submission complete: {submitted_count} sessions submitted") + print(f"Remaining to submit when capacity available: {len(query_paths) - submitted_count}") + print() + + if self.dry_run: + print("\nDRY RUN: Stopping here. In real execution, the script would continue monitoring sessions.") + return True + + # Main monitoring loop + remaining_queries = [q for q in query_paths + if self._generate_session_name(q) not in self.sessions] + + iteration = 0 + while True: + iteration += 1 + + # Print detailed progress report + self.print_detailed_progress(remaining_queries) + + # Update status of active sessions + self.update_session_statuses() + + # Download completed sessions + self.download_completed_sessions() + + # Submit new sessions if we have capacity + while remaining_queries and self._can_submit_new_session(): + query_path = remaining_queries.pop(0) + if self.submit_session(query_path): + time.sleep(1) + + # Check if we're done + active_count = self._get_active_sessions_count() + completed_not_downloaded = len([s for s in self.sessions.values() + if s.status == "completed"]) + download_in_progress = len([s for s in self.sessions.values() + if s.status == "download_in_progress"]) + + if not remaining_queries and active_count == 0 and completed_not_downloaded == 0 and download_in_progress == 0: + print("\n✓ All sessions completed and downloaded!") + break + + # Wait before next check + print(f"Iteration {iteration} complete. Waiting {check_interval} seconds before next check...") + time.sleep(check_interval) + + # Final summary + final_summary = self.get_status_summary() + print(f"\nFinal summary: {final_summary}") + + return final_summary.get("completed", 0) > 0 + + +def main(): + parser = argparse.ArgumentParser( + description="Run MRVA sessions for all queries in a CodeQL query suite", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic usage + python run-gh-mrva-for-query-suite.py \\ + --query-suite codeql/misra-cpp-coding-standards@2.50.0 \\ + --output-base-dir ./mrva \\ + --session-prefix t1-misra-cpp-default \\ + --language cpp \\ + --repository-list cpp_top_1000 + + # With custom concurrency + python run-gh-mrva-for-query-suite.py \\ + --query-suite ../../cpp/misra/src/codeql-suites/misra-c-default.qls \\ + --output-base-dir ./mrva \\ + --session-prefix t1-misra-c-default \\ + --language c \\ + --repository-list cpp_top_1000 \\ + --max-concurrent 10 + + # Dry run to validate commands + python run-gh-mrva-for-query-suite.py \\ + --query-suite ../../cpp/misra/src/codeql-suites/misra-cpp-default.qls \\ + --output-base-dir ./mrva \\ + --session-prefix t1-misra-cpp-default \\ + --language cpp \\ + --repository-list cpp_top_1000 \\ + --dry-run + """ + ) + + parser.add_argument( + "--query-suite", required=True, + help="CodeQL query suite - either a pack reference (e.g., codeql/misra-cpp-coding-standards@2.50.0) or path to .qls file (e.g., ../../cpp/misra/src/codeql-suites/misra-cpp-default.qls)" + ) + parser.add_argument( + "--output-base-dir", required=True, type=Path, + help="Base directory for output files and session state" + ) + parser.add_argument( + "--session-prefix", required=True, + help="Prefix for MRVA session names (e.g., t1-misra-cpp-default)" + ) + parser.add_argument( + "--language", required=True, choices=["cpp", "c", "java", "javascript", "python", "go", "csharp"], + help="Programming language for analysis" + ) + parser.add_argument( + "--repository-list", required=True, + help="Repository list for MRVA (e.g., cpp_top_1000)" + ) + parser.add_argument( + "--max-concurrent", type=int, default=20, + help="Maximum number of concurrent MRVA sessions (default: 20)" + ) + parser.add_argument( + "--check-interval", type=int, default=300, + help="Interval in seconds between status checks (default: 300)" + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Print commands that would be executed without actually running them" + ) + + args = parser.parse_args() + + print("MRVA Query Suite Runner") + print("=" * 50) + print(f"Query suite: {args.query_suite}") + print(f"Output directory: {args.output_base_dir}") + print(f"Session prefix: {args.session_prefix}") + print(f"Language: {args.language}") + print(f"Repository list: {args.repository_list}") + print(f"Max concurrent: {args.max_concurrent}") + if args.dry_run: + print("DRY RUN MODE: Commands will be printed but not executed") + print() + + # Validate dependencies + if not args.dry_run: + try: + subprocess.run(["gh", "mrva", "--help"], capture_output=True, check=True) + print("✓ GitHub CLI with mrva extension is available") + except (subprocess.CalledProcessError, FileNotFoundError): + print("✗ GitHub CLI with mrva extension is required") + sys.exit(1) + + try: + subprocess.run(["codeql", "--version"], capture_output=True, check=True) + print("✓ CodeQL CLI is available") + except (subprocess.CalledProcessError, FileNotFoundError): + print("✗ CodeQL CLI is required") + sys.exit(1) + else: + print("✓ [DRY RUN] Skipping dependency validation") + + print() + + # Create and run MRVA manager + manager = MRVAManager( + query_suite=args.query_suite, + output_base_dir=args.output_base_dir, + session_prefix=args.session_prefix, + language=args.language, + repository_list=args.repository_list, + max_concurrent=args.max_concurrent, + dry_run=args.dry_run + ) + + try: + success = manager.run_until_complete(check_interval=args.check_interval) + + if success: + print(f"\n✓ MRVA sessions completed successfully") + print(f"Results available in: {args.output_base_dir}") + print(f"State file: {manager.state_file}") + sys.exit(0) + else: + print(f"\n✗ No sessions completed successfully") + sys.exit(1) + + except KeyboardInterrupt: + print(f"\n\nInterrupted by user") + print(f"Session state saved to: {manager.state_file}") + print(f"You can resume by running the script again with the same parameters") + sys.exit(130) + + +if __name__ == "__main__": + main() diff --git a/scripts/es-sarif/setup-venv.sh b/scripts/es-sarif/setup-venv.sh new file mode 100755 index 000000000..0545ca9bf --- /dev/null +++ b/scripts/es-sarif/setup-venv.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# Setup script for SARIF analysis environment (MRVA + Elasticsearch indexing) + +set -e # Exit on any error + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV_DIR="$SCRIPT_DIR/.venv" + +echo "SARIF Analysis Environment Setup" +echo "================================" +echo "Setting up Python virtual environment in: $VENV_DIR" +echo + +# Check Python 3.11 availability +if ! command -v python3.11 &> /dev/null; then + echo "Error: Python 3.11 is required but not found" + echo "Please install Python 3.11 first" + exit 1 +fi + +python_version=$(python3.11 --version 2>&1 | grep -oE '[0-9]+\.[0-9]+' | head -1) +echo "Python version: $python_version" + +# Create virtual environment +echo "Creating virtual environment with Python 3.11..." +python3.11 -m venv "$VENV_DIR" + +# Activate virtual environment +echo "Activating virtual environment..." +source "$VENV_DIR/bin/activate" + +# Upgrade pip +echo "Upgrading pip..." +pip install --upgrade pip + +# Install dependencies +echo "Installing dependencies from requirements.txt..." +pip install -r "$SCRIPT_DIR/requirements.txt" + +# Verify installation +echo +echo "Verifying installation..." +python -c "import elasticsearch; print(f'✓ Elasticsearch client version: {elasticsearch.__version__}')" + +echo +echo "✓ Setup complete!" +echo +echo "Source the python virtual environment within scripts/es-sarif:" +echo " source .venv/bin/activate" +echo +echo "To run MRVA for a query suite:" +echo " python run-gh-mrva-for-query-suite.py --query-suite codeql/misra-cpp-coding-standards@2.50.0 --output-base-dir ./mrva --session-prefix t1-misra-cpp-default --language cpp --repository-list top_1000" +echo +echo "To run the indexer:" +echo " python index-sarif-results-in-elasticsearch.py ../../mrva/sessions/sarif-files.txt sarif_results_2024" +echo +echo "To deactivate the virtual environment:" +echo " deactivate" \ No newline at end of file From 8806b101cdcbbd23c0167625047dc69cd4c560b8 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 07:05:01 -0600 Subject: [PATCH 2/8] Add sarif-infer-versionControlProvenance.py --- scripts/es-sarif/Makefile | 20 + .../index-sarif-results-in-elasticsearch.py | 264 ++-- scripts/es-sarif/pyproject.toml | 19 + scripts/es-sarif/requirements.txt | 3 + .../es-sarif/run-gh-mrva-for-query-suite.py | 1142 ++++++++++------- .../sarif-infer-versionControlProvenance.md | 43 + .../sarif-infer-versionControlProvenance.py | 293 +++++ 7 files changed, 1185 insertions(+), 599 deletions(-) create mode 100644 scripts/es-sarif/Makefile create mode 100644 scripts/es-sarif/pyproject.toml create mode 100644 scripts/es-sarif/sarif-infer-versionControlProvenance.md create mode 100755 scripts/es-sarif/sarif-infer-versionControlProvenance.py diff --git a/scripts/es-sarif/Makefile b/scripts/es-sarif/Makefile new file mode 100644 index 000000000..231691ec4 --- /dev/null +++ b/scripts/es-sarif/Makefile @@ -0,0 +1,20 @@ +.PHONY: format check install help + +check: + black --check --diff *.py + +compile: + python3 -m py_compile *.py + +format: + black *.py + +help: + @echo "Available targets:" + @echo " install - Install black formatter" + @echo " format - Format all Python files in this directory" + @echo " check - Check formatting without making changes" + +install: + pip install -r requirements.txt + diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index 0a130e307..c6ca7eece 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -3,14 +3,14 @@ SARIF Results Elasticsearch Indexer This script creates a fresh Elasticsearch index and indexes individual SARIF results -from multiple SARIF files into it. Each result from runs[].results[] becomes a -separate document in the Elasticsearch index, allowing for granular querying and +from multiple SARIF files into it. Each result from runs[].results[] becomes a +separate document in the Elasticsearch index, allowing for granular querying and analysis of code scanning findings. The script reads a list of SARIF file paths from a text file and bulk indexes all results into a single Elasticsearch index. Each result document includes: - All original SARIF result fields (ruleId, message, locations, etc.) -- Derived fields (ruleGroup, ruleLanguage) parsed from ruleId +- Derived fields (ruleGroup, ruleLanguage) parsed from ruleId - Run-level metadata (tool info, version control provenance) - Source file tracking metadata @@ -39,22 +39,25 @@ def load_env_file(env_file_path): """Load environment variables from a .env file.""" if not os.path.exists(env_file_path): return - - with open(env_file_path, 'r') as f: + + with open(env_file_path, "r") as f: for line in f: line = line.strip() - if line and not line.startswith('#') and '=' in line: - key, value = line.split('=', 1) + if line and not line.startswith("#") and "=" in line: + key, value = line.split("=", 1) # Handle variable substitution - if '${' in value and '}' in value: + if "${" in value and "}" in value: # Simple variable substitution for ${VAR} patterns import re + def replace_var(match): var_name = match.group(1) return os.environ.get(var_name, match.group(0)) - value = re.sub(r'\$\{([^}]+)\}', replace_var, value) + + value = re.sub(r"\$\{([^}]+)\}", replace_var, value) os.environ[key] = value + # --- Configuration --- DEFAULT_ELASTIC_HOST = "http://localhost:9200" SARIF_VERSION = "2.1.0" @@ -68,21 +71,18 @@ def replace_var(match): "ruleIndex": {"type": "integer"}, "kind": {"type": "keyword"}, "level": {"type": "keyword"}, - # Derived fields from ruleId parsing "ruleGroup": {"type": "keyword"}, "ruleLanguage": {"type": "keyword"}, - # Message object "message": { "properties": { "text": {"type": "text"}, "markdown": {"type": "text"}, "id": {"type": "keyword"}, - "arguments": {"type": "keyword"} + "arguments": {"type": "keyword"}, } }, - # Locations array - each result can have multiple locations "locations": { "type": "nested", @@ -94,7 +94,7 @@ def replace_var(match): "properties": { "uri": {"type": "keyword"}, "uriBaseId": {"type": "keyword"}, - "index": {"type": "integer"} + "index": {"type": "integer"}, } }, "region": { @@ -106,20 +106,16 @@ def replace_var(match): "charOffset": {"type": "integer"}, "charLength": {"type": "integer"}, "byteOffset": {"type": "integer"}, - "byteLength": {"type": "integer"} + "byteLength": {"type": "integer"}, } }, "contextRegion": { "properties": { "startLine": {"type": "integer"}, "endLine": {"type": "integer"}, - "snippet": { - "properties": { - "text": {"type": "text"} - } - } + "snippet": {"properties": {"text": {"type": "text"}}}, } - } + }, } }, "logicalLocations": { @@ -128,30 +124,22 @@ def replace_var(match): "name": {"type": "keyword"}, "fullyQualifiedName": {"type": "keyword"}, "decoratedName": {"type": "keyword"}, - "kind": {"type": "keyword"} - } - } - } + "kind": {"type": "keyword"}, + }, + }, + }, }, - # Rule reference - "rule": { - "properties": { - "id": {"type": "keyword"}, - "index": {"type": "integer"} - } - }, - + "rule": {"properties": {"id": {"type": "keyword"}, "index": {"type": "integer"}}}, # Fingerprints for deduplication "partialFingerprints": {"type": "object"}, "fingerprints": {"type": "object"}, - # Analysis and classification "analysisTarget": { "properties": { "uri": {"type": "keyword"}, "uriBaseId": {"type": "keyword"}, - "index": {"type": "integer"} + "index": {"type": "integer"}, } }, "guid": {"type": "keyword"}, @@ -159,7 +147,6 @@ def replace_var(match): "occurrenceCount": {"type": "integer"}, "rank": {"type": "float"}, "baselineState": {"type": "keyword"}, - # Run-level metadata (tool, repo info, etc.) "run": { "properties": { @@ -171,7 +158,7 @@ def replace_var(match): "organization": {"type": "keyword"}, "product": {"type": "keyword"}, "version": {"type": "keyword"}, - "semanticVersion": {"type": "keyword"} + "semanticVersion": {"type": "keyword"}, } } } @@ -180,41 +167,33 @@ def replace_var(match): "properties": { "id": {"type": "keyword"}, "guid": {"type": "keyword"}, - "correlationGuid": {"type": "keyword"} + "correlationGuid": {"type": "keyword"}, } }, "versionControlProvenance": { "type": "nested", "properties": { "repositoryUri": {"type": "keyword"}, - "revisionId": {"type": "keyword"} - } - } + "revisionId": {"type": "keyword"}, + }, + }, } }, - # Metadata for tracking source SARIF file "_sarif_source": { "properties": { "file_path": {"type": "keyword"}, "file_name": {"type": "keyword"}, - "indexed_at": {"type": "date"} + "indexed_at": {"type": "date"}, } - } + }, } }, "settings": { "number_of_shards": 1, "number_of_replicas": 0, - "analysis": { - "analyzer": { - "sarif_text": { - "type": "standard", - "stopwords": "_none_" - } - } - } - } + "analysis": {"analyzer": {"sarif_text": {"type": "standard", "stopwords": "_none_"}}}, + }, } @@ -225,7 +204,7 @@ def create_elasticsearch_client(host, api_key=None): hosts=[host], api_key=api_key.strip(), verify_certs=False, # For local development - ssl_show_warn=False + ssl_show_warn=False, ) else: return Elasticsearch(hosts=[host]) @@ -249,7 +228,9 @@ def validate_index_name(es_client, index_name): """Validate that the index doesn't already exist.""" if es_client.indices.exists(index=index_name): print(f"Error: Index '{index_name}' already exists.") - print("This script requires a fresh index. Please choose a different name or delete the existing index.") + print( + "This script requires a fresh index. Please choose a different name or delete the existing index." + ) return False print(f"✓ Index name '{index_name}' is available") return True @@ -272,28 +253,30 @@ def read_sarif_files_list(list_file_path): if not list_file.exists(): print(f"Error: SARIF files list not found at {list_file_path}") return None - + base_dir = list_file.parent sarif_files = [] - - with open(list_file, 'r') as f: + + with open(list_file, "r") as f: for line_num, line in enumerate(f, 1): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue - + # Resolve relative paths relative to the list file sarif_path = base_dir / line if not sarif_path.exists(): print(f"Warning: SARIF file not found: {sarif_path} (line {line_num})") continue - - if not sarif_path.suffix.lower() == '.sarif': - print(f"Warning: File does not have .sarif extension: {sarif_path} (line {line_num})") + + if not sarif_path.suffix.lower() == ".sarif": + print( + f"Warning: File does not have .sarif extension: {sarif_path} (line {line_num})" + ) continue - + sarif_files.append(sarif_path) - + print(f"✓ Found {len(sarif_files)} valid SARIF files to process") return sarif_files @@ -301,7 +284,7 @@ def read_sarif_files_list(list_file_path): def parse_rule_id(rule_id): """ Parse a ruleId to extract ruleGroup and ruleLanguage. - + Examples: - "cpp/misra/function-like-macros-defined" -> ruleGroup="cpp/misra", ruleLanguage="cpp" - "c/misra/cast-between-pointer-to-object-and-non-int-arithmetic-type" -> ruleGroup="c/misra", ruleLanguage="c" @@ -309,107 +292,110 @@ def parse_rule_id(rule_id): """ if not rule_id: return None, None - - parts = rule_id.split('/') + + parts = rule_id.split("/") if len(parts) < 2: return None, None - + # First part is the language rule_language = parts[0] - + # Rule group is language + first category (e.g., "cpp/misra", "py/baseline") if len(parts) >= 2: rule_group = f"{parts[0]}/{parts[1]}" else: rule_group = rule_language - + return rule_group, rule_language def sarif_results_generator(sarif_files, index_name): """ Generator that yields Elasticsearch bulk actions for individual SARIF results. - + For each SARIF file: 1. Processes each run in runs[] 2. Extracts each result from runs[].results[] 3. Creates a separate Elasticsearch document per result 4. Adds derived fields (ruleGroup, ruleLanguage) from ruleId parsing 5. Includes run-level metadata and source file tracking - + This approach allows for granular querying of individual code scanning findings rather than treating entire SARIF files as single documents. """ from datetime import datetime + indexed_at = datetime.utcnow().isoformat() - + total_results = 0 - + for sarif_file in sarif_files: print(f"Processing {sarif_file.name}...") - + try: - with open(sarif_file, 'r', encoding='utf-8') as f: + with open(sarif_file, "r", encoding="utf-8") as f: sarif_data = json.load(f) - + # Validate SARIF version - if sarif_data.get('version') != SARIF_VERSION: - print(f"Warning: {sarif_file.name} has version {sarif_data.get('version')}, expected {SARIF_VERSION}") - + if sarif_data.get("version") != SARIF_VERSION: + print( + f"Warning: {sarif_file.name} has version {sarif_data.get('version')}, expected {SARIF_VERSION}" + ) + # Extract metadata from the SARIF file - runs = sarif_data.get('runs', []) + runs = sarif_data.get("runs", []) if not runs: print(f"Warning: No runs found in {sarif_file.name}") continue - + file_results_count = 0 for run_index, run in enumerate(runs): - results = run.get('results', []) + results = run.get("results", []) if not results: print(f"Warning: No results found in run {run_index} of {sarif_file.name}") continue - + file_results_count += len(results) - + # Extract run-level metadata run_metadata = { - 'tool': run.get('tool', {}), - 'automationDetails': run.get('automationDetails', {}), - 'versionControlProvenance': run.get('versionControlProvenance', []) + "tool": run.get("tool", {}), + "automationDetails": run.get("automationDetails", {}), + "versionControlProvenance": run.get("versionControlProvenance", []), } - + for result_index, result in enumerate(results): # Create a document that includes both the result and metadata document = dict(result) # Copy all result fields - + # Add derived fields from ruleId parsing - rule_id = document.get('ruleId') + rule_id = document.get("ruleId") if rule_id: rule_group, rule_language = parse_rule_id(rule_id) if rule_group: - document['ruleGroup'] = rule_group + document["ruleGroup"] = rule_group if rule_language: - document['ruleLanguage'] = rule_language - + document["ruleLanguage"] = rule_language + # Add run-level metadata - document['run'] = run_metadata - + document["run"] = run_metadata + # Add source file metadata - document['_sarif_source'] = { - 'file_path': str(sarif_file), - 'file_name': sarif_file.name, - 'indexed_at': indexed_at + document["_sarif_source"] = { + "file_path": str(sarif_file), + "file_name": sarif_file.name, + "indexed_at": indexed_at, } - + yield { "_index": index_name, "_source": document, } - + total_results += 1 - + print(f" → Found {file_results_count} results in {sarif_file.name}") - + except FileNotFoundError: print(f"Error: SARIF file not found: {sarif_file}") continue @@ -419,8 +405,10 @@ def sarif_results_generator(sarif_files, index_name): except Exception as e: print(f"Error processing {sarif_file}: {e}") continue - - print(f"✓ Prepared {total_results} individual results for indexing from {len(sarif_files)} SARIF files") + + print( + f"✓ Prepared {total_results} individual results for indexing from {len(sarif_files)} SARIF files" + ) def index_sarif_files(sarif_files, index_name, host, api_key=None): @@ -428,49 +416,49 @@ def index_sarif_files(sarif_files, index_name, host, api_key=None): Connect to Elasticsearch and bulk index all SARIF results. """ es_client = create_elasticsearch_client(host, api_key) - + # Validate connection if not validate_elasticsearch_connection(es_client, host): return False - + # Validate index name if not validate_index_name(es_client, index_name): return False - + # Create index with mapping if not create_index_with_mapping(es_client, index_name): return False - + print(f"Indexing results from {len(sarif_files)} SARIF files...") - + try: # Use bulk helper to index all documents success_count, failed_docs = helpers.bulk( - es_client, + es_client, sarif_results_generator(sarif_files, index_name), chunk_size=500, - request_timeout=60 + request_timeout=60, ) - + print("-" * 50) print(f"✓ Bulk indexing complete") print(f"✓ Successfully indexed: {success_count} documents") print(f"✗ Failed to index: {len(failed_docs)} documents") - + if failed_docs: print("\nFailed documents:") for doc in failed_docs[:5]: # Show first 5 failures print(f" - {doc}") if len(failed_docs) > 5: print(f" ... and {len(failed_docs) - 5} more") - + # Get final index stats stats = es_client.indices.stats(index=index_name) - doc_count = stats['indices'][index_name]['total']['docs']['count'] + doc_count = stats["indices"][index_name]["total"]["docs"]["count"] print(f"✓ Final document count in index: {doc_count}") - + return True - + except Exception as e: print(f"Error during bulk indexing: {e}") return False @@ -481,7 +469,9 @@ def main(): print(f"Usage: python {sys.argv[0]} ") print() print("Arguments:") - print(" sarif_files_list.txt - Text file containing relative paths to SARIF files (one per line)") + print( + " sarif_files_list.txt - Text file containing relative paths to SARIF files (one per line)" + ) print(" elasticsearch_index_name - Name for the new Elasticsearch index to create") print() print("Environment Variables:") @@ -494,28 +484,28 @@ def main(): print(" ES_LOCAL_API_KEY=your_api_key \\") print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") sys.exit(1) - + sarif_files_list = sys.argv[1] index_name = sys.argv[2] - + # Load environment variables from .env file if it exists script_dir = Path(__file__).parent - env_file = script_dir / 'elastic-start-local' / '.env' + env_file = script_dir / "elastic-start-local" / ".env" load_env_file(env_file) - + # Get configuration from environment variables - elastic_host = os.getenv('ES_LOCAL_URL', DEFAULT_ELASTIC_HOST) - elastic_api_key = os.getenv('ES_LOCAL_API_KEY') - + elastic_host = os.getenv("ES_LOCAL_URL", DEFAULT_ELASTIC_HOST) + elastic_api_key = os.getenv("ES_LOCAL_API_KEY") + # Handle variable substitution in ES_LOCAL_URL if needed - if elastic_host and '${ES_LOCAL_PORT}' in elastic_host: - es_local_port = os.getenv('ES_LOCAL_PORT', '9200') - elastic_host = elastic_host.replace('${ES_LOCAL_PORT}', es_local_port) - + if elastic_host and "${ES_LOCAL_PORT}" in elastic_host: + es_local_port = os.getenv("ES_LOCAL_PORT", "9200") + elastic_host = elastic_host.replace("${ES_LOCAL_PORT}", es_local_port) + # Treat empty string or literal "None" as None for API key - if elastic_api_key == '' or elastic_api_key == 'None': + if elastic_api_key == "" or elastic_api_key == "None": elastic_api_key = None - + print(f"SARIF Files Elasticsearch Indexer") print(f"==================================") print(f"SARIF files list: {sarif_files_list}") @@ -523,13 +513,13 @@ def main(): print(f"Elasticsearch host: {elastic_host}") print(f"Authentication: {'API Key' if elastic_api_key else 'None (HTTP Basic)'}") print() - + # Read and validate SARIF files list sarif_files = read_sarif_files_list(sarif_files_list) if not sarif_files: print("No valid SARIF files found. Exiting.") sys.exit(1) - + # Index the files if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key): print(f"\n✓ Successfully created and populated index '{index_name}'") @@ -541,4 +531,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/scripts/es-sarif/pyproject.toml b/scripts/es-sarif/pyproject.toml new file mode 100644 index 000000000..eea3d934c --- /dev/null +++ b/scripts/es-sarif/pyproject.toml @@ -0,0 +1,19 @@ +[tool.black] +line-length = 100 +target-version = ['py311'] +include = '\.pyi?$' +extend-exclude = ''' +/( + # directories + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' diff --git a/scripts/es-sarif/requirements.txt b/scripts/es-sarif/requirements.txt index 938ab14ff..241829792 100644 --- a/scripts/es-sarif/requirements.txt +++ b/scripts/es-sarif/requirements.txt @@ -1,5 +1,8 @@ # Required for index-sarif-results-in-elasticsearch.py elasticsearch>=7.0.0,<9.0.0 +# Code formatter +black>=24.0.0 + # Note: run-gh-mrva-for-query-suite.py only uses Python standard library modules # and does not require any external dependencies \ No newline at end of file diff --git a/scripts/es-sarif/run-gh-mrva-for-query-suite.py b/scripts/es-sarif/run-gh-mrva-for-query-suite.py index f79286d28..ff23e4334 100644 --- a/scripts/es-sarif/run-gh-mrva-for-query-suite.py +++ b/scripts/es-sarif/run-gh-mrva-for-query-suite.py @@ -6,7 +6,8 @@ This script creates a MRVA session for each query in a given query suite and manages the execution lifecycle including submission, monitoring, and downloading of results. -The script can optionally load SARIF results into an Elasticsearch index for analysis. +The script uses a pipeline-based approach where each query is processed asynchronously +through its complete lifecycle: submit -> monitor -> download -> copy results. This script expects that the `mrva` extension has already been installed for the `gh` CLI tool, such that `gh mrva` commands can be executed. @@ -18,7 +19,6 @@ - GitHub CLI with mrva extension installed - CodeQL CLI available in PATH - Python 3.11+ - - Optional: Elasticsearch for result indexing """ import sys @@ -27,518 +27,731 @@ import argparse import subprocess import time +import re +import threading from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Set, Tuple from datetime import datetime -import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from enum import Enum -class MRVASession: - """Represents a single MRVA session for a CodeQL query.""" - - def __init__(self, query_path: str, session_name: str, language: str): - self.query_path = query_path - self.session_name = session_name - self.language = language - self.status = "not_started" # not_started, submitted, in_progress, completed, download_in_progress, downloaded, failed - self.submitted_at: Optional[datetime] = None - self.completed_at: Optional[datetime] = None - self.downloaded_at: Optional[datetime] = None - self.output_dir: Optional[Path] = None - self.error_message: Optional[str] = None - self.run_stats: Optional[Dict] = None # Track run-level statistics - +class PipelineState(Enum): + """States in the MRVA processing pipeline.""" + + NOT_STARTED = "not_started" + SUBMITTING = "submitting" + SUBMITTED = "submitted" + MONITORING = "monitoring" + COMPLETED = "completed" + DOWNLOADING = "downloading" + DOWNLOADED = "downloaded" + COPYING = "copying" + FINISHED = "finished" + FAILED = "failed" + + +@dataclass +class MRVAPipeline: + """Represents the processing pipeline for a single query.""" + + query_path: str + session_name: str + language: str + state: PipelineState = PipelineState.NOT_STARTED + submitted_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + downloaded_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + error_message: Optional[str] = None + run_count: int = 0 + succeeded_runs: int = 0 + failed_runs: int = 0 + def to_dict(self) -> Dict: - """Convert session to dictionary for JSON serialization.""" + """Convert pipeline to dictionary for JSON serialization.""" return { "query_path": self.query_path, "session_name": self.session_name, "language": self.language, - "status": self.status, + "state": self.state.value, "submitted_at": self.submitted_at.isoformat() if self.submitted_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "downloaded_at": self.downloaded_at.isoformat() if self.downloaded_at else None, - "output_dir": str(self.output_dir) if self.output_dir else None, + "finished_at": self.finished_at.isoformat() if self.finished_at else None, "error_message": self.error_message, - "run_stats": self.run_stats + "run_count": self.run_count, + "succeeded_runs": self.succeeded_runs, + "failed_runs": self.failed_runs, } - + @classmethod - def from_dict(cls, data: Dict) -> 'MRVASession': - """Create session from dictionary.""" - session = cls(data["query_path"], data["session_name"], data["language"]) - session.status = data["status"] - session.submitted_at = datetime.fromisoformat(data["submitted_at"]) if data["submitted_at"] else None - session.completed_at = datetime.fromisoformat(data["completed_at"]) if data["completed_at"] else None - session.downloaded_at = datetime.fromisoformat(data["downloaded_at"]) if data.get("downloaded_at") else None - session.output_dir = Path(data["output_dir"]) if data["output_dir"] else None - session.error_message = data.get("error_message") - session.run_stats = data.get("run_stats") - return session + def from_dict(cls, data: Dict) -> "MRVAPipeline": + """Create pipeline from dictionary.""" + pipeline = cls( + query_path=data["query_path"], + session_name=data["session_name"], + language=data["language"], + state=PipelineState(data["state"]), + ) + pipeline.submitted_at = ( + datetime.fromisoformat(data["submitted_at"]) if data.get("submitted_at") else None + ) + pipeline.completed_at = ( + datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None + ) + pipeline.downloaded_at = ( + datetime.fromisoformat(data["downloaded_at"]) if data.get("downloaded_at") else None + ) + pipeline.finished_at = ( + datetime.fromisoformat(data["finished_at"]) if data.get("finished_at") else None + ) + pipeline.error_message = data.get("error_message") + pipeline.run_count = data.get("run_count", 0) + pipeline.succeeded_runs = data.get("succeeded_runs", 0) + pipeline.failed_runs = data.get("failed_runs", 0) + return pipeline class MRVAManager: - """Manages multiple MRVA sessions for a query suite.""" - - def __init__(self, query_suite: str, output_base_dir: Path, session_prefix: str, - language: str, repository_list: str, max_concurrent: int = 20, dry_run: bool = False): + """Manages MRVA pipelines for a query suite using concurrent processing.""" + + def __init__( + self, + query_suite: str, + output_base_dir: Path, + session_prefix: str, + language: str, + repository_list: str, + max_concurrent: int = 20, + check_interval: int = 300, + dry_run: bool = False, + ): self.query_suite = query_suite self.output_base_dir = output_base_dir self.session_prefix = session_prefix self.language = language self.repository_list = repository_list self.max_concurrent = max_concurrent + self.check_interval = check_interval self.dry_run = dry_run - self.sessions: Dict[str, MRVASession] = {} + + # Directories + self.sessions_dir = output_base_dir / "sessions" + self.status_dir = output_base_dir / "sessions" / "status" + self.results_dir = output_base_dir / "results" / session_prefix self.state_file = output_base_dir / f"{session_prefix}_state.json" - - # Create output directory - if not self.dry_run: - self.output_base_dir.mkdir(parents=True, exist_ok=True) - - # Load existing state if available + self.sarif_list_file = self.results_dir / "sarif-files.txt" + + # Pipeline tracking + self.pipelines: Dict[str, MRVAPipeline] = {} + self.pipelines_lock = threading.Lock() + + # Create directories (only in non-dry-run mode) if not self.dry_run: - self._load_state() - + self.sessions_dir.mkdir(parents=True, exist_ok=True) + self.status_dir.mkdir(parents=True, exist_ok=True) + self.results_dir.mkdir(parents=True, exist_ok=True) + + # Load existing state if available (both modes) + self._load_state() + def _load_state(self): - """Load existing session state from file.""" + """Load existing pipeline state from file.""" if self.state_file.exists(): try: - with open(self.state_file, 'r') as f: + with open(self.state_file, "r") as f: state_data = json.load(f) - for session_data in state_data.get("sessions", []): - session = MRVASession.from_dict(session_data) - self.sessions[session.session_name] = session - print(f"✓ Loaded {len(self.sessions)} existing sessions from state file") + + for pipeline_data in state_data.get("pipelines", []): + pipeline = MRVAPipeline.from_dict(pipeline_data) + self.pipelines[pipeline.session_name] = pipeline + + print(f"✓ Loaded state for {len(self.pipelines)} pipelines from {self.state_file}") except Exception as e: print(f"Warning: Could not load state file: {e}") - + def _save_state(self): - """Save current session state to file.""" + """Save current pipeline state to file.""" + # Don't save state in dry-run mode to avoid creating directories + if self.dry_run: + return + state_data = { "query_suite": self.query_suite, "session_prefix": self.session_prefix, "language": self.language, "repository_list": self.repository_list, "updated_at": datetime.utcnow().isoformat(), - "sessions": [session.to_dict() for session in self.sessions.values()] + "pipelines": [p.to_dict() for p in self.pipelines.values()], } - - with open(self.state_file, 'w') as f: + + # Ensure parent directory exists + self.state_file.parent.mkdir(parents=True, exist_ok=True) + + with open(self.state_file, "w") as f: json.dump(state_data, f, indent=2) - + def resolve_queries(self) -> List[str]: """Resolve query files from the query suite using CodeQL CLI.""" print(f"Resolving queries from suite: {self.query_suite}") - + try: result = subprocess.run( ["codeql", "resolve", "queries", "--", self.query_suite], - capture_output=True, text=True, check=True + capture_output=True, + text=True, + check=True, ) - + query_paths = [] - for line in result.stdout.strip().split('\n'): + for line in result.stdout.strip().split("\n"): line = line.strip() - if line and line.endswith('.ql'): + if line and line.endswith(".ql"): query_paths.append(line) - + print(f"✓ Found {len(query_paths)} queries in suite") return query_paths - + except subprocess.CalledProcessError as e: print(f"Error resolving queries: {e}") print(f"STDOUT: {e.stdout}") print(f"STDERR: {e.stderr}") sys.exit(1) - + def _generate_session_name(self, query_path: str) -> str: """Generate a session name from query path.""" - # Extract query name from path query_name = Path(query_path).stem - # Sanitize for session name - sanitized = re.sub(r'[^a-zA-Z0-9\-_]', '-', query_name) + sanitized = re.sub(r"[^a-zA-Z0-9\-_]", "-", query_name) return f"{self.session_prefix}-{sanitized}" - - def _get_active_sessions_count(self) -> int: - """Count sessions that are currently running.""" - return len([s for s in self.sessions.values() - if s.status in ["submitted", "in_progress"]]) - - def _can_submit_new_session(self) -> bool: - """Check if we can submit a new session based on concurrency limits.""" - return self._get_active_sessions_count() < self.max_concurrent - - def submit_session(self, query_path: str) -> bool: - """Submit a new MRVA session for a query.""" - session_name = self._generate_session_name(query_path) - - if session_name in self.sessions: - session = self.sessions[session_name] - if session.status in ["completed", "failed"]: - print(f"Session {session_name} already processed, skipping") - return True - elif session.status in ["submitted", "in_progress"]: - print(f"Session {session_name} already active, skipping") - return True - - # Create new session - session = MRVASession(query_path, session_name, self.language) - - if not self.dry_run: - print(f" Submitting: {session_name}") - else: - print(f" [DRY RUN] Would submit: {session_name}") - - try: - # Submit the MRVA session - use -q/--query flag as per CLI help - cmd = [ - "gh", "mrva", "submit", - "--language", self.language, - "--session", session_name, - "--list", self.repository_list, - "--query", query_path - ] - - if self.dry_run: - print(f" Command: {' '.join(cmd)}") - session.status = "submitted" - session.submitted_at = datetime.utcnow() - self.sessions[session_name] = session - return True - - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - - session.status = "submitted" - session.submitted_at = datetime.utcnow() - self.sessions[session_name] = session - - self._save_state() - return True - - except subprocess.CalledProcessError as e: - print(f" ✗ Failed to submit: {e}") - if e.stderr: - print(f" STDERR: {e.stderr}") - - session.status = "failed" - session.error_message = str(e) - self.sessions[session_name] = session - if not self.dry_run: - self._save_state() - return False - - def check_session_status(self, session_name: str) -> Optional[str]: - """Check the status of a specific session.""" - if self.dry_run: - # In dry run mode, simulate status progression - return "in_progress" - + + def get_existing_sessions(self) -> Dict[str, Dict]: + """Get existing MRVA sessions from gh mrva list.""" + # Save list output to file for performance and debugging + list_file = self.sessions_dir / "mrva-sessions-list.json" + try: result = subprocess.run( - ["gh", "mrva", "status", "--json", "--session", session_name], - capture_output=True, text=True, check=True + ["gh", "mrva", "list", "--json"], capture_output=True, text=True, check=True ) - - status_data = json.loads(result.stdout) - # The status JSON is a list with one element containing session info - if status_data and len(status_data) > 0: - session_info = status_data[0] - return session_info.get("status", "unknown") - return None - - except subprocess.CalledProcessError: - return None - except json.JSONDecodeError: - return None - - def get_session_details(self, session_name: str) -> Optional[Dict]: - """Get detailed information about a session including run statuses.""" - if self.dry_run: - # In dry run mode, simulate session details - return { - "name": session_name, - "status": "in_progress", - "runs": [ - {"id": 12345, "status": "succeeded", "query": "sample.ql"}, - {"id": 12346, "status": "in_progress", "query": "sample2.ql"} - ] - } - + + # Save output to file (only if not in dry-run to avoid creating directories) + if not self.dry_run: + list_file.write_text(result.stdout) + + sessions_data = json.loads(result.stdout) + sessions_by_name = {} + + for session in sessions_data: + session_name = session.get("name", "") + if session_name.startswith(self.session_prefix): + sessions_by_name[session_name] = session + + return sessions_by_name + + except subprocess.CalledProcessError as e: + print(f"Warning: Could not list existing sessions: {e}") + return {} + except json.JSONDecodeError as e: + print(f"Warning: Could not parse session list: {e}") + return {} + + def get_session_status(self, session_name: str) -> Optional[Dict]: + """Get detailed status for a specific session.""" + # Save status output to file for performance and debugging (only in non-dry-run) + status_file = self.status_dir / f"{session_name}_status.json" + try: result = subprocess.run( ["gh", "mrva", "status", "--json", "--session", session_name], - capture_output=True, text=True, check=True + capture_output=True, + text=True, + check=True, ) - + + # Save output to file (only if not in dry-run to avoid creating directories) + if not self.dry_run: + status_file.write_text(result.stdout) + status_data = json.loads(result.stdout) if status_data and len(status_data) > 0: return status_data[0] return None - + except subprocess.CalledProcessError: return None except json.JSONDecodeError: return None - - def update_session_statuses(self): - """Update the status of all active sessions.""" - active_sessions = [s for s in self.sessions.values() - if s.status in ["submitted", "in_progress"]] - - if not active_sessions: - return - - print(f"\nChecking status of {len(active_sessions)} active sessions...") - - for session in active_sessions: - session_details = self.get_session_details(session.session_name) - if session_details: - old_status = session.status - new_status = session_details.get("status", "unknown") - - # Update session status - session.status = new_status - - # Extract and store run statistics - runs = session_details.get("runs", []) - if runs: - run_stats = { - "total": len(runs), - "succeeded": len([r for r in runs if r.get("status") == "succeeded"]), - "failed": len([r for r in runs if r.get("status") == "failed"]), - "in_progress": len([r for r in runs if r.get("status") == "in_progress"]), - "pending": len([r for r in runs if r.get("status") == "pending"]) - } - session.run_stats = run_stats - - # Check if session is completed - if new_status == "completed" and old_status != "completed": - session.completed_at = datetime.utcnow() - print(f" ✓ Session completed: {session.session_name}") - if session.run_stats: - print(f" Runs: {session.run_stats['succeeded']}/{session.run_stats['total']} succeeded, {session.run_stats['failed']} failed") - elif new_status == "failed" and old_status != "failed": - session.error_message = "Session failed" - print(f" ✗ Session failed: {session.session_name}") - elif new_status != old_status: - print(f" Status update: {session.session_name} -> {new_status}") - - # For in_progress sessions, show run details - if new_status == "in_progress" and session.run_stats: - print(f" {session.session_name}: {session.run_stats['succeeded']}/{session.run_stats['total']} runs completed, {session.run_stats['in_progress']} in progress") - + + def check_downloads_exist(self, session_name: str) -> Tuple[bool, List[str]]: + """Check if downloads already exist for a session.""" + session_dir = self.sessions_dir / session_name + + if not session_dir.exists(): + return False, [] + + # Look for SARIF files + sarif_files = list(session_dir.glob("*.sarif")) + return len(sarif_files) > 0, [str(f) for f in sarif_files] + + def submit_session(self, pipeline: MRVAPipeline) -> bool: + """Submit a new MRVA session.""" + try: + cmd = [ + "gh", + "mrva", + "submit", + "--language", + self.language, + "--session", + pipeline.session_name, + "--list", + self.repository_list, + "--query", + pipeline.query_path, + ] + + if self.dry_run: + # In dry run, just mark as submitted without printing per-session details + pipeline.state = PipelineState.SUBMITTED + pipeline.submitted_at = datetime.utcnow() + return True + + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + + pipeline.state = PipelineState.SUBMITTED + pipeline.submitted_at = datetime.utcnow() + return True + + except subprocess.CalledProcessError as e: + pipeline.state = PipelineState.FAILED + pipeline.error_message = f"Submit failed: {e.stderr if e.stderr else str(e)}" + return False + + def monitor_session(self, pipeline: MRVAPipeline) -> bool: + """Monitor a session until it completes.""" + pipeline.state = PipelineState.MONITORING + + # In dry-run, just check status once and return without waiting + if self.dry_run: + session_status = self.get_session_status(pipeline.session_name) + + if not session_status: + # Session doesn't exist yet, would need to be submitted + pipeline.state = PipelineState.NOT_STARTED + pipeline.error_message = "Session does not exist (would need to be submitted)" + return False + + status = session_status.get("status", "").lower() + runs = session_status.get("runs", []) + + pipeline.run_count = len(runs) + pipeline.succeeded_runs = len([r for r in runs if r.get("status") == "succeeded"]) + pipeline.failed_runs = len([r for r in runs if r.get("status") == "failed"]) + in_progress = len([r for r in runs if r.get("status") in ["pending", "in_progress"]]) + + if status in ["completed", "succeeded"] and in_progress == 0: + pipeline.state = PipelineState.COMPLETED + pipeline.completed_at = datetime.utcnow() + return True + elif status == "failed": + pipeline.state = PipelineState.FAILED + pipeline.error_message = f"MRVA session failed (status: {status})" + return False + else: + # Still in progress + pipeline.state = PipelineState.SUBMITTED + pipeline.error_message = ( + f"Session still in progress (status: {status}, runs: {pipeline.run_count})" + ) + return False + + # Non-dry-run mode: actually monitor until completion + while True: + session_status = self.get_session_status(pipeline.session_name) + + if not session_status: + time.sleep(self.check_interval) + continue + + status = session_status.get("status", "").lower() + runs = session_status.get("runs", []) + + # Count run statuses + pipeline.run_count = len(runs) + pipeline.succeeded_runs = len([r for r in runs if r.get("status") == "succeeded"]) + pipeline.failed_runs = len([r for r in runs if r.get("status") == "failed"]) + in_progress = len([r for r in runs if r.get("status") in ["pending", "in_progress"]]) + + if status in ["completed", "succeeded"] and in_progress == 0: + pipeline.state = PipelineState.COMPLETED + pipeline.completed_at = datetime.utcnow() + return True + elif status == "failed": + pipeline.state = PipelineState.FAILED + pipeline.error_message = "Session failed" + return False + + time.sleep(self.check_interval) + + def download_session(self, pipeline: MRVAPipeline) -> bool: + """Download results for a completed session.""" + pipeline.state = PipelineState.DOWNLOADING + + session_dir = self.sessions_dir / pipeline.session_name + + # Only create directory in non-dry-run mode if not self.dry_run: - self._save_state() - - def download_completed_sessions(self): - """Download results for completed sessions.""" - completed_sessions = [s for s in self.sessions.values() - if s.status == "completed" and not s.output_dir] - - if not completed_sessions: - return - - print(f"\n{'[DRY RUN] ' if self.dry_run else ''}Downloading results for {len(completed_sessions)} completed sessions...") - - for session in completed_sessions: - output_dir = self.output_base_dir / "sessions" / session.session_name - - try: - # Mark as download in progress - session.status = "download_in_progress" - if not self.dry_run: + session_dir.mkdir(parents=True, exist_ok=True) + + # In dry-run, just mark as would-be-downloaded + if self.dry_run: + pipeline.state = PipelineState.DOWNLOADED + pipeline.downloaded_at = datetime.utcnow() + return True + + try: + # Download all results for the session at once + cmd = [ + "gh", + "mrva", + "download", + "--session", + pipeline.session_name, + "--output-dir", + str(session_dir), + ] + + subprocess.run(cmd, capture_output=True, text=True, check=True) + + pipeline.state = PipelineState.DOWNLOADED + pipeline.downloaded_at = datetime.utcnow() + return True + + except subprocess.CalledProcessError as e: + pipeline.state = PipelineState.FAILED + pipeline.error_message = f"Download failed: {e.stderr if e.stderr else str(e)}" + return False + + def copy_results(self, pipeline: MRVAPipeline) -> bool: + """Copy downloaded SARIF files to results directory and update sarif-files.txt.""" + pipeline.state = PipelineState.COPYING + + session_dir = self.sessions_dir / pipeline.session_name + + # In dry-run, skip the actual file operations + if self.dry_run: + pipeline.state = PipelineState.FINISHED + pipeline.finished_at = datetime.utcnow() + return True + + if not session_dir.exists(): + pipeline.state = PipelineState.FAILED + pipeline.error_message = "Session directory does not exist" + return False + + # Find all SARIF files in session directory + sarif_files = list(session_dir.glob("*.sarif")) + + if not sarif_files: + # No SARIF files to copy (could be zero results) + pipeline.state = PipelineState.FINISHED + pipeline.finished_at = datetime.utcnow() + return True + + try: + copied_files = [] + + for sarif_file in sarif_files: + # Parse filename to extract org/repo info + # Expected format: __.sarif + filename = sarif_file.name + parts = filename.replace(".sarif", "").split("_") + + if len(parts) >= 3: + org = parts[0] + repo = "_".join(parts[1:-1]) # Handle repo names with underscores + + # Create destination directory + dest_dir = self.results_dir / org / repo + dest_dir.mkdir(parents=True, exist_ok=True) + + # Copy file + dest_file = dest_dir / filename + import shutil + + shutil.copy2(sarif_file, dest_file) + + # Record relative path for sarif-files.txt + relative_path = f"{org}/{repo}/{filename}" + copied_files.append(relative_path) + + # Append to sarif-files.txt + if copied_files: + with open(self.sarif_list_file, "a") as f: + for relative_path in copied_files: + f.write(f"{relative_path}\n") + + pipeline.state = PipelineState.FINISHED + pipeline.finished_at = datetime.utcnow() + return True + + except Exception as e: + pipeline.state = PipelineState.FAILED + pipeline.error_message = f"Copy failed: {str(e)}" + return False + + def process_pipeline(self, pipeline: MRVAPipeline) -> bool: + """Process a single pipeline through all stages.""" + try: + # Submit + if pipeline.state == PipelineState.NOT_STARTED: + if not self.submit_session(pipeline): + return False + with self.pipelines_lock: self._save_state() - - cmd = [ - "gh", "mrva", "download", - "--session", session.session_name, - "--output-dir", str(output_dir) - ] - - if self.dry_run: - print(f" Command: {' '.join(cmd)}") - session.output_dir = output_dir - session.status = "downloaded" - session.downloaded_at = datetime.utcnow() - print(f" ✓ [DRY RUN] Would download results: {session.session_name} -> {output_dir}") - continue - - print(f" Downloading: {session.session_name}...") - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - - session.output_dir = output_dir - session.status = "downloaded" - session.downloaded_at = datetime.utcnow() - print(f" ✓ Downloaded results: {session.session_name} -> {output_dir}") - - except subprocess.CalledProcessError as e: - print(f" ✗ Failed to download {session.session_name}: {e}") - print(f" STDERR: {e.stderr}") - session.status = "completed" # Revert to completed so we can retry - session.error_message = f"Download failed: {e}" - - if not self.dry_run: + + # Monitor + if pipeline.state in [PipelineState.SUBMITTED, PipelineState.MONITORING]: + if not self.monitor_session(pipeline): + return False + with self.pipelines_lock: + self._save_state() + + # Download + if pipeline.state == PipelineState.COMPLETED: + if not self.download_session(pipeline): + return False + with self.pipelines_lock: + self._save_state() + + # Copy results + if pipeline.state == PipelineState.DOWNLOADED: + if not self.copy_results(pipeline): + return False + with self.pipelines_lock: + self._save_state() + + return pipeline.state == PipelineState.FINISHED + + except Exception as e: + pipeline.state = PipelineState.FAILED + pipeline.error_message = f"Pipeline error: {str(e)}" + with self.pipelines_lock: + self._save_state() + return False + + def restore_existing_pipelines(self, query_paths: List[str]): + """Restore pipelines from existing MRVA sessions.""" + print("\nChecking for existing MRVA sessions...") + existing_sessions = self.get_existing_sessions() + + if existing_sessions: + print( + f"Found {len(existing_sessions)} existing sessions with prefix '{self.session_prefix}'" + ) + + restored_count = 0 + + for query_path in query_paths: + session_name = self._generate_session_name(query_path) + + # Skip if already in state + if session_name in self.pipelines: + continue + + # Check if session exists in MRVA + if session_name in existing_sessions: + session_data = existing_sessions[session_name] + + # Create pipeline + pipeline = MRVAPipeline( + query_path=query_path, session_name=session_name, language=self.language + ) + + # Determine state based on session status and downloads + session_status = self.get_session_status(session_name) + downloads_exist, sarif_files = self.check_downloads_exist(session_name) + + if session_status: + status = session_status.get("status", "").lower() + runs = session_status.get("runs", []) + + pipeline.run_count = len(runs) + pipeline.succeeded_runs = len( + [r for r in runs if r.get("status") == "succeeded"] + ) + pipeline.failed_runs = len([r for r in runs if r.get("status") == "failed"]) + + if status in ["completed", "succeeded"]: + if downloads_exist: + # Check if results have been copied + # For simplicity, assume if downloaded, we need to copy + pipeline.state = PipelineState.DOWNLOADED + pipeline.submitted_at = datetime.utcnow() # Unknown actual time + pipeline.completed_at = datetime.utcnow() + pipeline.downloaded_at = datetime.utcnow() + else: + pipeline.state = PipelineState.COMPLETED + pipeline.submitted_at = datetime.utcnow() + pipeline.completed_at = datetime.utcnow() + elif status == "failed": + pipeline.state = PipelineState.FAILED + pipeline.error_message = "Session previously failed" + else: + # In progress + pipeline.state = PipelineState.SUBMITTED + pipeline.submitted_at = datetime.utcnow() + + self.pipelines[session_name] = pipeline + restored_count += 1 + + if restored_count > 0: + print(f"✓ Restored {restored_count} pipelines from existing sessions") self._save_state() - + def get_status_summary(self) -> Dict[str, int]: - """Get a summary of session statuses.""" + """Get summary of pipeline states.""" summary = {} - for session in self.sessions.values(): - summary[session.status] = summary.get(session.status, 0) + 1 + for pipeline in self.pipelines.values(): + state = pipeline.state.value + summary[state] = summary.get(state, 0) + 1 return summary - - def print_detailed_progress(self, remaining_queries: List[str]): - """Print detailed progress information about all sessions.""" + + def print_progress(self, total_queries: int): + """Print detailed progress information.""" summary = self.get_status_summary() - - # Calculate various counts - not_started = len(remaining_queries) - submitted = summary.get("submitted", 0) - in_progress = summary.get("in_progress", 0) - completed_not_downloaded = summary.get("completed", 0) - download_in_progress = summary.get("download_in_progress", 0) - downloaded = summary.get("downloaded", 0) - failed = summary.get("failed", 0) - - total_queries = len(self.sessions) + not_started - total_processed = submitted + in_progress + completed_not_downloaded + download_in_progress + downloaded + failed - + + not_started = total_queries - len(self.pipelines) + submitting = summary.get(PipelineState.SUBMITTING.value, 0) + submitted = summary.get(PipelineState.SUBMITTED.value, 0) + monitoring = summary.get(PipelineState.MONITORING.value, 0) + completed = summary.get(PipelineState.COMPLETED.value, 0) + downloading = summary.get(PipelineState.DOWNLOADING.value, 0) + downloaded = summary.get(PipelineState.DOWNLOADED.value, 0) + copying = summary.get(PipelineState.COPYING.value, 0) + finished = summary.get(PipelineState.FINISHED.value, 0) + failed = summary.get(PipelineState.FAILED.value, 0) + print("\n" + "=" * 70) print(f"PROGRESS REPORT - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") print("=" * 70) print(f"Total queries in suite: {total_queries}") - print(f" Not yet submitted: {not_started}") - print(f" Submitted (pending): {submitted}") - print(f" MRVA runs in progress: {in_progress}") - print(f" MRVA completed (not downloaded):{completed_not_downloaded}") - print(f" Download in progress: {download_in_progress}") - print(f" Downloaded (complete): {downloaded}") + print(f" Not yet started: {not_started}") + print(f" Submitting: {submitting}") + print(f" Submitted (monitoring): {submitted + monitoring}") + print(f" Completed (awaiting download): {completed}") + print(f" Downloading: {downloading}") + print(f" Downloaded (copying results): {downloaded + copying}") + print(f" Finished: {finished}") print(f" Failed/Errored: {failed}") print("-" * 70) - print(f"Total processed: {total_processed}/{total_queries}") - print(f"Completion rate: {(downloaded / total_queries * 100):.1f}%") - - # Show detailed run statistics for in-progress sessions - in_progress_sessions = [s for s in self.sessions.values() if s.status == "in_progress"] - if in_progress_sessions: - print("\nIn-Progress Sessions:") - for session in in_progress_sessions[:10]: # Show first 10 - if session.run_stats: - stats = session.run_stats - print(f" {session.session_name}:") - print(f" Runs: {stats['succeeded']}/{stats['total']} succeeded, " - f"{stats['in_progress']} in progress, {stats['failed']} failed") - if len(in_progress_sessions) > 10: - print(f" ... and {len(in_progress_sessions) - 10} more in-progress sessions") - - # Show recently completed sessions - completed_sessions = [s for s in self.sessions.values() - if s.status == "completed" and s.completed_at] - if completed_sessions: - recent = sorted(completed_sessions, key=lambda s: s.completed_at, reverse=True)[:5] - print("\nRecently Completed (awaiting download):") - for session in recent: - elapsed = (datetime.utcnow() - session.completed_at).total_seconds() / 60 - print(f" {session.session_name} (completed {elapsed:.1f} minutes ago)") - - # Show failed sessions - failed_sessions = [s for s in self.sessions.values() if s.status == "failed"] - if failed_sessions: - print("\nFailed Sessions:") - for session in failed_sessions[:5]: # Show first 5 - print(f" {session.session_name}") - if session.error_message: - print(f" Error: {session.error_message}") - if len(failed_sessions) > 5: - print(f" ... and {len(failed_sessions) - 5} more failed sessions") - + print( + f"Completion rate: {(finished / total_queries * 100) if total_queries > 0 else 0:.1f}%" + ) print("=" * 70) print() - - def run_until_complete(self, check_interval: int = 300) -> bool: - """Run the MRVA manager until all sessions are complete.""" - # Resolve queries and create sessions + + def run_until_complete(self) -> bool: + """Run all pipelines until complete using concurrent processing.""" + # Resolve queries query_paths = self.resolve_queries() - - print(f"{'[DRY RUN] ' if self.dry_run else ''}Planning to submit {len(query_paths)} MRVA sessions...") + total_queries = len(query_paths) + + print( + f"\n{'[DRY RUN] ' if self.dry_run else ''}Planning to process {total_queries} queries..." + ) print(f"Session prefix: {self.session_prefix}") - print(f"Max concurrent sessions: {self.max_concurrent}") - print(f"Check interval: {check_interval} seconds") + print(f"Max concurrent pipelines: {self.max_concurrent}") + print(f"Monitor interval: {self.check_interval} seconds") print() - - if self.dry_run: - print("DRY RUN MODE: Commands will be printed but not executed") - print() - - # Submit initial batch of sessions - print(f"Submitting initial batch of up to {self.max_concurrent} sessions...") - submitted_count = 0 + + # Restore existing pipelines + self.restore_existing_pipelines(query_paths) + + # Create pipelines for new queries for query_path in query_paths: - if not self._can_submit_new_session(): - break - - if self.submit_session(query_path): - submitted_count += 1 - if not self.dry_run: - time.sleep(1) # Brief pause between submissions - - print(f"\nInitial submission complete: {submitted_count} sessions submitted") - print(f"Remaining to submit when capacity available: {len(query_paths) - submitted_count}") - print() - - if self.dry_run: - print("\nDRY RUN: Stopping here. In real execution, the script would continue monitoring sessions.") + session_name = self._generate_session_name(query_path) + if session_name not in self.pipelines: + pipeline = MRVAPipeline( + query_path=query_path, session_name=session_name, language=self.language + ) + self.pipelines[session_name] = pipeline + + # Get pipelines that need processing + pipelines_to_process = [ + p for p in self.pipelines.values() if p.state != PipelineState.FINISHED + ] + + if not pipelines_to_process: + print("\n✓ All pipelines already finished!") return True - - # Main monitoring loop - remaining_queries = [q for q in query_paths - if self._generate_session_name(q) not in self.sessions] - - iteration = 0 - while True: - iteration += 1 - - # Print detailed progress report - self.print_detailed_progress(remaining_queries) - - # Update status of active sessions - self.update_session_statuses() - - # Download completed sessions - self.download_completed_sessions() - - # Submit new sessions if we have capacity - while remaining_queries and self._can_submit_new_session(): - query_path = remaining_queries.pop(0) - if self.submit_session(query_path): - time.sleep(1) - - # Check if we're done - active_count = self._get_active_sessions_count() - completed_not_downloaded = len([s for s in self.sessions.values() - if s.status == "completed"]) - download_in_progress = len([s for s in self.sessions.values() - if s.status == "download_in_progress"]) - - if not remaining_queries and active_count == 0 and completed_not_downloaded == 0 and download_in_progress == 0: - print("\n✓ All sessions completed and downloaded!") - break - - # Wait before next check - print(f"Iteration {iteration} complete. Waiting {check_interval} seconds before next check...") - time.sleep(check_interval) - - # Final summary - final_summary = self.get_status_summary() - print(f"\nFinal summary: {final_summary}") - - return final_summary.get("completed", 0) > 0 + + print(f"\nProcessing {len(pipelines_to_process)} pipelines...") + + if self.dry_run: + print("\nDRY RUN: Simulating all pipeline operations (no actual API calls)...") + + # Process pipelines concurrently + completed = 0 + failed = 0 + + with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: + # Submit all pipelines + future_to_pipeline = { + executor.submit(self.process_pipeline, pipeline): pipeline + for pipeline in pipelines_to_process + } + + # Process completions + for future in as_completed(future_to_pipeline): + pipeline = future_to_pipeline[future] + try: + success = future.result() + if success: + completed += 1 + if not self.dry_run: + print( + f"✓ Finished: {pipeline.session_name} ({completed + failed}/{len(pipelines_to_process)})" + ) + else: + failed += 1 + error_msg = pipeline.error_message or "Unknown error" + print( + f"✗ Failed: {pipeline.session_name} - {error_msg} ({completed + failed}/{len(pipelines_to_process)})" + ) + except Exception as e: + failed += 1 + print( + f"✗ Exception in {pipeline.session_name}: {e} ({completed + failed}/{len(pipelines_to_process)})" + ) + + # Print progress every 10 completions (or every 50 in dry-run) + progress_interval = 50 if self.dry_run else 10 + if (completed + failed) % progress_interval == 0: + self.print_progress(total_queries) + + # Final progress report + self.print_progress(total_queries) + + print(f"\n{'=' * 70}") + print(f"Processing complete!") + print(f" Finished: {completed}") + print(f" Failed: {failed}") + print(f"{'=' * 70}") + + if not self.dry_run: + print(f"\nResults directory: {self.results_dir}") + print(f"SARIF file list: {self.sarif_list_file}") + print(f"State file: {self.state_file}") + + return failed == 0 def main(): parser = argparse.ArgumentParser( - description="Run MRVA sessions for all queries in a CodeQL query suite", + description="Run MRVA sessions for all queries in a CodeQL query suite (Pipeline-based v2)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -550,94 +763,106 @@ def main(): --language cpp \\ --repository-list cpp_top_1000 - # With custom concurrency + # With local query suite file python run-gh-mrva-for-query-suite.py \\ - --query-suite ../../cpp/misra/src/codeql-suites/misra-c-default.qls \\ + --query-suite ../../cpp/misra/src/codeql-suites/misra-cpp-default.qls \\ --output-base-dir ./mrva \\ - --session-prefix t1-misra-c-default \\ - --language c \\ + --session-prefix t1-misra-cpp-default \\ + --language cpp \\ --repository-list cpp_top_1000 \\ --max-concurrent 10 - # Dry run to validate commands + # Dry run python run-gh-mrva-for-query-suite.py \\ - --query-suite ../../cpp/misra/src/codeql-suites/misra-cpp-default.qls \\ + --query-suite ../../cpp/autosar/src/codeql-suites/autosar-default.qls \\ --output-base-dir ./mrva \\ - --session-prefix t1-misra-cpp-default \\ + --session-prefix t1-autosar-cpp-default \\ --language cpp \\ --repository-list cpp_top_1000 \\ --dry-run - """ + """, ) - + parser.add_argument( - "--query-suite", required=True, - help="CodeQL query suite - either a pack reference (e.g., codeql/misra-cpp-coding-standards@2.50.0) or path to .qls file (e.g., ../../cpp/misra/src/codeql-suites/misra-cpp-default.qls)" + "--query-suite", + required=True, + help="CodeQL query suite - either a pack reference or path to .qls file", ) parser.add_argument( - "--output-base-dir", required=True, type=Path, - help="Base directory for output files and session state" + "--output-base-dir", + required=True, + type=Path, + help="Base directory for output files and session state", ) parser.add_argument( - "--session-prefix", required=True, - help="Prefix for MRVA session names (e.g., t1-misra-cpp-default)" + "--session-prefix", + required=True, + help="Prefix for MRVA session names (e.g., t1-misra-cpp-default)", ) parser.add_argument( - "--language", required=True, choices=["cpp", "c", "java", "javascript", "python", "go", "csharp"], - help="Programming language for analysis" + "--language", + required=True, + choices=["cpp", "c", "java", "javascript", "python", "go", "csharp"], + help="Programming language for analysis", ) parser.add_argument( - "--repository-list", required=True, - help="Repository list for MRVA (e.g., cpp_top_1000)" + "--repository-list", required=True, help="Repository list for MRVA (e.g., cpp_top_1000)" ) parser.add_argument( - "--max-concurrent", type=int, default=20, - help="Maximum number of concurrent MRVA sessions (default: 20)" + "--max-concurrent", + type=int, + default=20, + help="Maximum number of concurrent pipeline workers (default: 20)", ) parser.add_argument( - "--check-interval", type=int, default=300, - help="Interval in seconds between status checks (default: 300)" + "--check-interval", + type=int, + default=300, + help="Interval in seconds between status checks (default: 300)", ) parser.add_argument( - "--dry-run", action="store_true", - help="Print commands that would be executed without actually running them" + "--dry-run", + action="store_true", + help="Print commands that would be executed without actually running them", ) - + args = parser.parse_args() - - print("MRVA Query Suite Runner") - print("=" * 50) + + print("MRVA Query Suite Runner (Pipeline-based v2)") + print("=" * 70) + print(f"Run started: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") print(f"Query suite: {args.query_suite}") print(f"Output directory: {args.output_base_dir}") print(f"Session prefix: {args.session_prefix}") print(f"Language: {args.language}") print(f"Repository list: {args.repository_list}") print(f"Max concurrent: {args.max_concurrent}") + print(f"Monitor interval: {args.check_interval} seconds") if args.dry_run: - print("DRY RUN MODE: Commands will be printed but not executed") + print("DRY RUN MODE: No submissions or downloads will occur") print() - + # Validate dependencies if not args.dry_run: try: subprocess.run(["gh", "mrva", "--help"], capture_output=True, check=True) print("✓ GitHub CLI with mrva extension is available") except (subprocess.CalledProcessError, FileNotFoundError): - print("✗ GitHub CLI with mrva extension is required") + print("✗ Error: GitHub CLI with mrva extension not found") sys.exit(1) - + try: subprocess.run(["codeql", "--version"], capture_output=True, check=True) print("✓ CodeQL CLI is available") except (subprocess.CalledProcessError, FileNotFoundError): - print("✗ CodeQL CLI is required") + print("✗ Error: CodeQL CLI not found") sys.exit(1) else: print("✓ [DRY RUN] Skipping dependency validation") - + print() - - # Create and run MRVA manager + + # Create and run manager manager = MRVAManager( query_suite=args.query_suite, output_base_dir=args.output_base_dir, @@ -645,21 +870,14 @@ def main(): language=args.language, repository_list=args.repository_list, max_concurrent=args.max_concurrent, - dry_run=args.dry_run + check_interval=args.check_interval, + dry_run=args.dry_run, ) - + try: - success = manager.run_until_complete(check_interval=args.check_interval) - - if success: - print(f"\n✓ MRVA sessions completed successfully") - print(f"Results available in: {args.output_base_dir}") - print(f"State file: {manager.state_file}") - sys.exit(0) - else: - print(f"\n✗ No sessions completed successfully") - sys.exit(1) - + success = manager.run_until_complete() + sys.exit(0 if success else 1) + except KeyboardInterrupt: print(f"\n\nInterrupted by user") print(f"Session state saved to: {manager.state_file}") diff --git a/scripts/es-sarif/sarif-infer-versionControlProvenance.md b/scripts/es-sarif/sarif-infer-versionControlProvenance.md new file mode 100644 index 000000000..95fdb8828 --- /dev/null +++ b/scripts/es-sarif/sarif-infer-versionControlProvenance.md @@ -0,0 +1,43 @@ +# sarif-infer-versionControlProvenance.py + +Pre-indexing script that infers `versionControlProvenance.repositoryUri` from SARIF filenames. + +## Usage + +```bash +python sarif-infer-versionControlProvenance.py +``` + +## Purpose + +Ensures SARIF files have `versionControlProvenance.repositoryUri` set before indexing into Elasticsearch. This is required for the indexing script to properly enrich results with repository information. + +## File Naming Convention + +Files must follow the pattern: `-__.sarif` + +Examples: + +- `c-cert_nasa_fprime.sarif` → `https://github.com/nasa/fprime` +- `cpp-misra_bitcoin_bitcoin.sarif` → `https://github.com/bitcoin/bitcoin` +- `c-misra_zeromq_libzmq.sarif` → `https://github.com/zeromq/libzmq` + +Files with exactly 2 underscores are processed; others are skipped with an error. + +## Behavior + +- **Modifies files in-place** - consider backing up first +- Skips files that already have `repositoryUri` set +- Only infers `repositoryUri` (not `revisionId` or `branch`) +- Adds to first run in each SARIF file + +## Example + +```bash +# Process all files in the list +python sarif-infer-versionControlProvenance.py mrva/sarif-files_results-1.txt + +# Output: +# [OK] c-cert_nasa_fprime.sarif: Added repositoryUri: https://github.com/nasa/fprime +# [SKIP] cpp-cert_nasa_fprime.sarif: Already has repositoryUri (skipped) +``` diff --git a/scripts/es-sarif/sarif-infer-versionControlProvenance.py b/scripts/es-sarif/sarif-infer-versionControlProvenance.py new file mode 100755 index 000000000..3b28121fb --- /dev/null +++ b/scripts/es-sarif/sarif-infer-versionControlProvenance.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +SARIF Version Control Provenance Inference Script + +This script processes SARIF files and infers the versionControlProvenance.repositoryUri +field for each run based on the file naming convention: -__.sarif + +The script reads a list of SARIF file paths from a text file (similar to the indexing script) +and modifies each SARIF file in-place to add the inferred versionControlProvenance information. + +Usage: + python sarif-infer-versionControlProvenance.py + +File Naming Convention: + Files should follow the pattern: -__.sarif + Examples: + - c-cert_nasa_fprime.sarif -> https://github.com/nasa/fprime + - cpp-misra_bitcoin_bitcoin.sarif -> https://github.com/bitcoin/bitcoin + - c-misra_curl_curl.sarif -> https://github.com/curl/curl + +Notes: + - This is a pre-indexing step to ensure versionControlProvenance is present + - Only repositoryUri is inferred; revisionId and branch cannot be derived from filename + - Files are modified in-place; consider backing up before running + - Files already containing versionControlProvenance are skipped unless they lack repositoryUri +""" + +import sys +import os +import json +import re +from pathlib import Path + + +def parse_filename_for_repo_info(filename): + """ + Parse a SARIF filename to extract organization and repository information. + + Expected filename pattern: -__.sarif + Files conforming to our naming scheme have exactly 2 underscores. + + Examples: + c-cert_nasa_fprime.sarif -> ('nasa', 'fprime') + cpp-misra_bitcoin_bitcoin.sarif -> ('bitcoin', 'bitcoin') + c-misra_curl_curl.sarif -> ('curl', 'curl') + + Args: + filename: The SARIF filename (without path) + + Returns: + Tuple of (org, repo) if successful, None otherwise + """ + # Remove .sarif extension + name_without_ext = filename + if name_without_ext.endswith('.sarif'): + name_without_ext = name_without_ext[:-6] + + # Count underscores - should be exactly 2 for conforming files + underscore_count = name_without_ext.count('_') + if underscore_count != 2: + return None + + # Split by underscore: __ + # The value between underscores is org, value after last underscore is repo + first_underscore = name_without_ext.index('_') + last_underscore = name_without_ext.rindex('_') + + org = name_without_ext[first_underscore + 1:last_underscore] + repo = name_without_ext[last_underscore + 1:] + + if org and repo: + return (org, repo) + + return None + + +def infer_repository_uri(org, repo): + """ + Construct a GitHub repository URI from organization and repository names. + + Args: + org: GitHub organization name + repo: GitHub repository name + + Returns: + Full GitHub repository URI + """ + return f"https://github.com/{org}/{repo}" + + +def process_sarif_file(sarif_path): + """ + Process a single SARIF file to add inferred versionControlProvenance. + + Args: + sarif_path: Path to the SARIF file + + Returns: + Tuple of (success: bool, message: str) + """ + try: + # Read the SARIF file + with open(sarif_path, 'r', encoding='utf-8') as f: + sarif_data = json.load(f) + + # Extract filename for parsing + filename = Path(sarif_path).name + + # Parse filename to get org and repo + repo_info = parse_filename_for_repo_info(filename) + if not repo_info: + return (False, f"Could not parse repository info from filename: {filename}") + + org, repo = repo_info + repository_uri = infer_repository_uri(org, repo) + + # Track whether we modified anything + modified = False + + # Process each run in the SARIF file + if 'runs' not in sarif_data or not isinstance(sarif_data['runs'], list): + return (False, "SARIF file does not contain valid 'runs' array") + + for run_index, run in enumerate(sarif_data['runs']): + # Check if versionControlProvenance already exists + if 'versionControlProvenance' not in run: + # Create new versionControlProvenance array + run['versionControlProvenance'] = [ + { + "repositoryUri": repository_uri + } + ] + modified = True + else: + # Check if it's an array + if not isinstance(run['versionControlProvenance'], list): + return (False, f"Run {run_index} has invalid versionControlProvenance (not an array)") + + # Check if any entry already has repositoryUri set + has_repository_uri = False + for vcp in run['versionControlProvenance']: + if 'repositoryUri' in vcp and vcp['repositoryUri']: + has_repository_uri = True + break + + # If no repositoryUri found, add one + if not has_repository_uri: + if len(run['versionControlProvenance']) == 0: + # Empty array, add new entry + run['versionControlProvenance'].append({ + "repositoryUri": repository_uri + }) + else: + # Array has entries but no repositoryUri, add to first entry + run['versionControlProvenance'][0]['repositoryUri'] = repository_uri + modified = True + + # Write back to file if modified + if modified: + with open(sarif_path, 'w', encoding='utf-8') as f: + json.dump(sarif_data, f, indent=2) + return (True, f"Added repositoryUri: {repository_uri}") + else: + return (True, f"Already has repositoryUri (skipped)") + + except json.JSONDecodeError as e: + return (False, f"JSON decode error: {e}") + except Exception as e: + return (False, f"Error processing file: {e}") + + +def read_sarif_files_list(list_file_path): + """ + Read the list of SARIF files from the specified text file. + + Args: + list_file_path: Path to the text file containing SARIF file paths + + Returns: + List of absolute paths to SARIF files + """ + list_file = Path(list_file_path) + if not list_file.exists(): + print(f"Error: File list '{list_file_path}' does not exist.") + return [] + + base_dir = list_file.parent + sarif_files = [] + + with open(list_file, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + + # Resolve relative paths + if not os.path.isabs(line): + sarif_path = base_dir / line + else: + sarif_path = Path(line) + + # Check if file exists + if not sarif_path.exists(): + print(f"Warning: File not found (line {line_num}): {line}") + continue + + # Check if it's a .sarif file + if not sarif_path.suffix == '.sarif': + print(f"Warning: Not a .sarif file (line {line_num}): {line}") + continue + + sarif_files.append(sarif_path) + + return sarif_files + + +def main(): + if len(sys.argv) != 2: + print("SARIF Version Control Provenance Inference Script") + print("=" * 50) + print() + print("Usage:") + print(f" python {sys.argv[0]} ") + print() + print("Description:") + print(" Infers and adds versionControlProvenance.repositoryUri to SARIF files") + print(" based on the filename pattern: -__.sarif") + print() + print("Example:") + print(f" python {sys.argv[0]} sarif-files_results-1.txt") + print() + print("Note:") + print(" Files are modified in-place. Consider backing up before running.") + sys.exit(1) + + sarif_files_list = sys.argv[1] + + print("SARIF Version Control Provenance Inference Script") + print("=" * 50) + print(f"SARIF files list: {sarif_files_list}") + print() + + # Read and validate SARIF files list + sarif_files = read_sarif_files_list(sarif_files_list) + if not sarif_files: + print("Error: No valid SARIF files found in the list.") + sys.exit(1) + + print(f"Found {len(sarif_files)} SARIF files to process") + print() + + # Process each file + success_count = 0 + skip_count = 0 + error_count = 0 + + for sarif_file in sarif_files: + filename = sarif_file.name + success, message = process_sarif_file(sarif_file) + + if success: + if "skipped" in message.lower(): + skip_count += 1 + print(f" [SKIP] {filename}: {message}") + else: + success_count += 1 + print(f" [OK] {filename}: {message}") + else: + error_count += 1 + print(f" [ERROR] {filename}: {message}") + + # Summary + print() + print("=" * 50) + print("Processing Summary:") + print(f" Total files: {len(sarif_files)}") + print(f" Modified: {success_count}") + print(f" Skipped: {skip_count}") + print(f" Errors: {error_count}") + print() + + if error_count > 0: + print(f"⚠️ {error_count} file(s) had errors. Please review the output above.") + sys.exit(1) + else: + print(f"✓ Successfully processed all files!") + sys.exit(0) + + +if __name__ == "__main__": + main() From c182f91d9744add16f6e9c6e42254b9ce0293a0d Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 08:23:22 -0600 Subject: [PATCH 3/8] Update index-sarif-results-in-elasticsearch.py --- .../index-sarif-results-in-elasticsearch.py | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index c6ca7eece..157991c49 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -20,6 +20,8 @@ Environment Variables: ES_LOCAL_URL - Elasticsearch host URL (default: http://localhost:9200) ES_LOCAL_API_KEY - API key for authentication (optional, enables API key auth) + ES_LOCAL_USERNAME - Username for basic authentication (optional) + ES_LOCAL_PASSWORD - Password for basic authentication (optional) Requirements: - Python 3.11+ @@ -197,8 +199,8 @@ def replace_var(match): } -def create_elasticsearch_client(host, api_key=None): - """Create Elasticsearch client with optional API key authentication.""" +def create_elasticsearch_client(host, api_key=None, username=None, password=None): + """Create Elasticsearch client with optional API key or basic authentication.""" if api_key and api_key.strip(): return Elasticsearch( hosts=[host], @@ -206,6 +208,13 @@ def create_elasticsearch_client(host, api_key=None): verify_certs=False, # For local development ssl_show_warn=False, ) + elif username and password: + return Elasticsearch( + hosts=[host], + basic_auth=(username, password), + verify_certs=False, # For local development + ssl_show_warn=False, + ) else: return Elasticsearch(hosts=[host]) @@ -411,11 +420,11 @@ def sarif_results_generator(sarif_files, index_name): ) -def index_sarif_files(sarif_files, index_name, host, api_key=None): +def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None): """ Connect to Elasticsearch and bulk index all SARIF results. """ - es_client = create_elasticsearch_client(host, api_key) + es_client = create_elasticsearch_client(host, api_key, username, password) # Validate connection if not validate_elasticsearch_connection(es_client, host): @@ -477,6 +486,8 @@ def main(): print("Environment Variables:") print(" ES_LOCAL_URL - Elasticsearch host URL (default: http://localhost:9200)") print(" ES_LOCAL_API_KEY - API key for authentication (optional)") + print(" ES_LOCAL_USERNAME - Username for basic authentication (optional)") + print(" ES_LOCAL_PASSWORD - Password for basic authentication (optional)") print() print("Example:") print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") @@ -496,6 +507,8 @@ def main(): # Get configuration from environment variables elastic_host = os.getenv("ES_LOCAL_URL", DEFAULT_ELASTIC_HOST) elastic_api_key = os.getenv("ES_LOCAL_API_KEY") + elastic_username = os.getenv("ES_LOCAL_USERNAME") + elastic_password = os.getenv("ES_LOCAL_PASSWORD") # Handle variable substitution in ES_LOCAL_URL if needed if elastic_host and "${ES_LOCAL_PORT}" in elastic_host: @@ -505,13 +518,26 @@ def main(): # Treat empty string or literal "None" as None for API key if elastic_api_key == "" or elastic_api_key == "None": elastic_api_key = None - + + # Treat empty strings as None for username/password + if elastic_username == "" or elastic_username == "None": + elastic_username = None + if elastic_password == "" or elastic_password == "None": + elastic_password = None + + # Determine authentication method + auth_method = "None" + if elastic_api_key: + auth_method = "API Key" + elif elastic_username and elastic_password: + auth_method = "Basic Auth (Username/Password)" + print(f"SARIF Files Elasticsearch Indexer") print(f"==================================") print(f"SARIF files list: {sarif_files_list}") print(f"Elasticsearch index: {index_name}") print(f"Elasticsearch host: {elastic_host}") - print(f"Authentication: {'API Key' if elastic_api_key else 'None (HTTP Basic)'}") + print(f"Authentication: {auth_method}") print() # Read and validate SARIF files list @@ -521,7 +547,7 @@ def main(): sys.exit(1) # Index the files - if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key): + if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key, elastic_username, elastic_password): print(f"\n✓ Successfully created and populated index '{index_name}'") print(f"You can now query the index using Elasticsearch APIs or Kibana.") sys.exit(0) From 42e6b4a358ab316f247451c4bb2d09c5915ed6b5 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 08:52:30 -0600 Subject: [PATCH 4/8] ES indexing should only use SARIF results --- .../index-sarif-results-in-elasticsearch.py | 66 ++++++------------- 1 file changed, 21 insertions(+), 45 deletions(-) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index 157991c49..d7a0aa5fa 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -64,7 +64,8 @@ def replace_var(match): DEFAULT_ELASTIC_HOST = "http://localhost:9200" SARIF_VERSION = "2.1.0" -# Elasticsearch mapping optimized for SARIF result documents + # Elasticsearch mapping optimized for SARIF result documents +# Minimal mapping - only results with versionControlProvenance enrichment SARIF_MAPPING = { "mappings": { "properties": { @@ -149,37 +150,15 @@ def replace_var(match): "occurrenceCount": {"type": "integer"}, "rank": {"type": "float"}, "baselineState": {"type": "keyword"}, - # Run-level metadata (tool, repo info, etc.) - "run": { + # ONLY versionControlProvenance from run-level (minimal enrichment) + "versionControlProvenance": { + "type": "nested", "properties": { - "tool": { - "properties": { - "driver": { - "properties": { - "name": {"type": "keyword"}, - "organization": {"type": "keyword"}, - "product": {"type": "keyword"}, - "version": {"type": "keyword"}, - "semanticVersion": {"type": "keyword"}, - } - } - } - }, - "automationDetails": { - "properties": { - "id": {"type": "keyword"}, - "guid": {"type": "keyword"}, - "correlationGuid": {"type": "keyword"}, - } - }, - "versionControlProvenance": { - "type": "nested", - "properties": { - "repositoryUri": {"type": "keyword"}, - "revisionId": {"type": "keyword"}, - }, - }, - } + "repositoryUri": {"type": "keyword"}, + "revisionId": {"type": "keyword"}, + "branch": {"type": "keyword"}, + "revisionTag": {"type": "keyword"}, + }, }, # Metadata for tracking source SARIF file "_sarif_source": { @@ -197,8 +176,6 @@ def replace_var(match): "analysis": {"analyzer": {"sarif_text": {"type": "standard", "stopwords": "_none_"}}}, }, } - - def create_elasticsearch_client(host, api_key=None, username=None, password=None): """Create Elasticsearch client with optional API key or basic authentication.""" if api_key and api_key.strip(): @@ -327,10 +304,12 @@ def sarif_results_generator(sarif_files, index_name): 2. Extracts each result from runs[].results[] 3. Creates a separate Elasticsearch document per result 4. Adds derived fields (ruleGroup, ruleLanguage) from ruleId parsing - 5. Includes run-level metadata and source file tracking + 5. ONLY enriches with versionControlProvenance from run (minimal overhead) + 6. Adds source file tracking metadata - This approach allows for granular querying of individual code scanning findings - rather than treating entire SARIF files as single documents. + This approach keeps document sizes minimal by ONLY indexing the result objects + themselves plus minimal enrichment data, avoiding the overhead of tool info, + automation details, and other run-level data. """ from datetime import datetime @@ -366,15 +345,11 @@ def sarif_results_generator(sarif_files, index_name): file_results_count += len(results) - # Extract run-level metadata - run_metadata = { - "tool": run.get("tool", {}), - "automationDetails": run.get("automationDetails", {}), - "versionControlProvenance": run.get("versionControlProvenance", []), - } + # Extract ONLY versionControlProvenance from run (minimal enrichment) + version_control_provenance = run.get("versionControlProvenance", []) for result_index, result in enumerate(results): - # Create a document that includes both the result and metadata + # Create a document that includes ONLY the result fields document = dict(result) # Copy all result fields # Add derived fields from ruleId parsing @@ -386,8 +361,9 @@ def sarif_results_generator(sarif_files, index_name): if rule_language: document["ruleLanguage"] = rule_language - # Add run-level metadata - document["run"] = run_metadata + # Add ONLY versionControlProvenance (not tool, automationDetails, etc.) + if version_control_provenance: + document["versionControlProvenance"] = version_control_provenance # Add source file metadata document["_sarif_source"] = { From 1018933b7a6a1401fdcfabce955e9ea8f46c2413 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 08:57:36 -0600 Subject: [PATCH 5/8] Improve logging in scripts/es-sarif/index-sarif-* --- .../index-sarif-results-in-elasticsearch.py | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index d7a0aa5fa..94d39d76f 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -11,9 +11,12 @@ results into a single Elasticsearch index. Each result document includes: - All original SARIF result fields (ruleId, message, locations, etc.) - Derived fields (ruleGroup, ruleLanguage) parsed from ruleId -- Run-level metadata (tool info, version control provenance) +- ONLY versionControlProvenance from run (minimal enrichment) - Source file tracking metadata +This approach keeps documents minimal by indexing ONLY the result objects to avoid +Elasticsearch size limits. Tool info and automation details are NOT included. + Usage: python index-sarif-results-in-elasticsearch.py @@ -398,7 +401,7 @@ def sarif_results_generator(sarif_files, index_name): def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None): """ - Connect to Elasticsearch and bulk index all SARIF results. + Connect to Elasticsearch and bulk index all SARIF results with progress logging. """ es_client = create_elasticsearch_client(host, api_key, username, password) @@ -415,37 +418,58 @@ def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None return False print(f"Indexing results from {len(sarif_files)} SARIF files...") + print() try: - # Use bulk helper to index all documents - success_count, failed_docs = helpers.bulk( + # Track progress during bulk indexing + documents_indexed = 0 + last_progress_update = 0 + progress_interval = 100 # Update every 100 documents + + def progress_callback(success, info): + """Callback to track progress during bulk indexing.""" + nonlocal documents_indexed, last_progress_update + documents_indexed += 1 + + # Print progress updates periodically + if documents_indexed - last_progress_update >= progress_interval: + print(f" → Indexed {documents_indexed} documents so far...") + last_progress_update = documents_indexed + + if not success: + print(f" ✗ Failed to index document: {info}") + + # Use bulk helper to index all documents with progress tracking + print("Starting bulk indexing...") + for success, info in helpers.streaming_bulk( es_client, sarif_results_generator(sarif_files, index_name), chunk_size=500, request_timeout=60, - ) + raise_on_error=False, + ): + progress_callback(success, info) + print(f" → Indexed {documents_indexed} documents (final)") + print() print("-" * 50) print(f"✓ Bulk indexing complete") - print(f"✓ Successfully indexed: {success_count} documents") - print(f"✗ Failed to index: {len(failed_docs)} documents") - - if failed_docs: - print("\nFailed documents:") - for doc in failed_docs[:5]: # Show first 5 failures - print(f" - {doc}") - if len(failed_docs) > 5: - print(f" ... and {len(failed_docs) - 5} more") + print(f"✓ Total documents indexed: {documents_indexed}") - # Get final index stats + # Get final index stats to verify stats = es_client.indices.stats(index=index_name) doc_count = stats["indices"][index_name]["total"]["docs"]["count"] print(f"✓ Final document count in index: {doc_count}") + + if doc_count != documents_indexed: + print(f"⚠ Warning: Document count mismatch (indexed: {documents_indexed}, in index: {doc_count})") return True except Exception as e: print(f"Error during bulk indexing: {e}") + import traceback + traceback.print_exc() return False From ec7b5f68ba1b9999fb83087d146642d13769a110 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 09:43:53 -0600 Subject: [PATCH 6/8] Add delay to es bulk indexing --- .../index-sarif-results-in-elasticsearch.py | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index 94d39d76f..c8e09887e 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -25,6 +25,7 @@ ES_LOCAL_API_KEY - API key for authentication (optional, enables API key auth) ES_LOCAL_USERNAME - Username for basic authentication (optional) ES_LOCAL_PASSWORD - Password for basic authentication (optional) + ES_BULK_DELAY - Delay in seconds between bulk indexing chunks (default: 1) Requirements: - Python 3.11+ @@ -35,6 +36,7 @@ import sys import json import os +import time from pathlib import Path from elasticsearch import Elasticsearch, helpers from elasticsearch.exceptions import ConnectionError, RequestError @@ -399,9 +401,18 @@ def sarif_results_generator(sarif_files, index_name): ) -def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None): +def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None, bulk_delay=1): """ Connect to Elasticsearch and bulk index all SARIF results with progress logging. + + Args: + sarif_files: List of SARIF file paths to index + index_name: Name of the Elasticsearch index to create + host: Elasticsearch host URL + api_key: Optional API key for authentication + username: Optional username for basic auth + password: Optional password for basic auth + bulk_delay: Delay in seconds between bulk indexing chunks (default: 1) """ es_client = create_elasticsearch_client(host, api_key, username, password) @@ -418,6 +429,8 @@ def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None return False print(f"Indexing results from {len(sarif_files)} SARIF files...") + if bulk_delay > 0: + print(f"Bulk delay: {bulk_delay} second(s) between chunks") print() try: @@ -425,10 +438,11 @@ def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None documents_indexed = 0 last_progress_update = 0 progress_interval = 100 # Update every 100 documents + chunks_processed = 0 def progress_callback(success, info): """Callback to track progress during bulk indexing.""" - nonlocal documents_indexed, last_progress_update + nonlocal documents_indexed, last_progress_update, chunks_processed documents_indexed += 1 # Print progress updates periodically @@ -449,12 +463,22 @@ def progress_callback(success, info): raise_on_error=False, ): progress_callback(success, info) + + # Check if we just completed a chunk and should sleep + # streaming_bulk yields one result per document, so we track chunks + if documents_indexed > 0 and documents_indexed % 500 == 0: + chunks_processed += 1 + if bulk_delay > 0: + print(f" → Sleeping {bulk_delay}s after chunk {chunks_processed}...") + time.sleep(bulk_delay) print(f" → Indexed {documents_indexed} documents (final)") print() print("-" * 50) print(f"✓ Bulk indexing complete") print(f"✓ Total documents indexed: {documents_indexed}") + if chunks_processed > 0: + print(f"✓ Total chunks processed: {chunks_processed}") # Get final index stats to verify stats = es_client.indices.stats(index=index_name) @@ -488,11 +512,13 @@ def main(): print(" ES_LOCAL_API_KEY - API key for authentication (optional)") print(" ES_LOCAL_USERNAME - Username for basic authentication (optional)") print(" ES_LOCAL_PASSWORD - Password for basic authentication (optional)") + print(" ES_BULK_DELAY - Delay in seconds between bulk chunks (default: 1)") print() print("Example:") print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") print(" ES_LOCAL_URL=https://my-cluster.elastic.co:9243 \\") print(" ES_LOCAL_API_KEY=your_api_key \\") + print(" ES_BULK_DELAY=1 \\") print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024") sys.exit(1) @@ -509,6 +535,7 @@ def main(): elastic_api_key = os.getenv("ES_LOCAL_API_KEY") elastic_username = os.getenv("ES_LOCAL_USERNAME") elastic_password = os.getenv("ES_LOCAL_PASSWORD") + bulk_delay = float(os.getenv("ES_BULK_DELAY", "1")) # Handle variable substitution in ES_LOCAL_URL if needed if elastic_host and "${ES_LOCAL_PORT}" in elastic_host: @@ -538,6 +565,8 @@ def main(): print(f"Elasticsearch index: {index_name}") print(f"Elasticsearch host: {elastic_host}") print(f"Authentication: {auth_method}") + if bulk_delay > 0: + print(f"Bulk delay: {bulk_delay} second(s) between chunks") print() # Read and validate SARIF files list @@ -547,7 +576,7 @@ def main(): sys.exit(1) # Index the files - if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key, elastic_username, elastic_password): + if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key, elastic_username, elastic_password, bulk_delay): print(f"\n✓ Successfully created and populated index '{index_name}'") print(f"You can now query the index using Elasticsearch APIs or Kibana.") sys.exit(0) From bb72b34f843aa725200d0ed291e0f92a101236e0 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 11:19:54 -0600 Subject: [PATCH 7/8] Update index-sarif-results-in-elasticsearch.py --- .../index-sarif-results-in-elasticsearch.py | 72 ++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index c8e09887e..caf13921f 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -11,9 +11,14 @@ results into a single Elasticsearch index. Each result document includes: - All original SARIF result fields (ruleId, message, locations, etc.) - Derived fields (ruleGroup, ruleLanguage) parsed from ruleId -- ONLY versionControlProvenance from run (minimal enrichment) +- versionControlProvenance from run, OR derived from filename pattern - Source file tracking metadata +Repository URI Derivation from Filename: +If versionControlProvenance is missing or lacks repositoryUri, it will be derived +from SARIF filenames matching: [-_]_[_].sarif +Example: "cpp-misra_nasa_fprime_18795.sarif" -> "https://github.com/nasa/fprime" + This approach keeps documents minimal by indexing ONLY the result objects to avoid Elasticsearch size limits. Tool info and automation details are NOT included. @@ -69,10 +74,11 @@ def replace_var(match): DEFAULT_ELASTIC_HOST = "http://localhost:9200" SARIF_VERSION = "2.1.0" - # Elasticsearch mapping optimized for SARIF result documents +# Elasticsearch mapping optimized for SARIF result documents # Minimal mapping - only results with versionControlProvenance enrichment SARIF_MAPPING = { "mappings": { + "dynamic": True, # Allow dynamic field mapping for any unmapped fields "properties": { # Core SARIF result fields "ruleId": {"type": "keyword"}, @@ -300,6 +306,45 @@ def parse_rule_id(rule_id): return rule_group, rule_language +def parse_repository_uri_from_filename(filename): + """ + Parse repository URI from SARIF filename following the pattern: + [-_]_[_].sarif + + Examples: + - "nasa_fprime_18795.sarif" -> "https://github.com/nasa/fprime" + - "cpp-misra_nasa_fprime_18795.sarif" -> "https://github.com/nasa/fprime" + - "tmux_tmux.sarif" -> "https://github.com/tmux/tmux" + + Returns: + str or None: The repository URI if parsing succeeds, None otherwise + """ + # Remove .sarif extension + name = filename.replace('.sarif', '') + + # Split by underscore + parts = name.split('_') + + # Need at least org_repo (2 parts) + if len(parts) < 2: + return None + + # Check if first part contains a hyphen (lang-framework pattern) + if '-' in parts[0]: + # Pattern: lang-framework_org_repo[_id] + # Skip the lang-framework prefix + if len(parts) < 3: + return None + org = parts[1] + repo = parts[2] + else: + # Pattern: org_repo[_id] + org = parts[0] + repo = parts[1] + + return f"https://github.com/{org}/{repo}" + + def sarif_results_generator(sarif_files, index_name): """ Generator that yields Elasticsearch bulk actions for individual SARIF results. @@ -309,9 +354,14 @@ def sarif_results_generator(sarif_files, index_name): 2. Extracts each result from runs[].results[] 3. Creates a separate Elasticsearch document per result 4. Adds derived fields (ruleGroup, ruleLanguage) from ruleId parsing - 5. ONLY enriches with versionControlProvenance from run (minimal overhead) + 5. Enriches with versionControlProvenance from run, or derives repositoryUri from filename 6. Adds source file tracking metadata + Filename Pattern for Repository URI Derivation: + - [-_]_[_].sarif + - Examples: "nasa_fprime_18795.sarif" -> "https://github.com/nasa/fprime" + - Examples: "cpp-misra_tmux_tmux.sarif" -> "https://github.com/tmux/tmux" + This approach keeps document sizes minimal by ONLY indexing the result objects themselves plus minimal enrichment data, avoiding the overhead of tool info, automation details, and other run-level data. @@ -324,6 +374,11 @@ def sarif_results_generator(sarif_files, index_name): for sarif_file in sarif_files: print(f"Processing {sarif_file.name}...") + + # Parse repository URI from filename + repo_uri_from_filename = parse_repository_uri_from_filename(sarif_file.name) + if repo_uri_from_filename: + print(f" → Derived repository URI: {repo_uri_from_filename}") try: with open(sarif_file, "r", encoding="utf-8") as f: @@ -352,6 +407,17 @@ def sarif_results_generator(sarif_files, index_name): # Extract ONLY versionControlProvenance from run (minimal enrichment) version_control_provenance = run.get("versionControlProvenance", []) + + # If no versionControlProvenance in run, create from filename + if not version_control_provenance and repo_uri_from_filename: + version_control_provenance = [{ + "repositoryUri": repo_uri_from_filename + }] + # If versionControlProvenance exists but missing repositoryUri, add it from filename + elif version_control_provenance and repo_uri_from_filename: + # Check if repositoryUri is missing from the first entry + if not version_control_provenance[0].get("repositoryUri"): + version_control_provenance[0]["repositoryUri"] = repo_uri_from_filename for result_index, result in enumerate(results): # Create a document that includes ONLY the result fields From 25763c503ec6a44fbffb1b05051a1bbe97b2c530 Mon Sep 17 00:00:00 2001 From: Nathan Randall Date: Wed, 29 Oct 2025 11:29:00 -0600 Subject: [PATCH 8/8] Another update to fix repositoryUri sarif --- scripts/es-sarif/index-sarif-results-in-elasticsearch.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py index caf13921f..39d97f05d 100644 --- a/scripts/es-sarif/index-sarif-results-in-elasticsearch.py +++ b/scripts/es-sarif/index-sarif-results-in-elasticsearch.py @@ -12,6 +12,7 @@ - All original SARIF result fields (ruleId, message, locations, etc.) - Derived fields (ruleGroup, ruleLanguage) parsed from ruleId - versionControlProvenance from run, OR derived from filename pattern +- repositoryUri (flattened from versionControlProvenance for easier querying) - Source file tracking metadata Repository URI Derivation from Filename: @@ -171,6 +172,8 @@ def replace_var(match): "revisionTag": {"type": "keyword"}, }, }, + # Flattened repositoryUri for easier querying (extracted from versionControlProvenance) + "repositoryUri": {"type": "keyword"}, # Metadata for tracking source SARIF file "_sarif_source": { "properties": { @@ -435,6 +438,9 @@ def sarif_results_generator(sarif_files, index_name): # Add ONLY versionControlProvenance (not tool, automationDetails, etc.) if version_control_provenance: document["versionControlProvenance"] = version_control_provenance + # Also add flattened repositoryUri for easier querying + if version_control_provenance[0].get("repositoryUri"): + document["repositoryUri"] = version_control_provenance[0]["repositoryUri"] # Add source file metadata document["_sarif_source"] = {