diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c5569e8654..8e100dc5ce 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -74,6 +74,10 @@ export default class RedisSocket extends EventEmitter { #isReady = false; get isReady() { + // If socket is closed or destroyed, we're not ready + if (!this.#isOpen || !this.#socket || this.#socket.destroyed) { + return false; + } return this.#isReady; } @@ -85,6 +89,8 @@ export default class RedisSocket extends EventEmitter { return this.#socketEpoch; } + #isReconnecting = false; + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -215,30 +221,75 @@ export default class RedisSocket extends EventEmitter { let retries = 0; do { try { - this.#socket = await this.#createSocket(); + // Ensure any existing socket is cleaned up before creating a new one + if (this.#socket) { + this.#socket.destroy(); + this.#socket = undefined; + } + + const socket = await this.#createSocket(); + + // Check if connection was cancelled or another attempt started + if (!this.#isOpen || (this.#socket && this.#socket !== socket)) { + socket.destroy(); + throw new Error('Connection cancelled or replaced'); + } + + this.#socket = socket; this.emit('connect'); try { await this.#initiator(); + + // Verify socket is still valid and current after initiator completes + if (!this.#isOpen || this.#socket !== socket || socket.destroyed) { + throw new Error('Socket invalidated during handshake'); + } } catch (err) { - this.#socket.destroy(); - this.#socket = undefined; + // Cleanup socket if it's still the current one + if (this.#socket === socket) { + this.#socket.destroy(); + this.#socket = undefined; + } else { + socket.destroy(); + } throw err; } + + // Final verification before marking ready - socket must be current, connection open, socket not destroyed, and writable + if (!this.#isOpen || this.#socket !== socket || socket.destroyed || !socket.writable) { + throw new Error('Socket invalidated before marking ready'); + } + this.#isReady = true; this.#socketEpoch++; + this.#isReconnecting = false; + + // Critical final check right before emitting ready - socket must be fully valid + // If socket was destroyed or closed between setting isReady and emitting ready, don't emit + if (this.#socket !== socket || socket.destroyed || !this.#isOpen || !socket.writable) { + this.#isReady = false; + throw new Error('Socket became invalid after marking ready'); + } + this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); if (typeof retryIn !== 'number') { + this.#isReconnecting = false; throw retryIn; } this.emit('error', err); await setTimeout(retryIn); - this.emit('reconnecting'); + // Only emit 'reconnecting' if we're already in a reconnection cycle + // (i.e., called from #onSocketError). Initial connection retries don't emit this. + if (this.#isReconnecting) { + this.emit('reconnecting'); + } } } while (this.#isOpen && !this.#isReady); + this.#isReconnecting = false; } setMaintenanceTimeout(ms?: number) { @@ -304,11 +355,26 @@ export default class RedisSocket extends EventEmitter { this.#isReady = false; this.emit('error', err); - if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; + if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') { + return; + } + + // Prevent concurrent reconnection attempts - set flag atomically + if (this.#isReconnecting) { + return; + } + this.#isReconnecting = true; + + // Destroy existing socket before starting new reconnection + if (this.#socket) { + this.#socket.destroy(); + this.#socket = undefined; + } this.emit('reconnecting'); this.#connect().catch(() => { // the error was already emitted, silently ignore it + this.#isReconnecting = false; }); } @@ -356,6 +422,7 @@ export default class RedisSocket extends EventEmitter { destroySocket() { this.#isReady = false; + this.#isReconnecting = false; if (this.#socket) { this.#socket.destroy();