Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BIN_DIR := $(PROJECT_DIR)/bin
BOILERPLATE_DIR := $(PROJECT_DIR)/hack/boilerplate

# Image URL to use all building/pushing image targets
TAG ?= $(shell git describe --tags --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null )
TAG ?= $(shell git describe --tags --dirty --always --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null)
IMG ?= ghcr.io/adobe/kafka-operator:$(TAG)

# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
Expand Down Expand Up @@ -171,7 +171,7 @@ test: generate fmt vet bin/setup-envtest
cd third_party/github.com/banzaicloud/go-cruise-control && \
go test -v -parallel 2 -failfast ./... -cover -covermode=count -coverprofile cover.out -test.v -test.paniconexit0

# Run e2e tests
# Run e2e tests. Set IMG_E2E to use a custom operator image; otherwise the chart default is used.
test-e2e:
cd tests/e2e && IMG_E2E=${IMG_E2E} go test . \
-v \
Expand Down
281 changes: 281 additions & 0 deletions config/samples/simplekafkacluster_multidisk.yaml

Large diffs are not rendered by default.

59 changes: 49 additions & 10 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
configureBrokerZKMode(broker.Id, r.KafkaCluster, config, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log)
}

// This logic prevents the removal of the mountPath from the broker configmap
// log.dirs must reflect actual provisioned PVCs. During disk removal we keep removed paths
// in log.dirs only while CC is draining (VolumeState in progress); once removal succeeds
// or status is cleaned, we drop the path so config matches pod mounts.
brokerConfigMapName := fmt.Sprintf(brokerConfigTemplate+"-"+"%d", r.KafkaCluster.Name, broker.Id)
var brokerConfigMapOld corev1.ConfigMap
err := r.Get(context.Background(), client.ObjectKey{Name: brokerConfigMapName, Namespace: r.KafkaCluster.GetNamespace()}, &brokerConfigMapOld)
Expand All @@ -88,16 +90,10 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
}

mountPathsNew := generateStorageConfig(bConfig.StorageConfigs)
mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)
mountPathsEffective := getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew, fmt.Sprintf("%d", broker.Id), r.KafkaCluster)

if isMountPathRemoved {
log.Error(errors.New("removed storage is found in the KafkaCluster CR"),
"removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, broker.Id, "mountPaths",
mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew)
}

if len(mountPathsMerged) != 0 {
if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil {
if len(mountPathsEffective) != 0 {
if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsEffective, ",")); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerLogDirectory))
}
}
Expand Down Expand Up @@ -291,6 +287,49 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c
}
}

// getEffectiveLogDirsMountPaths returns the slice of log dir paths to set in log.dirs.
// It starts from mountPathsNew (desired from CR). For each path in mountPathsOld that is
// not in mountPathsNew (i.e. a removed disk), the path is included only if the broker's
// VolumeState for that path is still in progress (IsDiskRemoval or IsDiskRebalance).
// Once removal succeeds or status is cleaned, the path is not included so config matches
// actual provisioned PVCs and pod mounts.
func getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew []string, brokerID string, kafkaCluster *v1beta1.KafkaCluster) []string {
effective := make([]string, 0, len(mountPathsNew)+len(mountPathsOld))
effective = append(effective, mountPathsNew...)
if kafkaCluster == nil {
return effective
}
brokerState, hasBroker := kafkaCluster.Status.BrokersState[brokerID]
if !hasBroker || brokerState.GracefulActionState.VolumeStates == nil {
return effective
}
volumeStates := brokerState.GracefulActionState.VolumeStates
for _, oldPath := range mountPathsOld {
if containsPath(effective, oldPath) {
continue
}
// VolumeStates is keyed by PVC mount path (e.g. /kafka-logs); log dir path is mountPath + "/kafka"
mountPathKey := strings.TrimSuffix(oldPath, "/kafka")
volState, found := volumeStates[mountPathKey]
if !found || volState.CruiseControlVolumeState.IsDiskRemovalSucceeded() {
continue
}
if volState.CruiseControlVolumeState.IsDiskRemoval() || volState.CruiseControlVolumeState.IsDiskRebalance() {
effective = append(effective, oldPath)
}
}
return effective
}

