Skip to content

Commit 7181bcc

Browse files
Rebased to master, resolved comnflicts
1 parent 5315fb4 commit 7181bcc

File tree

2 files changed

+14
-55
lines changed

2 files changed

+14
-55
lines changed

command.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ var keylessCommands = map[string]struct{}{
6666
"unsubscribe": {},
6767
"unwatch": {},
6868
}
69-
type CmdType = routing.CmdType
7069

7170
// CmdTyper interface for getting command type
7271
type CmdTyper interface {
@@ -151,7 +150,6 @@ const (
151150
CmdTypeTSTimestampValue
152151
CmdTypeTSTimestampValueSlice
153152
)
154-
>>>>>>> b6633bf9 (centralize cluster command routing in osscluster_router.go and refactor osscluster.go (#6))
155153

156154
type (
157155
CmdTypeXAutoClaimValue struct {
@@ -6993,6 +6991,14 @@ func (cmd *VectorScoreSliceCmd) readReply(rd *proto.Reader) error {
69936991

69946992
return nil
69956993
}
6994+
6995+
func (cmd *VectorScoreSliceCmd) Clone() Cmder {
6996+
return &VectorScoreSliceCmd{
6997+
baseCmd: cmd.cloneBaseCmd(),
6998+
val: cmd.val,
6999+
}
7000+
}
7001+
69967002
func (cmd *MonitorCmd) Clone() Cmder {
69977003
// MonitorCmd cannot be safely cloned due to channels and goroutines
69987004
// Return a new MonitorCmd with the same channel

osscluster.go

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,7 +1432,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
14321432
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
14331433
)
14341434
}
1435-
slot := c.cmdSlot(ctx, cmd)
1435+
slot := c.cmdSlot(cmd, -1)
14361436
node, err := c.slotReadOnlyNode(state, slot)
14371437
if err != nil {
14381438
return err
@@ -1452,7 +1452,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
14521452
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
14531453
)
14541454
}
1455-
slot := c.cmdSlot(ctx, cmd)
1455+
slot := c.cmdSlot(cmd, -1)
14561456
node, err := state.slotMasterNode(slot)
14571457
if err != nil {
14581458
return err
@@ -1557,53 +1557,6 @@ func (c *ClusterClient) pipelineReadCmds(
15571557
return nil
15581558
}
15591559

1560-
// Legacy functions needed for transaction pipeline processing
1561-
func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1562-
state, err := c.state.Get(ctx)
1563-
if err != nil {
1564-
return err
1565-
}
1566-
1567-
preferredRandomSlot := -1
1568-
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
1569-
for _, cmd := range cmds {
1570-
slot := c.cmdSlot(cmd, preferredRandomSlot)
1571-
if preferredRandomSlot == -1 {
1572-
preferredRandomSlot = slot
1573-
}
1574-
node, err := c.slotReadOnlyNode(state, slot)
1575-
if err != nil {
1576-
return err
1577-
}
1578-
cmdsMap.Add(node, cmd)
1579-
}
1580-
return nil
1581-
}
1582-
1583-
for _, cmd := range cmds {
1584-
slot := c.cmdSlot(cmd, preferredRandomSlot)
1585-
if preferredRandomSlot == -1 {
1586-
preferredRandomSlot = slot
1587-
}
1588-
node, err := state.slotMasterNode(slot)
1589-
if err != nil {
1590-
return err
1591-
}
1592-
cmdsMap.Add(node, cmd)
1593-
}
1594-
return nil
1595-
}
1596-
1597-
func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
1598-
for _, cmd := range cmds {
1599-
cmdInfo := c.cmdInfo(ctx, cmd.Name())
1600-
if cmdInfo == nil || !cmdInfo.ReadOnly {
1601-
return false
1602-
}
1603-
}
1604-
return true
1605-
}
1606-
16071560
func (c *ClusterClient) checkMovedErr(
16081561
ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
16091562
) bool {
@@ -1661,7 +1614,7 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
16611614
return err
16621615
}
16631616

1664-
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
1617+
keyedCmdsBySlot := c.slottedKeyedCommands(ctx, cmds)
16651618
slot := -1
16661619
switch len(keyedCmdsBySlot) {
16671620
case 0:
@@ -1715,7 +1668,7 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
17151668

17161669
// slottedKeyedCommands returns a map of slot to commands taking into account
17171670
// only commands that have keys.
1718-
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
1671+
func (c *ClusterClient) slottedKeyedCommands(ctx context.Context, cmds []Cmder) map[int][]Cmder {
17191672
cmdsSlots := map[int][]Cmder{}
17201673

17211674
preferredRandomSlot := -1
@@ -2111,13 +2064,13 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
21112064
return info
21122065
}
21132066

2114-
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
2067+
func (c *ClusterClient) cmdSlot(cmd Cmder, prefferedSlot int) int {
21152068
args := cmd.Args()
21162069
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
21172070
return args[2].(int)
21182071
}
21192072

2120-
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
2073+
return cmdSlot(cmd, cmdFirstKeyPos(cmd), prefferedSlot)
21212074
}
21222075

21232076
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {

0 commit comments

Comments
 (0)