Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions include/pulsar/AutoClusterFailover.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
#define PULSAR_AUTO_CLUSTER_FAILOVER_H_

#include <pulsar/ServiceUrlProvider.h>
#include <pulsar/defines.h>

#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <vector>

namespace pulsar {

class AutoClusterFailoverImpl;

/**
* Builder for creating an AutoClusterFailover instance.
*
* Example usage:
* @code
* auto provider = AutoClusterFailover::builder()
* .primary("pulsar://primary:6650")
* .secondary({"pulsar://secondary1:6650", "pulsar://secondary2:6650"})
* .failoverDelay(std::chrono::seconds(30))
* .switchBackDelay(std::chrono::seconds(60))
* .checkInterval(std::chrono::seconds(10))
* .build();
*
* ClientConfiguration conf;
* conf.setServiceUrlProvider(provider);
* Client client("pulsar://primary:6650", conf);
* @endcode
*/
class PULSAR_PUBLIC AutoClusterFailoverBuilder {
public:
AutoClusterFailoverBuilder();

/**
* Set the primary service URL. Required.
*/
AutoClusterFailoverBuilder& primary(const std::string& primaryUrl);

/**
* Set the list of secondary (backup) service URLs. Required. Must have at least one entry.
*/
AutoClusterFailoverBuilder& secondary(const std::vector<std::string>& secondaryUrls);

/**
* Set per-secondary authentication. The map key is the secondary service URL.
* Optional — if not set, no per-cluster auth override is applied during failover.
*/
AutoClusterFailoverBuilder& secondaryAuthentication(
const std::map<std::string, AuthenticationPtr>& authentication);

/**
* Set per-secondary TLS trust certificate file paths. The map key is the secondary service URL.
* Optional.
*/
AutoClusterFailoverBuilder& secondaryTlsTrustCertsFilePath(
const std::map<std::string, std::string>& tlsTrustCertsFilePaths);

/**
* Set the failover delay — the duration the current service URL must be unreachable
* before the client switches to a secondary cluster. Required (must be > 0).
*/
AutoClusterFailoverBuilder& failoverDelay(std::chrono::milliseconds delay);

/**
* Set the switch-back delay — the duration the primary must be available again
* before switching back from a secondary. Required (must be > 0).
*/
AutoClusterFailoverBuilder& switchBackDelay(std::chrono::milliseconds delay);

/**
* Set the interval between availability probes. Default: 30 seconds.
*/
AutoClusterFailoverBuilder& checkInterval(std::chrono::milliseconds interval);

/**
* Build the ServiceUrlProvider.
*
* @throw std::invalid_argument if required fields are missing or invalid
*/
ServiceUrlProviderPtr build();

private:
std::string primary_;
std::vector<std::string> secondary_;
std::map<std::string, AuthenticationPtr> secondaryAuthentications_;
std::map<std::string, std::string> secondaryTlsTrustCertsFilePaths_;
std::chrono::milliseconds failoverDelay_{0};
std::chrono::milliseconds switchBackDelay_{0};
std::chrono::milliseconds checkInterval_{30000};
};

/**
* AutoClusterFailover provides automatic failover between a primary Pulsar cluster
* and one or more secondary clusters.
*
* It periodically probes the current service URL for availability. If the current cluster
* is unreachable for longer than `failoverDelay`, it switches to the first available
* secondary. When on a secondary, if the primary recovers and stays up for `switchBackDelay`,
* it switches back.
*/
class PULSAR_PUBLIC AutoClusterFailover : public ServiceUrlProvider {
public:
static AutoClusterFailoverBuilder builder();

~AutoClusterFailover() override;

void initialize(ServiceUrlProviderClient* client, ServiceProber* prober) override;
const std::string& getServiceUrl() const override;
void close() override;

private:
friend class AutoClusterFailoverBuilder;
explicit AutoClusterFailover(std::shared_ptr<AutoClusterFailoverImpl> impl);

std::shared_ptr<AutoClusterFailoverImpl> impl_;
};

} // namespace pulsar

