Skip to content

Commit 71bb3ca

Browse files
ofekshenawandyakov
andauthored
Add support for XReadGroup CLAIM argument (#3578)
* Add support for XReadGroup CLAIM argument * modify tutorial tests --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
1 parent ce53464 commit 71bb3ca

File tree

4 files changed

+286
-19
lines changed

4 files changed

+286
-19
lines changed

command.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
15851585
type XMessage struct {
15861586
ID string
15871587
Values map[string]interface{}
1588+
// MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered.
1589+
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
1590+
MillisElapsedFromDelivery int64
1591+
// DeliveredCount is the number of times the entry was delivered.
1592+
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
1593+
DeliveredCount int64
15881594
}
15891595

15901596
type XMessageSliceCmd struct {
@@ -1641,10 +1647,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) {
16411647
}
16421648

16431649
func readXMessage(rd *proto.Reader) (XMessage, error) {
1644-
if err := rd.ReadFixedArrayLen(2); err != nil {
1650+
// Read array length can be 2 or 4 (with CLAIM metadata)
1651+
n, err := rd.ReadArrayLen()
1652+
if err != nil {
16451653
return XMessage{}, err
16461654
}
16471655

1656+
if n != 2 && n != 4 {
1657+
return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n)
1658+
}
1659+
16481660
id, err := rd.ReadString()
16491661
if err != nil {
16501662
return XMessage{}, err
@@ -1657,10 +1669,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) {
16571669
}
16581670
}
16591671

1660-
return XMessage{
1672+
msg := XMessage{
16611673
ID: id,
16621674
Values: v,
1663-
}, nil
1675+
}
1676+
1677+
if n == 4 {
1678+
msg.MillisElapsedFromDelivery, err = rd.ReadInt()
1679+
if err != nil {
1680+
return XMessage{}, err
1681+
}
1682+
1683+
msg.DeliveredCount, err = rd.ReadInt()
1684+
if err != nil {
1685+
return XMessage{}, err
1686+
}
1687+
}
1688+
1689+
return msg, nil
16641690
}
16651691

