Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,25 @@ typedef std::unique_lock<std::mutex> Lock;

typedef std::vector<std::string> StringList;

static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
const ClientConfiguration& clientConfiguration,
ConnectionPool& pool, const AuthenticationPtr& auth) {
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
LOG_DEBUG("Using HTTP Lookup");
return std::make_shared<HTTPLookupService>(serviceUrl, std::cref(clientConfiguration),
std::cref(auth));
} else {
LOG_DEBUG("Using Binary Lookup");
return std::make_shared<BinaryProtoLookupService>(serviceUrl, std::ref(pool),
std::cref(clientConfiguration));
}
}

ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
: ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {}

ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
LookupServiceFactory&& lookupServiceFactory)
: mutex_(),
state_(Open),
clientConfiguration_(ClientConfiguration(clientConfiguration)
Expand All @@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
consumerIdGenerator_(0),
closingError(ResultOk),
useProxy_(false),
lookupCount_(0L) {
lookupCount_(0L),
lookupServiceFactory_(std::move(lookupServiceFactory)) {
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
Expand All @@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
ClientImpl::~ClientImpl() { shutdown(); }

LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
LookupServicePtr underlyingLookupServicePtr;
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
LOG_DEBUG("Using HTTP Lookup");
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
}

auto lookupServicePtr = RetryableLookupService::create(
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()),
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}

Expand Down Expand Up @@ -767,6 +776,7 @@ void ClientImpl::shutdown() {
<< " consumers have been shutdown.");
}

lookupServicePtr_->close();
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
Expand Down
8 changes: 8 additions & 0 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;

class LookupService;
using LookupServicePtr = std::shared_ptr<LookupService>;
using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
ConnectionPool& pool, const AuthenticationPtr&)>;

class ProducerImplBase;
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
Expand All @@ -70,6 +72,11 @@ std::string generateRandomName();
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);

// only for tests
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
LookupServiceFactory&& lookupServiceFactory);

virtual ~ClientImpl();

/**
Expand Down Expand Up @@ -205,6 +212,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
std::atomic<Result> closingError;
std::atomic<bool> useProxy_;
std::atomic<uint64_t> lookupCount_;
LookupServiceFactory lookupServiceFactory_;

friend class Client;
};
Expand Down
3 changes: 2 additions & 1 deletion lib/ResultUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) {
ResultLookupError,
ResultTooManyLookupRequestException,
ResultProducerBlockedQuotaExceededException,
ResultProducerBlockedQuotaExceededError};
ResultProducerBlockedQuotaExceededError,
ResultAlreadyClosed};
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
}

Expand Down
12 changes: 5 additions & 7 deletions lib/RetryableLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
#pragma once

#include <chrono>

#include "LookupDataResult.h"
#include "LookupService.h"
#include "NamespaceName.h"
Expand All @@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService {
: RetryableLookupService(std::forward<Args>(args)...) {}

void close() override {
lookupCache_->clear();
partitionLookupCache_->clear();
namespaceLookupCache_->clear();
getSchemaCache_->clear();
lookupCache_->close();
partitionLookupCache_->close();
namespaceLookupCache_->close();
getSchemaCache_->close();
}

template <typename... Args>
Expand Down Expand Up @@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService {

RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
ExecutorServiceProviderPtr executorProvider)
: lookupService_(lookupService),
: lookupService_(std::move(lookupService)),
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
partitionLookupCache_(
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),
Expand Down
12 changes: 11 additions & 1 deletion lib/RetryableOperationCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe

Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) {
std::unique_lock<std::mutex> lock{mutex_};
if (closed_) {
Promise<Result, T> promise;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}
auto it = operations_.find(key);
if (it == operations_.end()) {
DeadlineTimerPtr timer;
Expand Down Expand Up @@ -92,11 +97,15 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
}
}

void clear() {
void close() {
decltype(operations_) operations;
{
std::lock_guard<std::mutex> lock{mutex_};
if (closed_) {
return;
}
operations.swap(operations_);
closed_ = true;
}
// cancel() could trigger the listener to erase the key from operations, so we should use a swap way
// to release the lock here
Expand All @@ -110,6 +119,7 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
const TimeDuration timeout_;

std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
bool closed_{false};
mutable std::mutex mutex_;

DECLARE_LOG_OBJECT()
Expand Down
62 changes: 62 additions & 0 deletions tests/LookupServiceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,3 +500,65 @@ TEST(LookupServiceTest, testRedirectionLimit) {
}
}
}

class MockLookupService : public BinaryProtoLookupService {
public:
using BinaryProtoLookupService::BinaryProtoLookupService;

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
bool expected = true;
if (firstTime_.compare_exchange_strong(expected, false)) {
// Trigger the retry
LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
Promise<Result, LookupDataResultPtr> promise;
promise.setFailed(ResultRetryable);
return promise.getFuture();
}
return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
}

private:
std::atomic_bool firstTime_{true};
};

TEST(LookupServiceTest, testAfterClientShutdown) {
auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", ClientConfiguration{},
[](const std::string& serviceUrl, const ClientConfiguration&,
ConnectionPool& pool, const AuthenticationPtr&) {
return std::make_shared<MockLookupService>(
serviceUrl, pool, ClientConfiguration{});
});
std::promise<Result> promise;
client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
[&promise](Result result, const Consumer&) { promise.set_value(result); });
// When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
// MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
client->shutdown();
EXPECT_EQ(ResultDisconnected, promise.get_future().get());

// A new subscribeAsync call will fail immediately in the current thread
Result result = ResultOk;
client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
[&result](Result innerResult, const Consumer&) { result = innerResult; });
EXPECT_EQ(ResultAlreadyClosed, result);
}

TEST(LookupServiceTest, testRetryAfterDestroyed) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");

auto internalLookupService =
std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, ClientConfiguration{});
auto lookupService =
RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);

// Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
// lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
lookupService->close();
Result result = ResultOk;
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
.addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
EXPECT_EQ(ResultAlreadyClosed, result);
pool.close();
executorProvider->close();
}
4 changes: 2 additions & 2 deletions tests/RetryableOperationCacheTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ TEST_F(RetryableOperationCacheTest, testTimeout) {
}
}

TEST_F(RetryableOperationCacheTest, testClear) {
TEST_F(RetryableOperationCacheTest, testClose) {
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));
}
ASSERT_EQ(getSize(*cache), 10);
cache->clear();
cache->close();
for (auto&& future : futures_) {
int value;
// All cancelled futures complete with ResultDisconnected and the default int value
Expand Down