Skip to content

Commit ea795e3

Browse files
added read only policies
1 parent 17ac170 commit ea795e3

File tree

5 files changed

+93
-7
lines changed

5 files changed

+93
-7
lines changed

command.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4292,6 +4292,9 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
42924292
}
42934293

42944294
rawTips := make(map[string]string, tipsLen)
4295+
if cmdInfo.ReadOnly {
4296+
rawTips[routing.ReadOnlyCMD] = ""
4297+
}
42954298
for f := 0; f < tipsLen; f++ {
42964299
tip, err := rd.ReadString()
42974300
if err != nil {

command_policy_resolver.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
2121
"search": {
2222
Request: routing.ReqDefault,
2323
Response: routing.RespDefaultKeyless,
24+
Tips: map[string]string{
25+
routing.ReadOnlyCMD: "",
26+
},
2427
},
2528
"aggregate": {
2629
Request: routing.ReqDefault,
2730
Response: routing.RespDefaultKeyless,
31+
Tips: map[string]string{
32+
routing.ReadOnlyCMD: "",
33+
},
2834
},
2935
"dictadd": {
3036
Request: routing.ReqDefault,
@@ -33,6 +39,9 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
3339
"dictdump": {
3440
Request: routing.ReqDefault,
3541
Response: routing.RespDefaultKeyless,
42+
Tips: map[string]string{
43+
routing.ReadOnlyCMD: "",
44+
},
3645
},
3746
"dictdel": {
3847
Request: routing.ReqDefault,
@@ -41,10 +50,16 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
4150
"suglen": {
4251
Request: routing.ReqDefault,
4352
Response: routing.RespDefaultHashSlot,
53+
Tips: map[string]string{
54+
routing.ReadOnlyCMD: "",
55+
},
4456
},
4557
"cursor": {
4658
Request: routing.ReqSpecial,
4759
Response: routing.RespDefaultKeyless,
60+
Tips: map[string]string{
61+
routing.ReadOnlyCMD: "",
62+
},
4863
},
4964
"sugadd": {
5065
Request: routing.ReqDefault,
@@ -53,6 +68,9 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
5368
"sugget": {
5469
Request: routing.ReqDefault,
5570
Response: routing.RespDefaultHashSlot,
71+
Tips: map[string]string{
72+
routing.ReadOnlyCMD: "",
73+
},
5674
},
5775
"sugdel": {
5876
Request: routing.ReqDefault,
@@ -61,14 +79,23 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
6179
"spellcheck": {
6280
Request: routing.ReqDefault,
6381
Response: routing.RespDefaultKeyless,
82+
Tips: map[string]string{
83+
routing.ReadOnlyCMD: "",
84+
},
6485
},
6586
"explain": {
6687
Request: routing.ReqDefault,
6788
Response: routing.RespDefaultKeyless,
89+
Tips: map[string]string{
90+
routing.ReadOnlyCMD: "",
91+
},
6892
},
6993
"explaincli": {
7094
Request: routing.ReqDefault,
7195
Response: routing.RespDefaultKeyless,
96+
Tips: map[string]string{
97+
routing.ReadOnlyCMD: "",
98+
},
7299
},
73100
"aliasadd": {
74101
Request: routing.ReqDefault,
@@ -85,14 +112,23 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
85112
"info": {
86113
Request: routing.ReqDefault,
87114
Response: routing.RespDefaultKeyless,
115+
Tips: map[string]string{
116+
routing.ReadOnlyCMD: "",
117+
},
88118
},
89119
"tagvals": {
90120
Request: routing.ReqDefault,
91121
Response: routing.RespDefaultKeyless,
122+
Tips: map[string]string{
123+
routing.ReadOnlyCMD: "",
124+
},
92125
},
93126
"syndump": {
94127
Request: routing.ReqDefault,
95128
Response: routing.RespDefaultKeyless,
129+
Tips: map[string]string{
130+
routing.ReadOnlyCMD: "",
131+
},
96132
},
97133
"synupdate": {
98134
Request: routing.ReqDefault,
@@ -101,6 +137,9 @@ var defaultPolicies = map[module]map[commandName]*routing.CommandPolicy{
101137
"profile": {
102138
Request: routing.ReqDefault,
103139
Response: routing.RespDefaultKeyless,
140+
Tips: map[string]string{
141+
routing.ReadOnlyCMD: "",
142+
},
104143
},
105144
"alter": {
106145
Request: routing.ReqDefault,

internal/routing/policy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ const (
1919
ReqSpecial
2020
)
2121

22+
const (
23+
ReadOnlyCMD string = "readonly"
24+
)
25+
2226
func (p RequestPolicy) String() string {
2327
switch p {
2428
case ReqDefault:
@@ -133,3 +137,9 @@ type CommandPolicy struct {
133137
func (p *CommandPolicy) CanBeUsedInPipeline() bool {
134138
return p.Request != ReqAllNodes && p.Request != ReqAllShards && p.Request != ReqMultiShard
135139
}
140+
141+
func (p *CommandPolicy) IsReadOnly() bool {
142+
_, readOnly := p.Tips[ReadOnlyCMD]
143+
fmt.Println(readOnly)
144+
return readOnly
145+
}

osscluster.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -944,9 +944,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
944944

945945
c.state = newClusterStateHolder(c.loadState)
946946

947-
dynamicResolver := c.NewDynamicResolver()
948-
dynamicResolver.SetFallbackResolver(NewDefaultCommandPolicyResolver())
949-
c.SetCommandInfoResolver(dynamicResolver)
947+
c.SetCommandInfoResolver(NewDefaultCommandPolicyResolver())
950948

951949
c.cmdable = c.Process
952950
c.initHooks(hooks{
@@ -1895,6 +1893,7 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
18951893
if info == nil {
18961894
internal.Logger.Printf(cmdInfoCtx, "info for cmd=%s not found", name)
18971895
}
1896+
18981897
return info
18991898
}
19001899

osscluster_router.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *cluste
2727
}
2828

2929
if policy == nil {
30-
return c.executeDefault(ctx, cmd, node)
30+
return c.executeDefault(ctx, cmd, policy, node)
3131
}
3232
switch policy.Request {
3333
case routing.ReqAllNodes:
@@ -39,16 +39,25 @@ func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *cluste
3939
case routing.ReqSpecial:
4040
return c.executeSpecialCommand(ctx, cmd, policy, node)
4141
default:
42-
return c.executeDefault(ctx, cmd, node)
42+
return c.executeDefault(ctx, cmd, policy, node)
4343
}
4444
}
4545

4646
// executeDefault handles standard command routing based on keys
47-
func (c *ClusterClient) executeDefault(ctx context.Context, cmd Cmder, node *clusterNode) error {
47+
func (c *ClusterClient) executeDefault(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy, node *clusterNode) error {
4848
if c.hasKeys(cmd) {
4949
// execute on key based shard
5050
return node.Client.Process(ctx, cmd)
5151
}
52+
if policy != nil {
53+
54+
fmt.Println(policy.Tips)
55+
if c.readOnlyEnabled() && policy.IsReadOnly() {
56+
fmt.Println("will execute on arbitrary node")
57+
return c.executeOnArbitraryNode(ctx, cmd)
58+
}
59+
}
60+
5261
return c.executeOnArbitraryShard(ctx, cmd)
5362
}
5463

@@ -61,6 +70,15 @@ func (c *ClusterClient) executeOnArbitraryShard(ctx context.Context, cmd Cmder)
6170
return node.Client.Process(ctx, cmd)
6271
}
6372

73+
// executeOnArbitraryNode routes command to an arbitrary node
74+
func (c *ClusterClient) executeOnArbitraryNode(ctx context.Context, cmd Cmder) error {
75+
node := c.pickArbitraryNode(ctx)
76+
if node == nil {
77+
return errClusterNoNodes
78+
}
79+
return node.Client.Process(ctx, cmd)
80+
}
81+
6482
// executeOnAllNodes executes command on all nodes (masters and replicas)
6583
func (c *ClusterClient) executeOnAllNodes(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy) error {
6684
state, err := c.state.Get(ctx)
@@ -252,7 +270,7 @@ func (c *ClusterClient) executeSpecialCommand(ctx context.Context, cmd Cmder, po
252270
case "ft.cursor":
253271
return c.executeCursorCommand(ctx, cmd)
254272
default:
255-
return c.executeDefault(ctx, cmd, node)
273+
return c.executeDefault(ctx, cmd, policy, node)
256274
}
257275
}
258276

@@ -479,12 +497,29 @@ func (c *ClusterClient) pickArbitraryShard(ctx context.Context) *clusterNode {
479497
return state.Masters[idx]
480498
}
481499

500+
// pickArbitraryNode selects a master or slave shard using the configured ShardPicker
501+
func (c *ClusterClient) pickArbitraryNode(ctx context.Context) *clusterNode {
502+
state, err := c.state.Get(ctx)
503+
if err != nil || len(state.Masters) == 0 {
504+
return nil
505+
}
506+
507+
allNodes := append(state.Masters, state.Slaves...)
508+
509+
idx := c.opt.ShardPicker.Next(len(allNodes))
510+
return allNodes[idx]
511+
}
512+
482513
// hasKeys checks if a command operates on keys
483514
func (c *ClusterClient) hasKeys(cmd Cmder) bool {
484515
firstKeyPos := cmdFirstKeyPos(cmd)
485516
return firstKeyPos > 0
486517
}
487518

519+
func (c *ClusterClient) readOnlyEnabled() bool {
520+
return c.opt.ReadOnly
521+
}
522+
488523
// setCommandValue sets the aggregated value on a command using the enum-based approach
489524
func (c *ClusterClient) setCommandValue(cmd Cmder, value interface{}) error {
490525
// If value is nil, it might mean ExtractCommandValue couldn't extract the value

0 commit comments

Comments
 (0)