@@ -370,7 +370,7 @@ private void lazyListen() {
370370 State state = this .state .get ();
371371
372372 CompletableFuture <Void > futureToAwait = state .isPrepareListening () ? containerListenFuture
373- : lazyListen (this .backOff .start ());
373+ : lazyListen (new InitialBackoffExecution ( this .backOff .start () ));
374374
375375 try {
376376 futureToAwait .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
@@ -531,8 +531,7 @@ private void awaitRegistrationTime(CompletableFuture<Void> future) {
531531 future .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
532532 } catch (InterruptedException ex ) {
533533 Thread .currentThread ().interrupt ();
534- } catch (ExecutionException | TimeoutException ignore ) {
535- }
534+ } catch (ExecutionException | TimeoutException ignore ) {}
536535 }
537536
538537 @ Override
@@ -876,7 +875,7 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
876875
877876 if (recoveryInterval != BackOffExecution .STOP ) {
878877 String message = String .format ("Connection failure occurred: %s; Restarting subscription task after %s ms" ,
879- cause , recoveryInterval );
878+ cause , recoveryInterval );
880879 logger .error (message , cause );
881880 }
882881
@@ -885,8 +884,13 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
885884
886885 Runnable recoveryFunction = () -> {
887886
888- CompletableFuture <Void > lazyListen = lazyListen (backOffExecution );
889- lazyListen .whenComplete (propagate (future ));
887+ CompletableFuture <Void > lazyListen = lazyListen (new RecoveryBackoffExecution (backOffExecution ));
888+ lazyListen .whenComplete (propagate (future )).thenRun (() -> {
889+
890+ if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution ) {
891+ logger .info ("Subscription(s) recovered" );
892+ }
893+ });
890894 };
891895
892896 if (potentiallyRecover (loggingBackOffExecution , recoveryFunction )) {
@@ -980,7 +984,7 @@ private boolean hasTopics() {
980984 private Subscriber getRequiredSubscriber () {
981985
982986 Assert .state (this .subscriber != null ,
983- "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber" );
987+ "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called " );
984988
985989 return this .subscriber ;
986990 }
@@ -1018,6 +1022,54 @@ private void logTrace(Supplier<String> message) {
10181022 }
10191023 }
10201024
1025+ BackOffExecution nextBackoffExecution (BackOffExecution backOffExecution , boolean subscribed ) {
1026+
1027+ if (subscribed ) {
1028+ return new RecoveryAfterSubscriptionBackoffExecution (backOff .start ());
1029+ }
1030+
1031+ return backOffExecution ;
1032+ }
1033+
1034+ /**
1035+ * Marker for an initial backoff.
1036+ *
1037+ * @param delegate
1038+ */
1039+ record InitialBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1040+
1041+ @ Override
1042+ public long nextBackOff () {
1043+ return delegate .nextBackOff ();
1044+ }
1045+ }
1046+
1047+ /**
1048+ * Marker for a recovery after a subscription has been active previously.
1049+ *
1050+ * @param delegate
1051+ */
1052+ record RecoveryAfterSubscriptionBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1053+
1054+ @ Override
1055+ public long nextBackOff () {
1056+ return delegate .nextBackOff ();
1057+ }
1058+ }
1059+
1060+ /**
1061+ * Marker for a recovery execution.
1062+ *
1063+ * @param delegate
1064+ */
1065+ record RecoveryBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1066+
1067+ @ Override
1068+ public long nextBackOff () {
1069+ return delegate .nextBackOff ();
1070+ }
1071+ }
1072+
10211073 /**
10221074 * Represents an operation that accepts three input arguments {@link SubscriptionListener},
10231075 * {@code channel or pattern}, and {@code count} and returns no result.
@@ -1191,18 +1243,23 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
11911243 if (connection .isSubscribed ()) {
11921244
11931245 initFuture .completeExceptionally (
1194- new IllegalStateException ("Retrieved connection is already subscribed; aborting listening" ));
1246+ new IllegalStateException ("Retrieved connection is already subscribed; aborting listening" ));
11951247
11961248 return initFuture ;
11971249 }
11981250
11991251 try {
12001252 eventuallyPerformSubscription (connection , backOffExecution , initFuture , patterns , channels );
12011253 } catch (Throwable t ) {
1202- handleSubscriptionException (initFuture , backOffExecution , t );
1254+ handleSubscriptionException (initFuture , nextBackoffExecution (backOffExecution , connection .isSubscribed ()),
1255+ t );
12031256 }
12041257 } catch (RuntimeException ex ) {
1205- initFuture .completeExceptionally (ex );
1258+ if (backOffExecution instanceof InitialBackoffExecution ) {
1259+ initFuture .completeExceptionally (ex );
1260+ } else {
1261+ handleSubscriptionException (initFuture , backOffExecution , ex );
1262+ }
12061263 }
12071264
12081265 return initFuture ;
@@ -1215,8 +1272,9 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
12151272 void eventuallyPerformSubscription (RedisConnection connection , BackOffExecution backOffExecution ,
12161273 CompletableFuture <Void > subscriptionDone , Collection <byte []> patterns , Collection <byte []> channels ) {
12171274
1218- addSynchronization (new SynchronizingMessageListener .SubscriptionSynchronization (patterns , channels ,
1219- () -> subscriptionDone .complete (null )));
1275+ addSynchronization (new SynchronizingMessageListener .SubscriptionSynchronization (patterns , channels , () -> {
1276+ subscriptionDone .complete (null );
1277+ }));
12201278
12211279 doSubscribe (connection , patterns , channels );
12221280 }
@@ -1381,7 +1439,10 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
13811439 }
13821440
13831441 private void doInLock (Runnable runner ) {
1384- doInLock (() -> { runner .run (); return null ; });
1442+ doInLock (() -> {
1443+ runner .run ();
1444+ return null ;
1445+ });
13851446 }
13861447
13871448 private <T > T doInLock (Supplier <T > supplier ) {
@@ -1432,7 +1493,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14321493 try {
14331494 subscribeChannel (channels .toArray (new byte [0 ][]));
14341495 } catch (Exception ex ) {
1435- handleSubscriptionException (subscriptionDone , backOffExecution , ex );
1496+ handleSubscriptionException (subscriptionDone , nextBackoffExecution ( backOffExecution , true ) , ex );
14361497 }
14371498 }));
14381499 } else {
@@ -1449,7 +1510,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14491510 closeConnection ();
14501511 unsubscribeFuture .complete (null );
14511512 } catch (Throwable cause ) {
1452- handleSubscriptionException (subscriptionDone , backOffExecution , cause );
1513+ handleSubscriptionException (subscriptionDone ,
1514+ nextBackoffExecution (backOffExecution , connection .isSubscribed ()), cause );
14531515 }
14541516 });
14551517 }
0 commit comments