diff --git a/Makefile b/Makefile index 8cf6fad82..a6a4a5894 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ BIN_DIR := $(PROJECT_DIR)/bin BOILERPLATE_DIR := $(PROJECT_DIR)/hack/boilerplate # Image URL to use all building/pushing image targets -TAG ?= $(shell git describe --tags --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null ) +TAG ?= $(shell git describe --tags --dirty --always --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null) IMG ?= ghcr.io/adobe/kafka-operator:$(TAG) # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) @@ -171,7 +171,7 @@ test: generate fmt vet bin/setup-envtest cd third_party/github.com/banzaicloud/go-cruise-control && \ go test -v -parallel 2 -failfast ./... -cover -covermode=count -coverprofile cover.out -test.v -test.paniconexit0 -# Run e2e tests +# Run e2e tests. Set IMG_E2E to use a custom operator image; otherwise the chart default is used. test-e2e: cd tests/e2e && IMG_E2E=${IMG_E2E} go test . \ -v \ diff --git a/config/samples/simplekafkacluster_multidisk.yaml b/config/samples/simplekafkacluster_multidisk.yaml new file mode 100644 index 000000000..0b23f37d2 --- /dev/null +++ b/config/samples/simplekafkacluster_multidisk.yaml @@ -0,0 +1,281 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + labels: + controller-tools.k8s.io: "1.0" + name: kafka +spec: + kRaft: false + monitoringConfig: + jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0" + headlessServiceEnabled: true + zkAddresses: + - "zookeeper-server-client.zookeeper:2181" + propagateLabels: false + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=2 + brokerConfigGroups: + default: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + - mountPath: "/kafka-logs-extra" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + brokerAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "9020" + # brokerLabels: + # kafka_broker_group: "default_group" + brokers: + - id: 0 + brokerConfigGroup: "default" + # brokerConfig: + # envs: + # - name: +CLASSPATH + # value: "/opt/kafka/libs/dev/*:" + # - name: CLASSPATH+ + # value: ":/opt/kafka/libs/extra-jars/*" + - id: 1 + brokerConfigGroup: "default" + - id: 2 + brokerConfigGroup: "default" + rollingUpgradeConfig: + failureThreshold: 1 + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true + cruiseControlConfig: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + cruiseControlTaskSpec: + RetryDurationMinutes: 5 + topicConfig: + partitions: 12 + replicationFactor: 3 +# resourceRequirements: +# requests: +# cpu: 500m +# memory: 1Gi +# limits: +# cpu: 500m +# memory: 1Gi + image: "adobe/cruise-control:3.0.3-adbe-20250804" + config: | + # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + # + # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details. + # Configuration for the metadata client. + # ======================================= + # The maximum interval in milliseconds between two metadata refreshes. + #metadata.max.age.ms=300000 + # Client id for the Cruise Control. It is used for the metadata client. + #client.id=kafka-cruise-control + # The size of TCP send buffer bytes for the metadata client. + #send.buffer.bytes=131072 + # The size of TCP receive buffer size for the metadata client. + #receive.buffer.bytes=131072 + # The time to wait before disconnect an idle TCP connection. + #connections.max.idle.ms=540000 + # The time to wait before reconnect to a given host. + #reconnect.backoff.ms=50 + # The time to wait for a response from a host after sending a request. + #request.timeout.ms=30000 + # Configurations for the load monitor + # ======================================= + # The number of metric fetcher thread to fetch metrics for the Kafka cluster + num.metric.fetchers=1 + # The metric sampler class + metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler + # Configurations for CruiseControlMetricsReporterSampler + metric.reporter.topic.pattern=__CruiseControlMetrics + # The sample store class name + sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore + # The config for the Kafka sample store to save the partition metric samples + partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples + # The config for the Kafka sample store to save the model training samples + broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples + # The replication factor of Kafka metric sample store topic + sample.store.topic.replication.factor=2 + # The config for the number of Kafka sample store consumer threads + num.sample.loading.threads=8 + # The partition assignor class for the metric samplers + metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor + # The metric sampling interval in milliseconds + metric.sampling.interval.ms=120000 + metric.anomaly.detection.interval.ms=180000 + # The partition metrics window size in milliseconds + partition.metrics.window.ms=300000 + # The number of partition metric windows to keep in memory + num.partition.metrics.windows=1 + # The minimum partition metric samples required for a partition in each window + min.samples.per.partition.metrics.window=1 + # The broker metrics window size in milliseconds + broker.metrics.window.ms=300000 + # The number of broker metric windows to keep in memory + num.broker.metrics.windows=20 + # The minimum broker metric samples required for a partition in each window + min.samples.per.broker.metrics.window=1 + # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities) + capacity.config.file=config/capacity.json + #capacity.config.file=config/capacityJBOD.json + # Configurations for the analyzer + # ======================================= + # The list of goals to optimize the Kafka cluster for with pre-computed proposals + default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal + # The list of supported goals + goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal + # The list of supported hard goals + hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The minimum percentage of well monitored partitions out of all the partitions + min.monitored.partition.percentage=0.95 + # The balance threshold for CPU + cpu.balance.threshold=1.1 + # The balance threshold for disk + disk.balance.threshold=1.1 + # The balance threshold for network inbound utilization + network.inbound.balance.threshold=1.1 + # The balance threshold for network outbound utilization + network.outbound.balance.threshold=1.1 + # The balance threshold for the replica count + replica.count.balance.threshold=1.1 + # The capacity threshold for CPU in percentage + cpu.capacity.threshold=0.8 + # The capacity threshold for disk in percentage + disk.capacity.threshold=0.8 + # The capacity threshold for network inbound utilization in percentage + network.inbound.capacity.threshold=0.8 + # The capacity threshold for network outbound utilization in percentage + network.outbound.capacity.threshold=0.8 + # The threshold to define the cluster to be in a low CPU utilization state + cpu.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + disk.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low network inbound utilization state + network.inbound.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + network.outbound.low.utilization.threshold=0.0 + # The metric anomaly percentile upper threshold + metric.anomaly.percentile.upper.threshold=90.0 + # The metric anomaly percentile lower threshold + metric.anomaly.percentile.lower.threshold=10.0 + # How often should the cached proposal be expired and recalculated if necessary + proposal.expiration.ms=60000 + # The maximum number of replicas that can reside on a broker at any given time. + max.replicas.per.broker=10000 + # The number of threads to use for proposal candidate precomputing. + num.proposal.precompute.threads=1 + # the topics that should be excluded from the partition movement. + #topics.excluded.from.partition.movement + # Configurations for the executor + # ======================================= + # The max number of partitions to move in/out on a given broker at a given time. + num.concurrent.partition.movements.per.broker=10 + # The interval between two execution progress checks. + execution.progress.check.interval.ms=10000 + # Configurations for anomaly detector + # ======================================= + # The goal violation notifier class + anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier + # The metric anomaly finder class + metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder + # The anomaly detection interval + anomaly.detection.interval.ms=10000 + # The goal violation to detect. + anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The interested metrics for metric anomaly analyzer. + metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN + ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics. + #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH + # The zk path to store failed broker information. + failed.brokers.zk.path=/CruiseControlBrokerList + # Topic config provider class + topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider + # The cluster configurations for the KafkaTopicConfigProvider + cluster.configs.file=config/clusterConfigs.json + # The maximum time in milliseconds to store the response and access details of a completed user task. + completed.user.task.retention.time.ms=21600000 + # The maximum time in milliseconds to retain the demotion history of brokers. + demotion.history.retention.time.ms=86400000 + # The maximum number of completed user tasks for which the response and access details will be cached. + max.cached.completed.user.tasks=500 + # The maximum number of user tasks for concurrently running in async endpoints across all users. + max.active.user.tasks=25 + # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled + self.healing.enabled=true + # Enable self healing for broker failure detector + #self.healing.broker.failure.enabled=true + # Enable self healing for goal violation detector + #self.healing.goal.violation.enabled=true + # Enable self healing for metric anomaly detector + #self.healing.metric.anomaly.enabled=true + # configurations for the webserver + # ================================ + # HTTP listen port + webserver.http.port=9090 + # HTTP listen address + webserver.http.address=0.0.0.0 + # Whether CORS support is enabled for API or not + webserver.http.cors.enabled=false + # Value for Access-Control-Allow-Origin + webserver.http.cors.origin=http://localhost:8080/ + # Value for Access-Control-Request-Method + webserver.http.cors.allowmethods=OPTIONS,GET,POST + # Headers that should be exposed to the Browser (Webapp) + # This is a special header that is used by the + # User Tasks subsystem and should be explicitly + # Enabled when CORS mode is used as part of the + # Admin Interface + webserver.http.cors.exposeheaders=User-Task-ID + # REST API default prefix + # (dont forget the ending *) + webserver.api.urlprefix=/kafkacruisecontrol/* + # Location where the Cruise Control frontend is deployed + webserver.ui.diskpath=./cruise-control-ui/dist/ + # URL path prefix for UI + # (dont forget the ending *) + webserver.ui.urlprefix=/* + # Time After which request is converted to Async + webserver.request.maxBlockTimeMs=10000 + # Default Session Expiry Period + webserver.session.maxExpiryTimeMs=60000 + # Session cookie path + webserver.session.path=/ + # Server Access Logs + webserver.accesslog.enabled=true + # Location of HTTP Request Logs + webserver.accesslog.path=access.log + # HTTP Request Log retention days + webserver.accesslog.retention.days=14 + clusterConfig: | + { + "min.insync.replicas": 3 + } diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 1d14a2c2f..0f0d8bad6 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -88,13 +88,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v } mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) - mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) - - if isMountPathRemoved { - log.Error(errors.New("removed storage is found in the KafkaCluster CR"), - "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, broker.Id, "mountPaths", - mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew) - } + mountPathsMerged := getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew, fmt.Sprintf("%d", broker.Id), r.KafkaCluster) if len(mountPathsMerged) != 0 { if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil { @@ -291,29 +285,53 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c } } -// mergeMountPaths is merges the new mountPaths with the old. -// It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld -func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) { - var mountPathsMerged []string - mountPathsMerged = append(mountPathsMerged, mountPathsNew...) - isMountPathRemoved := false - // Merging the new mountPaths with the old. If any of them is removed we can check the difference in the mountPathsOldLen - for i := range mountPathsOld { - found := false - for k := range mountPathsNew { - if mountPathsOld[i] == mountPathsNew[k] { - found = true - break - } +func getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew []string, brokerID string, kafkaCluster *v1beta1.KafkaCluster) []string { + mountPathsEffective := append([]string{}, mountPathsNew...) + if len(mountPathsOld) == 0 { + return mountPathsEffective + } + + newMountPathsSet := make(map[string]struct{}, len(mountPathsNew)) + for _, path := range mountPathsNew { + newMountPathsSet[path] = struct{}{} + } + + for _, oldPath := range mountPathsOld { + if _, found := newMountPathsSet[oldPath]; found { + continue } - // if this is a new mountPath then add it to the current - if !found { - mountPathsMerged = append(mountPathsMerged, mountPathsOld[i]) - isMountPathRemoved = true + + if shouldKeepRemovedLogDirInConfig(oldPath, brokerID, kafkaCluster) { + mountPathsEffective = append(mountPathsEffective, oldPath) } } - return mountPathsMerged, isMountPathRemoved + return mountPathsEffective +} + +func shouldKeepRemovedLogDirInConfig(logDirPath, brokerID string, kafkaCluster *v1beta1.KafkaCluster) bool { + if kafkaCluster == nil { + return false + } + + brokerState, found := kafkaCluster.Status.BrokersState[brokerID] + if !found || brokerState.GracefulActionState.VolumeStates == nil { + return false + } + + volumePath := strings.TrimSuffix(logDirPath, "/kafka") + volumeState, found := brokerState.GracefulActionState.VolumeStates[volumePath] + if !found { + return false + } + + switch volumeState.CruiseControlVolumeState { + case v1beta1.GracefulDiskRemovalRequired, v1beta1.GracefulDiskRemovalScheduled, v1beta1.GracefulDiskRemovalRunning, + v1beta1.GracefulDiskRebalanceRequired, v1beta1.GracefulDiskRebalanceScheduled, v1beta1.GracefulDiskRebalanceRunning: + return true + default: + return false + } } func generateSuperUsers(users []string) (suStrings []string) { diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 8cce33a23..14ff8b8a3 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -73,56 +73,155 @@ zookeeper.connect=zookeeper-server-client.zookeeper:2181/ } } -func TestMergeMountPaths(t *testing.T) { +func TestGetEffectiveLogDirsMountPaths(t *testing.T) { tests := []struct { - testName string - mountPathNew []string - mountPathOld []string - expectedMergedMountPath []string - expectedRemoved bool + testName string + mountPathsOld []string + mountPathsNew []string + brokerID string + kafkaCluster *v1beta1.KafkaCluster + expectedEffective []string }{ { - testName: "no old mountPath", - mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - mountPathOld: []string{}, - expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedRemoved: false, + testName: "no broker state - effective is mountPathsNew only", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: nil, + expectedEffective: []string{"/kafka-logs/kafka"}, }, { - testName: "same", - mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedRemoved: false, + testName: "nil VolumeStates - effective is mountPathsNew only", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{Status: v1beta1.KafkaClusterStatus{BrokersState: map[string]v1beta1.BrokerState{"0": {GracefulActionState: v1beta1.GracefulActionState{VolumeStates: nil}}}}}, + expectedEffective: []string{"/kafka-logs/kafka"}, }, { - testName: "changed order", - mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedRemoved: false, + testName: "removed path with VolumeState in progress (GracefulDiskRemovalRequired) - path kept in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired}, + }, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + }, + { + testName: "removed path with state not found - path not in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{}, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka"}, + }, + { + testName: "removed path with IsDiskRemovalSucceeded - path not in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalSucceeded}, + }, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka"}, + }, + { + testName: "removed path with IsDiskRebalance - path kept in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired}, + }, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, }, { - testName: "removed one", - mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka", "/kafka-logs3/kafka"}, - expectedRemoved: true, + testName: "removed path with disk removal completed with error - path not in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalCompletedWithError}, + }, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka"}, }, { - testName: "removed all", - mountPathNew: []string{}, - mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedRemoved: true, + testName: "removed path with disk rebalance paused - path not in effective", + mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"}, + mountPathsNew: []string{"/kafka-logs/kafka"}, + brokerID: "0", + kafkaCluster: &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRebalancePaused}, + }, + }, + }, + }, + }, + }, + expectedEffective: []string{"/kafka-logs/kafka"}, }, } for _, test := range tests { - mergedMountPaths, isRemoved := mergeMountPaths(test.mountPathOld, test.mountPathNew) - if !reflect.DeepEqual(mergedMountPaths, test.expectedMergedMountPath) { - t.Errorf("testName: %s, expected: %s, got: %s", test.testName, test.expectedMergedMountPath, mergedMountPaths) - } - require.Equal(t, test.expectedRemoved, isRemoved) + t.Run(test.testName, func(t *testing.T) { + got := getEffectiveLogDirsMountPaths(test.mountPathsOld, test.mountPathsNew, test.brokerID, test.kafkaCluster) + require.Equal(t, test.expectedEffective, got, "effective log dirs") + }) } } diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index a2c59bffe..86841c38c 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -69,6 +69,9 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() { testProduceConsumeInternalSSL(defaultTLSSecretName) testJmxExporter() testUninstallKafkaCluster() + testInstallKafkaCluster("../../config/samples/simplekafkacluster_multidisk.yaml") + testMultiDiskRemoval() + testUninstallKafkaCluster() testUninstallZookeeperCluster() testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml") testProduceConsumeInternal() diff --git a/tests/e2e/test_multidisk_removal.go b/tests/e2e/test_multidisk_removal.go new file mode 100644 index 000000000..0284f19ee --- /dev/null +++ b/tests/e2e/test_multidisk_removal.go @@ -0,0 +1,134 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// Copyright 2025 Adobe. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + ginkgo "github.com/onsi/ginkgo/v2" + gomega "github.com/onsi/gomega" + + "github.com/banzaicloud/koperator/api/v1beta1" + kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" +) + +const ( + multidiskRemovalTimeout = 900 * time.Second // CC disk removal can take long + multidiskRemovalPollInterval = 15 * time.Second + removedLogDirPath = "/kafka-logs-extra/kafka" + brokerConfigTemplateFormat = "%s-config-%d" +) + +// testMultiDiskRemoval applies the single-disk manifest to remove the second disk from the cluster, +// waits for Cruise Control and PVC cleanup, then asserts broker ConfigMaps' log.dirs no longer +// contain the removed path and brokers stay healthy. +func testMultiDiskRemoval() bool { + return ginkgo.When("Multi-disk removal: remove one disk and assert log.dirs is updated", func() { + var kubectlOptions k8s.KubectlOptions + var err error + + ginkgo.It("Acquiring K8s config and context", func() { + kubectlOptions, err = kubectlOptionsForCurrentContext() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + }) + + ginkgo.It("Applying single-disk manifest to trigger disk removal", func() { + ginkgo.By("Patching KafkaCluster to remove second disk (storageConfigs -> single entry)") + applyK8sResourceManifest(kubectlOptions, "../../config/samples/simplekafkacluster.yaml") + }) + + ginkgo.It("Waiting for disk removal and PVC cleanup", func() { + ginkgo.By("Waiting until broker ConfigMaps' log.dirs no longer contain the removed path") + gomega.Eventually(context.Background(), func() (bool, error) { + return brokerConfigMapsLogDirsExcludePath(kubectlOptions, kafkaClusterName, removedLogDirPath) + }, multidiskRemovalTimeout, multidiskRemovalPollInterval).Should(gomega.BeTrue()) + }) + + ginkgo.It("Asserting broker ConfigMaps log.dirs do not contain removed path", func() { + exclude, err := brokerConfigMapsLogDirsExcludePath(kubectlOptions, kafkaClusterName, removedLogDirPath) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(exclude).To(gomega.BeTrue(), "broker log.dirs must not contain removed path %s", removedLogDirPath) + }) + + ginkgo.It("Asserting Kafka brokers remain healthy", func() { + err := waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, + v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+",app=kafka", "") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + }) +} + +// brokerConfigMapsLogDirsExcludePath returns true if all broker ConfigMaps (for the given cluster) +// have log.dirs that do not contain the given path. Returns error if any required ConfigMap is missing +// or broker-config data cannot be read. +func brokerConfigMapsLogDirsExcludePath(kubectlOptions k8s.KubectlOptions, clusterName string, path string) (bool, error) { + // Brokers 0, 1, 2 from default sample + for _, brokerID := range []int{0, 1, 2} { + configMapName := fmt.Sprintf(brokerConfigTemplateFormat, clusterName, brokerID) + logDirs, err := getBrokerConfigMapLogDirs(kubectlOptions, configMapName, kubectlOptions.Namespace) + if err != nil { + return false, err + } + for _, d := range logDirs { + if d == path { + return false, nil + } + } + } + return true, nil +} + +// getBrokerConfigMapLogDirs returns the log.dirs value from the broker ConfigMap's broker-config data, +// parsed as a slice of paths (comma-separated in the config). +func getBrokerConfigMapLogDirs(kubectlOptions k8s.KubectlOptions, configMapName string, namespace string) ([]string, error) { + args := []string{ + "get", "configmap", configMapName, + "-n", namespace, + "-o", fmt.Sprintf("jsonpath={.data.%s}", kafkautils.ConfigPropertyName), + } + output, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, args...) + if err != nil { + return nil, fmt.Errorf("getting configmap %s: %w", configMapName, err) + } + // Parse properties-style content for log.dirs=path1,path2 (broker-config is multi-line) + prefix := "log.dirs=" + lines := strings.Split(output, "\n") + for i := range lines { + line := strings.TrimSpace(lines[i]) + if strings.HasPrefix(line, prefix) { + value := strings.TrimPrefix(line, prefix) + value = strings.TrimSpace(value) + if value == "" { + return []string{}, nil + } + var paths []string + for _, p := range strings.Split(value, ",") { + if q := strings.TrimSpace(p); q != "" { + paths = append(paths, q) + } + } + return paths, nil + } + } + return nil, fmt.Errorf("log.dirs not found in configmap %s", configMapName) +}