3939)
4040from redis .asyncio .lock import Lock
4141from redis .asyncio .retry import Retry
42+ from redis .backoff import ExponentialWithJitterBackoff
4243from redis .client import (
4344 EMPTY_RESPONSE ,
4445 NEVER_DECODE ,
6566 PubSubError ,
6667 RedisError ,
6768 ResponseError ,
68- TimeoutError ,
6969 WatchError ,
7070)
7171from redis .typing import ChannelT , EncodableT , KeyT
7272from redis .utils import (
7373 HIREDIS_AVAILABLE ,
7474 SSL_AVAILABLE ,
7575 _set_info_logger ,
76+ deprecated_args ,
7677 deprecated_function ,
7778 get_lib_version ,
7879 safe_str ,
@@ -208,6 +209,11 @@ def from_pool(
208209 client .auto_close_connection_pool = True
209210 return client
210211
212+ @deprecated_args (
213+ args_to_warn = ["retry_on_timeout" ],
214+ reason = "TimeoutError is included by default." ,
215+ version = "6.0.0" ,
216+ )
211217 def __init__ (
212218 self ,
213219 * ,
@@ -225,6 +231,9 @@ def __init__(
225231 encoding_errors : str = "strict" ,
226232 decode_responses : bool = False ,
227233 retry_on_timeout : bool = False ,
234+ retry : Retry = Retry (
235+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ), retries = 3
236+ ),
228237 retry_on_error : Optional [list ] = None ,
229238 ssl : bool = False ,
230239 ssl_keyfile : Optional [str ] = None ,
@@ -242,7 +251,6 @@ def __init__(
242251 lib_name : Optional [str ] = "redis-py" ,
243252 lib_version : Optional [str ] = get_lib_version (),
244253 username : Optional [str ] = None ,
245- retry : Optional [Retry ] = None ,
246254 auto_close_connection_pool : Optional [bool ] = None ,
247255 redis_connect_func = None ,
248256 credential_provider : Optional [CredentialProvider ] = None ,
@@ -251,10 +259,24 @@ def __init__(
251259 ):
252260 """
253261 Initialize a new Redis client.
254- To specify a retry policy for specific errors, first set
255- `retry_on_error` to a list of the error/s to retry on, then set
256- `retry` to a valid `Retry` object.
257- To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
262+
263+ To specify a retry policy for specific errors, you have two options:
264+
265+ 1. Set the `retry_on_error` to a list of the error/s to retry on, and
266+ you can also set `retry` to a valid `Retry` object(in case the default
267+ one is not appropriate) - with this approach the retries will be triggered
268+ on the default errors specified in the Retry object enriched with the
269+ errors specified in `retry_on_error`.
270+
271+ 2. Define a `Retry` object with configured 'supported_errors' and set
272+ it to the `retry` parameter - with this approach you completely redefine
273+ the errors on which retries will happen.
274+
275+ `retry_on_timeout` is deprecated - please include the TimeoutError
276+ either in the Retry object or in the `retry_on_error` list.
277+
278+ When 'connection_pool' is provided - the retry configuration of the
279+ provided pool will be used.
258280 """
259281 kwargs : Dict [str , Any ]
260282 if event_dispatcher is None :
@@ -280,8 +302,6 @@ def __init__(
280302 # Create internal connection pool, expected to be closed by Redis instance
281303 if not retry_on_error :
282304 retry_on_error = []
283- if retry_on_timeout is True :
284- retry_on_error .append (TimeoutError )
285305 kwargs = {
286306 "db" : db ,
287307 "username" : username ,
@@ -291,7 +311,6 @@ def __init__(
291311 "encoding" : encoding ,
292312 "encoding_errors" : encoding_errors ,
293313 "decode_responses" : decode_responses ,
294- "retry_on_timeout" : retry_on_timeout ,
295314 "retry_on_error" : retry_on_error ,
296315 "retry" : copy .deepcopy (retry ),
297316 "max_connections" : max_connections ,
@@ -403,10 +422,10 @@ def get_connection_kwargs(self):
403422 """Get the connection's key-word arguments"""
404423 return self .connection_pool .connection_kwargs
405424
406- def get_retry (self ) -> Optional [" Retry" ]:
425+ def get_retry (self ) -> Optional [Retry ]:
407426 return self .get_connection_kwargs ().get ("retry" )
408427
409- def set_retry (self , retry : " Retry" ) -> None :
428+ def set_retry (self , retry : Retry ) -> None :
410429 self .get_connection_kwargs ().update ({"retry" : retry })
411430 self .connection_pool .set_retry (retry )
412431
@@ -633,18 +652,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
633652 await conn .send_command (* args )
634653 return await self .parse_response (conn , command_name , ** options )
635654
636- async def _disconnect_raise (self , conn : Connection , error : Exception ):
655+ async def _close_connection (self , conn : Connection ):
637656 """
638- Close the connection and raise an exception
639- if retry_on_error is not set or the error
640- is not one of the specified error types
657+ Close the connection before retrying.
658+
659+ The supported exceptions are already checked in the
660+ retry object so we don't need to do it here.
661+
662+ After we disconnect the connection, it will try to reconnect and
663+ do a health check as part of the send_command logic(on connection level).
641664 """
642665 await conn .disconnect ()
643- if (
644- conn .retry_on_error is None
645- or isinstance (error , tuple (conn .retry_on_error )) is False
646- ):
647- raise error
648666
649667 # COMMAND EXECUTION AND PROTOCOL PARSING
650668 async def execute_command (self , * args , ** options ):
@@ -661,7 +679,7 @@ async def execute_command(self, *args, **options):
661679 lambda : self ._send_command_parse_response (
662680 conn , command_name , * args , ** options
663681 ),
664- lambda error : self ._disconnect_raise (conn , error ),
682+ lambda _ : self ._close_connection (conn ),
665683 )
666684 finally :
667685 if self .single_connection_client :
@@ -929,19 +947,11 @@ async def connect(self):
929947 )
930948 )
931949
932- async def _disconnect_raise_connect (self , conn , error ):
950+ async def _reconnect (self , conn ):
933951 """
934- Close the connection and raise an exception
935- if retry_on_error is not set or the error is not one
936- of the specified error types. Otherwise, try to
937- reconnect
952+ Try to reconnect
938953 """
939954 await conn .disconnect ()
940- if (
941- conn .retry_on_error is None
942- or isinstance (error , tuple (conn .retry_on_error )) is False
943- ):
944- raise error
945955 await conn .connect ()
946956
947957 async def _execute (self , conn , command , * args , ** kwargs ):
@@ -954,7 +964,7 @@ async def _execute(self, conn, command, *args, **kwargs):
954964 """
955965 return await conn .retry .call_with_retry (
956966 lambda : command (* args , ** kwargs ),
957- lambda error : self ._disconnect_raise_connect (conn , error ),
967+ lambda _ : self ._reconnect (conn ),
958968 )
959969
960970 async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -1245,7 +1255,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
12451255 in one transmission. This is convenient for batch processing, such as
12461256 saving all the values in a list to Redis.
12471257
1248- All commands executed within a pipeline are wrapped with MULTI and EXEC
1258+ All commands executed within a pipeline(when running in transactional mode,
1259+ which is the default behavior) are wrapped with MULTI and EXEC
12491260 calls. This guarantees all commands executed in the pipeline will be
12501261 executed atomically.
12511262
@@ -1274,7 +1285,7 @@ def __init__(
12741285 self .shard_hint = shard_hint
12751286 self .watching = False
12761287 self .command_stack : CommandStackT = []
1277- self .scripts : Set [" Script" ] = set ()
1288+ self .scripts : Set [Script ] = set ()
12781289 self .explicit_transaction = False
12791290
12801291 async def __aenter__ (self : _RedisT ) -> _RedisT :
@@ -1346,36 +1357,36 @@ def execute_command(
13461357 return self .immediate_execute_command (* args , ** kwargs )
13471358 return self .pipeline_execute_command (* args , ** kwargs )
13481359
1349- async def _disconnect_reset_raise (self , conn , error ):
1360+ async def _disconnect_reset_raise_on_watching (
1361+ self ,
1362+ conn : Connection ,
1363+ error : Exception ,
1364+ ):
13501365 """
1351- Close the connection, reset watching state and
1352- raise an exception if we were watching,
1353- if retry_on_error is not set or the error is not one
1354- of the specified error types.
1366+ Close the connection reset watching state and
1367+ raise an exception if we were watching.
1368+
1369+ The supported exceptions are already checked in the
1370+ retry object so we don't need to do it here.
1371+
1372+ After we disconnect the connection, it will try to reconnect and
1373+ do a health check as part of the send_command logic(on connection level).
13551374 """
13561375 await conn .disconnect ()
13571376 # if we were already watching a variable, the watch is no longer
13581377 # valid since this connection has died. raise a WatchError, which
13591378 # indicates the user should retry this transaction.
13601379 if self .watching :
1361- await self .aclose ()
1380+ await self .reset ()
13621381 raise WatchError (
1363- "A ConnectionError occurred on while watching one or more keys"
1382+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
13641383 )
1365- # if retry_on_error is not set or the error is not one
1366- # of the specified error types, raise it
1367- if (
1368- conn .retry_on_error is None
1369- or isinstance (error , tuple (conn .retry_on_error )) is False
1370- ):
1371- await self .aclose ()
1372- raise
13731384
13741385 async def immediate_execute_command (self , * args , ** options ):
13751386 """
1376- Execute a command immediately, but don't auto-retry on a
1377- ConnectionError if we're already WATCHing a variable. Used when
1378- issuing WATCH or subsequent commands retrieving their values but before
1387+ Execute a command immediately, but don't auto-retry on the supported
1388+ errors for retry if we're already WATCHing a variable.
1389+ Used when issuing WATCH or subsequent commands retrieving their values but before
13791390 MULTI is called.
13801391 """
13811392 command_name = args [0 ]
@@ -1389,7 +1400,7 @@ async def immediate_execute_command(self, *args, **options):
13891400 lambda : self ._send_command_parse_response (
13901401 conn , command_name , * args , ** options
13911402 ),
1392- lambda error : self ._disconnect_reset_raise (conn , error ),
1403+ lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
13931404 )
13941405
13951406 def pipeline_execute_command (self , * args , ** options ):
@@ -1544,28 +1555,24 @@ async def load_scripts(self):
15441555 if not exist :
15451556 s .sha = await immediate ("SCRIPT LOAD" , s .script )
15461557
1547- async def _disconnect_raise_reset (self , conn : Connection , error : Exception ):
1558+ async def _disconnect_raise_on_watching (self , conn : Connection , error : Exception ):
15481559 """
1549- Close the connection, raise an exception if we were watching,
1550- and raise an exception if retry_on_error is not set or the
1551- error is not one of the specified error types.
1560+ Close the connection, raise an exception if we were watching.
1561+
1562+ The supported exceptions are already checked in the
1563+ retry object so we don't need to do it here.
1564+
1565+ After we disconnect the connection, it will try to reconnect and
1566+ do a health check as part of the send_command logic(on connection level).
15521567 """
15531568 await conn .disconnect ()
15541569 # if we were watching a variable, the watch is no longer valid
15551570 # since this connection has died. raise a WatchError, which
15561571 # indicates the user should retry this transaction.
15571572 if self .watching :
15581573 raise WatchError (
1559- "A ConnectionError occurred on while watching one or more keys"
1574+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
15601575 )
1561- # if retry_on_error is not set or the error is not one
1562- # of the specified error types, raise it
1563- if (
1564- conn .retry_on_error is None
1565- or isinstance (error , tuple (conn .retry_on_error )) is False
1566- ):
1567- await self .reset ()
1568- raise
15691576
15701577 async def execute (self , raise_on_error : bool = True ) -> List [Any ]:
15711578 """Execute all the commands in the current pipeline"""
@@ -1590,7 +1597,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]:
15901597 try :
15911598 return await conn .retry .call_with_retry (
15921599 lambda : execute (conn , stack , raise_on_error ),
1593- lambda error : self ._disconnect_raise_reset (conn , error ),
1600+ lambda error : self ._disconnect_raise_on_watching (conn , error ),
15941601 )
15951602 finally :
15961603 await self .reset ()
0 commit comments