@@ -3,6 +3,9 @@ package redis
33import (
44 "context"
55 "fmt"
6+ "reflect"
7+ "sync"
8+ "sync/atomic"
69 "testing"
710 "time"
811
@@ -107,6 +110,7 @@ func TestRingSetAddrsAndRebalanceRace(t *testing.T) {
107110 }
108111 },
109112 })
113+ defer ring .Close ()
110114
111115 // Continuously update addresses by adding and removing one address
112116 updatesDone := make (chan struct {})
@@ -156,13 +160,127 @@ func BenchmarkRingShardingRebalanceLocked(b *testing.B) {
156160 }
157161
158162 ring := NewRing (opts )
163+ defer ring .Close ()
159164
160165 b .ResetTimer ()
161166 for i := 0 ; i < b .N ; i ++ {
162167 ring .sharding .rebalanceLocked ()
163168 }
164169}
165170
171+ type testCounter struct {
172+ mu sync.Mutex
173+ t * testing.T
174+ m map [string ]int
175+ }
176+
177+ func newTestCounter (t * testing.T ) * testCounter {
178+ return & testCounter {t : t , m : make (map [string ]int )}
179+ }
180+
181+ func (ct * testCounter ) increment (key string ) {
182+ ct .mu .Lock ()
183+ defer ct .mu .Unlock ()
184+ ct .m [key ]++
185+ }
186+
187+ func (ct * testCounter ) expect (values map [string ]int ) {
188+ ct .mu .Lock ()
189+ defer ct .mu .Unlock ()
190+ ct .t .Helper ()
191+ if ! reflect .DeepEqual (values , ct .m ) {
192+ ct .t .Errorf ("expected %v != actual %v" , values , ct .m )
193+ }
194+ }
195+
196+ func TestRingShardsCleanup (t * testing.T ) {
197+ const (
198+ ringShard1Name = "ringShardOne"
199+ ringShard2Name = "ringShardTwo"
200+
201+ ringShard1Addr = "shard1.test"
202+ ringShard2Addr = "shard2.test"
203+ )
204+
205+ t .Run ("closes unused shards" , func (t * testing.T ) {
206+ closeCounter := newTestCounter (t )
207+
208+ ring := NewRing (& RingOptions {
209+ Addrs : map [string ]string {
210+ ringShard1Name : ringShard1Addr ,
211+ ringShard2Name : ringShard2Addr ,
212+ },
213+ NewClient : func (opt * Options ) * Client {
214+ c := NewClient (opt )
215+ c .baseClient .onClose = func () error {
216+ closeCounter .increment (opt .Addr )
217+ return nil
218+ }
219+ return c
220+ },
221+ })
222+ closeCounter .expect (map [string ]int {})
223+
224+ // no change due to the same addresses
225+ ring .SetAddrs (map [string ]string {
226+ ringShard1Name : ringShard1Addr ,
227+ ringShard2Name : ringShard2Addr ,
228+ })
229+ closeCounter .expect (map [string ]int {})
230+
231+ ring .SetAddrs (map [string ]string {
232+ ringShard1Name : ringShard1Addr ,
233+ })
234+ closeCounter .expect (map [string ]int {ringShard2Addr : 1 })
235+
236+ ring .SetAddrs (map [string ]string {
237+ ringShard2Name : ringShard2Addr ,
238+ })
239+ closeCounter .expect (map [string ]int {ringShard1Addr : 1 , ringShard2Addr : 1 })
240+
241+ ring .Close ()
242+ closeCounter .expect (map [string ]int {ringShard1Addr : 1 , ringShard2Addr : 2 })
243+ })
244+
245+ t .Run ("closes created shards if ring was closed" , func (t * testing.T ) {
246+ createCounter := newTestCounter (t )
247+ closeCounter := newTestCounter (t )
248+
249+ var (
250+ ring * Ring
251+ shouldClose int32
252+ )
253+
254+ ring = NewRing (& RingOptions {
255+ Addrs : map [string ]string {
256+ ringShard1Name : ringShard1Addr ,
257+ },
258+ NewClient : func (opt * Options ) * Client {
259+ if atomic .LoadInt32 (& shouldClose ) != 0 {
260+ ring .Close ()
261+ }
262+ createCounter .increment (opt .Addr )
263+ c := NewClient (opt )
264+ c .baseClient .onClose = func () error {
265+ closeCounter .increment (opt .Addr )
266+ return nil
267+ }
268+ return c
269+ },
270+ })
271+ createCounter .expect (map [string ]int {ringShard1Addr : 1 })
272+ closeCounter .expect (map [string ]int {})
273+
274+ atomic .StoreInt32 (& shouldClose , 1 )
275+
276+ ring .SetAddrs (map [string ]string {
277+ ringShard2Name : ringShard2Addr ,
278+ })
279+ createCounter .expect (map [string ]int {ringShard1Addr : 1 , ringShard2Addr : 1 })
280+ closeCounter .expect (map [string ]int {ringShard1Addr : 1 , ringShard2Addr : 1 })
281+ })
282+ }
283+
166284//------------------------------------------------------------------------------
167285
168286type timeoutErr struct {
0 commit comments