Skip to content

Commit 54281d6

Browse files
committed
optimize push notif
1 parent 0752aec commit 54281d6

File tree

5 files changed

+107
-17
lines changed

5 files changed

+107
-17
lines changed

internal/pool/conn.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import (
1818

1919
var noDeadline = time.Time{}
2020

21-
// Global time cache updated every 50ms by background goroutine.
21+
// Global time cache updated every 100ms by background goroutine.
2222
// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout.
23-
// Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
23+
// Max staleness: 100ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
2424
var globalTimeCache struct {
2525
nowNs atomic.Int64
2626
}
@@ -31,7 +31,7 @@ func init() {
3131

3232
// Start background updater
3333
go func() {
34-
ticker := time.NewTicker(50 * time.Millisecond)
34+
ticker := time.NewTicker(100 * time.Millisecond)
3535
defer ticker.Stop()
3636

3737
for range ticker.C {
@@ -41,12 +41,20 @@ func init() {
4141
}
4242

4343
// getCachedTimeNs returns the current time in nanoseconds from the global cache.
44-
// This is updated every 50ms by a background goroutine, avoiding expensive syscalls.
45-
// Max staleness: 50ms.
44+
// This is updated every 100ms by a background goroutine, avoiding expensive syscalls.
45+
// Max staleness: 100ms.
4646
func getCachedTimeNs() int64 {
4747
return globalTimeCache.nowNs.Load()
4848
}
4949

50+
// GetCachedTimeNs returns the current time in nanoseconds from the global cache.
51+
// This is updated every 100ms by a background goroutine, avoiding expensive syscalls.
52+
// Max staleness: 100ms.
53+
// Exported for use by other packages that need fast time access.
54+
func GetCachedTimeNs() int64 {
55+
return getCachedTimeNs()
56+
}
57+
5058
// Global atomic counter for connection IDs
5159
var connIDCounter uint64
5260

@@ -170,6 +178,9 @@ func (cn *Conn) UsedAt() time.Time {
170178
unixNano := atomic.LoadInt64(&cn.usedAt)
171179
return time.Unix(0, unixNano)
172180
}
181+
func (cn *Conn) UsedAtNs() int64 {
182+
return atomic.LoadInt64(&cn.usedAt)
183+
}
173184

174185
func (cn *Conn) SetUsedAt(tm time.Time) {
175186
atomic.StoreInt64(&cn.usedAt, tm.UnixNano())
@@ -488,7 +499,7 @@ func (cn *Conn) getEffectiveReadTimeout(normalTimeout time.Duration) time.Durati
488499
return time.Duration(readTimeoutNs)
489500
}
490501

491-
// Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks)
502+
// Use cached time to avoid expensive syscall (max 100ms staleness is acceptable for timeout checks)
492503
nowNs := getCachedTimeNs()
493504
// Check if deadline has passed
494505
if nowNs < deadlineNs {
@@ -522,7 +533,7 @@ func (cn *Conn) getEffectiveWriteTimeout(normalTimeout time.Duration) time.Durat
522533
return time.Duration(writeTimeoutNs)
523534
}
524535

525-
// Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks)
536+
// Use cached time to avoid expensive syscall (max 100ms staleness is acceptable for timeout checks)
526537
nowNs := getCachedTimeNs()
527538
// Check if deadline has passed
528539
if nowNs < deadlineNs {
@@ -879,7 +890,7 @@ func (cn *Conn) MaybeHasData() bool {
879890

880891
// deadline computes the effective deadline time based on context and timeout.
881892
// It updates the usedAt timestamp to now.
882-
// Uses cached time to avoid expensive syscall (max 50ms staleness is acceptable for deadline calculation).
893+
// Uses cached time to avoid expensive syscall (max 100ms staleness is acceptable for deadline calculation).
883894
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
884895
// Use cached time for deadline calculation (called 2x per command: read + write)
885896
tm := time.Unix(0, getCachedTimeNs())

internal/pool/conn_check.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func connCheck(conn net.Conn) error {
3030

3131
var sysErr error
3232

33-
if err := rawConn.Read(func(fd uintptr) bool {
33+
if err := rawConn.Control(func(fd uintptr) {
3434
var buf [1]byte
3535
// Use MSG_PEEK to peek at data without consuming it
3636
n, _, err := syscall.Recvfrom(int(fd), buf[:], syscall.MSG_PEEK|syscall.MSG_DONTWAIT)
@@ -45,7 +45,6 @@ func connCheck(conn net.Conn) error {
4545
default:
4646
sysErr = err
4747
}
48-
return true
4948
}); err != nil {
5049
return err
5150
}

internal/pool/pool.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,18 @@ type ConnPool struct {
155155
var _ Pooler = (*ConnPool)(nil)
156156

157157
func NewConnPool(opt *Options) *ConnPool {
158-
p := &ConnPool{
159-
cfg: opt,
158+
semSize := opt.PoolSize
159+
if opt.MaxActiveConns > 0 && opt.MaxActiveConns < opt.PoolSize {
160+
if opt.MaxActiveConns < opt.PoolSize {
161+
opt.MaxActiveConns = opt.PoolSize
162+
}
163+
semSize = opt.MaxActiveConns
164+
}
165+
//semSize = opt.PoolSize
160166

161-
semaphore: internal.NewFastSemaphore(opt.PoolSize),
167+
p := &ConnPool{
168+
cfg: opt,
169+
semaphore: internal.NewFastSemaphore(semSize),
162170
conns: make(map[uint64]*Conn),
163171
idleConns: make([]*Conn, 0, opt.PoolSize),
164172
}

redis.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,13 +1351,39 @@ func (c *Conn) TxPipeline() Pipeliner {
13511351

13521352
// processPushNotifications processes all pending push notifications on a connection
13531353
// This ensures that cluster topology changes are handled immediately before the connection is used
1354-
// This method should be called by the client before using WithReader for command execution
1354+
// This method should be called by the client before using WithWriter for command execution
1355+
//
1356+
// Performance optimization: Skip the expensive MaybeHasData() syscall if a health check
1357+
// was performed recently (within 5 seconds). The health check already verified the connection
1358+
// is healthy and checked for unexpected data (push notifications).
13551359
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
13561360
// Only process push notifications for RESP3 connections with a processor
1357-
// Also check if there is any data to read before processing
1358-
// Which is an optimization on UNIX systems where MaybeHasData is a syscall
1361+
if c.opt.Protocol != 3 || c.pushProcessor == nil {
1362+
return nil
1363+
}
1364+
1365+
// Performance optimization: Skip MaybeHasData() syscall if health check was recent
1366+
// If the connection was health-checked within the last 5 seconds, we can skip the
1367+
// expensive syscall since the health check already verified no unexpected data.
1368+
// This is safe because:
1369+
// 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK)
1370+
// 2. If push notifications arrived, they would have been detected by health check
1371+
// 3. 5 seconds is short enough that connection state is still fresh
1372+
// 4. Push notifications will be processed by the next WithReader call
1373+
lastHealthCheckNs := cn.UsedAtNs()
1374+
if lastHealthCheckNs > 0 {
1375+
// Use pool's cached time to avoid expensive time.Now() syscall
1376+
nowNs := pool.GetCachedTimeNs()
1377+
if nowNs-lastHealthCheckNs < int64(5*time.Second) {
1378+
// Recent health check confirmed no unexpected data, skip the syscall
1379+
return nil
1380+
}
1381+
}
1382+
1383+
// Check if there is any data to read before processing
1384+
// This is an optimization on UNIX systems where MaybeHasData is a syscall
13591385
// On Windows, MaybeHasData always returns true, so this check is a no-op
1360-
if c.opt.Protocol != 3 || c.pushProcessor == nil || !cn.MaybeHasData() {
1386+
if !cn.MaybeHasData() {
13611387
return nil
13621388
}
13631389

redis_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,52 @@ var _ = Describe("Client", func() {
245245
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
246246
})
247247

248+
It("should initialize idle connections created by MinIdleConns", func() {
249+
opt := redisOptions()
250+
opt.MinIdleConns = 5
251+
opt.Password = "asdf" // Set password to require AUTH
252+
opt.DB = 1 // Set DB to require SELECT
253+
254+
db := redis.NewClient(opt)
255+
defer func() {
256+
Expect(db.Close()).NotTo(HaveOccurred())
257+
}()
258+
259+
// Wait for minIdle connections to be created
260+
time.Sleep(100 * time.Millisecond)
261+
262+
// Verify that idle connections were created
263+
stats := db.PoolStats()
264+
Expect(stats.IdleConns).To(BeNumerically(">=", 5))
265+
266+
// Now use these connections - they should be properly initialized
267+
// If they're not initialized, we'll get NOAUTH or WRONGDB errors
268+
var wg sync.WaitGroup
269+
for i := 0; i < 10; i++ {
270+
wg.Add(1)
271+
go func(id int) {
272+
defer wg.Done()
273+
// Each goroutine performs multiple operations
274+
for j := 0; j < 5; j++ {
275+
key := fmt.Sprintf("test_key_%d_%d", id, j)
276+
err := db.Set(ctx, key, "value", 0).Err()
277+
Expect(err).NotTo(HaveOccurred())
278+
279+
val, err := db.Get(ctx, key).Result()
280+
Expect(err).NotTo(HaveOccurred())
281+
Expect(val).To(Equal("value"))
282+
283+
err = db.Del(ctx, key).Err()
284+
Expect(err).NotTo(HaveOccurred())
285+
}
286+
}(i)
287+
}
288+
wg.Wait()
289+
290+
// Verify no errors occurred
291+
Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred())
292+
})
293+
248294
It("processes custom commands", func() {
249295
cmd := redis.NewCmd(ctx, "PING")
250296
_ = client.Process(ctx, cmd)

0 commit comments

Comments
 (0)