From 9ae69fd13e69a3d089d033a3126ca992f1e8edf8 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 4 Mar 2026 16:52:31 +0800 Subject: [PATCH 1/4] fix https://github.com/apache/pulsar-client-cpp/issues/531 --- lib/Commands.cc | 4 +++- lib/ProducerImpl.cc | 23 +++++++++++++---------- tests/ProducerTest.cc | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/lib/Commands.cc b/lib/Commands.cc index 30f5bf1a..08dc7183 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl batchPayload.write(payload.data(), payload.readableBytes()); } - return messages.back().impl_->metadata.sequence_id(); + // Use the first message's sequence_id so that ackReceived can compute + // lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly. + return messages.front().impl_->metadata.sequence_id(); } Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex, diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 360e1288..b0d52b34 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -933,19 +933,23 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return false; } - uint64_t expectedSequenceId = op.sendArgs->sequenceId; - if (sequenceId > expectedSequenceId) { - LOG_WARN(getName() << "Got ack for msg " << sequenceId // - << " expecting: " << expectedSequenceId << " queue size=" // - << pendingMessagesQueue_.size() << " producer: " << producerId_); + const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId; + const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1; + // Broker may ack with either the first or the last sequence id of the batch. + if (sequenceId > expectedLastSequenceId) { + LOG_WARN(getName() << "Got ack for msg " << sequenceId << " expecting last: " << expectedLastSequenceId + << " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_); return false; - } else if (sequenceId < expectedSequenceId) { + } + if (sequenceId < expectedFirstSequenceId) { // Ignoring the ack since it's referring to a message that has already timed out. - LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId // - << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId - << " producer: " << producerId_); + LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId + << " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_); return true; } + // sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op. + const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId); + lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId; // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); @@ -960,7 +964,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { } releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1; std::unique_ptr opSendMsg{pendingMessagesQueue_.front().release()}; pendingMessagesQueue_.pop_front(); diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index edb79e47..4220a2ed 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) { client.close(); } +// Verifies that getLastSequenceId() is correct after sendAsync + flush when batching is enabled. +// Previously the batch used the last message's sequence_id, causing lastSequenceIdPublished_ to be +// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the first message's +// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 1 is correct. +TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) { + Client client(serviceUrl); + + const std::string topicName = + "persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + std::to_string(time(nullptr)); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(true); + producerConfiguration.setBatchingMaxMessages(10); + producerConfiguration.setBatchingMaxPublishDelayMs(60000); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + // Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so getLastSequenceId() must be 2. + for (int i = 0; i < 3; i++) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, nullptr); + } + ASSERT_EQ(ResultOk, producer.flush()); + ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last sequence id should be 2"; + + // Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so getLastSequenceId() must be 4. + for (int i = 0; i < 2; i++) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, nullptr); + } + ASSERT_EQ(ResultOk, producer.flush()); + ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, last sequence id should be 4"; + + producer.close(); + client.close(); +} + TEST_P(ProducerTest, testFlushBatch) { Client client(serviceUrl); From 8c9ce21c6d53c50feaae69ae5311b4cfc10af6d6 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 4 Mar 2026 17:07:28 +0800 Subject: [PATCH 2/4] format --- lib/ProducerImpl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index b0d52b34..c9a16e8f 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -937,7 +937,8 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1; // Broker may ack with either the first or the last sequence id of the batch. if (sequenceId > expectedLastSequenceId) { - LOG_WARN(getName() << "Got ack for msg " << sequenceId << " expecting last: " << expectedLastSequenceId + LOG_WARN(getName() << "Got ack for msg " << sequenceId + << " expecting last: " << expectedLastSequenceId << " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_); return false; } From bbc064436a09b3dbdae0725dcfd1c5dd4e520b03 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 5 Mar 2026 18:52:09 +0800 Subject: [PATCH 3/4] fix --- tests/ReaderTest.cc | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index e4a924dd..77719a19 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) { } ASSERT_EQ(ResultOk, reader.seek(MessageId::latest())); - ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + // After seek-to-end the broker may close the consumer and trigger reconnect; allow a short + // delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes). + for (int i = 0; i < 50; i++) { + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } ASSERT_FALSE(hasMessageAvailable); producer.send(MessageBuilder().setContent("msg-2").build()); @@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) { // Test the 2nd seek ASSERT_EQ(ResultOk, reader.seek(MessageId::latest())); - ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + for (int i = 0; i < 50; i++) { + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } ASSERT_FALSE(hasMessageAvailable); } From 9da1fa7ddbd0c5e0d1b6ddc249fcbe437bf8af09 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 5 Mar 2026 20:02:14 +0800 Subject: [PATCH 4/4] fix --- tests/KeyBasedBatchingTest.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/KeyBasedBatchingTest.cc b/tests/KeyBasedBatchingTest.cc index e5962669..d5c5ce7b 100644 --- a/tests/KeyBasedBatchingTest.cc +++ b/tests/KeyBasedBatchingTest.cc @@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) { sendAsync("B", "3"); sendAsync("C", "4"); sendAsync("A", "5"); - // sequence id: B < C < A, so there are 3 batches in order as following: + // Batches are sent in ascending order of the first message's sequence id (BatchMessageKeyBasedContainer + // sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for first per key, so batches: A, B, C. + // A: 0, 5 // B: 1, 3 // C: 2, 4 - // A: 0, 5 latch.wait(); std::vector receivedKeys; @@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) { receivedValues.emplace_back(msg.getDataAsString()); } - decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"}; - decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"}; + decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"}; + decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"}; EXPECT_EQ(receivedKeys, expectedKeys); EXPECT_EQ(receivedValues, expectedValues); }