diff --git a/lib/Commands.cc b/lib/Commands.cc index 30f5bf1a..08dc7183 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl batchPayload.write(payload.data(), payload.readableBytes()); } - return messages.back().impl_->metadata.sequence_id(); + // Use the first message's sequence_id so that ackReceived can compute + // lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly. + return messages.front().impl_->metadata.sequence_id(); } Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex, diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 360e1288..c9a16e8f 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return false; } - uint64_t expectedSequenceId = op.sendArgs->sequenceId; - if (sequenceId > expectedSequenceId) { - LOG_WARN(getName() << "Got ack for msg " << sequenceId // - << " expecting: " << expectedSequenceId << " queue size=" // - << pendingMessagesQueue_.size() << " producer: " << producerId_); + const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId; + const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1; + // Broker may ack with either the first or the last sequence id of the batch. + if (sequenceId > expectedLastSequenceId) { + LOG_WARN(getName() << "Got ack for msg " << sequenceId + << " expecting last: " << expectedLastSequenceId + << " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_); return false; - } else if (sequenceId < expectedSequenceId) { + } + if (sequenceId < expectedFirstSequenceId) { // Ignoring the ack since it's referring to a message that has already timed out. - LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId // - << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId - << " producer: " << producerId_); + LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId + << " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_); return true; } + // sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op. + const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId); + lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId; // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); @@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { } releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1; std::unique_ptr opSendMsg{pendingMessagesQueue_.front().release()}; pendingMessagesQueue_.pop_front(); diff --git a/tests/KeyBasedBatchingTest.cc b/tests/KeyBasedBatchingTest.cc index e5962669..d5c5ce7b 100644 --- a/tests/KeyBasedBatchingTest.cc +++ b/tests/KeyBasedBatchingTest.cc @@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) { sendAsync("B", "3"); sendAsync("C", "4"); sendAsync("A", "5"); - // sequence id: B < C < A, so there are 3 batches in order as following: + // Batches are sent in ascending order of the first message's sequence id (BatchMessageKeyBasedContainer + // sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for first per key, so batches: A, B, C. + // A: 0, 5 // B: 1, 3 // C: 2, 4 - // A: 0, 5 latch.wait(); std::vector receivedKeys; @@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) { receivedValues.emplace_back(msg.getDataAsString()); } - decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"}; - decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"}; + decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"}; + decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"}; EXPECT_EQ(receivedKeys, expectedKeys); EXPECT_EQ(receivedValues, expectedValues); } diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index edb79e47..4220a2ed 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) { client.close(); } +// Verifies that getLastSequenceId() is correct after sendAsync + flush when batching is enabled. +// Previously the batch used the last message's sequence_id, causing lastSequenceIdPublished_ to be +// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the first message's +// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 1 is correct. +TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) { + Client client(serviceUrl); + + const std::string topicName = + "persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + std::to_string(time(nullptr)); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(true); + producerConfiguration.setBatchingMaxMessages(10); + producerConfiguration.setBatchingMaxPublishDelayMs(60000); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + // Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so getLastSequenceId() must be 2. + for (int i = 0; i < 3; i++) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, nullptr); + } + ASSERT_EQ(ResultOk, producer.flush()); + ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last sequence id should be 2"; + + // Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so getLastSequenceId() must be 4. + for (int i = 0; i < 2; i++) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, nullptr); + } + ASSERT_EQ(ResultOk, producer.flush()); + ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, last sequence id should be 4"; + + producer.close(); + client.close(); +} + TEST_P(ProducerTest, testFlushBatch) { Client client(serviceUrl); diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index e4a924dd..77719a19 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) { } ASSERT_EQ(ResultOk, reader.seek(MessageId::latest())); - ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + // After seek-to-end the broker may close the consumer and trigger reconnect; allow a short + // delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes). + for (int i = 0; i < 50; i++) { + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } ASSERT_FALSE(hasMessageAvailable); producer.send(MessageBuilder().setContent("msg-2").build()); @@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) { // Test the 2nd seek ASSERT_EQ(ResultOk, reader.seek(MessageId::latest())); - ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + for (int i = 0; i < 50; i++) { + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } ASSERT_FALSE(hasMessageAvailable); }