From 1cac8a74d0cc0aa7d51626cd795ea41fbeb4413a Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 4 Mar 2026 16:07:10 +0800 Subject: [PATCH 1/3] fix --- tests/ReaderTest.cc | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index ba5b2a9f..a4dae354 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -21,9 +21,13 @@ #include #include +#include +#include #include +#include #include #include +#include #include "HttpHelper.h" #include "PulsarFriend.h" @@ -37,7 +41,7 @@ DECLARE_LOG_OBJECT() using namespace pulsar; static std::string serviceUrl = "pulsar://localhost:6650"; -static const std::string adminUrl = "http://localhost:8080/"; +static const std::string adminUrl = "http://localhost:8090/"; class ReaderTest : public ::testing::TestWithParam { public: @@ -110,15 +114,25 @@ TEST_P(ReaderTest, testAsyncRead) { ASSERT_EQ(ResultOk, producer.send(msg)); } + // readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then verify set + std::string received[10]; + std::atomic receivedCount{0}; for (int i = 0; i < 10; i++) { - reader.readNextAsync([i](Result result, const Message& msg) { + reader.readNextAsync([&](Result result, const Message& msg) { ASSERT_EQ(ResultOk, result); - std::string content = msg.getDataAsString(); - std::string expected = "my-message-" + std::to_string(i); - ASSERT_EQ(expected, content); + int idx = receivedCount.fetch_add(1); + if (idx < 10) received[idx] = msg.getDataAsString(); }); } + waitUntil(std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 1000); + ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages"; + + std::set receivedSet(received, received + 10); + for (int i = 0; i < 10; i++) { + ASSERT_TRUE(receivedSet.count("my-message-" + std::to_string(i))) << "Missing my-message-" << i; + } + waitUntil( std::chrono::seconds(5), [&]() { From cd3f9e612442b9febd4efb00296d7f67bc061413 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 4 Mar 2026 16:13:23 +0800 Subject: [PATCH 2/3] format --- tests/ReaderTest.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index a4dae354..b0bda39e 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -114,7 +114,8 @@ TEST_P(ReaderTest, testAsyncRead) { ASSERT_EQ(ResultOk, producer.send(msg)); } - // readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then verify set + // readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then + // verify set std::string received[10]; std::atomic receivedCount{0}; for (int i = 0; i < 10; i++) { @@ -125,7 +126,8 @@ TEST_P(ReaderTest, testAsyncRead) { }); } - waitUntil(std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 1000); + waitUntil( + std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 1000); ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages"; std::set receivedSet(received, received + 10); From 8c98f1b62f7ccd936cfc3165db86edd3fecef917 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 4 Mar 2026 16:16:46 +0800 Subject: [PATCH 3/3] revert test --- tests/ReaderTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index b0bda39e..e4a924dd 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -41,7 +41,7 @@ DECLARE_LOG_OBJECT() using namespace pulsar; static std::string serviceUrl = "pulsar://localhost:6650"; -static const std::string adminUrl = "http://localhost:8090/"; +static const std::string adminUrl = "http://localhost:8080/"; class ReaderTest : public ::testing::TestWithParam { public: