Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,21 @@ type Conn struct {
// background operations that may execute commands, like re-authentication.
used atomic.Bool

// Inited flag to mark connection as initialized, this is almost the same as usable
// inited flag to mark connection as initialized, this is almost the same as usable
// but it is used to make sure we don't initialize a network connection twice
// On handoff, the network connection is replaced, but the Conn struct is reused
// this flag will be set to false when the network connection is replaced and
// set to true after the new network connection is initialized
Inited atomic.Bool
inited atomic.Bool

pooled bool
pubsub bool
closed atomic.Bool
createdAt time.Time
expiresAt time.Time
// Initializing flag to mark connection as initializing
// This is used to prevent concurrent initialization of the network connection
initializing atomic.Bool
pooled bool
pubsub bool
closed atomic.Bool
createdAt time.Time
expiresAt time.Time

// maintenanceNotifications upgrade support: relaxed timeouts during migrations/failovers
// Using atomic operations for lock-free access to avoid mutex contention
Expand Down Expand Up @@ -306,7 +309,27 @@ func (cn *Conn) IsPubSub() bool {
}

func (cn *Conn) IsInited() bool {
return cn.Inited.Load()
return cn.inited.Load()
}

func (cn *Conn) SetInited(inited bool) {
cn.inited.Store(inited)
}

func (cn *Conn) CompareAndSwapInited(old, new bool) bool {
return cn.inited.CompareAndSwap(old, new)
}

func (cn *Conn) IsInitializing() bool {
return cn.initializing.Load()
}

func (cn *Conn) SetInitializing(initializing bool) {
cn.initializing.Store(initializing)
}

func (cn *Conn) CompareAndSwapInitializing(old, new bool) bool {
return cn.initializing.CompareAndSwap(old, new)
}

// SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades.
Expand Down Expand Up @@ -478,8 +501,17 @@ func (cn *Conn) GetNetConn() net.Conn {

// SetNetConnAndInitConn replaces the underlying connection and executes the initialization.
func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error {
// max retries of 100ms * 20 = 2 second
Copy link

Copilot AI Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected 'second' to 'seconds' for grammatical accuracy.

Suggested change
// max retries of 100ms * 20 = 2 second
// max retries of 100ms * 20 = 2 seconds

Copilot uses AI. Check for mistakes.
maxRetries := 20
for cn.IsInitializing() || cn.IsUsed() {
time.Sleep(100 * time.Millisecond)
maxRetries--
if maxRetries <= 0 {
return fmt.Errorf("failed to set net conn after %d attempts due to high contention", maxRetries)
Copy link

Copilot AI Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message uses maxRetries which will always be 0 or negative at this point. Use the original value (20) or calculate the actual number of attempts made instead.

Suggested change
return fmt.Errorf("failed to set net conn after %d attempts due to high contention", maxRetries)
return fmt.Errorf("failed to set net conn after %d attempts due to high contention", 20)

Copilot uses AI. Check for mistakes.
}
}
Comment on lines +504 to +512
Copy link

Copilot AI Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop blocks the goroutine with fixed 100ms sleeps, which could cause unnecessary delays. Consider using exponential backoff or a more efficient synchronization mechanism like a condition variable.

Copilot uses AI. Check for mistakes.
// New connection is not initialized yet
cn.Inited.Store(false)
cn.SetInited(false)
// Replace the underlying connection
cn.SetNetConn(netConn)
return cn.ExecuteInitConn(ctx)
Expand Down
21 changes: 16 additions & 5 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,20 @@ func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
}

func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
if !cn.Inited.CompareAndSwap(false, true) {
if !cn.CompareAndSwapInited(false, true) {
return nil
}

defer func() {
// if the initialization did not complete successfully
// we need to mark the connection as not initialized
if cn.CompareAndSwapInitializing(true, false) {
internal.Logger.Printf(ctx, "redis: failed to initialize connection conn[%d]", cn.GetID())
cn.SetInited(false)
}
}()

cn.SetInitializing(true)
var err error
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(c.opt, connPool, &c.hooksMixin)
Expand Down Expand Up @@ -510,14 +521,14 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
}

// Set the connection initialization function for potential reconnections
cn.SetInitConnFunc(c.createInitConnFunc())

// mark the connection as usable and inited
// once returned to the pool as idle, this connection can be used by other clients
cn.SetUsable(true)
cn.SetUsed(false)
cn.Inited.Store(true)

// Set the connection initialization function for potential reconnections
cn.SetInitConnFunc(c.createInitConnFunc())
cn.SetInitializing(false)

if c.opt.OnConnect != nil {
return c.opt.OnConnect(ctx, conn)
Expand Down
Loading