diff --git a/.github/workflows/doctests.yaml b/.github/workflows/doctests.yaml index 1afd0d8033..bd95c58d35 100644 --- a/.github/workflows/doctests.yaml +++ b/.github/workflows/doctests.yaml @@ -16,7 +16,7 @@ jobs: services: redis-stack: - image: redislabs/client-libs-test:8.0.2 + image: redislabs/client-libs-test:8.4-RC1-pre.2 env: TLS_ENABLED: no REDIS_CLUSTER: no diff --git a/acl_commands.go b/acl_commands.go index 9cb800bb3b..a7b5dd2483 100644 --- a/acl_commands.go +++ b/acl_commands.go @@ -8,8 +8,12 @@ type ACLCmdable interface { ACLLog(ctx context.Context, count int64) *ACLLogCmd ACLLogReset(ctx context.Context) *StatusCmd + ACLGenPass(ctx context.Context, bit int) *StringCmd + ACLSetUser(ctx context.Context, username string, rules ...string) *StatusCmd ACLDelUser(ctx context.Context, username string) *IntCmd + ACLUsers(ctx context.Context) *StringSliceCmd + ACLWhoAmI(ctx context.Context) *StringCmd ACLList(ctx context.Context) *StringSliceCmd ACLCat(ctx context.Context) *StringSliceCmd @@ -65,6 +69,24 @@ func (c cmdable) ACLSetUser(ctx context.Context, username string, rules ...strin return cmd } +func (c cmdable) ACLGenPass(ctx context.Context, bit int) *StringCmd { + cmd := NewStringCmd(ctx, "acl", "genpass") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) ACLUsers(ctx context.Context) *StringSliceCmd { + cmd := NewStringSliceCmd(ctx, "acl", "users") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) ACLWhoAmI(ctx context.Context) *StringCmd { + cmd := NewStringCmd(ctx, "acl", "whoami") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ACLList(ctx context.Context) *StringSliceCmd { cmd := NewStringSliceCmd(ctx, "acl", "list") _ = c(ctx, cmd) diff --git a/acl_commands_test.go b/acl_commands_test.go index a96621dbce..e6a99ec5e3 100644 --- a/acl_commands_test.go +++ b/acl_commands_test.go @@ -92,6 +92,21 @@ var _ = Describe("ACL user commands", Label("NonRedisEnterprise"), func() { Expect(err).NotTo(HaveOccurred()) Expect(res).To(HaveLen(1)) Expect(res[0]).To(ContainSubstring("default")) + + res, err = client.ACLUsers(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0]).To(Equal("default")) + + res1, err := client.ACLWhoAmI(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res1).To(Equal("default")) + }) + + It("gen password", func() { + password, err := client.ACLGenPass(ctx, 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(password).NotTo(BeEmpty()) }) It("setuser and deluser", func() { diff --git a/command.go b/command.go index d3fb231b5e..b99f1312af 100644 --- a/command.go +++ b/command.go @@ -698,6 +698,68 @@ func (cmd *IntCmd) readReply(rd *proto.Reader) (err error) { //------------------------------------------------------------------------------ +// DigestCmd is a command that returns a uint64 xxh3 hash digest. +// +// This command is specifically designed for the Redis DIGEST command, +// which returns the xxh3 hash of a key's value as a hex string. +// The hex string is automatically parsed to a uint64 value. +// +// The digest can be used for optimistic locking with SetIFDEQ, SetIFDNE, +// and DelExArgs commands. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/digest/ +type DigestCmd struct { + baseCmd + + val uint64 +} + +var _ Cmder = (*DigestCmd)(nil) + +func NewDigestCmd(ctx context.Context, args ...interface{}) *DigestCmd { + return &DigestCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *DigestCmd) SetVal(val uint64) { + cmd.val = val +} + +func (cmd *DigestCmd) Val() uint64 { + return cmd.val +} + +func (cmd *DigestCmd) Result() (uint64, error) { + return cmd.val, cmd.err +} + +func (cmd *DigestCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *DigestCmd) readReply(rd *proto.Reader) (err error) { + // Redis DIGEST command returns a hex string (e.g., "a1b2c3d4e5f67890") + // We parse it as a uint64 xxh3 hash value + var hexStr string + hexStr, err = rd.ReadString() + if err != nil { + return err + } + + // Parse hex string to uint64 + cmd.val, err = strconv.ParseUint(hexStr, 16, 64) + return err +} + +//------------------------------------------------------------------------------ + type IntSliceCmd struct { baseCmd @@ -1585,6 +1647,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { type XMessage struct { ID string Values map[string]interface{} + // MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + MillisElapsedFromDelivery int64 + // DeliveredCount is the number of times the entry was delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + DeliveredCount int64 } type XMessageSliceCmd struct { @@ -1641,10 +1709,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) { } func readXMessage(rd *proto.Reader) (XMessage, error) { - if err := rd.ReadFixedArrayLen(2); err != nil { + // Read array length can be 2 or 4 (with CLAIM metadata) + n, err := rd.ReadArrayLen() + if err != nil { return XMessage{}, err } + if n != 2 && n != 4 { + return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n) + } + id, err := rd.ReadString() if err != nil { return XMessage{}, err @@ -1657,10 +1731,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) { } } - return XMessage{ + msg := XMessage{ ID: id, Values: v, - }, nil + } + + if n == 4 { + msg.MillisElapsedFromDelivery, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + + msg.DeliveredCount, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + } + + return msg, nil } func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) { @@ -3768,6 +3856,83 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error { //----------------------------------------------------------------------- +type Latency struct { + Name string + Time time.Time + Latest time.Duration + Max time.Duration +} + +type LatencyCmd struct { + baseCmd + val []Latency +} + +var _ Cmder = (*LatencyCmd)(nil) + +func NewLatencyCmd(ctx context.Context, args ...interface{}) *LatencyCmd { + return &LatencyCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *LatencyCmd) SetVal(val []Latency) { + cmd.val = val +} + +func (cmd *LatencyCmd) Val() []Latency { + return cmd.val +} + +func (cmd *LatencyCmd) Result() ([]Latency, error) { + return cmd.val, cmd.err +} + +func (cmd *LatencyCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *LatencyCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]Latency, n) + for i := 0; i < len(cmd.val); i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return err + } + if nn < 3 { + return fmt.Errorf("redis: got %d elements in latency get, expected at least 3", nn) + } + if cmd.val[i].Name, err = rd.ReadString(); err != nil { + return err + } + createdAt, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Time = time.Unix(createdAt, 0) + latest, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Latest = time.Duration(latest) * time.Millisecond + maximum, err := rd.ReadInt() + if err != nil { + return err + } + cmd.val[i].Max = time.Duration(maximum) * time.Millisecond + } + return nil +} + +//----------------------------------------------------------------------- + type MapStringInterfaceCmd struct { baseCmd diff --git a/command_digest_test.go b/command_digest_test.go new file mode 100644 index 0000000000..6b65b3eb0d --- /dev/null +++ b/command_digest_test.go @@ -0,0 +1,118 @@ +package redis + +import ( + "context" + "fmt" + "testing" + + "github.com/redis/go-redis/v9/internal/proto" +) + +func TestDigestCmd(t *testing.T) { + tests := []struct { + name string + hexStr string + expected uint64 + wantErr bool + }{ + { + name: "zero value", + hexStr: "0", + expected: 0, + wantErr: false, + }, + { + name: "small value", + hexStr: "ff", + expected: 255, + wantErr: false, + }, + { + name: "medium value", + hexStr: "1234abcd", + expected: 0x1234abcd, + wantErr: false, + }, + { + name: "large value", + hexStr: "ffffffffffffffff", + expected: 0xffffffffffffffff, + wantErr: false, + }, + { + name: "uppercase hex", + hexStr: "DEADBEEF", + expected: 0xdeadbeef, + wantErr: false, + }, + { + name: "mixed case hex", + hexStr: "DeAdBeEf", + expected: 0xdeadbeef, + wantErr: false, + }, + { + name: "typical xxh3 hash", + hexStr: "a1b2c3d4e5f67890", + expected: 0xa1b2c3d4e5f67890, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock reader that returns the hex string in RESP format + // Format: $\r\n\r\n + respData := []byte(fmt.Sprintf("$%d\r\n%s\r\n", len(tt.hexStr), tt.hexStr)) + + rd := proto.NewReader(newMockConn(respData)) + + cmd := NewDigestCmd(context.Background(), "digest", "key") + err := cmd.readReply(rd) + + if (err != nil) != tt.wantErr { + t.Errorf("DigestCmd.readReply() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !tt.wantErr && cmd.Val() != tt.expected { + t.Errorf("DigestCmd.Val() = %d (0x%x), want %d (0x%x)", cmd.Val(), cmd.Val(), tt.expected, tt.expected) + } + }) + } +} + +func TestDigestCmdResult(t *testing.T) { + cmd := NewDigestCmd(context.Background(), "digest", "key") + expected := uint64(0xdeadbeefcafebabe) + cmd.SetVal(expected) + + val, err := cmd.Result() + if err != nil { + t.Errorf("DigestCmd.Result() error = %v", err) + } + + if val != expected { + t.Errorf("DigestCmd.Result() = %d (0x%x), want %d (0x%x)", val, val, expected, expected) + } +} + +// mockConn is a simple mock connection for testing +type mockConn struct { + data []byte + pos int +} + +func newMockConn(data []byte) *mockConn { + return &mockConn{data: data} +} + +func (c *mockConn) Read(p []byte) (n int, err error) { + if c.pos >= len(c.data) { + return 0, nil + } + n = copy(p, c.data[c.pos:]) + c.pos += n + return n, nil +} + diff --git a/commands.go b/commands.go index f313728d8f..daee5505e1 100644 --- a/commands.go +++ b/commands.go @@ -211,9 +211,13 @@ type Cmdable interface { ShutdownNoSave(ctx context.Context) *StatusCmd SlaveOf(ctx context.Context, host, port string) *StatusCmd SlowLogGet(ctx context.Context, num int64) *SlowLogCmd + SlowLogLen(ctx context.Context) *IntCmd + SlowLogReset(ctx context.Context) *StatusCmd Time(ctx context.Context) *TimeCmd DebugObject(ctx context.Context, key string) *StringCmd MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd + Latency(ctx context.Context) *LatencyCmd + LatencyReset(ctx context.Context, events ...interface{}) *StatusCmd ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd @@ -673,6 +677,34 @@ func (c cmdable) SlowLogGet(ctx context.Context, num int64) *SlowLogCmd { return cmd } +func (c cmdable) SlowLogLen(ctx context.Context) *IntCmd { + cmd := NewIntCmd(ctx, "slowlog", "len") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) SlowLogReset(ctx context.Context) *StatusCmd { + cmd := NewStatusCmd(ctx, "slowlog", "reset") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) Latency(ctx context.Context) *LatencyCmd { + cmd := NewLatencyCmd(ctx, "latency", "latest") + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) LatencyReset(ctx context.Context, events ...interface{}) *StatusCmd { + args := make([]interface{}, 2+len(events)) + args[0] = "latency" + args[1] = "reset" + copy(args[2:], events) + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) Sync(_ context.Context) { panic("not implemented") } diff --git a/commands_test.go b/commands_test.go index 17b4dd0306..edbae4e7a3 100644 --- a/commands_test.go +++ b/commands_test.go @@ -199,6 +199,29 @@ var _ = Describe("Commands", func() { Expect(r.Val()).To(Equal(int64(0))) }) + It("should ClientKillByFilter with kill myself", func() { + opt := redisOptions() + opt.ClientName = "killmyid" + db := redis.NewClient(opt) + Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred()) + + defer func() { + Expect(db.Close()).NotTo(HaveOccurred()) + }() + val, err := client.ClientList(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).Should(ContainSubstring("name=killmyid")) + + myid := db.ClientID(ctx).Val() + killed := client.ClientKillByFilter(ctx, "ID", strconv.FormatInt(myid, 10)) + Expect(killed.Err()).NotTo(HaveOccurred()) + Expect(killed.Val()).To(BeNumerically("==", 1)) + + val, err = client.ClientList(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).ShouldNot(ContainSubstring("name=killmyid")) + }) + It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() { SkipBeforeRedisVersion(7.4, "doesn't work with older redis stack images") var s []string @@ -1773,6 +1796,200 @@ var _ = Describe("Commands", func() { Expect(get.Err()).To(Equal(redis.Nil)) }) + It("should DelExArgs when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "lock", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete only if value matches + deleted := client.DelExArgs(ctx, "lock", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "token-123", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "lock") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs fail when value does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "lock", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to delete with wrong value + deleted := client.DelExArgs(ctx, "lock", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "wrong-token", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + + // Verify key was NOT deleted + val, err := client.Get(ctx, "lock").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("token-123")) + }) + + It("should DelExArgs on non-existent key", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Try to delete non-existent key + deleted := client.DelExArgs(ctx, "nonexistent", redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "any-value", + }) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + }) + + It("should DelExArgs with IFEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "temp-key", "temp-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete with IFEQ + args := redis.DelExArgs{ + Mode: "IFEQ", + MatchValue: "temp-value", + } + deleted := client.DelExArgs(ctx, "temp-key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "temp-key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs with IFNE", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "temporary", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Delete only if value is NOT "permanent" + args := redis.DelExArgs{ + Mode: "IFNE", + MatchValue: "permanent", + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + + It("should DelExArgs with IFNE fail when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "permanent", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to delete but value matches (should fail) + args := redis.DelExArgs{ + Mode: "IFNE", + MatchValue: "permanent", + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(0))) + + // Verify key was NOT deleted + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("permanent")) + }) + + It("should Digest", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set a value + err := client.Set(ctx, "my-key", "my-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest (returns uint64) + digest := client.Digest(ctx, "my-key") + Expect(digest.Err()).NotTo(HaveOccurred()) + Expect(digest.Val()).NotTo(BeZero()) + + // Digest should be consistent + digest2 := client.Digest(ctx, "my-key") + Expect(digest2.Err()).NotTo(HaveOccurred()) + Expect(digest2.Val()).To(Equal(digest.Val())) + }) + + It("should Digest on non-existent key", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Get digest of non-existent key + digest := client.Digest(ctx, "nonexistent") + Expect(digest.Err()).To(Equal(redis.Nil)) + }) + + It("should use Digest with SetArgs IFDEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest + args := redis.SetArgs{ + Mode: "IFDEQ", + MatchDigest: digest.Val(), + } + result := client.SetArgs(ctx, "key", "value2", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should use Digest with DelExArgs IFDEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Delete using digest + args := redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: digest.Val(), + } + deleted := client.DelExArgs(ctx, "key", args) + Expect(deleted.Err()).NotTo(HaveOccurred()) + Expect(deleted.Val()).To(Equal(int64(1))) + + // Verify key was deleted + get := client.Get(ctx, "key") + Expect(get.Err()).To(Equal(redis.Nil)) + }) + It("should Incr", func() { set := client.Set(ctx, "key", "10", 0) Expect(set.Err()).NotTo(HaveOccurred()) @@ -1912,6 +2129,137 @@ var _ = Describe("Commands", func() { Expect(mSetNX.Val()).To(Equal(true)) }) + It("should MSetEX", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + args := redis.MSetEXArgs{ + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "hello1", "key2", "hello2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + // Verify keys were set + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("hello1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("hello2")) + + // Verify TTL was set + ttl1 := client.TTL(ctx, "key1") + Expect(ttl1.Err()).NotTo(HaveOccurred()) + Expect(ttl1.Val()).To(BeNumerically(">", 0)) + Expect(ttl1.Val()).To(BeNumerically("<=", 1*time.Second)) + + ttl2 := client.TTL(ctx, "key2") + Expect(ttl2.Err()).NotTo(HaveOccurred()) + Expect(ttl2.Val()).To(BeNumerically(">", 0)) + Expect(ttl2.Val()).To(BeNumerically("<=", 1*time.Second)) + }) + + It("should MSetEX with NX mode", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + + client.Set(ctx, "key1", "existing", 0) + + // Try to set with NX mode - should fail because key1 exists + args := redis.MSetEXArgs{ + Condition: redis.NX, + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(0))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("existing")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).To(Equal(redis.Nil)) + + client.Del(ctx, "key1") + + // Now try with NX mode when keys don't exist - should succeed + mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 = client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("new1")) + + val2 = client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("new2")) + }) + + It("should MSetEX with XX mode", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + + args := redis.MSetEXArgs{ + Condition: redis.XX, + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(0))) + + client.Set(ctx, "key1", "existing1", 0) + client.Set(ctx, "key2", "existing2", 0) + + mSetEX = client.MSetEX(ctx, args, "key1", "new1", "key2", "new2") + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("new1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("new2")) + + ttl1 := client.TTL(ctx, "key1") + Expect(ttl1.Err()).NotTo(HaveOccurred()) + Expect(ttl1.Val()).To(BeNumerically(">", 0)) + }) + + It("should MSetEX with map", func() { + SkipBeforeRedisVersion(8.3, "MSetEX is available since redis 8.4") + args := redis.MSetEXArgs{ + Expiration: &redis.ExpirationOption{ + Mode: redis.EX, + Value: 1, + }, + } + mSetEX := client.MSetEX(ctx, args, map[string]interface{}{ + "key1": "value1", + "key2": "value2", + }) + Expect(mSetEX.Err()).NotTo(HaveOccurred()) + Expect(mSetEX.Val()).To(Equal(int64(1))) + + val1 := client.Get(ctx, "key1") + Expect(val1.Err()).NotTo(HaveOccurred()) + Expect(val1.Val()).To(Equal("value1")) + + val2 := client.Get(ctx, "key2") + Expect(val2.Err()).NotTo(HaveOccurred()) + Expect(val2.Val()).To(Equal("value2")) + }) + It("should SetWithArgs with TTL", func() { args := redis.SetArgs{ TTL: 500 * time.Millisecond, @@ -2320,6 +2668,320 @@ var _ = Describe("Commands", func() { Expect(ttl).NotTo(Equal(-1)) }) + It("should SetIFEQ when value matches", func() { + if RedisVersion < 8.4 { + Skip("CAS/CAD commands require Redis >= 8.4") + } + + // Set initial value + err := client.Set(ctx, "key", "old-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update only if current value is "old-value" + result := client.SetIFEQ(ctx, "key", "new-value", "old-value", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new-value")) + }) + + It("should SetIFEQ fail when value does not match", func() { + if RedisVersion < 8.4 { + Skip("CAS/CAD commands require Redis >= 8.4") + } + + // Set initial value + err := client.Set(ctx, "key", "current-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to update with wrong match value + result := client.SetIFEQ(ctx, "key", "new-value", "wrong-value", 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("current-value")) + }) + + It("should SetIFEQ with expiration", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "token-123", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with expiration + result := client.SetIFEQ(ctx, "key", "token-456", "token-123", 500*time.Millisecond) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("token-456")) + + // Wait for expiration + Eventually(func() error { + return client.Get(ctx, "key").Err() + }, "1s", "100ms").Should(Equal(redis.Nil)) + }) + + It("should SetIFNE when value does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update only if current value is NOT "completed" + result := client.SetIFNE(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFNE fail when value matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "completed", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Try to update but value matches (should fail) + result := client.SetIFNE(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("completed")) + }) + + It("should SetArgs with IFEQ", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "counter", "100", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFEQ + args := redis.SetArgs{ + Mode: "IFEQ", + MatchValue: "100", + TTL: 1 * time.Hour, + } + result := client.SetArgs(ctx, "counter", "200", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "counter").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("200")) + }) + + It("should SetArgs with IFEQ and GET", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "old", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFEQ and GET old value + args := redis.SetArgs{ + Mode: "IFEQ", + MatchValue: "old", + Get: true, + } + result := client.SetArgs(ctx, "key", "new", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("old")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new")) + }) + + It("should SetArgs with IFNE", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "status", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update with IFNE + args := redis.SetArgs{ + Mode: "IFNE", + MatchValue: "completed", + TTL: 30 * time.Minute, + } + result := client.SetArgs(ctx, "status", "processing", args) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "status").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFEQGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "old-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update and get previous value + result := client.SetIFEQGet(ctx, "key", "new-value", "old-value", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("old-value")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("new-value")) + }) + + It("should SetIFNEGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "pending", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Update and get previous value + result := client.SetIFNEGet(ctx, "key", "processing", "completed", 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("pending")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("processing")) + }) + + It("should SetIFDEQ when digest matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest + result := client.SetIFDEQ(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDEQ fail when digest does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest of a different value to use as wrong digest + err = client.Set(ctx, "temp-key", "different-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + wrongDigest := client.Digest(ctx, "temp-key") + Expect(wrongDigest.Err()).NotTo(HaveOccurred()) + + // Try to update with wrong digest + result := client.SetIFDEQ(ctx, "key", "value2", wrongDigest.Val(), 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + }) + + It("should SetIFDEQGet return previous value", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Update using digest and get previous value + result := client.SetIFDEQGet(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("value1")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDNE when digest does not match", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest of a different value + err = client.Set(ctx, "temp-key", "different-value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + differentDigest := client.Digest(ctx, "temp-key") + Expect(differentDigest.Err()).NotTo(HaveOccurred()) + + // Update with different digest (should succeed because digest doesn't match) + result := client.SetIFDNE(ctx, "key", "value2", differentDigest.Val(), 0) + Expect(result.Err()).NotTo(HaveOccurred()) + Expect(result.Val()).To(Equal("OK")) + + // Verify value was updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should SetIFDNE fail when digest matches", func() { + SkipBeforeRedisVersion(8.4, "CAS/CAD commands require Redis >= 8.4") + + // Set initial value + err := client.Set(ctx, "key", "value1", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + // Get digest + digest := client.Digest(ctx, "key") + Expect(digest.Err()).NotTo(HaveOccurred()) + + // Try to update but digest matches (should fail) + result := client.SetIFDNE(ctx, "key", "value2", digest.Val(), 0) + Expect(result.Err()).To(Equal(redis.Nil)) + + // Verify value was NOT updated + val, err := client.Get(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + }) + It("should SetRange", func() { set := client.Set(ctx, "key", "Hello World", 0) Expect(set.Err()).NotTo(HaveOccurred()) @@ -6749,6 +7411,242 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(2))) }) + + It("should XReadGroup with CLAIM argument", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer2", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + messages := res[0].Messages + Expect(len(messages)).To(BeNumerically(">=", 1)) + + for _, msg := range messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50)) + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + }) + + It("should XReadGroup with CLAIM and COUNT", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer3", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + Count: 2, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + Expect(len(res[0].Messages)).To(BeNumerically("<=", 2)) + } + }) + + It("should XReadGroup with CLAIM and NOACK", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer4", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + NoAck: true, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream")) + } + }) + + It("should XReadGroup CLAIM empties PEL after acknowledgment", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer5", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + ids := make([]string, len(res[0].Messages)) + for i, msg := range res[0].Messages { + ids[i] = msg.ID + } + + n, err := client.XAck(ctx, "stream", "group", ids...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 1)) + + pending, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pending.Count).To(BeNumerically("<", 3)) + } + }) + + It("should XReadGroup backward compatibility without CLAIM", func() { + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer_compat", + Streams: []string{"stream", "0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + for _, msg := range res[0].Messages { + Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0))) + Expect(msg.DeliveredCount).To(Equal(int64(0))) + } + }) + + It("should XReadGroup CLAIM with multiple streams", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "1-0", + Values: map[string]interface{}{"field1": "value1"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("1-0")) + + id, err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "2-0", + Values: map[string]interface{}{"field2": "value2"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("2-0")) + + err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer1", + Streams: []string{"stream2", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer2", + Streams: []string{"stream2", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream2")) + if len(res[0].Messages) > 0 { + for _, msg := range res[0].Messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + } + } + }) + + It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + streamName := "stream-resp-test" + err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field1": "value1"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field2": "value2"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + groupName := "resp-test-group" + err = client.XGroupCreate(ctx, streamName, groupName, "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer1", + Streams: []string{streamName, ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + // Test with RESP2 (protocol 2) + resp2Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 2, + }) + defer resp2Client.Close() + + resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer2", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp2Result).To(HaveLen(1)) + + // Test with RESP3 (protocol 3) + resp3Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 3, + }) + defer resp3Client.Close() + + resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer3", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp3Result).To(HaveLen(1)) + + Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages))) + + for i := range resp2Result[0].Messages { + msg2 := resp2Result[0].Messages[i] + msg3 := resp3Result[0].Messages[i] + + Expect(msg2.ID).To(Equal(msg3.ID)) + + if msg2.MillisElapsedFromDelivery > 0 { + Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0)) + Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount)) + } + } + }) }) Describe("xinfo", func() { @@ -7904,7 +8802,7 @@ var _ = Describe("Commands", func() { }) }) - Describe("SlowLogGet", func() { + Describe("SlowLog", func() { It("returns slow query result", func() { const key = "slowlog-log-slower-than" @@ -7921,6 +8819,114 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(len(result)).NotTo(BeZero()) }) + + It("returns the number of slow queries", Label("NonRedisEnterprise"), func() { + // Reset slowlog + err := client.SlowLogReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + const key = "slowlog-log-slower-than" + + old := client.ConfigGet(ctx, key).Val() + // first slowlog entry is the config set command itself + client.ConfigSet(ctx, key, "0") + defer client.ConfigSet(ctx, key, old[key]) + + // Set a key to trigger a slow query, and this is the second slowlog entry + client.Set(ctx, "test", "true", 0) + result, err := client.SlowLogLen(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).Should(Equal(int64(2))) + + // Reset slowlog + err = client.SlowLogReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + // Check if slowlog is empty, this is the first slowlog entry after reset + result, err = client.SlowLogLen(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).Should(Equal(int64(1))) + }) + }) + + Describe("Latency", Label("NonRedisEnterprise"), func() { + It("returns latencies", func() { + const key = "latency-monitor-threshold" + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + err := client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).NotTo(BeZero()) + }) + + It("reset all latencies", func() { + const key = "latency-monitor-threshold" + + result, err := client.Latency(ctx).Result() + // reset all latencies + err = client.LatencyReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + // get latency after reset + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + + // create a new latency + err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + // get latency after create a new latency + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(1)) + + // reset all latencies again + err = client.LatencyReset(ctx).Err() + Expect(err).NotTo(HaveOccurred()) + + // get latency after reset again + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + }) + + It("reset latencies by add event name args", func() { + const key = "latency-monitor-threshold" + + old := client.ConfigGet(ctx, key).Val() + client.ConfigSet(ctx, key, "1") + defer client.ConfigSet(ctx, key, old[key]) + + result, err := client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + + err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(1)) + + // reset latency by event name + err = client.LatencyReset(ctx, result[0].Name).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.Latency(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).Should(Equal(0)) + }) }) }) diff --git a/digest_test.go b/digest_test.go new file mode 100644 index 0000000000..b9d91979b1 --- /dev/null +++ b/digest_test.go @@ -0,0 +1,265 @@ +package redis_test + +import ( + "context" + "os" + "strconv" + "strings" + "testing" + + "github.com/redis/go-redis/v9" +) + +func init() { + // Initialize RedisVersion from environment variable for regular Go tests + // (Ginkgo tests initialize this in BeforeSuite) + if version := os.Getenv("REDIS_VERSION"); version != "" { + if v, err := strconv.ParseFloat(strings.Trim(version, "\""), 64); err == nil && v > 0 { + RedisVersion = v + } + } +} + +// skipIfRedisBelow84 checks if Redis version is below 8.4 and skips the test if so +func skipIfRedisBelow84(t *testing.T) { + if RedisVersion < 8.4 { + t.Skipf("Skipping test: Redis version %.1f < 8.4 (DIGEST command requires Redis 8.4+)", RedisVersion) + } +} + +// TestDigestBasic validates that the Digest command returns a uint64 value +func TestDigestBasic(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "digest-test-key") + + // Set a value + err := client.Set(ctx, "digest-test-key", "testvalue", 0).Err() + if err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + // Get digest + digestCmd := client.Digest(ctx, "digest-test-key") + if err := digestCmd.Err(); err != nil { + t.Fatalf("Failed to get digest: %v", err) + } + + digest := digestCmd.Val() + if digest == 0 { + t.Error("Digest should not be zero for non-empty value") + } + + t.Logf("Digest for 'testvalue': %d (0x%016x)", digest, digest) + + // Verify same value produces same digest + digest2 := client.Digest(ctx, "digest-test-key").Val() + if digest != digest2 { + t.Errorf("Same value should produce same digest: %d != %d", digest, digest2) + } + + client.Del(ctx, "digest-test-key") +} + +// TestSetIFDEQWithDigest validates the SetIFDEQ command works with digests +func TestSetIFDEQWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "cas-test-key") + + // Set initial value + initialValue := "initial-value" + err := client.Set(ctx, "cas-test-key", initialValue, 0).Err() + if err != nil { + t.Fatalf("Failed to set initial value: %v", err) + } + + // Get current digest + correctDigest := client.Digest(ctx, "cas-test-key").Val() + wrongDigest := uint64(12345) // arbitrary wrong digest + + // Test 1: SetIFDEQ with correct digest should succeed + result := client.SetIFDEQ(ctx, "cas-test-key", "new-value", correctDigest, 0) + if err := result.Err(); err != nil { + t.Errorf("SetIFDEQ with correct digest failed: %v", err) + } else { + t.Logf("✓ SetIFDEQ with correct digest succeeded") + } + + // Verify value was updated + val, err := client.Get(ctx, "cas-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value not updated: got %q, want %q", val, "new-value") + } + + // Test 2: SetIFDEQ with wrong digest should fail + result = client.SetIFDEQ(ctx, "cas-test-key", "another-value", wrongDigest, 0) + if result.Err() != redis.Nil { + t.Errorf("SetIFDEQ with wrong digest should return redis.Nil, got: %v", result.Err()) + } else { + t.Logf("✓ SetIFDEQ with wrong digest correctly failed") + } + + // Verify value was NOT updated + val, err = client.Get(ctx, "cas-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value should not have changed: got %q, want %q", val, "new-value") + } + + client.Del(ctx, "cas-test-key") +} + +// TestSetIFDNEWithDigest validates the SetIFDNE command works with digests +func TestSetIFDNEWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "cad-test-key") + + // Set initial value + initialValue := "initial-value" + err := client.Set(ctx, "cad-test-key", initialValue, 0).Err() + if err != nil { + t.Fatalf("Failed to set initial value: %v", err) + } + + // Use an arbitrary different digest + wrongDigest := uint64(99999) // arbitrary different digest + + // Test 1: SetIFDNE with different digest should succeed + result := client.SetIFDNE(ctx, "cad-test-key", "new-value", wrongDigest, 0) + if err := result.Err(); err != nil { + t.Errorf("SetIFDNE with different digest failed: %v", err) + } else { + t.Logf("✓ SetIFDNE with different digest succeeded") + } + + // Verify value was updated + val, err := client.Get(ctx, "cad-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value not updated: got %q, want %q", val, "new-value") + } + + // Test 2: SetIFDNE with matching digest should fail + newDigest := client.Digest(ctx, "cad-test-key").Val() + result = client.SetIFDNE(ctx, "cad-test-key", "another-value", newDigest, 0) + if result.Err() != redis.Nil { + t.Errorf("SetIFDNE with matching digest should return redis.Nil, got: %v", result.Err()) + } else { + t.Logf("✓ SetIFDNE with matching digest correctly failed") + } + + // Verify value was NOT updated + val, err = client.Get(ctx, "cad-test-key").Result() + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if val != "new-value" { + t.Errorf("Value should not have changed: got %q, want %q", val, "new-value") + } + + client.Del(ctx, "cad-test-key") +} + +// TestDelExArgsWithDigest validates DelExArgs works with digest matching +func TestDelExArgsWithDigest(t *testing.T) { + skipIfRedisBelow84(t) + + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + client.Del(ctx, "del-test-key") + + // Set a value + value := "delete-me" + err := client.Set(ctx, "del-test-key", value, 0).Err() + if err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + // Get correct digest + correctDigest := client.Digest(ctx, "del-test-key").Val() + wrongDigest := uint64(54321) + + // Test 1: Delete with wrong digest should fail + deleted := client.DelExArgs(ctx, "del-test-key", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: wrongDigest, + }).Val() + + if deleted != 0 { + t.Errorf("Delete with wrong digest should not delete: got %d deletions", deleted) + } else { + t.Logf("✓ DelExArgs with wrong digest correctly refused to delete") + } + + // Verify key still exists + exists := client.Exists(ctx, "del-test-key").Val() + if exists != 1 { + t.Errorf("Key should still exist after failed delete") + } + + // Test 2: Delete with correct digest should succeed + deleted = client.DelExArgs(ctx, "del-test-key", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: correctDigest, + }).Val() + + if deleted != 1 { + t.Errorf("Delete with correct digest should delete: got %d deletions", deleted) + } else { + t.Logf("✓ DelExArgs with correct digest successfully deleted") + } + + // Verify key was deleted + exists = client.Exists(ctx, "del-test-key").Val() + if exists != 0 { + t.Errorf("Key should not exist after successful delete") + } +} + diff --git a/doctests/stream_tutorial_test.go b/doctests/stream_tutorial_test.go index e39919ea62..655021faaa 100644 --- a/doctests/stream_tutorial_test.go +++ b/doctests/stream_tutorial_test.go @@ -216,8 +216,8 @@ func ExampleClient_racefrance1() { // REMOVE_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}] // 4 } @@ -467,13 +467,13 @@ func ExampleClient_racefrance2() { // STEP_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] // [] - // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}] + // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}] } func ExampleClient_xgroupcreate() { @@ -999,18 +999,18 @@ func ExampleClient_raceitaly() { // REMOVE_END // Output: - // [{race:italy [{1692632639151-0 map[rider:Castilla]}]}] + // [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}] // 1 // [{race:italy []}] - // [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}] + // [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}] // &{2 1692632647899-0 1692632662819-0 map[Bob:2]} - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] // 1692632662819-0 // [] // 0-0 - // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0} + // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0} // [{italy_riders 3 2 1692632662819-0 3 2}] // 2 // 0 @@ -1085,7 +1085,7 @@ func ExampleClient_xdel() { // STEP_END // Output: - // [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}] + // [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}] // 1 - // [{1692633198206-0 map[rider:Wood]}] + // [{1692633198206-0 map[rider:Wood] 0 0}] } diff --git a/example/digest-optimistic-locking/README.md b/example/digest-optimistic-locking/README.md new file mode 100644 index 0000000000..37fa1e8e8f --- /dev/null +++ b/example/digest-optimistic-locking/README.md @@ -0,0 +1,200 @@ +# Redis Digest & Optimistic Locking Example + +This example demonstrates how to use Redis DIGEST command and digest-based optimistic locking with go-redis. + +## What is Redis DIGEST? + +The DIGEST command (Redis 8.4+) returns a 64-bit xxh3 hash of a key's value. This hash can be used for: + +- **Optimistic locking**: Update values only if they haven't changed +- **Change detection**: Detect if a value was modified +- **Conditional operations**: Delete or update based on expected content + +## Features Demonstrated + +1. **Basic Digest Usage**: Get digest from Redis and verify with client-side calculation +2. **Optimistic Locking with SetIFDEQ**: Update only if digest matches (value unchanged) +3. **Change Detection with SetIFDNE**: Update only if digest differs (value changed) +4. **Conditional Delete**: Delete only if digest matches expected value +5. **Client-Side Digest Generation**: Calculate digests without fetching from Redis + +## Requirements + +- Redis 8.4+ (for DIGEST command support) +- Go 1.18+ + +## Installation + +```bash +cd example/digest-optimistic-locking +go mod tidy +``` + +## Running the Example + +```bash +# Make sure Redis 8.4+ is running on localhost:6379 +redis-server + +# In another terminal, run the example +go run . +``` + +## Expected Output + +``` +=== Redis Digest & Optimistic Locking Example === + +1. Basic Digest Usage +--------------------- +Key: user:1000:name +Value: Alice +Digest: 7234567890123456789 (0x6478a1b2c3d4e5f6) +Client-calculated digest: 7234567890123456789 (0x6478a1b2c3d4e5f6) +✓ Digests match! + +2. Optimistic Locking with SetIFDEQ +------------------------------------ +Initial value: 100 +Current digest: 0x1234567890abcdef +✓ Update successful! New value: 150 +✓ Correctly rejected update with wrong digest + +3. Detecting Changes with SetIFDNE +----------------------------------- +Initial value: v1.0.0 +Old digest: 0xabcdef1234567890 +✓ Value changed! Updated to: v2.0.0 +✓ Correctly rejected: current value matches the digest + +4. Conditional Delete with DelExArgs +------------------------------------- +Created session: session:abc123 +Expected digest: 0x9876543210fedcba +✓ Correctly refused to delete (wrong digest) +✓ Successfully deleted with correct digest +✓ Session deleted + +5. Client-Side Digest Generation +--------------------------------- +Current price: $29.99 +Expected digest (calculated client-side): 0xfedcba0987654321 +✓ Price updated successfully to $24.99 + +Binary data example: +Binary data digest: 0x1122334455667788 +✓ Binary digest matches! + +=== All examples completed successfully! === +``` + +## How It Works + +### Digest Calculation + +Redis uses the **xxh3** hashing algorithm. To calculate digests client-side, use `github.com/zeebo/xxh3`: + +```go +import "github.com/zeebo/xxh3" + +// For strings +digest := xxh3.HashString("myvalue") + +// For binary data +digest := xxh3.Hash([]byte{0x01, 0x02, 0x03}) +``` + +### Optimistic Locking Pattern + +```go +// 1. Read current value and get its digest +currentValue := rdb.Get(ctx, "key").Val() +currentDigest := rdb.Digest(ctx, "key").Val() + +// 2. Perform business logic +newValue := processValue(currentValue) + +// 3. Update only if value hasn't changed +result := rdb.SetIFDEQ(ctx, "key", newValue, currentDigest, 0) +if result.Err() == redis.Nil { + // Value was modified by another client - retry or handle conflict +} +``` + +### Client-Side Digest (No Extra Round Trip) + +```go +// If you know the expected current value, calculate digest client-side +expectedValue := "100" +expectedDigest := xxh3.HashString(expectedValue) + +// Update without fetching digest from Redis first +result := rdb.SetIFDEQ(ctx, "counter", "150", expectedDigest, 0) +``` + +## Use Cases + +### 1. Distributed Counter with Conflict Detection + +```go +// Multiple clients can safely update a counter +currentValue := rdb.Get(ctx, "counter").Val() +currentDigest := rdb.Digest(ctx, "counter").Val() + +newValue := incrementCounter(currentValue) + +// Only succeeds if no other client modified it +if rdb.SetIFDEQ(ctx, "counter", newValue, currentDigest, 0).Err() == redis.Nil { + // Retry with new value +} +``` + +### 2. Session Management + +```go +// Delete session only if it contains expected data +sessionData := "user:1234:active" +expectedDigest := xxh3.HashString(sessionData) + +deleted := rdb.DelExArgs(ctx, "session:xyz", redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: expectedDigest, +}).Val() +``` + +### 3. Configuration Updates + +```go +// Update config only if it changed +oldConfig := loadOldConfig() +oldDigest := xxh3.HashString(oldConfig) + +newConfig := loadNewConfig() + +// Only update if config actually changed +result := rdb.SetIFDNE(ctx, "config", newConfig, oldDigest, 0) +if result.Err() != redis.Nil { + fmt.Println("Config updated!") +} +``` + +## Advantages Over WATCH/MULTI/EXEC + +- **Simpler**: Single command instead of transaction +- **Faster**: No transaction overhead +- **Client-side digest**: Can calculate expected digest without fetching from Redis +- **Works with any command**: Not limited to transactions + +## Learn More + +- [Redis DIGEST command](https://redis.io/commands/digest/) +- [Redis SET command with IFDEQ/IFDNE](https://redis.io/commands/set/) +- [xxh3 hashing algorithm](https://github.com/Cyan4973/xxHash) +- [github.com/zeebo/xxh3](https://github.com/zeebo/xxh3) + +## Comparison: XXH3 vs XXH64 + +**Note**: Redis uses **XXH3**, not XXH64. If you have `github.com/cespare/xxhash/v2` in your project, it implements XXH64 which produces **different hash values**. You must use `github.com/zeebo/xxh3` for Redis DIGEST operations. + +See [XXHASH_LIBRARY_COMPARISON.md](../../XXHASH_LIBRARY_COMPARISON.md) for detailed comparison. + diff --git a/example/digest-optimistic-locking/go.mod b/example/digest-optimistic-locking/go.mod new file mode 100644 index 0000000000..d27d92020a --- /dev/null +++ b/example/digest-optimistic-locking/go.mod @@ -0,0 +1,16 @@ +module github.com/redis/go-redis/example/digest-optimistic-locking + +go 1.18 + +replace github.com/redis/go-redis/v9 => ../.. + +require ( + github.com/redis/go-redis/v9 v9.16.0 + github.com/zeebo/xxh3 v1.0.2 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect +) diff --git a/example/digest-optimistic-locking/go.sum b/example/digest-optimistic-locking/go.sum new file mode 100644 index 0000000000..1efe9a309b --- /dev/null +++ b/example/digest-optimistic-locking/go.sum @@ -0,0 +1,11 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= diff --git a/example/digest-optimistic-locking/main.go b/example/digest-optimistic-locking/main.go new file mode 100644 index 0000000000..2b380fc18a --- /dev/null +++ b/example/digest-optimistic-locking/main.go @@ -0,0 +1,245 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" + "github.com/zeebo/xxh3" +) + +func main() { + ctx := context.Background() + + // Connect to Redis + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer rdb.Close() + + // Ping to verify connection + if err := rdb.Ping(ctx).Err(); err != nil { + fmt.Printf("Failed to connect to Redis: %v\n", err) + return + } + + fmt.Println("=== Redis Digest & Optimistic Locking Example ===") + fmt.Println() + + // Example 1: Basic Digest Usage + fmt.Println("1. Basic Digest Usage") + fmt.Println("---------------------") + basicDigestExample(ctx, rdb) + fmt.Println() + + // Example 2: Optimistic Locking with SetIFDEQ + fmt.Println("2. Optimistic Locking with SetIFDEQ") + fmt.Println("------------------------------------") + optimisticLockingExample(ctx, rdb) + fmt.Println() + + // Example 3: Detecting Changes with SetIFDNE + fmt.Println("3. Detecting Changes with SetIFDNE") + fmt.Println("-----------------------------------") + detectChangesExample(ctx, rdb) + fmt.Println() + + // Example 4: Conditional Delete with DelExArgs + fmt.Println("4. Conditional Delete with DelExArgs") + fmt.Println("-------------------------------------") + conditionalDeleteExample(ctx, rdb) + fmt.Println() + + // Example 5: Client-Side Digest Generation + fmt.Println("5. Client-Side Digest Generation") + fmt.Println("---------------------------------") + clientSideDigestExample(ctx, rdb) + fmt.Println() + + fmt.Println("=== All examples completed successfully! ===") +} + +// basicDigestExample demonstrates getting a digest from Redis +func basicDigestExample(ctx context.Context, rdb *redis.Client) { + // Set a value + key := "user:1000:name" + value := "Alice" + rdb.Set(ctx, key, value, 0) + + // Get the digest + digest := rdb.Digest(ctx, key).Val() + + fmt.Printf("Key: %s\n", key) + fmt.Printf("Value: %s\n", value) + fmt.Printf("Digest: %d (0x%016x)\n", digest, digest) + + // Verify with client-side calculation + clientDigest := xxh3.HashString(value) + fmt.Printf("Client-calculated digest: %d (0x%016x)\n", clientDigest, clientDigest) + + if digest == clientDigest { + fmt.Println("✓ Digests match!") + } +} + +// optimisticLockingExample demonstrates using SetIFDEQ for optimistic locking +func optimisticLockingExample(ctx context.Context, rdb *redis.Client) { + key := "counter" + + // Initial value + rdb.Set(ctx, key, "100", 0) + fmt.Printf("Initial value: %s\n", rdb.Get(ctx, key).Val()) + + // Get current digest + currentDigest := rdb.Digest(ctx, key).Val() + fmt.Printf("Current digest: 0x%016x\n", currentDigest) + + // Simulate some processing time + time.Sleep(100 * time.Millisecond) + + // Try to update only if value hasn't changed (digest matches) + newValue := "150" + result := rdb.SetIFDEQ(ctx, key, newValue, currentDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Update failed: value was modified by another client") + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Update successful! New value: %s\n", rdb.Get(ctx, key).Val()) + } + + // Try again with wrong digest (simulating concurrent modification) + wrongDigest := uint64(12345) + result = rdb.SetIFDEQ(ctx, key, "200", wrongDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✓ Correctly rejected update with wrong digest") + } +} + +// detectChangesExample demonstrates using SetIFDNE to detect if a value changed +func detectChangesExample(ctx context.Context, rdb *redis.Client) { + key := "config:version" + + // Set initial value + oldValue := "v1.0.0" + rdb.Set(ctx, key, oldValue, 0) + fmt.Printf("Initial value: %s\n", oldValue) + + // Calculate digest of a DIFFERENT value (what we expect it NOT to be) + unwantedValue := "v0.9.0" + unwantedDigest := xxh3.HashString(unwantedValue) + fmt.Printf("Unwanted value digest: 0x%016x\n", unwantedDigest) + + // Update to new value only if current value is NOT the unwanted value + // (i.e., only if digest does NOT match unwantedDigest) + newValue := "v2.0.0" + result := rdb.SetIFDNE(ctx, key, newValue, unwantedDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Current value matches unwanted value (digest matches)") + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Current value is different from unwanted value! Updated to: %s\n", rdb.Get(ctx, key).Val()) + } + + // Try to update again, but this time the digest matches current value (should fail) + currentDigest := rdb.Digest(ctx, key).Val() + result = rdb.SetIFDNE(ctx, key, "v3.0.0", currentDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✓ Correctly rejected: current value matches the digest (IFDNE failed)") + } +} + +// conditionalDeleteExample demonstrates using DelExArgs with digest +func conditionalDeleteExample(ctx context.Context, rdb *redis.Client) { + key := "session:abc123" + value := "user_data_here" + + // Set a value + rdb.Set(ctx, key, value, 0) + fmt.Printf("Created session: %s\n", key) + + // Calculate expected digest + expectedDigest := xxh3.HashString(value) + fmt.Printf("Expected digest: 0x%016x\n", expectedDigest) + + // Try to delete with wrong digest (should fail) + wrongDigest := uint64(99999) + deleted := rdb.DelExArgs(ctx, key, redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: wrongDigest, + }).Val() + + if deleted == 0 { + fmt.Println("✓ Correctly refused to delete (wrong digest)") + } + + // Delete with correct digest (should succeed) + deleted = rdb.DelExArgs(ctx, key, redis.DelExArgs{ + Mode: "IFDEQ", + MatchDigest: expectedDigest, + }).Val() + + if deleted == 1 { + fmt.Println("✓ Successfully deleted with correct digest") + } + + // Verify deletion + exists := rdb.Exists(ctx, key).Val() + if exists == 0 { + fmt.Println("✓ Session deleted") + } +} + +// clientSideDigestExample demonstrates calculating digests without fetching from Redis +func clientSideDigestExample(ctx context.Context, rdb *redis.Client) { + key := "product:1001:price" + + // Scenario: We know the expected current value + expectedCurrentValue := "29.99" + newValue := "24.99" + + // Set initial value + rdb.Set(ctx, key, expectedCurrentValue, 0) + fmt.Printf("Current price: $%s\n", expectedCurrentValue) + + // Calculate digest client-side (no need to fetch from Redis!) + expectedDigest := xxh3.HashString(expectedCurrentValue) + fmt.Printf("Expected digest (calculated client-side): 0x%016x\n", expectedDigest) + + // Update price only if it matches our expectation + result := rdb.SetIFDEQ(ctx, key, newValue, expectedDigest, 0) + + if result.Err() == redis.Nil { + fmt.Println("✗ Price was already changed by someone else") + actualValue := rdb.Get(ctx, key).Val() + fmt.Printf(" Actual current price: $%s\n", actualValue) + } else if result.Err() != nil { + fmt.Printf("✗ Error: %v\n", result.Err()) + } else { + fmt.Printf("✓ Price updated successfully to $%s\n", newValue) + } + + // Demonstrate with binary data + fmt.Println("\nBinary data example:") + binaryKey := "image:thumbnail" + binaryData := []byte{0xFF, 0xD8, 0xFF, 0xE0} // JPEG header + + rdb.Set(ctx, binaryKey, binaryData, 0) + + // Calculate digest for binary data + binaryDigest := xxh3.Hash(binaryData) + fmt.Printf("Binary data digest: 0x%016x\n", binaryDigest) + + // Verify it matches Redis + redisDigest := rdb.Digest(ctx, binaryKey).Val() + if binaryDigest == redisDigest { + fmt.Println("✓ Binary digest matches!") + } +} + diff --git a/export_test.go b/export_test.go index c1b77683f3..97b6179a44 100644 --- a/export_test.go +++ b/export_test.go @@ -106,3 +106,7 @@ func (c *ModuleLoadexConfig) ToArgs() []interface{} { func ShouldRetry(err error, retryTimeout bool) bool { return shouldRetry(err, retryTimeout) } + +func JoinErrors(errs []error) string { + return joinErrors(errs) +} diff --git a/extra/rediscensus/go.mod b/extra/rediscensus/go.mod index d4272c977b..21271aedf9 100644 --- a/extra/rediscensus/go.mod +++ b/extra/rediscensus/go.mod @@ -22,4 +22,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/rediscmd/go.mod b/extra/rediscmd/go.mod index d8c03b6af4..56d6408016 100644 --- a/extra/rediscmd/go.mod +++ b/extra/rediscmd/go.mod @@ -19,4 +19,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/redisotel/go.mod b/extra/redisotel/go.mod index 23cec11a2d..ade870c568 100644 --- a/extra/redisotel/go.mod +++ b/extra/redisotel/go.mod @@ -27,4 +27,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/extra/redisprometheus/go.mod b/extra/redisprometheus/go.mod index fd4e2d93ee..d704f9a7c4 100644 --- a/extra/redisprometheus/go.mod +++ b/extra/redisprometheus/go.mod @@ -26,4 +26,3 @@ retract ( v9.7.2 // This version was accidentally released. Please use version 9.7.3 instead. v9.5.3 // This version was accidentally released. Please use version 9.6.0 instead. ) - diff --git a/search_commands.go b/search_commands.go index f0ca1bfede..3a3179f4ed 100644 --- a/search_commands.go +++ b/search_commands.go @@ -29,6 +29,8 @@ type SearchCmdable interface { FTDropIndexWithArgs(ctx context.Context, index string, options *FTDropIndexOptions) *StatusCmd FTExplain(ctx context.Context, index string, query string) *StringCmd FTExplainWithArgs(ctx context.Context, index string, query string, options *FTExplainOptions) *StringCmd + FTHybrid(ctx context.Context, index string, searchExpr string, vectorField string, vectorData Vector) *FTHybridCmd + FTHybridWithArgs(ctx context.Context, index string, options *FTHybridOptions) *FTHybridCmd FTInfo(ctx context.Context, index string) *FTInfoCmd FTSpellCheck(ctx context.Context, index string, query string) *FTSpellCheckCmd FTSpellCheckWithArgs(ctx context.Context, index string, query string, options *FTSpellCheckOptions) *FTSpellCheckCmd @@ -344,6 +346,85 @@ type FTSearchOptions struct { DialectVersion int } +// FTHybridCombineMethod represents the fusion method for combining search and vector results +type FTHybridCombineMethod string + +const ( + FTHybridCombineRRF FTHybridCombineMethod = "RRF" + FTHybridCombineLinear FTHybridCombineMethod = "LINEAR" + FTHybridCombineFunction FTHybridCombineMethod = "FUNCTION" +) + +// FTHybridSearchExpression represents a search expression in hybrid search +type FTHybridSearchExpression struct { + Query string + Scorer string + ScorerParams []interface{} + YieldScoreAs string +} + +// FTHybridVectorExpression represents a vector expression in hybrid search +type FTHybridVectorExpression struct { + VectorField string + VectorData Vector + Method string // KNN or RANGE + MethodParams []interface{} + Filter string + YieldScoreAs string +} + +// FTHybridCombineOptions represents options for result fusion +type FTHybridCombineOptions struct { + Method FTHybridCombineMethod + Count int + Window int // For RRF + Constant float64 // For RRF + Alpha float64 // For LINEAR + Beta float64 // For LINEAR + YieldScoreAs string +} + +// FTHybridGroupBy represents GROUP BY functionality +type FTHybridGroupBy struct { + Count int + Fields []string + ReduceFunc string + ReduceCount int + ReduceParams []interface{} +} + +// FTHybridApply represents APPLY functionality +type FTHybridApply struct { + Expression string + AsField string +} + +// FTHybridWithCursor represents cursor configuration for hybrid search +type FTHybridWithCursor struct { + Count int // Number of results to return per cursor read + MaxIdle int // Maximum idle time in milliseconds before cursor is automatically deleted +} + +// FTHybridOptions hold options that can be passed to the FT.HYBRID command +type FTHybridOptions struct { + CountExpressions int // Number of search/vector expressions + SearchExpressions []FTHybridSearchExpression // Multiple search expressions + VectorExpressions []FTHybridVectorExpression // Multiple vector expressions + Combine *FTHybridCombineOptions // Fusion step options + Load []string // Projected fields + GroupBy *FTHybridGroupBy // Aggregation grouping + Apply []FTHybridApply // Field transformations + SortBy []FTSearchSortBy // Reuse from FTSearch + Filter string // Post-filter expression + LimitOffset int // Result limiting + Limit int + Params map[string]interface{} // Parameter substitution + ExplainScore bool // Include score explanations + Timeout int // Runtime timeout + WithCursor bool // Enable cursor support for large result sets + WithCursorOptions *FTHybridWithCursor // Cursor configuration options +} + type FTSynDumpResult struct { Term string Synonyms []string @@ -1819,6 +1900,66 @@ func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) { return nil } +// FTHybridResult represents the result of a hybrid search operation +type FTHybridResult = FTSearchResult + +type FTHybridCmd struct { + baseCmd + val FTHybridResult + options *FTHybridOptions +} + +func newFTHybridCmd(ctx context.Context, options *FTHybridOptions, args ...interface{}) *FTHybridCmd { + return &FTHybridCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + options: options, + } +} + +func (cmd *FTHybridCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *FTHybridCmd) SetVal(val FTHybridResult) { + cmd.val = val +} + +func (cmd *FTHybridCmd) Result() (FTHybridResult, error) { + return cmd.val, cmd.err +} + +func (cmd *FTHybridCmd) Val() FTHybridResult { + return cmd.val +} + +func (cmd *FTHybridCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTHybridCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + +func (cmd *FTHybridCmd) readReply(rd *proto.Reader) (err error) { + data, err := rd.ReadSlice() + if err != nil { + return err + } + // Parse hybrid search results similarly to FT.SEARCH + // We can reuse the FTSearch parser since the result format should be similar + searchResult, err := parseFTSearch(data, false, true, false, false) + if err != nil { + return err + } + + // FTSearchResult and FTHybridResult are aliases + cmd.val = searchResult + return nil +} + // FTSearch - Executes a search query on an index. // The 'index' parameter specifies the index to search, and the 'query' parameter specifies the search query. // For more information, please refer to the Redis documentation about [FT.SEARCH]. @@ -2191,3 +2332,184 @@ func (c cmdable) FTTagVals(ctx context.Context, index string, field string) *Str _ = c(ctx, cmd) return cmd } + +// FTHybrid - Executes a hybrid search combining full-text search and vector similarity +// The 'index' parameter specifies the index to search, 'searchExpr' is the search query, +// 'vectorField' is the name of the vector field, and 'vectorData' is the vector to search with. +// FTHybrid is still experimental, the command behaviour and signature may change +func (c cmdable) FTHybrid(ctx context.Context, index string, searchExpr string, vectorField string, vectorData Vector) *FTHybridCmd { + options := &FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []FTHybridSearchExpression{ + {Query: searchExpr}, + }, + VectorExpressions: []FTHybridVectorExpression{ + {VectorField: vectorField, VectorData: vectorData}, + }, + } + return c.FTHybridWithArgs(ctx, index, options) +} + +// FTHybridWithArgs - Executes a hybrid search with advanced options +// FTHybridWithArgs is still experimental, the command behaviour and signature may change +func (c cmdable) FTHybridWithArgs(ctx context.Context, index string, options *FTHybridOptions) *FTHybridCmd { + args := []interface{}{"FT.HYBRID", index} + + if options != nil { + // Add search expressions + for _, searchExpr := range options.SearchExpressions { + args = append(args, "SEARCH", searchExpr.Query) + + if searchExpr.Scorer != "" { + args = append(args, "SCORER", searchExpr.Scorer) + if len(searchExpr.ScorerParams) > 0 { + args = append(args, searchExpr.ScorerParams...) + } + } + + if searchExpr.YieldScoreAs != "" { + args = append(args, "YIELD_SCORE_AS", searchExpr.YieldScoreAs) + } + } + + // Add vector expressions + for _, vectorExpr := range options.VectorExpressions { + args = append(args, "VSIM", "@"+vectorExpr.VectorField) + args = append(args, vectorExpr.VectorData.Value()...) + + if vectorExpr.Method != "" { + args = append(args, vectorExpr.Method) + if len(vectorExpr.MethodParams) > 0 { + args = append(args, vectorExpr.MethodParams...) + } + } + + if vectorExpr.Filter != "" { + args = append(args, "FILTER", vectorExpr.Filter) + } + + if vectorExpr.YieldScoreAs != "" { + args = append(args, "YIELD_SCORE_AS", vectorExpr.YieldScoreAs) + } + } + + // Add combine/fusion options + if options.Combine != nil { + args = append(args, "COMBINE", string(options.Combine.Method)) + + if options.Combine.Count > 0 { + args = append(args, options.Combine.Count) + } + + switch options.Combine.Method { + case FTHybridCombineRRF: + if options.Combine.Window > 0 { + args = append(args, "WINDOW", options.Combine.Window) + } + if options.Combine.Constant > 0 { + args = append(args, "CONSTANT", options.Combine.Constant) + } + case FTHybridCombineLinear: + if options.Combine.Alpha > 0 { + args = append(args, "ALPHA", options.Combine.Alpha) + } + if options.Combine.Beta > 0 { + args = append(args, "BETA", options.Combine.Beta) + } + } + + if options.Combine.YieldScoreAs != "" { + args = append(args, "YIELD_SCORE_AS", options.Combine.YieldScoreAs) + } + } + + // Add LOAD (projected fields) + if len(options.Load) > 0 { + args = append(args, "LOAD", len(options.Load)) + for _, field := range options.Load { + args = append(args, field) + } + } + + // Add GROUPBY + if options.GroupBy != nil { + args = append(args, "GROUPBY", options.GroupBy.Count) + for _, field := range options.GroupBy.Fields { + args = append(args, field) + } + if options.GroupBy.ReduceFunc != "" { + args = append(args, "REDUCE", options.GroupBy.ReduceFunc, options.GroupBy.ReduceCount) + args = append(args, options.GroupBy.ReduceParams...) + } + } + + // Add APPLY transformations + for _, apply := range options.Apply { + args = append(args, "APPLY", apply.Expression, "AS", apply.AsField) + } + + // Add SORTBY + if len(options.SortBy) > 0 { + args = append(args, "SORTBY", len(options.SortBy)) + for _, sortBy := range options.SortBy { + args = append(args, sortBy.FieldName) + if sortBy.Asc && sortBy.Desc { + cmd := newFTHybridCmd(ctx, options, args...) + cmd.SetErr(fmt.Errorf("FT.HYBRID: ASC and DESC are mutually exclusive")) + return cmd + } + if sortBy.Asc { + args = append(args, "ASC") + } + if sortBy.Desc { + args = append(args, "DESC") + } + } + } + + // Add FILTER (post-filter) + if options.Filter != "" { + args = append(args, "FILTER", options.Filter) + } + + // Add LIMIT + if options.LimitOffset >= 0 && options.Limit > 0 || options.LimitOffset > 0 && options.Limit == 0 { + args = append(args, "LIMIT", options.LimitOffset, options.Limit) + } + + // Add PARAMS + if len(options.Params) > 0 { + args = append(args, "PARAMS", len(options.Params)*2) + for key, value := range options.Params { + args = append(args, key, value) + } + } + + // Add EXPLAINSCORE + if options.ExplainScore { + args = append(args, "EXPLAINSCORE") + } + + // Add TIMEOUT + if options.Timeout > 0 { + args = append(args, "TIMEOUT", options.Timeout) + } + + // Add WITHCURSOR support + if options.WithCursor { + args = append(args, "WITHCURSOR") + if options.WithCursorOptions != nil { + if options.WithCursorOptions.Count > 0 { + args = append(args, "COUNT", options.WithCursorOptions.Count) + } + if options.WithCursorOptions.MaxIdle > 0 { + args = append(args, "MAXIDLE", options.WithCursorOptions.MaxIdle) + } + } + } + } + + cmd := newFTHybridCmd(ctx, options, args...) + _ = c(ctx, cmd) + return cmd +} diff --git a/search_test.go b/search_test.go index a939a585e6..f4afcc0896 100644 --- a/search_test.go +++ b/search_test.go @@ -3561,4 +3561,418 @@ var _ = Describe("RediSearch commands Resp 3", Label("search"), func() { Expect(res2).ToNot(BeEmpty()) }).ShouldNot(Panic()) }) + + // Hybrid Search Tests + Describe("FT.HYBRID Commands", func() { + BeforeEach(func() { + // Create index with text, numeric, tag fields and vector fields + err := client.FTCreate(ctx, "hybrid_idx", &redis.FTCreateOptions{}, + &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText}, + &redis.FieldSchema{FieldName: "price", FieldType: redis.SearchFieldTypeNumeric}, + &redis.FieldSchema{FieldName: "color", FieldType: redis.SearchFieldTypeTag}, + &redis.FieldSchema{FieldName: "item_type", FieldType: redis.SearchFieldTypeTag}, + &redis.FieldSchema{FieldName: "size", FieldType: redis.SearchFieldTypeNumeric}, + &redis.FieldSchema{ + FieldName: "embedding", + FieldType: redis.SearchFieldTypeVector, + VectorArgs: &redis.FTVectorArgs{ + FlatOptions: &redis.FTFlatOptions{ + Type: "FLOAT32", + Dim: 4, + DistanceMetric: "L2", + }, + }, + }, + &redis.FieldSchema{ + FieldName: "embedding_hnsw", + FieldType: redis.SearchFieldTypeVector, + VectorArgs: &redis.FTVectorArgs{ + HNSWOptions: &redis.FTHNSWOptions{ + Type: "FLOAT32", + Dim: 4, + DistanceMetric: "L2", + }, + }, + }).Err() + Expect(err).NotTo(HaveOccurred()) + WaitForIndexing(client, "hybrid_idx") + + // Add test data + items := []struct { + key string + description string + price int + color string + itemType string + size int + embedding []float32 + }{ + {"item:0", "red shoes", 15, "red", "shoes", 10, []float32{1.0, 2.0, 7.0, 8.0}}, + {"item:1", "green shoes with red laces", 16, "green", "shoes", 11, []float32{1.0, 4.0, 7.0, 8.0}}, + {"item:2", "red dress", 17, "red", "dress", 12, []float32{1.0, 2.0, 6.0, 5.0}}, + {"item:3", "orange dress", 18, "orange", "dress", 10, []float32{2.0, 3.0, 6.0, 5.0}}, + {"item:4", "black shoes", 19, "black", "shoes", 11, []float32{5.0, 6.0, 7.0, 8.0}}, + } + + for _, item := range items { + client.HSet(ctx, item.key, map[string]interface{}{ + "description": item.description, + "price": item.price, + "color": item.color, + "item_type": item.itemType, + "size": item.size, + "embedding": encodeFloat32Vector(item.embedding), + "embedding_hnsw": encodeFloat32Vector(item.embedding), + }) + } + }) + + It("should perform basic hybrid search", Label("search", "fthybrid"), func() { + SkipBeforeRedisVersion(8.4, "no support") + // Basic hybrid search combining text and vector search + searchQuery := "@color:{red} OR @color:{green}" + vectorData := encodeFloat32Vector([]float32{-100, -200, -200, -300}) + + res, err := client.FTHybrid(ctx, "hybrid_idx", searchQuery, "embedding", &redis.VectorFP32{Val: vectorData}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically(">", 0)) + + // Check that results contain expected fields + for _, doc := range res.Docs { + Expect(doc.ID).ToNot(BeEmpty()) + Expect(doc.Fields).To(HaveKey("__score")) + Expect(doc.Fields).To(HaveKey("__key")) + } + }) + + It("should perform hybrid search with scorer", Label("search", "fthybrid", "scorer"), func() { + SkipBeforeRedisVersion(8.4, "no support") + // Test with TFIDF scorer + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + { + Query: "@color:{red}", + Scorer: "TFIDF", + }, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 2, 3})}, + }, + }, + Load: []string{"description", "color", "price", "size", "__score"}, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + + // Verify that we get red items + for _, doc := range res.Docs { + if color, exists := doc.Fields["color"]; exists { + Expect(color).To(Equal("red")) + } + } + }) + + It("should perform hybrid search with vector filter", Label("search", "fthybrid", "filter"), func() { + SkipBeforeRedisVersion(8.4, "no support") + // This query won't have results from search, so we can validate vector filter + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{none}"}, // This won't match anything + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 2, 3})}, + Filter: "@price:[15 16] @size:[10 11]", + }, + }, + Load: []string{"description", "color", "price", "size", "__score"}, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + + // Verify that all results match the filter criteria + for _, doc := range res.Docs { + if price, exists := doc.Fields["price"]; exists { + priceStr := fmt.Sprintf("%v", price) + priceFloat, err := helper.ParseFloat(priceStr) + Expect(err).NotTo(HaveOccurred()) + Expect(priceFloat).To(BeNumerically(">=", 15)) + Expect(priceFloat).To(BeNumerically("<=", 16)) + } + if size, exists := doc.Fields["size"]; exists { + sizeStr := fmt.Sprintf("%v", size) + sizeFloat, err := helper.ParseFloat(sizeStr) + Expect(err).NotTo(HaveOccurred()) + Expect(sizeFloat).To(BeNumerically(">=", 10)) + Expect(sizeFloat).To(BeNumerically("<=", 11)) + } + } + }) + + It("should perform hybrid search with KNN method", Label("search", "fthybrid", "knn"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{none}"}, // This won't match anything + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 2, 3})}, + Method: "KNN", + MethodParams: []interface{}{3}, // K=3 + }, + }, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(Equal(3)) // Should return exactly K=3 results + Expect(len(res.Docs)).To(Equal(3)) + }) + + It("should perform hybrid search with RANGE method", Label("search", "fthybrid", "range"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{none}"}, // This won't match anything + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + Method: "RANGE", + MethodParams: []interface{}{2}, // RADIUS=2 + }, + }, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + }) + + It("should perform hybrid search with LINEAR combine method", Label("search", "fthybrid", "combine"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + }, + }, + Combine: &redis.FTHybridCombineOptions{ + Method: redis.FTHybridCombineLinear, + Alpha: 0.5, + Beta: 0.5, + }, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + }) + + It("should perform hybrid search with RRF combine method", Label("search", "fthybrid", "rrf"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + }, + }, + Combine: &redis.FTHybridCombineOptions{ + Method: redis.FTHybridCombineRRF, + Window: 3, + Constant: 0.5, + }, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + }) + + It("should perform hybrid search with LOAD and APPLY", Label("search", "fthybrid", "load", "apply"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + }, + }, + Load: []string{"description", "color", "price", "size", "__score"}, + Apply: []redis.FTHybridApply{ + { + Expression: "@price - (@price * 0.1)", + AsField: "price_discount", + }, + { + Expression: "@price_discount * 0.2", + AsField: "tax_discount", + }, + }, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + + // Verify that applied fields exist + for _, doc := range res.Docs { + Expect(doc.Fields).To(HaveKey("price_discount")) + Expect(doc.Fields).To(HaveKey("tax_discount")) + } + }) + + It("should perform hybrid search with parameters", Label("search", "fthybrid", "params"), func() { + SkipBeforeRedisVersion(8.4, "no support") + params := map[string]interface{}{ + "$vector": encodeFloat32Vector([]float32{1, 2, 7, 6}), + "$discount": 0.1, + } + + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorRef{Name: "$vector"}, + }, + }, + Load: []string{"description", "color", "price"}, + Apply: []redis.FTHybridApply{ + { + Expression: "@price - (@price * $discount)", + AsField: "price_discount", + }, + }, + Params: params, + LimitOffset: 0, + Limit: 3, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 3)) + + // Verify that parameter substitution worked + for _, doc := range res.Docs { + Expect(doc.Fields).To(HaveKey("price_discount")) + } + }) + + It("should perform hybrid search with LIMIT", Label("search", "fthybrid", "limit"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + }, + }, + LimitOffset: 0, + Limit: 2, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(res.Docs)).To(BeNumerically("<=", 2)) + }) + + It("should perform hybrid search with SORTBY", Label("search", "fthybrid", "sortby"), func() { + SkipBeforeRedisVersion(8.4, "no support") + options := &redis.FTHybridOptions{ + CountExpressions: 2, + SearchExpressions: []redis.FTHybridSearchExpression{ + {Query: "@color:{red|green}"}, + }, + VectorExpressions: []redis.FTHybridVectorExpression{ + { + VectorField: "embedding", + VectorData: &redis.VectorFP32{Val: encodeFloat32Vector([]float32{1, 2, 7, 6})}, + }, + }, + Load: []string{"color", "price"}, + Apply: []redis.FTHybridApply{ + { + Expression: "@price - (@price * 0.1)", + AsField: "price_discount", + }, + }, + SortBy: []redis.FTSearchSortBy{ + {FieldName: "price_discount", Desc: true}, + {FieldName: "color", Asc: true}, + }, + LimitOffset: 0, + Limit: 5, + } + + res, err := client.FTHybridWithArgs(ctx, "hybrid_idx", options).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeNumerically(">", 0)) + Expect(len(res.Docs)).To(BeNumerically("<=", 5)) + + // Check that results are sorted - first result should have higher price_discount + if len(res.Docs) > 1 { + firstPriceStr := fmt.Sprintf("%v", res.Docs[0].Fields["price_discount"]) + secondPriceStr := fmt.Sprintf("%v", res.Docs[1].Fields["price_discount"]) + firstPrice, err1 := helper.ParseFloat(firstPriceStr) + secondPrice, err2 := helper.ParseFloat(secondPriceStr) + + if err1 == nil && err2 == nil && firstPrice != secondPrice { + Expect(firstPrice).To(BeNumerically(">=", secondPrice)) + } + } + }) + }) }) diff --git a/sentinel.go b/sentinel.go index f1222a340b..6481e1ee84 100644 --- a/sentinel.go +++ b/sentinel.go @@ -843,6 +843,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { } } + // short circuit if no sentinels configured + if len(c.sentinelAddrs) == 0 { + return "", errors.New("redis: no sentinels configured") + } + var ( masterAddr string wg sync.WaitGroup @@ -890,10 +895,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { } func joinErrors(errs []error) string { + if len(errs) == 0 { + return "" + } if len(errs) == 1 { return errs[0].Error() } - b := []byte(errs[0].Error()) for _, err := range errs[1:] { b = append(b, '\n') diff --git a/sentinel_test.go b/sentinel_test.go index f332822f5b..0f0f61ebf7 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -682,3 +682,99 @@ func compareSlices(t *testing.T, a, b []string, name string) { } } } + +type joinErrorsTest struct { + name string + errs []error + expected string +} + +func TestJoinErrors(t *testing.T) { + tests := []joinErrorsTest{ + { + name: "empty slice", + errs: []error{}, + expected: "", + }, + { + name: "single error", + errs: []error{errors.New("first error")}, + expected: "first error", + }, + { + name: "two errors", + errs: []error{errors.New("first error"), errors.New("second error")}, + expected: "first error\nsecond error", + }, + { + name: "multiple errors", + errs: []error{ + errors.New("first error"), + errors.New("second error"), + errors.New("third error"), + }, + expected: "first error\nsecond error\nthird error", + }, + { + name: "nil slice", + errs: nil, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := redis.JoinErrors(tt.errs) + if result != tt.expected { + t.Errorf("joinErrors() = %q, want %q", result, tt.expected) + } + }) + } +} + +func BenchmarkJoinErrors(b *testing.B) { + benchmarks := []joinErrorsTest{ + { + name: "empty slice", + errs: []error{}, + expected: "", + }, + { + name: "single error", + errs: []error{errors.New("first error")}, + expected: "first error", + }, + { + name: "two errors", + errs: []error{errors.New("first error"), errors.New("second error")}, + expected: "first error\nsecond error", + }, + { + name: "multiple errors", + errs: []error{ + errors.New("first error"), + errors.New("second error"), + errors.New("third error"), + }, + expected: "first error\nsecond error\nthird error", + }, + { + name: "nil slice", + errs: nil, + expected: "", + }, + } + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + result := redis.JoinErrors(bm.errs) + if result != bm.expected { + b.Errorf("joinErrors() = %q, want %q", result, bm.expected) + } + } + }) + }) + } +} diff --git a/stream_commands.go b/stream_commands.go index 4b84e00fdd..5573e48b96 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -263,6 +263,7 @@ type XReadGroupArgs struct { Count int64 Block time.Duration NoAck bool + Claim time.Duration // Claim idle pending entries older than this duration } func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd { @@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic args = append(args, "noack") keyPos++ } + if a.Claim > 0 { + args = append(args, "claim", int64(a.Claim/time.Millisecond)) + keyPos += 2 + } args = append(args, "streams") keyPos++ for _, s := range a.Streams { diff --git a/string_commands.go b/string_commands.go index eff5880dcd..1b37381e4e 100644 --- a/string_commands.go +++ b/string_commands.go @@ -2,6 +2,7 @@ package redis import ( "context" + "fmt" "time" ) @@ -9,6 +10,8 @@ type StringCmdable interface { Append(ctx context.Context, key, value string) *IntCmd Decr(ctx context.Context, key string) *IntCmd DecrBy(ctx context.Context, key string, decrement int64) *IntCmd + DelExArgs(ctx context.Context, key string, a DelExArgs) *IntCmd + Digest(ctx context.Context, key string) *DigestCmd Get(ctx context.Context, key string) *StringCmd GetRange(ctx context.Context, key string, start, end int64) *StringCmd GetSet(ctx context.Context, key string, value interface{}) *StringCmd @@ -21,9 +24,18 @@ type StringCmdable interface { MGet(ctx context.Context, keys ...string) *SliceCmd MSet(ctx context.Context, values ...interface{}) *StatusCmd MSetNX(ctx context.Context, values ...interface{}) *BoolCmd + MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd + SetIFEQ(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd + SetIFEQGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd + SetIFNE(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd + SetIFNEGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd + SetIFDEQ(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd + SetIFDEQGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd + SetIFDNE(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd + SetIFDNEGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd @@ -48,6 +60,70 @@ func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCm return cmd } +// DelExArgs provides arguments for the DelExArgs function. +type DelExArgs struct { + // Mode can be `IFEQ`, `IFNE`, `IFDEQ`, or `IFDNE`. + Mode string + + // MatchValue is used with IFEQ/IFNE modes for compare-and-delete operations. + // - IFEQ: only delete if current value equals MatchValue + // - IFNE: only delete if current value does not equal MatchValue + MatchValue interface{} + + // MatchDigest is used with IFDEQ/IFDNE modes for digest-based compare-and-delete. + // - IFDEQ: only delete if current value's digest equals MatchDigest + // - IFDNE: only delete if current value's digest does not equal MatchDigest + // + // The digest is a uint64 xxh3 hash value. + // + // For examples of client-side digest generation, see: + // example/digest-optimistic-locking/ + MatchDigest uint64 +} + +// DelExArgs Redis `DELEX key [IFEQ|IFNE|IFDEQ|IFDNE] match-value` command. +// Compare-and-delete with flexible conditions. +// +// Returns the number of keys that were removed (0 or 1). +func (c cmdable) DelExArgs(ctx context.Context, key string, a DelExArgs) *IntCmd { + args := []interface{}{"delex", key} + + if a.Mode != "" { + args = append(args, a.Mode) + + // Add match value/digest based on mode + switch a.Mode { + case "ifeq", "IFEQ", "ifne", "IFNE": + if a.MatchValue != nil { + args = append(args, a.MatchValue) + } + case "ifdeq", "IFDEQ", "ifdne", "IFDNE": + if a.MatchDigest != 0 { + args = append(args, fmt.Sprintf("%016x", a.MatchDigest)) + } + } + } + + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// Digest returns the xxh3 hash (uint64) of the specified key's value. +// +// The digest is a 64-bit xxh3 hash that can be used for optimistic locking +// with SetIFDEQ, SetIFDNE, and DelExArgs commands. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/digest/ +func (c cmdable) Digest(ctx context.Context, key string) *DigestCmd { + cmd := NewDigestCmd(ctx, "digest", key) + _ = c(ctx, cmd) + return cmd +} + // Get Redis `GET key` command. It returns redis.Nil error when key does not exist. func (c cmdable) Get(ctx context.Context, key string) *StringCmd { cmd := NewStringCmd(ctx, "get", key) @@ -112,6 +188,35 @@ func (c cmdable) IncrByFloat(ctx context.Context, key string, value float64) *Fl return cmd } +type SetCondition string + +const ( + // NX only set the keys and their expiration if none exist + NX SetCondition = "NX" + // XX only set the keys and their expiration if all already exist + XX SetCondition = "XX" +) + +type ExpirationMode string + +const ( + // EX sets expiration in seconds + EX ExpirationMode = "EX" + // PX sets expiration in milliseconds + PX ExpirationMode = "PX" + // EXAT sets expiration as Unix timestamp in seconds + EXAT ExpirationMode = "EXAT" + // PXAT sets expiration as Unix timestamp in milliseconds + PXAT ExpirationMode = "PXAT" + // KEEPTTL keeps the existing TTL + KEEPTTL ExpirationMode = "KEEPTTL" +) + +type ExpirationOption struct { + Mode ExpirationMode + Value int64 +} + func (c cmdable) LCS(ctx context.Context, q *LCSQuery) *LCSCmd { cmd := NewLCSCmd(ctx, q) _ = c(ctx, cmd) @@ -157,6 +262,49 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd { return cmd } +type MSetEXArgs struct { + Condition SetCondition + Expiration *ExpirationOption +} + +// MSetEX sets the given keys to their respective values. +// This command is an extension of the MSETNX that adds expiration and XX options. +// Available since Redis 8.4 +// Important: When this method is used with Cluster clients, all keys +// must be in the same hash slot, otherwise CROSSSLOT error will be returned. +// For more information, see https://redis.io/commands/msetex +func (c cmdable) MSetEX(ctx context.Context, args MSetEXArgs, values ...interface{}) *IntCmd { + expandedArgs := appendArgs([]interface{}{}, values) + numkeys := len(expandedArgs) / 2 + + cmdArgs := make([]interface{}, 0, 2+len(expandedArgs)+3) + cmdArgs = append(cmdArgs, "msetex", numkeys) + cmdArgs = append(cmdArgs, expandedArgs...) + + if args.Condition != "" { + cmdArgs = append(cmdArgs, string(args.Condition)) + } + + if args.Expiration != nil { + switch args.Expiration.Mode { + case EX: + cmdArgs = append(cmdArgs, "ex", args.Expiration.Value) + case PX: + cmdArgs = append(cmdArgs, "px", args.Expiration.Value) + case EXAT: + cmdArgs = append(cmdArgs, "exat", args.Expiration.Value) + case PXAT: + cmdArgs = append(cmdArgs, "pxat", args.Expiration.Value) + case KEEPTTL: + cmdArgs = append(cmdArgs, "keepttl") + } + } + + cmd := NewIntCmd(ctx, cmdArgs...) + _ = c(ctx, cmd) + return cmd +} + // Set Redis `SET key value [expiration]` command. // Use expiration for `SETEx`-like behavior. // @@ -185,9 +333,24 @@ func (c cmdable) Set(ctx context.Context, key string, value interface{}, expirat // SetArgs provides arguments for the SetArgs function. type SetArgs struct { - // Mode can be `NX` or `XX` or empty. + // Mode can be `NX`, `XX`, `IFEQ`, `IFNE`, `IFDEQ`, `IFDNE` or empty. Mode string + // MatchValue is used with IFEQ/IFNE modes for compare-and-set operations. + // - IFEQ: only set if current value equals MatchValue + // - IFNE: only set if current value does not equal MatchValue + MatchValue interface{} + + // MatchDigest is used with IFDEQ/IFDNE modes for digest-based compare-and-set. + // - IFDEQ: only set if current value's digest equals MatchDigest + // - IFDNE: only set if current value's digest does not equal MatchDigest + // + // The digest is a uint64 xxh3 hash value. + // + // For examples of client-side digest generation, see: + // example/digest-optimistic-locking/ + MatchDigest uint64 + // Zero `TTL` or `Expiration` means that the key has no expiration time. TTL time.Duration ExpireAt time.Time @@ -223,6 +386,18 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S if a.Mode != "" { args = append(args, a.Mode) + + // Add match value/digest for CAS modes + switch a.Mode { + case "ifeq", "IFEQ", "ifne", "IFNE": + if a.MatchValue != nil { + args = append(args, a.MatchValue) + } + case "ifdeq", "IFDEQ", "ifdne", "IFDNE": + if a.MatchDigest != 0 { + args = append(args, fmt.Sprintf("%016x", a.MatchDigest)) + } + } } if a.Get { @@ -290,6 +465,246 @@ func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expir return cmd } +// SetIFEQ Redis `SET key value [expiration] IFEQ match-value` command. +// Compare-and-set: only sets the value if the current value equals matchValue. +// +// Returns "OK" on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFEQ(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifeq", matchValue) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFEQGet Redis `SET key value [expiration] IFEQ match-value GET` command. +// Compare-and-set with GET: only sets the value if the current value equals matchValue, +// and returns the previous value. +// +// Returns the previous value on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFEQGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifeq", matchValue, "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFNE Redis `SET key value [expiration] IFNE match-value` command. +// Compare-and-set: only sets the value if the current value does not equal matchValue. +// +// Returns "OK" on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFNE(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifne", matchValue) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFNEGet Redis `SET key value [expiration] IFNE match-value GET` command. +// Compare-and-set with GET: only sets the value if the current value does not equal matchValue, +// and returns the previous value. +// +// Returns the previous value on success. +// Returns nil if the operation was aborted due to condition not matching. +// Zero expiration means the key has no expiration time. +func (c cmdable) SetIFNEGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifne", matchValue, "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDEQ sets the value only if the current value's digest equals matchDigest. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns "OK" on success. +// Returns redis.Nil if the digest doesn't match (value was modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDEQ(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdeq", fmt.Sprintf("%016x", matchDigest)) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDEQGet sets the value only if the current value's digest equals matchDigest, +// and returns the previous value. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns the previous value on success. +// Returns redis.Nil if the digest doesn't match (value was modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDEQGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdeq", fmt.Sprintf("%016x", matchDigest), "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDNE sets the value only if the current value's digest does NOT equal matchDigest. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns "OK" on success (digest didn't match, value was set). +// Returns redis.Nil if the digest matches (value was not modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDNE(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StatusCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdne", fmt.Sprintf("%016x", matchDigest)) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// SetIFDNEGet sets the value only if the current value's digest does NOT equal matchDigest, +// and returns the previous value. +// +// This is a compare-and-set operation using xxh3 digest for optimistic locking. +// The matchDigest parameter is a uint64 xxh3 hash value. +// +// Returns the previous value on success (digest didn't match, value was set). +// Returns redis.Nil if the digest matches (value was not modified). +// Zero expiration means the key has no expiration time. +// +// For examples of client-side digest generation and usage patterns, see: +// example/digest-optimistic-locking/ +// +// Redis 8.4+. See https://redis.io/commands/set/ +func (c cmdable) SetIFDNEGet(ctx context.Context, key string, value interface{}, matchDigest uint64, expiration time.Duration) *StringCmd { + args := []interface{}{"set", key, value} + + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(ctx, expiration)) + } else { + args = append(args, "ex", formatSec(ctx, expiration)) + } + } else if expiration == KeepTTL { + args = append(args, "keepttl") + } + + args = append(args, "ifdne", fmt.Sprintf("%016x", matchDigest), "get") + + cmd := NewStringCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd { cmd := NewIntCmd(ctx, "setrange", key, offset, value) _ = c(ctx, cmd)