#endif /* PULSAR_AUTO_CLUSTER_FAILOVER_H_ */
15 changes: 15 additions & 0 deletions include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <pulsar/Authentication.h>
#include <pulsar/Logger.h>
#include <pulsar/ServiceUrlProvider.h>
#include <pulsar/defines.h>

#include <cstdint>
Expand Down Expand Up @@ -371,6 +372,20 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
unsigned int getKeepAliveIntervalInSeconds() const;

/**
* Set a ServiceUrlProvider that dynamically provides the service URL.
* When set, the provider's getServiceUrl() is used as the initial service URL,
* and the provider is initialized after the client is constructed.
*
* @param provider the ServiceUrlProvider
*/
ClientConfiguration& setServiceUrlProvider(const ServiceUrlProviderPtr& provider);

/**
* @return the ServiceUrlProvider, or nullptr if not set
*/
ServiceUrlProviderPtr getServiceUrlProvider() const;

friend class ClientImpl;
friend class PulsarWrapper;

Expand Down
120 changes: 120 additions & 0 deletions include/pulsar/ServiceUrlProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_SERVICE_URL_PROVIDER_H_
#define PULSAR_SERVICE_URL_PROVIDER_H_

#include <pulsar/Authentication.h>
#include <pulsar/defines.h>

#include <functional>
#include <memory>
#include <string>

namespace pulsar {

/**
* Restricted interface exposing only the client methods needed by a ServiceUrlProvider.
* Implemented internally by ClientImpl without exposing its full definition.
*/
class PULSAR_PUBLIC ServiceUrlProviderClient {
public:
virtual ~ServiceUrlProviderClient() {}

/**
* Update the service URL of the client.
* This forces the client to re-create the lookup service and reconnect.
*/
virtual void updateServiceUrl(const std::string& serviceUrl) = 0;

/**
* Update the authentication credentials used by the client.
*/
virtual void updateAuthentication(AuthenticationPtr authentication) = 0;

/**
* Update the TLS trust certificate file path.
*/
virtual void updateTlsTrustCertsFilePath(const std::string& tlsTrustCertsFilePath) = 0;

/**
* Update the TLS trust store path and password.
*/
virtual void updateTlsTrustStorePathAndPassword(const std::string& tlsTrustStorePath,
const std::string& tlsTrustStorePassword) = 0;
};

/**
* Interface for probing the availability of a Pulsar service URL.
* Implemented internally by ClientImpl using its networking stack.
*
* The interface is asynchronous to avoid blocking the IO thread.
*/
class PULSAR_PUBLIC ServiceProber {
public:
virtual ~ServiceProber() {}

/**
* Callback type for the probe operation.
* @param success True if the service is reachable, false otherwise.
*/
using ProbeCallback = std::function<void(bool success)>;

/**
* Asynchronously probe the availability of the given service URL.
*
* @param serviceUrl The URL to probe (e.g. "pulsar://host:6650")
* @param timeoutMs The timeout in milliseconds
* @param callback The callback to be invoked when the probe completes
*/
virtual void probeAsync(const std::string& serviceUrl, int timeoutMs, ProbeCallback callback) = 0;
};

/**
* Interface that allows dynamically providing the Pulsar service URL.
* Implementations can provide failover/discovery logic.
*/
class PULSAR_PUBLIC ServiceUrlProvider {
public:
virtual ~ServiceUrlProvider() {}

/**
* Initialize the provider with the client and prober interfaces.
* Called automatically by the client after construction.
*
* @param client Pointer to the ServiceUrlProviderClient interface
* @param prober Pointer to the ServiceProber interface
*/
virtual void initialize(ServiceUrlProviderClient* client, ServiceProber* prober) = 0;

/**
* Get the current service URL.
*/
virtual const std::string& getServiceUrl() const = 0;

/**
* Close the provider and release resources.
*/
virtual void close() = 0;
};

typedef std::shared_ptr<ServiceUrlProvider> ServiceUrlProviderPtr;

} // namespace pulsar

#endif /* PULSAR_SERVICE_URL_PROVIDER_H_ */
Loading
Loading