Skip to content

Commit 648b48b

Browse files
Fix a null ACK grouping tracker can be accessed after consumer is closed (#517)
Fixes #516
1 parent 12471ec commit 648b48b

10 files changed

+311
-317
lines changed

lib/AckGroupingTracker.cc

Lines changed: 0 additions & 141 deletions
This file was deleted.

lib/AckGroupingTracker.h

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,17 @@
2222
#include <pulsar/MessageId.h>
2323
#include <pulsar/Result.h>
2424

25-
#include <cstdint>
2625
#include <functional>
27-
#include <set>
28-
29-
#include "ProtoApiEnums.h"
3026

3127
namespace pulsar {
3228

3329
class ClientConnection;
3430
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
3531
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
3632
using ResultCallback = std::function<void(Result)>;
33+
class ConsumerImpl;
34+
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
35+
using ConsumerImplWeakPtr = std::weak_ptr<ConsumerImpl>;
3736

3837
/**
3938
* @class AckGroupingTracker
@@ -42,19 +41,12 @@ using ResultCallback = std::function<void(Result)>;
4241
*/
4342
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
4443
public:
45-
AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
46-
std::function<uint64_t()> requestIdSupplier, uint64_t consumerId, bool waitResponse)
47-
: connectionSupplier_(std::move(connectionSupplier)),
48-
requestIdSupplier_(std::move(requestIdSupplier)),
49-
consumerId_(consumerId),
50-
waitResponse_(waitResponse) {}
51-
5244
virtual ~AckGroupingTracker() = default;
5345

5446
/**
5547
* Start tracking the ACK requests.
5648
*/
57-
virtual void start() {}
49+
virtual void start(const ConsumerImplPtr& consumer) { consumer_ = consumer; }
5850

5951
/**
6052
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
@@ -72,7 +64,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
7264
* @param[in] callback the callback that is triggered when the message is acknowledged
7365
*/
7466
virtual void addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
75-
callback(ResultOk);
67+
if (callback) {
68+
callback(ResultOk);
69+
}
7670
}
7771

7872
/**
@@ -81,7 +75,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
8175
* @param[in] callback the callback that is triggered when the messages are acknowledged
8276
*/
8377
virtual void addAcknowledgeList(const MessageIdList& msgIds, const ResultCallback& callback) {
84-
callback(ResultOk);
78+
if (callback) {
79+
callback(ResultOk);
80+
}
8581
}
8682

8783
/**
@@ -90,7 +86,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
9086
* @param[in] callback the callback that is triggered when the message is acknowledged
9187
*/
9288
virtual void addAcknowledgeCumulative(const MessageId& msgId, const ResultCallback& callback) {
93-
callback(ResultOk);
89+
if (callback) {
90+
callback(ResultOk);
91+
}
9492
}
9593

9694
/**
@@ -99,18 +97,10 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
9997
*/
10098
virtual void flushAndClean() {}
10199

102-
protected:
103-
void doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
104-
CommandAck_AckType ackType) const;
105-
void doImmediateAck(const std::set<MessageId>& msgIds, const ResultCallback& callback) const;
106-
107-
private:
108-
const std::function<ClientConnectionPtr()> connectionSupplier_;
109-
const std::function<uint64_t()> requestIdSupplier_;
110-
const uint64_t consumerId_;
100+
virtual void close() {}
111101

112102
protected:
113-
const bool waitResponse_;
103+
ConsumerImplWeakPtr consumer_;
114104

115105
}; // class AckGroupingTracker
116106

lib/AckGroupingTrackerDisabled.cc

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,41 @@
1919

2020
#include "AckGroupingTrackerDisabled.h"
2121

22-
#include "ProtoApiEnums.h"
22+
#include "ConsumerImpl.h"
2323

2424
namespace pulsar {
2525

2626
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
27-
doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
27+
auto consumer = consumer_.lock();
28+
if (consumer && !consumer->isClosingOrClosed()) {
29+
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
30+
} else if (callback) {
31+
callback(ResultAlreadyClosed);
32+
}
2833
}
2934

3035
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds,
3136
const ResultCallback& callback) {
32-
std::set<MessageId> msgIdSet;
33-
for (auto&& msgId : msgIds) {
34-
msgIdSet.emplace(msgId);
37+
auto consumer = consumer_.lock();
38+
if (consumer && !consumer->isClosingOrClosed()) {
39+
std::set<MessageId> uniqueMsgIds(msgIds.begin(), msgIds.end());
40+
for (auto&& msgId : msgIds) {
41+
uniqueMsgIds.insert(msgId);
42+
}
43+
consumer->doImmediateAck(uniqueMsgIds, callback);
44+
} else if (callback) {
45+
callback(ResultAlreadyClosed);
3546
}
36-
doImmediateAck(msgIdSet, callback);
3747
}
3848

3949
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId,
4050
const ResultCallback& callback) {
41-
doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
51+
auto consumer = consumer_.lock();
52+
if (consumer && !consumer->isClosingOrClosed()) {
53+
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
54+
} else if (callback) {
55+
callback(ResultAlreadyClosed);
56+
}
4257
}
4358

4459
} // namespace pulsar

0 commit comments

Comments
 (0)