Skip to content

[fix][broker] Return failed future instead of throwing exception in async methods#25289

Merged
lhotari merged 10 commits intoapache:masterfrom
zhanghaou:fix-broker-not-complete-future
Mar 9, 2026
Merged

[fix][broker] Return failed future instead of throwing exception in async methods#25289
lhotari merged 10 commits intoapache:masterfrom
zhanghaou:fix-broker-not-complete-future

Conversation

@zhanghaou
Copy link
Contributor

@zhanghaou zhanghaou commented Mar 5, 2026

same to this pr #25287

Motivation

In Java's CompletableFuture, callbacks registered via stages such as thenApply, thenCompose, etc., are executed within internal try/catch blocks. If a callback throws an exception, the framework automatically captures it and completes the dependent future exceptionally.

Therefore, throwing an exception inside a stage callback is safe and will be translated into an exceptionally completed future.

However, if an async method itself throws an exception instead of returning a failed future, the exception is raised synchronously and no CompletableFuture is returned. This breaks the expected async error propagation model.

Some async validation paths in the broker currently throw exceptions directly instead of returning a failed CompletableFuture. This breaks the async contract of methods that return CompletableFuture, because the exception is thrown synchronously at invocation time rather than being propagated through the future.

As a result, callers relying on async error handling (e.g. exceptionally, handle, or whenComplete) may not observe these failures correctly.

Modifications

  1. Convert validation synchronous throws to failed futures in broker async methods.
  2. Refactor original-principal validation to an async helper and preserve original auth/authorization semantics.
  3. Ensure default async UnsupportedOperationException paths return failed futures.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 5, 2026
@zhanghaou zhanghaou marked this pull request as draft March 5, 2026 09:22
@zhanghaou zhanghaou marked this pull request as ready for review March 5, 2026 11:43
@codelipenghui codelipenghui added this to the 4.2.0 milestone Mar 5, 2026
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. The core intent is sound — methods returning CompletableFuture should not throw synchronous exceptions. A few issues found below.

张浩 added 2 commits March 6, 2026 10:23
@zhanghaou zhanghaou requested a review from codelipenghui March 6, 2026 02:47
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some async validation paths in the broker currently throw exceptions directly instead of returning a failed CompletableFuture. This breaks the async contract of methods that return CompletableFuture, because the exception is thrown synchronously at invocation time rather than being propagated through the future.

As a result, callers relying on async error handling (e.g. exceptionally, handle, or whenComplete) may not observe these failures correctly.

I don't think that this is so simple.

In some cases, the calling code might be currently relying on exceptions being thrown before the actual asynchronous job is created. That's why converting all cases without evaluating each case is not a good approach.
Since all exception paths aren't tested, this change might cause more instability than bring benefits.

I agree that the contract of the asynchronous method should be clear about exceptions, whether the exception gets thrown directly or returned as a "result" in the CompletableFuture.

Please run Personal CI to validate such large changes on your side. You can run Pulsar CI in your personal fork by following these instructions: https://pulsar.apache.org/contribute/personal-ci/
The benefit is that you have full control of CI and we don't waste our quota for CI runs which could fail.

@zhanghaou
Copy link
Contributor Author

In some cases, the calling code might be currently relying on exceptions being thrown before the actual asynchronous job is created.

Thanks for raising this, I reviewed the call sites of the methods changed in this PR to check your concern specifically, in the current in-repo callers, I didn’t find that pattern. Most call sites are already handling failures via thenCompose / exceptionally or by awaiting the returned future (get/join), so the error is still observed through the async path.

Could you give me some cases?

zhanghao added 2 commits March 9, 2026 13:30
@zhanghaou
Copy link
Contributor Author

zhanghaou commented Mar 9, 2026

Please run Personal CI to validate such large changes on your side. You can run Pulsar CI in your personal fork by following these instructions: https://pulsar.apache.org/contribute/personal-ci/ The benefit is that you have full control of CI and we don't waste our quota for CI runs which could fail.

Personal CI have all passed, except for a few uploaded tasks.

zhanghaou#3

@zhanghaou zhanghaou requested a review from lhotari March 9, 2026 06:59
@lhotari
Copy link
Member

lhotari commented Mar 9, 2026

In some cases, the calling code might be currently relying on exceptions being thrown before the actual asynchronous job is created.

Thanks for raising this, I reviewed the call sites of the methods changed in this PR to check your concern specifically, in the current in-repo callers, I didn’t find that pattern. Most call sites are already handling failures via thenCompose / exceptionally or by awaiting the returned future (get/join), so the error is still observed through the async path.

Could you give me some cases?

