From 70026ab03405ec4a88050ff7b9ce5324ec247a58 Mon Sep 17 00:00:00 2001 From: Stephanie Wei Date: Wed, 15 Oct 2025 14:51:01 -0400 Subject: [PATCH] Add queue size multiplier config to BulkIndexer --- esutil/bulk_indexer.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 1d758b1c9b..c1e9663064 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -54,9 +54,10 @@ type BulkIndexer interface { // BulkIndexerConfig represents configuration of the indexer. type BulkIndexerConfig struct { - NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). - FlushBytes int // The flush threshold in bytes. Defaults to 5MB. - FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). + FlushBytes int // The flush threshold in bytes. Defaults to 5MB. + FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + QueueSizeMultiplier int // The multiplier on the size of the worker queue. Defaults to 1. Client esapi.Transport // The Elasticsearch client. Decoder BulkResponseJSONDecoder // A custom JSON decoder. @@ -301,6 +302,10 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { cfg.FlushInterval = 30 * time.Second } + if cfg.QueueSizeMultiplier == 0 { + cfg.QueueSizeMultiplier = 1 + } + bi := bulkIndexer{ config: cfg, stats: &bulkIndexerStats{}, @@ -371,7 +376,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats { // init initializes the bulk indexer. func (bi *bulkIndexer) init() { - bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers) + bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueSizeMultiplier) for i := 1; i <= bi.config.NumWorkers; i++ { bi.wg.Add(1)