16661692
func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) {

commands_test.go

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6749,6 +6749,242 @@ var _ = Describe("Commands", func() {
67496749
Expect(err).NotTo(HaveOccurred())
67506750
Expect(n).To(Equal(int64(2)))
67516751
})
6752+
6753+
It("should XReadGroup with CLAIM argument", func() {
6754+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6755+
6756+
time.Sleep(100 * time.Millisecond)
6757+
6758+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6759+
Group: "group",
6760+
Consumer: "consumer2",
6761+
Streams: []string{"stream", ">"},
6762+
Claim: 50 * time.Millisecond,
6763+
}).Result()
6764+
Expect(err).NotTo(HaveOccurred())
6765+
Expect(res).To(HaveLen(1))
6766+
Expect(res[0].Stream).To(Equal("stream"))
6767+
6768+
messages := res[0].Messages
6769+
Expect(len(messages)).To(BeNumerically(">=", 1))
6770+
6771+
for _, msg := range messages {
6772+
if msg.MillisElapsedFromDelivery > 0 {
6773+
Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50))
6774+
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
6775+
}
6776+
}
6777+
})
6778+
6779+
It("should XReadGroup with CLAIM and COUNT", func() {
6780+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6781+
6782+
time.Sleep(100 * time.Millisecond)
6783+
6784+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6785+
Group: "group",
6786+
Consumer: "consumer3",
6787+
Streams: []string{"stream", ">"},
6788+
Claim: 50 * time.Millisecond,
6789+
Count: 2,
6790+
}).Result()
6791+
Expect(err).NotTo(HaveOccurred())
6792+
6793+
if len(res) > 0 && len(res[0].Messages) > 0 {
6794+
Expect(len(res[0].Messages)).To(BeNumerically("<=", 2))
6795+
}
6796+
})
6797+
6798+
It("should XReadGroup with CLAIM and NOACK", func() {
6799+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6800+
6801+
time.Sleep(100 * time.Millisecond)
6802+
6803+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6804+
Group: "group",
6805+
Consumer: "consumer4",
6806+
Streams: []string{"stream", ">"},
6807+
Claim: 50 * time.Millisecond,
6808+
NoAck: true,
6809+
}).Result()
6810+
Expect(err).NotTo(HaveOccurred())
6811+
6812+
if len(res) > 0 {
6813+
Expect(res[0].Stream).To(Equal("stream"))
6814+
}
6815+
})
6816+
6817+
It("should XReadGroup CLAIM empties PEL after acknowledgment", func() {
6818+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6819+
6820+
time.Sleep(100 * time.Millisecond)
6821+
6822+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6823+
Group: "group",
6824+
Consumer: "consumer5",
6825+
Streams: []string{"stream", ">"},
6826+
Claim: 50 * time.Millisecond,
6827+
}).Result()
6828+
Expect(err).NotTo(HaveOccurred())
6829+
6830+
if len(res) > 0 && len(res[0].Messages) > 0 {
6831+
ids := make([]string, len(res[0].Messages))
6832+
for i, msg := range res[0].Messages {
6833+
ids[i] = msg.ID
6834+
}
6835+
6836+
n, err := client.XAck(ctx, "stream", "group", ids...).Result()
6837+
Expect(err).NotTo(HaveOccurred())
6838+
Expect(n).To(BeNumerically(">=", 1))
6839+
6840+
pending, err := client.XPending(ctx, "stream", "group").Result()
6841+
Expect(err).NotTo(HaveOccurred())
6842+
Expect(pending.Count).To(BeNumerically("<", 3))
6843+
}
6844+
})
6845+
6846+
It("should XReadGroup backward compatibility without CLAIM", func() {
6847+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6848+
Group: "group",
6849+
Consumer: "consumer_compat",
6850+
Streams: []string{"stream", "0"},
6851+
}).Result()
6852+
Expect(err).NotTo(HaveOccurred())
6853+
Expect(res).To(HaveLen(1))
6854+
Expect(res[0].Stream).To(Equal("stream"))
6855+
6856+
for _, msg := range res[0].Messages {
6857+
Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0)))
6858+
Expect(msg.DeliveredCount).To(Equal(int64(0)))
6859+
}
6860+
})
6861+
6862+
It("should XReadGroup CLAIM with multiple streams", func() {
6863+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6864+
6865+
id, err := client.XAdd(ctx, &redis.XAddArgs{
6866+
Stream: "stream2",
6867+
ID: "1-0",
6868+
Values: map[string]interface{}{"field1": "value1"},
6869+
}).Result()
6870+
Expect(err).NotTo(HaveOccurred())
6871+
Expect(id).To(Equal("1-0"))
6872+
6873+
id, err = client.XAdd(ctx, &redis.XAddArgs{
6874+
Stream: "stream2",
6875+
ID: "2-0",
6876+
Values: map[string]interface{}{"field2": "value2"},
6877+
}).Result()
6878+
Expect(err).NotTo(HaveOccurred())
6879+
Expect(id).To(Equal("2-0"))
6880+
6881+
err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err()
6882+
Expect(err).NotTo(HaveOccurred())
6883+
6884+
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
6885+
Group: "group2",
6886+
Consumer: "consumer1",
6887+
Streams: []string{"stream2", ">"},
6888+
}).Result()
6889+
Expect(err).NotTo(HaveOccurred())
6890+
6891+
time.Sleep(100 * time.Millisecond)
6892+
6893+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
6894+
Group: "group2",
6895+
Consumer: "consumer2",
6896+
Streams: []string{"stream2", ">"},
6897+
Claim: 50 * time.Millisecond,
6898+
}).Result()
6899+
Expect(err).NotTo(HaveOccurred())
6900+
6901+
if len(res) > 0 {
6902+
Expect(res[0].Stream).To(Equal("stream2"))
6903+
if len(res[0].Messages) > 0 {
6904+
for _, msg := range res[0].Messages {
6905+
if msg.MillisElapsedFromDelivery > 0 {
6906+
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
6907+
}
6908+
}
6909+
}
6910+
}
6911+
})
6912+
6913+
It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() {
6914+
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
6915+
6916+
streamName := "stream-resp-test"
6917+
err := client.XAdd(ctx, &redis.XAddArgs{
6918+
Stream: streamName,
6919+
Values: map[string]interface{}{"field1": "value1"},
6920+
}).Err()
6921+
Expect(err).NotTo(HaveOccurred())
6922+
6923+
err = client.XAdd(ctx, &redis.XAddArgs{
6924+
Stream: streamName,
6925+
Values: map[string]interface{}{"field2": "value2"},
6926+
}).Err()
6927+
Expect(err).NotTo(HaveOccurred())
6928+
6929+
groupName := "resp-test-group"
6930+
err = client.XGroupCreate(ctx, streamName, groupName, "0").Err()
6931+
Expect(err).NotTo(HaveOccurred())
6932+
6933+
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
6934+
Group: groupName,
6935+
Consumer: "consumer1",
6936+
Streams: []string{streamName, ">"},
6937+
}).Result()
6938+
Expect(err).NotTo(HaveOccurred())
6939+
6940+
time.Sleep(100 * time.Millisecond)
6941+
6942+
// Test with RESP2 (protocol 2)
6943+
resp2Client := redis.NewClient(&redis.Options{
6944+
Addr: redisAddr,
6945+
Protocol: 2,
6946+
})
6947+
defer resp2Client.Close()
6948+
6949+
resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{
6950+
Group: groupName,
6951+
Consumer: "consumer2",
6952+
Streams: []string{streamName, "0"},
6953+
Claim: 50 * time.Millisecond,
6954+
}).Result()
6955+
Expect(err).NotTo(HaveOccurred())
6956+
Expect(resp2Result).To(HaveLen(1))
6957+
6958+
// Test with RESP3 (protocol 3)
6959+
resp3Client := redis.NewClient(&redis.Options{
6960+
Addr: redisAddr,
6961+
Protocol: 3,
6962+
})
6963+
defer resp3Client.Close()
6964+
6965+
resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{
6966+
Group: groupName,
6967+
Consumer: "consumer3",
6968+
Streams: []string{streamName, "0"},
6969+
Claim: 50 * time.Millisecond,
6970+
}).Result()
6971+
Expect(err).NotTo(HaveOccurred())
6972+
Expect(resp3Result).To(HaveLen(1))
6973+
6974+
Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages)))
6975+
6976+
for i := range resp2Result[0].Messages {
6977+
msg2 := resp2Result[0].Messages[i]
6978+
msg3 := resp3Result[0].Messages[i]
6979+
6980+
Expect(msg2.ID).To(Equal(msg3.ID))
6981+
6982+
if msg2.MillisElapsedFromDelivery > 0 {
6983+
Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0))
6984+
Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount))
6985+
}
6986+
}
6987+
})
67526988
})
67536989