Thanks for checking that. IIRC, I've seen the pattern where the error handling relies on a pattern where the async job wouldn't be created at all at least for some method calls that return a CompletableFuture. However, it's possible that those methods have been eliminated over the years and the error handling has been modified to take this change in the method contract into account.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's necessary to remove LedgerOffloader and LoadManager interface changes from this PR since the changes would break third party implementations.
It's better to handle the interface changes for these "plugin interfaces" separately since according to our guidelines, we'd create a PIP for such changes. The other changes LGTM after checking them.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aligns multiple CompletableFuture-returning APIs with async error-propagation expectations by returning exceptionally-completed futures (rather than throwing synchronously) for validation/unsupported-operation paths across broker, client, and shared utility code.

Changes:

  • Replace synchronous validation/UnsupportedOperationException throws in async APIs with FutureUtil.failedFuture(...) returns.
  • Update shared Netty/CF adapters and FutureUtil helpers to fail via returned futures on null inputs.
  • Adjust unit test coverage for ChannelFutures null-input behavior.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java Updates expectations to assert exceptionally-completed future for null input.
pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java Returns failed CompletableFuture on null Netty future input.
pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java Returns failed CompletableFuture on null ChannelFuture input.
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java Makes Sequencer.sequential / composeAsync return failed futures on null params and updates Javadoc.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java Changes initTls to return failed futures instead of throwing for invalid state/args.
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java Returns failed future for unsupported compact(String) overload.
pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java Returns failed futures on null args for async factory methods.
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java Converts synchronous RestException from original-principal validation into failed future.
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java Default async delete now returns failed future instead of throwing.
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java getCompressedBuffer returns failed future when already released.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java Unsupported getLastMessageId() now returns failed future.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java Default getLastDispatchablePosition() now returns failed future.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java Returns failed future for null namespace in async preparation path.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java Converts null/validation failures to failed futures in async broker paths.
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java Converts null validation to failed futures for async bundle update methods.
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java Returns failed future on invalid channel state in async publish path.
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java Default async methods return failed futures instead of throwing.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java Returns failed future on null bucket id in async sequencing helper.
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Converts multiple REST async validation throws into failed futures.
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java Converts namespace validation failures to failed futures in async flows.
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java Converts dynamic-config validation failure to failed future.
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java Converts namespace-check failure to failed future for async authorization APIs.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java Default async methods return failed futures instead of throwing.
Comments suppressed due to low confidence (2)

pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:245

  • Sequencer#sequential still calls newTask.get() directly. If the supplier throws (or returns null), this method will throw synchronously and no CompletableFuture will be returned/completed, which breaks the async contract this PR is trying to enforce. Consider wrapping the supplier invocation in try/catch and returning/completing a failed future on error (and also fail fast if the supplier returns null).
        public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
            if (newTask == null) {
                return failedFuture(new NullPointerException("Expected Supplier should not be null"));
            }
            if (sequencerFuture.isDone()) {
                if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
                    return sequencerFuture;
                }
                return sequencerFuture = newTask.get();
            }
            return sequencerFuture = allowExceptionBreakChain
                    ? sequencerFuture.thenCompose(__ -> newTask.get())
                    : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());

pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:307

  • composeAsync executes futureSupplier.get() inside the executor task without guarding against runtime exceptions (or a null return). If the supplier throws, the exception will escape the runnable and the returned future may never be completed, causing a hang. Wrap the supplier invocation in try/catch and complete the returned future exceptionally (and handle a null supplied future) to keep failures observable via the returned CompletableFuture.
        final CompletableFuture<T> future = new CompletableFuture<>();
        try {
            executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> {
                if (error != null) {
                    future.completeExceptionally(error);
                    return;
                }
                future.complete(result);
            }));

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@zhanghaou
Copy link
Contributor Author

I think it's necessary to remove LedgerOffloader and LoadManager interface changes from this PR since the changes would break third party implementations. It's better to handle the interface changes for these "plugin interfaces" separately since according to our guidelines, we'd create a PIP for such changes. The other changes LGTM after checking them.

Make sense.

Additionally, I can’t guarantee that all problematic methods have been updated in this PR. I’ve done my best to address as many as possible.

Would it be possible to add the main purpose of this PR to the AI review rules, and prevent new errors from being introduced?

@lhotari
Copy link
Member

lhotari commented Mar 9, 2026

Additionally, I can’t guarantee that all problematic methods have been updated in this PR. I’ve done my best to address as many as possible.

No worries. Unless CI doesn't catch the issues, we cannot do much more about it.

Would it be possible to add the main purpose of this PR to the AI review rules, and prevent new errors from being introduced?

