@@ -500,3 +500,65 @@ TEST(LookupServiceTest, testRedirectionLimit) {
500500 }
501501 }
502502}
503+
504+ class MockLookupService : public BinaryProtoLookupService {
505+ public:
506+ using BinaryProtoLookupService::BinaryProtoLookupService;
507+
508+ Future<Result, LookupDataResultPtr> getPartitionMetadataAsync (const TopicNamePtr& topicName) override {
509+ bool expected = true ;
510+ if (firstTime_.compare_exchange_strong (expected, false )) {
511+ // Trigger the retry
512+ LOG_INFO (" Fail the lookup for " << topicName->toString () << " intentionally" );
513+ Promise<Result, LookupDataResultPtr> promise;
514+ promise.setFailed (ResultRetryable);
515+ return promise.getFuture ();
516+ }
517+ return BinaryProtoLookupService::getPartitionMetadataAsync (topicName);
518+ }
519+
520+ private:
521+ std::atomic_bool firstTime_{true };
522+ };
523+
524+ TEST (LookupServiceTest, testAfterClientShutdown) {
525+ auto client = std::make_shared<ClientImpl>(" pulsar://localhost:6650" , ClientConfiguration{},
526+ [](const std::string& serviceUrl, const ClientConfiguration&,
527+ ConnectionPool& pool, const AuthenticationPtr&) {
528+ return std::make_shared<MockLookupService>(
529+ serviceUrl, pool, ClientConfiguration{});
530+ });
531+ std::promise<Result> promise;
532+ client->subscribeAsync (" lookup-service-test-after-client-shutdown" , " sub" , ConsumerConfiguration{},
533+ [&promise](Result result, const Consumer&) { promise.set_value (result); });
534+ // When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
535+ // MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
536+ client->shutdown ();
537+ EXPECT_EQ (ResultDisconnected, promise.get_future ().get ());
538+
539+ // A new subscribeAsync call will fail immediately in the current thread
540+ Result result = ResultOk;
541+ client->subscribeAsync (" lookup-service-test-retry-after-destroyed" , " sub" , ConsumerConfiguration{},
542+ [&result](Result innerResult, const Consumer&) { result = innerResult; });
543+ EXPECT_EQ (ResultAlreadyClosed, result);
544+ }
545+
546+ TEST (LookupServiceTest, testRetryAfterDestroyed) {
547+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1 );
548+ ConnectionPool pool ({}, executorProvider, AuthFactory::Disabled (), " " );
549+
550+ auto internalLookupService =
551+ std::make_shared<MockLookupService>(" pulsar://localhost:6650" , pool, ClientConfiguration{});
552+ auto lookupService =
553+ RetryableLookupService::create (internalLookupService, std::chrono::seconds (30 ), executorProvider);
554+
555+ // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
556+ // lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
557+ lookupService->close ();
558+ Result result = ResultOk;
559+ lookupService->getPartitionMetadataAsync (TopicName::get (" lookup-service-test-retry-after-destroyed" ))
560+ .addListener ([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
561+ EXPECT_EQ (ResultAlreadyClosed, result);
562+ pool.close ();
563+ executorProvider->close ();
564+ }
0 commit comments