diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index ba5b2a9f..e4a924dd 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -21,9 +21,13 @@ #include #include +#include +#include #include +#include #include #include +#include #include "HttpHelper.h" #include "PulsarFriend.h" @@ -110,15 +114,27 @@ TEST_P(ReaderTest, testAsyncRead) { ASSERT_EQ(ResultOk, producer.send(msg)); } + // readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then + // verify set + std::string received[10]; + std::atomic receivedCount{0}; for (int i = 0; i < 10; i++) { - reader.readNextAsync([i](Result result, const Message& msg) { + reader.readNextAsync([&](Result result, const Message& msg) { ASSERT_EQ(ResultOk, result); - std::string content = msg.getDataAsString(); - std::string expected = "my-message-" + std::to_string(i); - ASSERT_EQ(expected, content); + int idx = receivedCount.fetch_add(1); + if (idx < 10) received[idx] = msg.getDataAsString(); }); } + waitUntil( + std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 1000); + ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages"; + + std::set receivedSet(received, received + 10); + for (int i = 0; i < 10; i++) { + ASSERT_TRUE(receivedSet.count("my-message-" + std::to_string(i))) << "Missing my-message-" << i; + } + waitUntil( std::chrono::seconds(5), [&]() {