67546990
Describe("xinfo", func() {

doctests/stream_tutorial_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ func ExampleClient_racefrance1() {
216216
// REMOVE_END
217217

218218
// Output:
219-
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
220-
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}]
219+
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
220+
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}]
221221
// 4
222222
}
223223

@@ -467,13 +467,13 @@ func ExampleClient_racefrance2() {
467467
// STEP_END
468468

469469
// Output:
470-
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
471-
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}]
472-
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
473-
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
470+
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
471+
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}]
472+
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
473+
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
474474
// []
475-
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
476-
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}]
475+
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
476+
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}]
477477
}
478478

479479
func ExampleClient_xgroupcreate() {
@@ -999,18 +999,18 @@ func ExampleClient_raceitaly() {
999999
// REMOVE_END
10001000

10011001
// Output:
1002-
// [{race:italy [{1692632639151-0 map[rider:Castilla]}]}]
1002+
// [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}]
10031003
// 1
10041004
// [{race:italy []}]
1005-
// [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}]
1005+
// [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}]
10061006
// &{2 1692632647899-0 1692632662819-0 map[Bob:2]}
1007-
// [{1692632647899-0 map[rider:Royce]}]
1008-
// [{1692632647899-0 map[rider:Royce]}]
1009-
// [{1692632647899-0 map[rider:Royce]}]
1007+
// [{1692632647899-0 map[rider:Royce] 0 0}]
1008+
// [{1692632647899-0 map[rider:Royce] 0 0}]
1009+
// [{1692632647899-0 map[rider:Royce] 0 0}]
10101010
// 1692632662819-0
10111011
// []
10121012
// 0-0
1013-
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0}
1013+
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0}
10141014
// [{italy_riders 3 2 1692632662819-0 3 2}]
10151015
// 2
10161016
// 0
@@ -1085,7 +1085,7 @@ func ExampleClient_xdel() {
10851085
// STEP_END
10861086

10871087
// Output:
1088-
// [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}]
1088+
// [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}]
10891089
// 1
1090-
// [{1692633198206-0 map[rider:Wood]}]
1090+
// [{1692633198206-0 map[rider:Wood] 0 0}]
10911091
}

stream_commands.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ type XReadGroupArgs struct {
263263
Count int64
264264
Block time.Duration
265265
NoAck bool
266+
Claim time.Duration // Claim idle pending entries older than this duration
266267
}
267268

268269
func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
@@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
282283
args = append(args, "noack")
283284
keyPos++
284285
}
286+
if a.Claim > 0 {
287+
args = append(args, "claim", int64(a.Claim/time.Millisecond))
288+
keyPos += 2
289+
}
285290
args = append(args, "streams")
286291
keyPos++
287292
for _, s := range a.Streams {

0 commit comments

Comments
 (0)