diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index cd7ddc85..360303bc 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -30,11 +30,7 @@ namespace pulsar { BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer) : BatchMessageContainerBase(producer) {} -BatchMessageContainer::~BatchMessageContainer() { - LOG_DEBUG(*this << " destructed"); - LOG_DEBUG("[numberOfBatchesSent = " << numberOfBatchesSent_ - << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); -} +BatchMessageContainer::~BatchMessageContainer() {} bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 5b181843..dc035c2b 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -40,11 +40,7 @@ inline std::string getKey(const Message& msg) { BatchMessageKeyBasedContainer::BatchMessageKeyBasedContainer(const ProducerImpl& producer) : BatchMessageContainerBase(producer) {} -BatchMessageKeyBasedContainer::~BatchMessageKeyBasedContainer() { - LOG_DEBUG(*this << " destructed"); - LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_ - << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); -} +BatchMessageKeyBasedContainer::~BatchMessageKeyBasedContainer() {} bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) const { auto it = batches_.find(getKey(msg)); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..803bb4fa 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -180,13 +180,15 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic } ConsumerImpl::~ConsumerImpl() { - LOG_DEBUG(consumerStr_ << "~ConsumerImpl"); + auto client = client_.lock(); if (state_ == Ready) { // this could happen at least in this condition: // consumer seek, caused reconnection, if consumer close happened before connection ready, // then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in // broker. - LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); + if (client) { + LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); + } ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -194,9 +196,6 @@ ConsumerImpl::~ConsumerImpl() { cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); - LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); - } else { - LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); } } internalShutdown(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index c9a16e8f..2bb5ed78 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -114,11 +114,12 @@ ProducerImpl::ProducerImpl(const ClientImplPtr& client, const TopicName& topicNa } ProducerImpl::~ProducerImpl() { - LOG_DEBUG(producerStr_ << "~ProducerImpl"); + auto client = client_.lock(); internalShutdown(); - printStats(); - if (state_ == Ready || state_ == Pending) { - LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); + if (client) { + if (state_ == Ready || state_ == Pending) { + LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); + } } } @@ -754,7 +755,7 @@ void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { void ProducerImpl::printStats() { if (batchMessageContainer_) { LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ - << "]"); + << "]"); } else { LOG_INFO("Producer - " << producerStr_ << ", [batching = off]"); }