@@ -10,6 +10,7 @@ import (
1010 "strconv"
1111 "strings"
1212 "sync"
13+ "sync/atomic"
1314 "time"
1415
1516 . "github.com/bsm/ginkgo/v2"
@@ -644,6 +645,87 @@ var _ = Describe("ClusterClient", func() {
644645 }, 30 * time .Second ).ShouldNot (HaveOccurred ())
645646 })
646647
648+ It ("supports PubSub with ReadOnly option" , func () {
649+ opt = redisClusterOptions ()
650+ opt .ReadOnly = true
651+ client = cluster .newClusterClient (ctx , opt )
652+
653+ pubsub := client .Subscribe (ctx , "mychannel" )
654+ defer pubsub .Close ()
655+
656+ Eventually (func () error {
657+ var masterPubsubChannels atomic.Int64
658+ var slavePubsubChannels atomic.Int64
659+
660+ err := client .ForEachMaster (ctx , func (ctx context.Context , master * redis.Client ) error {
661+ info := master .InfoMap (ctx , "stats" )
662+ if info .Err () != nil {
663+ return info .Err ()
664+ }
665+
666+ pc , err := strconv .Atoi (info .Item ("Stats" , "pubsub_channels" ))
667+ if err != nil {
668+ return err
669+ }
670+
671+ masterPubsubChannels .Add (int64 (pc ))
672+
673+ return nil
674+ })
675+ if err != nil {
676+ return err
677+ }
678+
679+ err = client .ForEachSlave (ctx , func (ctx context.Context , slave * redis.Client ) error {
680+ info := slave .InfoMap (ctx , "stats" )
681+ if info .Err () != nil {
682+ return info .Err ()
683+ }
684+
685+ pc , err := strconv .Atoi (info .Item ("Stats" , "pubsub_channels" ))
686+ if err != nil {
687+ return err
688+ }
689+
690+ slavePubsubChannels .Add (int64 (pc ))
691+
692+ return nil
693+ })
694+ if err != nil {
695+ return err
696+ }
697+
698+ if c := masterPubsubChannels .Load (); c != int64 (0 ) {
699+ return fmt .Errorf ("total master pubsub_channels is %d; expected 0" , c )
700+ }
701+
702+ if c := slavePubsubChannels .Load (); c != int64 (1 ) {
703+ return fmt .Errorf ("total slave pubsub_channels is %d; expected 1" , c )
704+ }
705+
706+ return nil
707+ }, 30 * time .Second ).ShouldNot (HaveOccurred ())
708+
709+ Eventually (func () error {
710+ _ , err := client .Publish (ctx , "mychannel" , "hello" ).Result ()
711+ if err != nil {
712+ return err
713+ }
714+
715+ msg , err := pubsub .ReceiveTimeout (ctx , time .Second )
716+ if err != nil {
717+ return err
718+ }
719+
720+ _ , ok := msg .(* redis.Message )
721+ if ! ok {
722+ return fmt .Errorf ("got %T, wanted *redis.Message" , msg )
723+ }
724+
725+ return nil
726+ }, 30 * time .Second ).ShouldNot (HaveOccurred ())
727+ })
728+
647729 It ("supports sharded PubSub" , func () {
648730 pubsub := client .SSubscribe (ctx , "mychannel" )
649731 defer pubsub .Close ()
@@ -668,6 +750,87 @@ var _ = Describe("ClusterClient", func() {
668750 }, 30 * time .Second ).ShouldNot (HaveOccurred ())
669751 })
670752
753+ It ("supports sharded PubSub with ReadOnly option" , func () {
754+ opt = redisClusterOptions ()
755+ opt .ReadOnly = true
756+ client = cluster .newClusterClient (ctx , opt )
757+
758+ pubsub := client .SSubscribe (ctx , "mychannel" )
759+ defer pubsub .Close ()
760+
761+ Eventually (func () error {
762+ var masterPubsubShardChannels atomic.Int64
763+ var slavePubsubShardChannels atomic.Int64
764+
765+ err := client .ForEachMaster (ctx , func (ctx context.Context , master * redis.Client ) error {
766+ info := master .InfoMap (ctx , "stats" )
767+ if info .Err () != nil {
768+ return info .Err ()
769+ }
770+
771+ pc , err := strconv .Atoi (info .Item ("Stats" , "pubsubshard_channels" ))
772+ if err != nil {
773+ return err
774+ }
775+
776+ masterPubsubShardChannels .Add (int64 (pc ))
777+
778+ return nil
779+ })
780+ if err != nil {
781+ return err
782+ }
783+
784+ err = client .ForEachSlave (ctx , func (ctx context.Context , slave * redis.Client ) error {
785+ info := slave .InfoMap (ctx , "stats" )
786+ if info .Err () != nil {
787+ return info .Err ()
788+ }
789+
790+ pc , err := strconv .Atoi (info .Item ("Stats" , "pubsubshard_channels" ))
791+ if err != nil {
792+ return err
793+ }
794+
795+ slavePubsubShardChannels .Add (int64 (pc ))
796+
797+ return nil
798+ })
799+ if err != nil {
800+ return err
801+ }
802+
803+ if c := masterPubsubShardChannels .Load (); c != int64 (0 ) {
804+ return fmt .Errorf ("total master pubsubshard_channels is %d; expected 0" , c )
805+ }
806+
807+ if c := slavePubsubShardChannels .Load (); c != int64 (1 ) {
808+ return fmt .Errorf ("total slave pubsubshard_channels is %d; expected 1" , c )
809+ }
810+
811+ return nil
812+ }, 30 * time .Second ).ShouldNot (HaveOccurred ())
813+
814+ Eventually (func () error {
815+ _ , err := client .SPublish (ctx , "mychannel" , "hello" ).Result ()
816+ if err != nil {
817+ return err
818+ }
819+
820+ msg , err := pubsub .ReceiveTimeout (ctx , time .Second )
821+ if err != nil {
822+ return err
823+ }
824+
825+ _ , ok := msg .(* redis.Message )
826+ if ! ok {
827+ return fmt .Errorf ("got %T, wanted *redis.Message" , msg )
828+ }
829+
830+ return nil
831+ }, 30 * time .Second ).ShouldNot (HaveOccurred ())
832+ })
833+
671834 It ("supports PubSub.Ping without channels" , func () {
672835 pubsub := client .Subscribe (ctx )
673836 defer pubsub .Close ()
0 commit comments