Skip to content

Commit ec7b5f6

Browse files
committed
Add delay to es bulk indexing
1 parent 1018933 commit ec7b5f6

File tree

1 file changed

+32
-3
lines changed

1 file changed

+32
-3
lines changed

scripts/es-sarif/index-sarif-results-in-elasticsearch.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
ES_LOCAL_API_KEY - API key for authentication (optional, enables API key auth)
2626
ES_LOCAL_USERNAME - Username for basic authentication (optional)
2727
ES_LOCAL_PASSWORD - Password for basic authentication (optional)
28+
ES_BULK_DELAY - Delay in seconds between bulk indexing chunks (default: 1)
2829
2930
Requirements:
3031
- Python 3.11+
@@ -35,6 +36,7 @@
3536
import sys
3637
import json
3738
import os
39+
import time
3840
from pathlib import Path
3941
from elasticsearch import Elasticsearch, helpers
4042
from elasticsearch.exceptions import ConnectionError, RequestError
@@ -399,9 +401,18 @@ def sarif_results_generator(sarif_files, index_name):
399401
)
400402

401403

402-
def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None):
404+
def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None, password=None, bulk_delay=1):
403405
"""
404406
Connect to Elasticsearch and bulk index all SARIF results with progress logging.
407+
408+
Args:
409+
sarif_files: List of SARIF file paths to index
410+
index_name: Name of the Elasticsearch index to create
411+
host: Elasticsearch host URL
412+
api_key: Optional API key for authentication
413+
username: Optional username for basic auth
414+
password: Optional password for basic auth
415+
bulk_delay: Delay in seconds between bulk indexing chunks (default: 1)
405416
"""
406417
es_client = create_elasticsearch_client(host, api_key, username, password)
407418

@@ -418,17 +429,20 @@ def index_sarif_files(sarif_files, index_name, host, api_key=None, username=None
418429
return False
419430

420431
print(f"Indexing results from {len(sarif_files)} SARIF files...")
432+
if bulk_delay > 0:
433+
print(f"Bulk delay: {bulk_delay} second(s) between chunks")
421434
print()
422435

423436
try:
424437
# Track progress during bulk indexing
425438
documents_indexed = 0
426439
last_progress_update = 0
427440
progress_interval = 100 # Update every 100 documents
441+
chunks_processed = 0
428442

429443
def progress_callback(success, info):
430444
"""Callback to track progress during bulk indexing."""
431-
nonlocal documents_indexed, last_progress_update
445+
nonlocal documents_indexed, last_progress_update, chunks_processed
432446
documents_indexed += 1
433447

434448
# Print progress updates periodically
@@ -449,12 +463,22 @@ def progress_callback(success, info):
449463
raise_on_error=False,
450464
):
451465
progress_callback(success, info)
466+
467+
# Check if we just completed a chunk and should sleep
468+
# streaming_bulk yields one result per document, so we track chunks
469+
if documents_indexed > 0 and documents_indexed % 500 == 0:
470+
chunks_processed += 1
471+
if bulk_delay > 0:
472+
print(f" → Sleeping {bulk_delay}s after chunk {chunks_processed}...")
473+
time.sleep(bulk_delay)
452474

453475
print(f" → Indexed {documents_indexed} documents (final)")
454476
print()
455477
print("-" * 50)
456478
print(f"✓ Bulk indexing complete")
457479
print(f"✓ Total documents indexed: {documents_indexed}")
480+
if chunks_processed > 0:
481+
print(f"✓ Total chunks processed: {chunks_processed}")
458482

459483
# Get final index stats to verify
460484
stats = es_client.indices.stats(index=index_name)
@@ -488,11 +512,13 @@ def main():
488512
print(" ES_LOCAL_API_KEY - API key for authentication (optional)")
489513
print(" ES_LOCAL_USERNAME - Username for basic authentication (optional)")
490514
print(" ES_LOCAL_PASSWORD - Password for basic authentication (optional)")
515+
print(" ES_BULK_DELAY - Delay in seconds between bulk chunks (default: 1)")
491516
print()
492517
print("Example:")
493518
print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024")
494519
print(" ES_LOCAL_URL=https://my-cluster.elastic.co:9243 \\")
495520
print(" ES_LOCAL_API_KEY=your_api_key \\")
521+
print(" ES_BULK_DELAY=1 \\")
496522
print(f" python {sys.argv[0]} sarif-files.txt sarif_results_2024")
497523
sys.exit(1)
498524

@@ -509,6 +535,7 @@ def main():
509535
elastic_api_key = os.getenv("ES_LOCAL_API_KEY")
510536
elastic_username = os.getenv("ES_LOCAL_USERNAME")
511537
elastic_password = os.getenv("ES_LOCAL_PASSWORD")
538+
bulk_delay = float(os.getenv("ES_BULK_DELAY", "1"))
512539

513540
# Handle variable substitution in ES_LOCAL_URL if needed
514541
if elastic_host and "${ES_LOCAL_PORT}" in elastic_host:
@@ -538,6 +565,8 @@ def main():
538565
print(f"Elasticsearch index: {index_name}")
539566
print(f"Elasticsearch host: {elastic_host}")
540567
print(f"Authentication: {auth_method}")
568+
if bulk_delay > 0:
569+
print(f"Bulk delay: {bulk_delay} second(s) between chunks")
541570
print()
542571

543572
# Read and validate SARIF files list
@@ -547,7 +576,7 @@ def main():
547576
sys.exit(1)
548577

549578
# Index the files
550-
if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key, elastic_username, elastic_password):
579+
if index_sarif_files(sarif_files, index_name, elastic_host, elastic_api_key, elastic_username, elastic_password, bulk_delay):
551580
print(f"\n✓ Successfully created and populated index '{index_name}'")
552581
print(f"You can now query the index using Elasticsearch APIs or Kibana.")
553582
sys.exit(0)

0 commit comments

Comments
 (0)