Skip to content

Commit 62442b5

Browse files
added batch aggregator operations
1 parent cfb290a commit 62442b5

File tree

2 files changed

+131
-24
lines changed

2 files changed

+131
-24
lines changed

internal/routing/aggregator.go

Lines changed: 122 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type ResponseAggregator interface {
2525
// AddWithKey processes a single shard response for a specific key (used by keyed aggregators).
2626
AddWithKey(key string, result interface{}, err error) error
2727

28+
BatchAdd(map[string]interface{}, error) error
29+
2830
// Result returns the final aggregated result and any error.
2931
Result() (interface{}, error)
3032
}
@@ -93,6 +95,14 @@ func (a *AllSucceededAggregator) Add(result interface{}, err error) error {
9395
return nil
9496
}
9597

98+
func (a *AllSucceededAggregator) BatchAdd(results map[string]interface{}, err error) error {
99+
for _, res := range results {
100+
a.Add(res, err)
101+
}
102+
103+
return nil
104+
}
105+
96106
func (a *AllSucceededAggregator) Result() (interface{}, error) {
97107
var err error
98108
res, e := a.res.Load(), a.err.Load()
@@ -127,6 +137,14 @@ func (a *OneSucceededAggregator) Add(result interface{}, err error) error {
127137
return nil
128138
}
129139

140+
func (a *OneSucceededAggregator) BatchAdd(results map[string]interface{}, err error) error {
141+
for _, res := range results {
142+
a.Add(res, err)
143+
}
144+
145+
return nil
146+
}
147+
130148
func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err error) error {
131149
return a.Add(result, err)
132150
}
@@ -162,6 +180,14 @@ func (a *AggSumAggregator) Add(result interface{}, err error) error {
162180
return nil
163181
}
164182

183+
func (a *AggSumAggregator) BatchAdd(results map[string]interface{}, err error) error {
184+
for _, res := range results {
185+
a.Add(res, err)
186+
}
187+
188+
return nil
189+
}
190+
165191
func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error) error {
166192
return a.Add(result, err)
167193
}
@@ -198,6 +224,14 @@ func (a *AggMinAggregator) Add(result interface{}, err error) error {
198224
return nil
199225
}
200226

227+
func (a *AggMinAggregator) BatchAdd(results map[string]interface{}, err error) error {
228+
for _, res := range results {
229+
a.Add(res, err)
230+
}
231+
232+
return nil
233+
}
234+
201235
func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error) error {
202236
return a.Add(result, err)
203237
}
@@ -238,6 +272,14 @@ func (a *AggMaxAggregator) Add(result interface{}, err error) error {
238272
return nil
239273
}
240274

275+
func (a *AggMaxAggregator) BatchAdd(results map[string]interface{}, err error) error {
276+
for _, res := range results {
277+
a.Add(res, err)
278+
}
279+
280+
return nil
281+
}
282+
241283
func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error) error {
242284
return a.Add(result, err)
243285
}
@@ -285,6 +327,14 @@ func (a *AggLogicalAndAggregator) Add(result interface{}, err error) error {
285327
return nil
286328
}
287329

330+
func (a *AggLogicalAndAggregator) BatchAdd(results map[string]interface{}, err error) error {
331+
for _, res := range results {
332+
a.Add(res, err)
333+
}
334+
335+
return nil
336+
}
337+
288338
func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err error) error {
289339
return a.Add(result, err)
290340
}
@@ -331,6 +381,14 @@ func (a *AggLogicalOrAggregator) Add(result interface{}, err error) error {
331381
return nil
332382
}
333383

384+
func (a *AggLogicalOrAggregator) BatchAdd(results map[string]interface{}, err error) error {
385+
for _, res := range results {
386+
a.Add(res, err)
387+
}
388+
389+
return nil
390+
}
391+
334392
func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err error) error {
335393
return a.Add(result, err)
336394
}
@@ -391,10 +449,7 @@ type DefaultKeylessAggregator struct {
391449
firstErr error
392450
}
393451

