Skip to content

Commit 268aa4e

Browse files
Fix possible zombie consumer when closing after reconnection (#518)
1 parent 648b48b commit 268aa4e

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

lib/ConsumerImpl.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,13 +293,38 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
293293
Result handleResult = ResultOk;
294294

295295
if (result == ResultOk) {
296-
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
297296
{
298297
Lock mutexLock(mutex_);
298+
if (!changeToReadyState()) {
299+
auto client = client_.lock();
300+
if (client) {
301+
LOG_INFO(getName() << "Closing subscribed consumer since it was already closed");
302+
int requestId = client->newRequestId();
303+
auto name = getName();
304+
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
305+
.addListener([name](Result result, const ResponseData&) {
306+
if (result == ResultOk) {
307+
LOG_INFO(name << "Closed consumer successfully after subscribe completed");
308+
} else {
309+
LOG_WARN(name << "Failed to close consumer: " << strResult(result));
310+
}
311+
});
312+
} else {
313+
// This should not happen normally because if client is destroyed, the connection pool
314+
// should also be closed, which means all connections should be closed. Close the
315+
// connection to let broker know this registered consumer is inactive.
316+
LOG_WARN(getName()
317+
<< "Client already closed when subscribe completed, close the connection "
318+
<< cnx->cnxString());
319+
cnx->close(ResultNotConnected);
320+
}
321+
return ResultAlreadyClosed;
322+
}
323+
324+
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
299325
setCnx(cnx);
300326
incomingMessages_.clear();
301327
possibleSendToDeadLetterTopicMessages_.clear();
302-
state_ = Ready;
303328
backoff_.reset();
304329
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
305330
// Complicated logic since we don't have a isLocked() function for mutex

lib/HandlerBase.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
148148
firstRequestIdAfterConnect_.store(requestId, std::memory_order_release);
149149
}
150150

151+
bool changeToReadyState() noexcept {
152+
State expected = Pending;
153+
return state_ == Ready || state_.compare_exchange_strong(expected, Ready);
154+
}
155+
151156
private:
152157
DeadlineTimerPtr timer_;
153158
DeadlineTimerPtr creationTimer_;

tests/ConsumerTest.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,4 +1573,26 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
15731573
ASSERT_EQ(ResultOk, client.close());
15741574
}
15751575

1576+
TEST(ConsumerTest, testCloseAfterSeek) {
1577+
const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr));
1578+
const auto subscription = "sub";
1579+
Client client(lookupUrl);
1580+
Consumer consumer;
1581+
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
1582+
ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
1583+
consumer.closeAsync(nullptr);
1584+
1585+
// Test the previous consumer will be closed even after seek is done, at the moment the connection might
1586+
// not be established.
1587+
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
1588+
1589+
// Test creating a consumer from a different client should also work for this case
1590+
Client anotherClient(lookupUrl);
1591+
consumer.closeAsync(nullptr);
1592+
ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer));
1593+
1594+
client.close();
1595+
anotherClient.close();
1596+
}
1597+
15761598
} // namespace pulsar

0 commit comments

Comments
 (0)