Skip to content

Commit a03eb92

Browse files
Fix topic lookup segmentation fault after client is closed (#521)
1 parent 268aa4e commit a03eb92

File tree

7 files changed

+113
-24
lines changed

7 files changed

+113
-24
lines changed

lib/ClientImpl.cc

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,25 @@ typedef std::unique_lock<std::mutex> Lock;
7878

7979
typedef std::vector<std::string> StringList;
8080

81+
static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
82+
const ClientConfiguration& clientConfiguration,
83+
ConnectionPool& pool, const AuthenticationPtr& auth) {
84+
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
85+
LOG_DEBUG("Using HTTP Lookup");
86+
return std::make_shared<HTTPLookupService>(serviceUrl, std::cref(clientConfiguration),
87+
std::cref(auth));
88+
} else {
89+
LOG_DEBUG("Using Binary Lookup");
90+
return std::make_shared<BinaryProtoLookupService>(serviceUrl, std::ref(pool),
91+
std::cref(clientConfiguration));
92+
}
93+
}
94+
8195
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
96+
: ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {}
97+
98+
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
99+
LookupServiceFactory&& lookupServiceFactory)
82100
: mutex_(),
83101
state_(Open),
84102
clientConfiguration_(ClientConfiguration(clientConfiguration)
@@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
95113
consumerIdGenerator_(0),
96114
closingError(ResultOk),
97115
useProxy_(false),
98-
lookupCount_(0L) {
116+
lookupCount_(0L),
117+
lookupServiceFactory_(std::move(lookupServiceFactory)) {
99118
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
100119
if (loggerFactory) {
101120
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
106125
ClientImpl::~ClientImpl() { shutdown(); }
107126

108127
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
109-
LookupServicePtr underlyingLookupServicePtr;
110-
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
111-
LOG_DEBUG("Using HTTP Lookup");
112-
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
113-
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
114-
} else {
115-
LOG_DEBUG("Using Binary Lookup");
116-
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
117-
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
118-
}
119-
120128
auto lookupServicePtr = RetryableLookupService::create(
121-
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
129+
lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()),
130+
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
122131
return lookupServicePtr;
123132
}
124133

@@ -767,6 +776,7 @@ void ClientImpl::shutdown() {
767776
<< " consumers have been shutdown.");
768777
}
769778

779+
lookupServicePtr_->close();
770780
if (!pool_.close()) {
771781
// pool_ has already been closed. It means shutdown() has been called before.
772782
return;

lib/ClientImpl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
5454

5555
class LookupService;
5656
using LookupServicePtr = std::shared_ptr<LookupService>;
57+
using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
58+
ConnectionPool& pool, const AuthenticationPtr&)>;
5759

5860
class ProducerImplBase;
5961
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -70,6 +72,11 @@ std::string generateRandomName();
7072
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7173
public:
7274
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
75+
76+
// only for tests
77+
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
78+
LookupServiceFactory&& lookupServiceFactory);
79+
7380
virtual ~ClientImpl();
7481