func containsPath(slice []string, path string) bool {
for _, p := range slice {
if p == path {
return true
}
}
return false
}

// mergeMountPaths is merges the new mountPaths with the old.
// It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld
func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) {
Expand Down
112 changes: 112 additions & 0 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,118 @@ func TestMergeMountPaths(t *testing.T) {
}
}

func TestGetEffectiveLogDirsMountPaths(t *testing.T) {
tests := []struct {
testName string
mountPathsOld []string
mountPathsNew []string
brokerID string
kafkaCluster *v1beta1.KafkaCluster
expectedEffective []string
}{
{
testName: "no broker state - effective is mountPathsNew only",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: nil,
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "nil VolumeStates - effective is mountPathsNew only",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{Status: v1beta1.KafkaClusterStatus{BrokersState: map[string]v1beta1.BrokerState{"0": {GracefulActionState: v1beta1.GracefulActionState{VolumeStates: nil}}}}},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "removed path with VolumeState in progress (GracefulDiskRemovalRequired) - path kept in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
{
testName: "removed path with state not found - path not in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "removed path with IsDiskRemovalSucceeded - path not in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalSucceeded},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "removed path with IsDiskRebalance - path kept in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
got := getEffectiveLogDirsMountPaths(test.mountPathsOld, test.mountPathsNew, test.brokerID, test.kafkaCluster)
require.Equal(t, test.expectedEffective, got, "effective log dirs")
})
}
}

func TestGenerateBrokerConfig(t *testing.T) { //nolint funlen
tests := []struct {
testName string
Expand Down
23 changes: 20 additions & 3 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,27 @@ func getCreatedPvcForBroker(
return nil, errors.NewWithDetails("broker mount paths missing persistent volume claim",
banzaiv1beta1.BrokerIdLabelKey, brokerID, "mount paths", missing)
}
sort.Slice(foundPvcList.Items, func(i, j int) bool {
return foundPvcList.Items[i].Name < foundPvcList.Items[j].Name

// Only return PVCs that match current (desired) storage configs so that when a volume
// is removed from the spec the pod template does not include it. Otherwise we would
// delete the PVC in handleDiskRemoval while the pod still mounts it, leaving the PVC
// stuck in Terminating.
desiredMountPaths := make(map[string]struct{})
for i := range storageConfigs {
if storageConfigs[i].PvcSpec != nil {
desiredMountPaths[storageConfigs[i].MountPath] = struct{}{}
}
}
var filtered []corev1.PersistentVolumeClaim
for i := range foundPvcList.Items {
if _, ok := desiredMountPaths[foundPvcList.Items[i].GetAnnotations()["mountPath"]]; ok {
filtered = append(filtered, foundPvcList.Items[i])
}
}
sort.Slice(filtered, func(i, j int) bool {
return filtered[i].Name < filtered[j].Name
})
return foundPvcList.Items, nil
return filtered, nil
}

func getLoadBalancerIP(foundLBService *corev1.Service) (string, error) {
Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() {
testProduceConsumeInternalSSL(defaultTLSSecretName)
testJmxExporter()
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml")
testProduceConsumeInternal()
testJmxExporter()
testUninstallKafkaCluster()
testInstallKafkaClusterMultidisk()
testMultiDiskRemoval()
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
testUninstall()
snapshotClusterAndCompare(snapshottedInfo)
})
17 changes: 17 additions & 0 deletions tests/e2e/test_install_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,20 @@ func testInstallKafkaCluster(kafkaClusterManifestPath string) bool { //nolint:un
requireCreatingKafkaCluster(kubectlOptions, kafkaClusterManifestPath)
})
}

// testInstallKafkaClusterMultidisk installs the multi-disk Kafka cluster (same as testInstallKafkaCluster
// but with a distinct description so it can be run alone via --ginkgo.focus "multi-disk").
func testInstallKafkaClusterMultidisk() bool {
return ginkgo.When("Installing Kafka cluster (multi-disk)", func() {
var kubectlOptions k8s.KubectlOptions
var err error

ginkgo.It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace
requireCreatingKafkaCluster(kubectlOptions, "../../config/samples/simplekafkacluster_multidisk.yaml")
})
}
Loading
Loading