Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
batchPayload.write(payload.data(), payload.readableBytes());
}

return messages.back().impl_->metadata.sequence_id();
// Use the first message's sequence_id so that ackReceived can compute
// lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
return messages.front().impl_->metadata.sequence_id();
}

Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
Expand Down
24 changes: 14 additions & 10 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
return false;
}

uint64_t expectedSequenceId = op.sendArgs->sequenceId;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size=" //
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1;
// Broker may ack with either the first or the last sequence id of the batch.
if (sequenceId > expectedLastSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId
<< " expecting last: " << expectedLastSequenceId
<< " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
}
if (sequenceId < expectedFirstSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId
<< " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_);
return true;
}
// sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op.
const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId;

// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
Expand All @@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
}

releaseSemaphoreForSendOp(op);
lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;

std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
pendingMessagesQueue_.pop_front();
Expand Down
9 changes: 5 additions & 4 deletions tests/KeyBasedBatchingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
sendAsync("B", "3");
sendAsync("C", "4");
sendAsync("A", "5");
// sequence id: B < C < A, so there are 3 batches in order as following:
// Batches are sent in ascending order of the first message's sequence id (BatchMessageKeyBasedContainer
// sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for first per key, so batches: A, B, C.
// A: 0, 5
// B: 1, 3
// C: 2, 4
// A: 0, 5
latch.wait();

std::vector<std::string> receivedKeys;
Expand All @@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
receivedValues.emplace_back(msg.getDataAsString());
}

decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"};
decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"};
EXPECT_EQ(receivedKeys, expectedKeys);
EXPECT_EQ(receivedValues, expectedValues);
}
Expand Down
38 changes: 38 additions & 0 deletions tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) {
client.close();
}

// Verifies that getLastSequenceId() is correct after sendAsync + flush when batching is enabled.
// Previously the batch used the last message's sequence_id, causing lastSequenceIdPublished_ to be
// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the first message's
// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 1 is correct.
TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + std::to_string(time(nullptr));

ProducerConfiguration producerConfiguration;
producerConfiguration.setBatchingEnabled(true);
producerConfiguration.setBatchingMaxMessages(10);
producerConfiguration.setBatchingMaxPublishDelayMs(60000);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));

// Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so getLastSequenceId() must be 2.
for (int i = 0; i < 3; i++) {
Message msg = MessageBuilder().setContent("content").build();
producer.sendAsync(msg, nullptr);
}
ASSERT_EQ(ResultOk, producer.flush());
ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last sequence id should be 2";

// Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so getLastSequenceId() must be 4.
for (int i = 0; i < 2; i++) {
Message msg = MessageBuilder().setContent("content").build();
producer.sendAsync(msg, nullptr);
}
ASSERT_EQ(ResultOk, producer.flush());
ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, last sequence id should be 4";

producer.close();
client.close();
}

TEST_P(ProducerTest, testFlushBatch) {
Client client(serviceUrl);

Expand Down
15 changes: 13 additions & 2 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <time.h>

#include <atomic>
#include <chrono>
#include <functional>
#include <future>
#include <set>
Expand Down Expand Up @@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
}

ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
// After seek-to-end the broker may close the consumer and trigger reconnect; allow a short
// delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes).
for (int i = 0; i < 50; i++) {
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ASSERT_FALSE(hasMessageAvailable);

producer.send(MessageBuilder().setContent("msg-2").build());
Expand All @@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {

// Test the 2nd seek
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
for (int i = 0; i < 50; i++) {
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ASSERT_FALSE(hasMessageAvailable);
}

Expand Down
Loading