7582
/**
@@ -205,6 +212,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
205212
std::atomic<Result> closingError;
206213
std::atomic<bool> useProxy_;
207214
std::atomic<uint64_t> lookupCount_;
215+
LookupServiceFactory lookupServiceFactory_;
208216

209217
friend class Client;
210218
};

lib/ResultUtils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) {
4949
ResultLookupError,
5050
ResultTooManyLookupRequestException,
5151
ResultProducerBlockedQuotaExceededException,
52-
ResultProducerBlockedQuotaExceededError};
52+
ResultProducerBlockedQuotaExceededError,
53+
ResultAlreadyClosed};
5354
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
5455
}
5556

lib/RetryableLookupService.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
#pragma once
2020

21-
#include <chrono>
22-
2321
#include "LookupDataResult.h"
2422
#include "LookupService.h"
2523
#include "NamespaceName.h"
@@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService {
4139
: RetryableLookupService(std::forward<Args>(args)...) {}
4240

4341
void close() override {
44-
lookupCache_->clear();
45-
partitionLookupCache_->clear();
46-
namespaceLookupCache_->clear();
47-
getSchemaCache_->clear();
42+
lookupCache_->close();
43+
partitionLookupCache_->close();
44+
namespaceLookupCache_->close();
45+
getSchemaCache_->close();
4846
}
4947

5048
template <typename... Args>
@@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService {
8987

9088
RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
9189
ExecutorServiceProviderPtr executorProvider)
92-
: lookupService_(lookupService),
90+
: lookupService_(std::move(lookupService)),
9391
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
9492
partitionLookupCache_(
9593
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),

lib/RetryableOperationCache.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
5858

5959
Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) {
6060
std::unique_lock<std::mutex> lock{mutex_};
61+
if (closed_) {
62+
Promise<Result, T> promise;
63+
promise.setFailed(ResultAlreadyClosed);
64+
return promise.getFuture();
65+
}
6166
auto it = operations_.find(key);
6267
if (it == operations_.end()) {
6368
DeadlineTimerPtr timer;
@@ -92,11 +97,15 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
9297
}
9398
}
9499

95-
void clear() {
100+
void close() {
96101
decltype(operations_) operations;
97102
{
98103
std::lock_guard<std::mutex> lock{mutex_};
104+
if (closed_) {
105+
return;
106+
}
99107
operations.swap(operations_);
108+
closed_ = true;
100109
}
101110
// cancel() could trigger the listener to erase the key from operations, so we should use a swap way
102111
// to release the lock here
@@ -110,6 +119,7 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
110119
const TimeDuration timeout_;
111120

112121
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
122+
bool closed_{false};
113123
mutable std::mutex mutex_;
114124

115125
DECLARE_LOG_OBJECT()

tests/LookupServiceTest.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,3 +500,65 @@ TEST(LookupServiceTest, testRedirectionLimit) {
500500
}
501501
}
502502
}
503+
504+
class MockLookupService : public BinaryProtoLookupService {
505+
public:
506+
using BinaryProtoLookupService::BinaryProtoLookupService;
507+
508+
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
509+
bool expected = true;
510+
if (firstTime_.compare_exchange_strong(expected, false)) {
511+
// Trigger the retry
512+
LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
513+
Promise<Result, LookupDataResultPtr> promise;
514+
promise.setFailed(ResultRetryable);
515+
return promise.getFuture();
516+
}
517+
return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
518+
}
519+
520+
private:
521+
std::atomic_bool firstTime_{true};
522+
};
523+
524+
TEST(LookupServiceTest, testAfterClientShutdown) {
525+
auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", ClientConfiguration{},
526+
[](const std::string& serviceUrl, const ClientConfiguration&,
527+
ConnectionPool& pool, const AuthenticationPtr&) {
528+
return std::make_shared<MockLookupService>(
529+
serviceUrl, pool, ClientConfiguration{});
530+
});
531+
std::promise<Result> promise;
532+
client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
533+
[&promise](Result result, const Consumer&) { promise.set_value(result); });
534+
// When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
535+
// MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
536+
client->shutdown();
537+
EXPECT_EQ(ResultDisconnected, promise.get_future().get());
538+
539+
// A new subscribeAsync call will fail immediately in the current thread
540+
Result result = ResultOk;
541+
client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
542+
[&result](Result innerResult, const Consumer&) { result = innerResult; });
543+
EXPECT_EQ(ResultAlreadyClosed, result);
544+
}
545+
546+
TEST(LookupServiceTest, testRetryAfterDestroyed) {
547+
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
548+
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
549+
550+
auto internalLookupService =
551+
std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, ClientConfiguration{});
552+
auto lookupService =
553+
RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);
554+
555+
// Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
556+
// lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
557+
lookupService->close();
558+
Result result = ResultOk;
559+
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
560+
.addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
561+
EXPECT_EQ(ResultAlreadyClosed, result);
562+
pool.close();
563+
executorProvider->close();
564+
}

tests/RetryableOperationCacheTest.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ TEST_F(RetryableOperationCacheTest, testTimeout) {
118118
}
119119
}
120120

121-
TEST_F(RetryableOperationCacheTest, testClear) {
121+
TEST_F(RetryableOperationCacheTest, testClose) {
122122
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
123123
for (int i = 0; i < 10; i++) {
124124
futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));
125125
}
126126
ASSERT_EQ(getSize(*cache), 10);
127-
cache->clear();
127+
cache->close();
128128
for (auto&& future : futures_) {
129129
int value;
130130
// All cancelled futures complete with ResultDisconnected and the default int value

0 commit comments

Comments
 (0)