394-
func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error {
395-
a.mu.Lock()
396-
defer a.mu.Unlock()
397-
452+
func (a *DefaultKeylessAggregator) add(result interface{}, err error) error {
398453
if err != nil && a.firstErr == nil {
399454
a.firstErr = err
400455
return nil
@@ -405,6 +460,21 @@ func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error {
405460
return nil
406461
}
407462

463+
func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error {
464+
a.mu.Lock()
465+
defer a.mu.Unlock()
466+
467+
return a.add(result, err)
468+
}
469+
470+
func (a *DefaultKeylessAggregator) BatchAdd(results map[string]interface{}, err error) error {
471+
for _, res := range results {
472+
a.add(res, err)
473+
}
474+
475+
return nil
476+
}
477+
408478
func (a *DefaultKeylessAggregator) AddWithKey(key string, result interface{}, err error) error {
409479
return a.Add(result, err)
410480
}
@@ -434,10 +504,7 @@ func NewDefaultKeyedAggregator(keyOrder []string) *DefaultKeyedAggregator {
434504
}
435505
}
436506

437-
func (a *DefaultKeyedAggregator) Add(result interface{}, err error) error {
438-
a.mu.Lock()
439-
defer a.mu.Unlock()
440-
507+
func (a *DefaultKeyedAggregator) add(result interface{}, err error) error {
441508
if err != nil && a.firstErr == nil {
442509
a.firstErr = err
443510
return nil
@@ -449,10 +516,22 @@ func (a *DefaultKeyedAggregator) Add(result interface{}, err error) error {
449516
return nil
450517
}
451518

452-
func (a *DefaultKeyedAggregator) AddWithKey(key string, result interface{}, err error) error {
519+
func (a *DefaultKeyedAggregator) Add(result interface{}, err error) error {
453520
a.mu.Lock()
454521
defer a.mu.Unlock()
455522

523+
return a.add(result, err)
524+
}
525+
526+
func (a *DefaultKeyedAggregator) BatchAdd(results map[string]interface{}, err error) error {
527+
for _, res := range results {
528+
a.add(res, err)
529+
}
530+
531+
return nil
532+
}
533+
534+
func (a *DefaultKeyedAggregator) addWithKey(key string, result interface{}, err error) error {
456535
if err != nil && a.firstErr == nil {
457536
a.firstErr = err
458537
return nil
@@ -463,6 +542,26 @@ func (a *DefaultKeyedAggregator) AddWithKey(key string, result interface{}, err
463542
return nil
464543
}
465544

545+
func (a *DefaultKeyedAggregator) AddWithKey(key string, result interface{}, err error) error {
546+
a.mu.Lock()
547+
defer a.mu.Unlock()
548+
549+
a.addWithKey(key, result, err)
550+
return nil
551+
}
552+
553+
func (a *DefaultKeyedAggregator) BatchAddWithKeyOrder(results map[string]interface{}, keyOrder []string) error {
554+
a.mu.Lock()
555+
defer a.mu.Unlock()
556+
557+
a.keyOrder = keyOrder
558+
for key, val := range results {
559+
_ = a.addWithKey(key, val, nil)
560+
}
561+
562+
return nil
563+
}
564+
466565
func (a *DefaultKeyedAggregator) SetKeyOrder(keyOrder []string) {
467566
a.mu.Lock()
468567
defer a.mu.Unlock()
@@ -504,12 +603,24 @@ type SpecialAggregator struct {
504603
errors []error
505604
}
506605

606+
func (a *SpecialAggregator) add(result interface{}, err error) error {
607+
a.results = append(a.results, result)
608+
a.errors = append(a.errors, err)
609+
return nil
610+
}
611+
507612
func (a *SpecialAggregator) Add(result interface{}, err error) error {
508613
a.mu.Lock()
509614
defer a.mu.Unlock()
510615

511-
a.results = append(a.results, result)
512-
a.errors = append(a.errors, err)
616+
return a.add(result, err)
617+
}
618+
619+
func (a *SpecialAggregator) BatchAdd(results map[string]interface{}, err error) error {
620+
for _, res := range results {
621+
a.add(res, err)
622+
}
623+
513624
return nil
514625
}
515626

osscluster_router.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -393,21 +393,17 @@ func (c *ClusterClient) aggregateKeyedValues(cmd Cmder, keyedResults map[string]
393393
aggregator := c.createAggregator(policy, cmd, true)
394394

395395
// Set key order for keyed aggregators
396-
if keyedAgg, ok := aggregator.(*routing.DefaultKeyedAggregator); ok {
397-
keyedAgg.SetKeyOrder(keyOrder)
396+
var keyedAgg *routing.DefaultKeyedAggregator
397+
var isKeyedAgg bool
398+
var err error
399+
if keyedAgg, isKeyedAgg = aggregator.(*routing.DefaultKeyedAggregator); isKeyedAgg {
400+
err = keyedAgg.BatchAddWithKeyOrder(keyedResults, keyOrder)
401+
} else {
402+
err = aggregator.BatchAdd(keyedResults, nil)
398403
}
399404

400-
// Add results with keys
401-
for key, value := range keyedResults {
402-
if keyedAgg, ok := aggregator.(*routing.DefaultKeyedAggregator); ok {
403-
if err := keyedAgg.AddWithKey(key, value, nil); err != nil {
404-
return err
405-
}
406-
} else {
407-
if err := aggregator.Add(value, nil); err != nil {
408-
return err
409-
}
410-
}
405+
if err != nil {
406+
return err
411407
}
412408

413409
return c.finishAggregation(cmd, aggregator)

0 commit comments

Comments
 (0)