Do you mean the tuning of Copilot review guidance? I haven't checked how that is handled.
For Pulsar "plugin interface" changes, there should be an automated solution in place in the CI pipeline that prevents accidential changes that break binary compatibility of interfaces. There's https://github.com/siom79/japicmp for checking all public API changes, but I don't know if it's possible to limit it to check only specific interfaces, for example the interfaces annotated with org.apache.pulsar.common.classification.InterfaceStability.Stable. It would be useful to add an AGENTS.md by explaining this since I'd assume that AI tools would at least be able to check changes at some level.

@lhotari lhotari dismissed codelipenghui’s stale review March 9, 2026 09:04

Penghui's code review comments were already addressed. Dismissing the review to get the PR running. (Pulsar CI won't run CI if the PR is in "changes requested" state)

@zhanghaou
Copy link
Contributor Author

Do you mean the tuning of Copilot review guidance? I haven't checked how that is handled.

We can add a copilot-instructions.md file in the .github directory of the repository.

https://docs.github.com/en/copilot/how-tos/configure-custom-instructions/add-repository-instructions

@lhotari
Copy link
Member

lhotari commented Mar 9, 2026

Do you mean the tuning of Copilot review guidance? I haven't checked how that is handled.

We can add a copilot-instructions.md file in the .github directory of the repository.

https://docs.github.com/en/copilot/how-tos/configure-custom-instructions/add-repository-instructions

@zhanghaou Would you like to create a separate PR to add that?

@zhanghaou
Copy link
Contributor Author

@zhanghaou Would you like to create a separate PR to add that?

Sure, I'd be happy to do.

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 24.61538% with 49 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.58%. Comparing base (4cbe124) to head (0718b88).
⚠️ Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pulsar/broker/namespace/NamespaceService.java 0.00% 4 Missing and 4 partials ⚠️
...pulsar/broker/admin/impl/PersistentTopicsBase.java 30.00% 7 Missing ⚠️
...java/org/apache/pulsar/common/util/FutureUtil.java 0.00% 3 Missing and 3 partials ⚠️
...e/pulsar/client/impl/PulsarChannelInitializer.java 0.00% 3 Missing and 2 partials ⚠️
...rg/apache/pulsar/broker/service/BrokerService.java 50.00% 2 Missing and 2 partials ⚠️
...sar/compaction/PulsarCompactionServiceFactory.java 0.00% 2 Missing and 2 partials ⚠️
...ache/pulsar/common/util/netty/NettyFutureUtil.java 0.00% 2 Missing and 2 partials ⚠️
...rg/apache/pulsar/broker/delayed/bucket/Bucket.java 0.00% 1 Missing and 1 partial ⚠️
.../service/SystemTopicBasedTopicPoliciesService.java 0.00% 1 Missing and 1 partial ⚠️
...ker/authorization/PulsarAuthorizationProvider.java 0.00% 1 Missing ⚠️
... and 6 more
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #25289      +/-   ##
============================================
- Coverage     72.81%   72.58%   -0.24%     
- Complexity    34134    34193      +59     
============================================
  Files          1959     1968       +9     
  Lines        155543   156143     +600     
  Branches      17741    17788      +47     
============================================
+ Hits         113263   113335      +72     
- Misses        33322    33733     +411     
- Partials       8958     9075     +117     
Flag Coverage Δ
inttests 25.62% <4.61%> (-0.37%) ⬇️
systests 22.31% <4.61%> (-0.23%) ⬇️
unittests 73.56% <24.61%> (-0.22%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...g/apache/pulsar/broker/admin/impl/BrokersBase.java 87.08% <100.00%> (-0.96%) ⬇️
...pache/pulsar/broker/admin/impl/NamespacesBase.java 78.12% <100.00%> (+0.53%) ⬆️
...rg/apache/pulsar/broker/web/PulsarWebResource.java 63.96% <100.00%> (-1.50%) ⬇️
...pache/pulsar/common/util/netty/ChannelFutures.java 86.66% <100.00%> (+0.95%) ⬆️
...ker/authorization/PulsarAuthorizationProvider.java 68.86% <0.00%> (ø)
...xtensions/channel/ServiceUnitStateChannelImpl.java 86.07% <0.00%> (-1.86%) ⬇️
...n/java/org/apache/pulsar/broker/service/Topic.java 40.00% <0.00%> (ø)
...oker/service/nonpersistent/NonPersistentTopic.java 71.73% <0.00%> (-1.33%) ⬇️
...r/stats/prometheus/PrometheusMetricsGenerator.java 79.23% <0.00%> (ø)
...ache/pulsar/broker/systopic/SystemTopicClient.java 0.00% <0.00%> (ø)
... and 10 more

... and 109 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari merged commit 35dae97 into apache:master Mar 9, 2026
102 of 104 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants