[fix][broker] Return failed future instead of throwing exception in async methods#25289
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
Outdated
Show resolved
Hide resolved
codelipenghui
left a comment
There was a problem hiding this comment.
Thanks for the PR. The core intent is sound — methods returning CompletableFuture should not throw synchronous exceptions. A few issues found below.
pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
Outdated
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
Outdated
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Some async validation paths in the broker currently throw exceptions directly instead of returning a failed CompletableFuture. This breaks the async contract of methods that return CompletableFuture, because the exception is thrown synchronously at invocation time rather than being propagated through the future.
As a result, callers relying on async error handling (e.g. exceptionally, handle, or whenComplete) may not observe these failures correctly.
I don't think that this is so simple.
In some cases, the calling code might be currently relying on exceptions being thrown before the actual asynchronous job is created. That's why converting all cases without evaluating each case is not a good approach.
Since all exception paths aren't tested, this change might cause more instability than bring benefits.
I agree that the contract of the asynchronous method should be clear about exceptions, whether the exception gets thrown directly or returned as a "result" in the CompletableFuture.
Please run Personal CI to validate such large changes on your side. You can run Pulsar CI in your personal fork by following these instructions: https://pulsar.apache.org/contribute/personal-ci/
The benefit is that you have full control of CI and we don't waste our quota for CI runs which could fail.
Thanks for raising this, I reviewed the call sites of the methods changed in this PR to check your concern specifically, in the current in-repo callers, I didn’t find that pattern. Most call sites are already handling failures via thenCompose / exceptionally or by awaiting the returned future (get/join), so the error is still observed through the async path. Could you give me some cases? |
Personal CI have all passed, except for a few uploaded tasks. |
Thanks for checking that. IIRC, I've seen the pattern where the error handling relies on a pattern where the async job wouldn't be created at all at least for some method calls that return a CompletableFuture. However, it's possible that those methods have been eliminated over the years and the error handling has been modified to take this change in the method contract into account. |
There was a problem hiding this comment.
I think it's necessary to remove LedgerOffloader and LoadManager interface changes from this PR since the changes would break third party implementations.
It's better to handle the interface changes for these "plugin interfaces" separately since according to our guidelines, we'd create a PIP for such changes. The other changes LGTM after checking them.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR aligns multiple CompletableFuture-returning APIs with async error-propagation expectations by returning exceptionally-completed futures (rather than throwing synchronously) for validation/unsupported-operation paths across broker, client, and shared utility code.
Changes:
- Replace synchronous validation/
UnsupportedOperationExceptionthrows in async APIs withFutureUtil.failedFuture(...)returns. - Update shared Netty/CF adapters and
FutureUtilhelpers to fail via returned futures on null inputs. - Adjust unit test coverage for
ChannelFuturesnull-input behavior.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java | Updates expectations to assert exceptionally-completed future for null input. |
| pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java | Returns failed CompletableFuture on null Netty future input. |
| pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java | Returns failed CompletableFuture on null ChannelFuture input. |
| pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java | Makes Sequencer.sequential / composeAsync return failed futures on null params and updates Javadoc. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java | Changes initTls to return failed futures instead of throwing for invalid state/args. |
| pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java | Returns failed future for unsupported compact(String) overload. |
| pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java | Returns failed futures on null args for async factory methods. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java | Converts synchronous RestException from original-principal validation into failed future. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java | Default async delete now returns failed future instead of throwing. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java | getCompressedBuffer returns failed future when already released. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java | Unsupported getLastMessageId() now returns failed future. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java | Default getLastDispatchablePosition() now returns failed future. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | Returns failed future for null namespace in async preparation path. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java | Converts null/validation failures to failed futures in async broker paths. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java | Converts null validation to failed futures for async bundle update methods. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java | Returns failed future on invalid channel state in async publish path. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java | Default async methods return failed futures instead of throwing. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java | Returns failed future on null bucket id in async sequencing helper. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | Converts multiple REST async validation throws into failed futures. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java | Converts namespace validation failures to failed futures in async flows. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | Converts dynamic-config validation failure to failed future. |
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java | Converts namespace-check failure to failed future for async authorization APIs. |
| managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java | Default async methods return failed futures instead of throwing. |
Comments suppressed due to low confidence (2)
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:245
Sequencer#sequentialstill callsnewTask.get()directly. If the supplier throws (or returns null), this method will throw synchronously and noCompletableFuturewill be returned/completed, which breaks the async contract this PR is trying to enforce. Consider wrapping the supplier invocation in try/catch and returning/completing a failed future on error (and also fail fast if the supplier returns null).
public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
if (newTask == null) {
return failedFuture(new NullPointerException("Expected Supplier should not be null"));
}
if (sequencerFuture.isDone()) {
if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
return sequencerFuture;
}
return sequencerFuture = newTask.get();
}
return sequencerFuture = allowExceptionBreakChain
? sequencerFuture.thenCompose(__ -> newTask.get())
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:307
composeAsyncexecutesfutureSupplier.get()inside the executor task without guarding against runtime exceptions (or a null return). If the supplier throws, the exception will escape the runnable and the returnedfuturemay never be completed, causing a hang. Wrap the supplier invocation in try/catch and complete the returned future exceptionally (and handle a null supplied future) to keep failures observable via the returnedCompletableFuture.
final CompletableFuture<T> future = new CompletableFuture<>();
try {
executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(result);
}));
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
Show resolved
Hide resolved
Make sense. Additionally, I can’t guarantee that all problematic methods have been updated in this PR. I’ve done my best to address as many as possible. Would it be possible to add the main purpose of this PR to the AI review rules, and prevent new errors from being introduced? |
No worries. Unless CI doesn't catch the issues, we cannot do much more about it.
Do you mean the tuning of Copilot review guidance? I haven't checked how that is handled. |
Penghui's code review comments were already addressed. Dismissing the review to get the PR running. (Pulsar CI won't run CI if the PR is in "changes requested" state)
We can add a copilot-instructions.md file in the .github directory of the repository. https://docs.github.com/en/copilot/how-tos/configure-custom-instructions/add-repository-instructions |
@zhanghaou Would you like to create a separate PR to add that? |
Sure, I'd be happy to do. |
same to this pr #25287
Motivation
In Java's CompletableFuture, callbacks registered via stages such as thenApply, thenCompose, etc., are executed within internal try/catch blocks. If a callback throws an exception, the framework automatically captures it and completes the dependent future exceptionally.
Therefore, throwing an exception inside a stage callback is safe and will be translated into an exceptionally completed future.
However, if an async method itself throws an exception instead of returning a failed future, the exception is raised synchronously and no CompletableFuture is returned. This breaks the expected async error propagation model.
Some async validation paths in the broker currently throw exceptions directly instead of returning a failed CompletableFuture. This breaks the async contract of methods that return CompletableFuture, because the exception is thrown synchronously at invocation time rather than being propagated through the future.
As a result, callers relying on async error handling (e.g. exceptionally, handle, or whenComplete) may not observe these failures correctly.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: