Skip to content

Commit 1d22dff

Browse files
committed
Extract the RegisterFailedScaleUp metric generation into a separate NodeGroupChangeObserver instance
1 parent 9517b7f commit 1d22dff

File tree

10 files changed

+212
-140
lines changed

10 files changed

+212
-140
lines changed

cluster-autoscaler/clusterstate/clusterstate.go

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import (
2929
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
3030
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
3131
"k8s.io/autoscaler/cluster-autoscaler/metrics"
32+
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
3233
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
3334
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
3435
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3536
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
36-
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
3737
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3838
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
3939

@@ -115,13 +115,10 @@ type UnregisteredNode struct {
115115
type ScaleUpFailure struct {
116116
NodeGroup cloudprovider.NodeGroup
117117
Reason metrics.FailedScaleUpReason
118+
ErrorInfo cloudprovider.InstanceErrorInfo
118119
Time time.Time
119120
}
120121

121-
type metricObserver interface {
122-
RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string)
123-
}
124-
125122
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
126123
type ClusterStateRegistry struct {
127124
sync.Mutex
@@ -148,11 +145,11 @@ type ClusterStateRegistry struct {
148145
interrupt chan struct{}
149146
nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
150147
asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker
151-
metrics metricObserver
152-
153148
// scaleUpFailures contains information about scale-up failures for each node group. It should be
154149
// cleared periodically to avoid unnecessary accumulation.
155150
scaleUpFailures map[string][]ScaleUpFailure
151+
152+
scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
156153
}
157154

158155
// NodeGroupScalingSafety contains information about the safety of the node group to scale up/down.
@@ -163,11 +160,7 @@ type NodeGroupScalingSafety struct {
163160
}
164161

165162
// NewClusterStateRegistry creates new ClusterStateRegistry.
166-
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker) *ClusterStateRegistry {
167-
return newClusterStateRegistry(cloudProvider, config, logRecorder, backoff, nodeGroupConfigProcessor, asyncNodeGroupStateChecker, metrics.DefaultMetrics)
168-
}
169-
170-
func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, metrics metricObserver) *ClusterStateRegistry {
163+
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList) *ClusterStateRegistry {
171164
return &ClusterStateRegistry{
172165
scaleUpRequests: make(map[string]*ScaleUpRequest),
173166
scaleDownRequests: make([]*ScaleDownRequest, 0),
@@ -188,7 +181,7 @@ func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
188181
scaleUpFailures: make(map[string][]ScaleUpFailure),
189182
nodeGroupConfigProcessor: nodeGroupConfigProcessor,
190183
asyncNodeGroupStateChecker: asyncNodeGroupStateChecker,
191-
metrics: metrics,
184+
scaleStateNotifier: scaleStateNotifier,
192185
}
193186
}
194187

@@ -282,10 +275,11 @@ func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeG
282275
}
283276

284277
// To be executed under a lock.
285-
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
278+
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) []ScaleUpFailure {
286279
// clean up stale backoff info
287280
csr.backoff.RemoveStaleBackoffData(currentTime)
288281

282+
var failedScaleUps []ScaleUpFailure
289283
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
290284
if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) {
291285
continue
@@ -304,19 +298,16 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
304298
csr.logRecorder.Eventf(apiv1.EventTypeWarning, "ScaleUpTimedOut",
305299
"Nodes added to group %s failed to register within %v",
306300
scaleUpRequest.NodeGroup.Id(), currentTime.Sub(scaleUpRequest.Time))
307-
availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
308-
gpuResource, gpuType := "", ""
309-
nodeInfo, err := scaleUpRequest.NodeGroup.TemplateNodeInfo()
310-
if err != nil {
311-
klog.Warningf("Failed to get template node info for a node group: %s", err)
312-
} else {
313-
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), scaleUpRequest.NodeGroup)
314-
}
315-
csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.InstanceErrorInfo{
316-
ErrorClass: cloudprovider.OtherErrorClass,
317-
ErrorCode: "timeout",
318-
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
319-
}, gpuResource, gpuType, currentTime)
301+
failedScaleUps = append(failedScaleUps, ScaleUpFailure{
302+
NodeGroup: scaleUpRequest.NodeGroup,
303+
Reason: metrics.Timeout,
304+
ErrorInfo: cloudprovider.InstanceErrorInfo{
305+
ErrorClass: cloudprovider.OtherErrorClass,
306+
ErrorCode: string(metrics.Timeout),
307+
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
308+
},
309+
Time: currentTime,
310+
})
320311
delete(csr.scaleUpRequests, nodeGroupName)
321312
}
322313
}
@@ -328,6 +319,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
328319
}
329320
}
330321
csr.scaleDownRequests = newScaleDownRequests
322+
return failedScaleUps
331323
}
332324

333325
// To be executed under a lock.
@@ -340,27 +332,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
340332
// RegisterFailedScaleUp should be called after getting error from cloudprovider
341333
// when trying to scale-up node group. It will mark this group as not safe to autoscale
342334
// for some time.
343-
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
335+
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) {
344336
csr.Lock()
345337
defer csr.Unlock()
346-
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
347-
ErrorClass: cloudprovider.OtherErrorClass,
348-
ErrorCode: string(reason),
349-
ErrorMessage: errorMessage,
350-
}, gpuResourceName, gpuType, currentTime)
338+
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{
339+
NodeGroup: nodeGroup,
340+
Reason: metrics.FailedScaleUpReason(errorInfo.ErrorCode),
341+
Time: currentTime})
342+
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
351343
}
352344

353345
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
354346
// We don't need to implement this function for cluster state registry
355347
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
356348
}
357349

358-
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
359-
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
360-
csr.metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
361-
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
362-
}
363-
364350
// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
365351
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) error {
366352
csr.updateNodeGroupMetrics()
@@ -374,12 +360,27 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
374360
if err != nil {
375361
return err
376362
}
363+
scaleUpFailures := csr.updateClusterStateRegistry(
364+
nodes,
365+
nodeInfosForGroups,
366+
cloudProviderNodeInstances,
367+
currentTime,
368+
targetSizes,
369+
)
370+
for _, failure := range scaleUpFailures {
371+
csr.scaleStateNotifier.RegisterFailedScaleUp(failure.NodeGroup, failure.ErrorInfo, failure.Time)
372+
}
373+
return nil
374+
}
375+
376+
func (csr *ClusterStateRegistry) updateClusterStateRegistry(nodes []*apiv1.Node,
377+
nodeInfosForGroups map[string]*framework.NodeInfo,
378+
cloudProviderNodeInstances map[string][]cloudprovider.Instance, currentTime time.Time, targetSizes map[string]int) []ScaleUpFailure {
377379
cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes)
378380
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)
379381

380382
csr.Lock()
381383
defer csr.Unlock()
382-
383384
csr.nodes = nodes
384385
csr.nodeInfosForGroups = nodeInfosForGroups
385386
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
@@ -392,12 +393,12 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
392393
// update acceptable ranges based on requests from last loop and targetSizes
393394
// updateScaleRequests relies on acceptableRanges being up to date
394395
csr.updateAcceptableRanges(targetSizes)
395-
csr.updateScaleRequests(currentTime)
396-
csr.handleInstanceCreationErrors(currentTime)
396+
scaleUpFailures := csr.updateScaleRequests(currentTime)
397+
scaleUpFailures = append(scaleUpFailures, csr.handleInstanceCreationErrors(currentTime)...)
397398
// recalculate acceptable ranges after removing timed out requests
398399
csr.updateAcceptableRanges(targetSizes)
399400
csr.updateIncorrectNodeGroupSizes(currentTime)
400-
return nil
401+
return scaleUpFailures
401402
}
402403

403404
// Recalculate cluster state after scale-ups or scale-downs were registered.
@@ -1132,23 +1133,25 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
11321133
return currentSize, targetSize
11331134
}
11341135

1135-
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) {
1136+
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) []ScaleUpFailure {
11361137
nodeGroups := csr.getRunningNodeGroups()
11371138

1139+
var failedScaleUps []ScaleUpFailure
11381140
for _, nodeGroup := range nodeGroups {
1139-
csr.handleInstanceCreationErrorsForNodeGroup(
1141+
failedScaleUps = append(failedScaleUps, csr.handleInstanceCreationErrorsForNodeGroup(
11401142
nodeGroup,
11411143
csr.cloudProviderNodeInstances[nodeGroup.Id()],
11421144
csr.previousCloudProviderNodeInstances[nodeGroup.Id()],
1143-
currentTime)
1145+
currentTime)...)
11441146
}
1147+
return failedScaleUps
11451148
}
11461149

11471150
func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11481151
nodeGroup cloudprovider.NodeGroup,
11491152
currentInstances []cloudprovider.Instance,
11501153
previousInstances []cloudprovider.Instance,
1151-
currentTime time.Time) {
1154+
currentTime time.Time) []ScaleUpFailure {
11521155

11531156
_, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToErrorCodeMappings(currentInstances)
11541157
previousInstanceToErrorCode, _, _ := csr.buildInstanceToErrorCodeMappings(previousInstances)
@@ -1159,6 +1162,7 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11591162
}
11601163
}
11611164

1165+
var failedScaleUps []ScaleUpFailure
11621166
// If node group is scaling up and there are new node-create requests which cannot be satisfied because of
11631167
// out-of-resources errors we:
11641168
// - emit event
@@ -1183,25 +1187,21 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11831187
nodeGroup.Id(),
11841188
errorCode,
11851189
csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]))
1186-
1187-
availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
1188-
gpuResource, gpuType := "", ""
1189-
nodeInfo, err := nodeGroup.TemplateNodeInfo()
1190-
if err != nil {
1191-
klog.Warningf("Failed to get template node info for a node group: %s", err)
1192-
} else {
1193-
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), nodeGroup)
1194-
}
11951190
// Decrease the scale up request by the number of deleted nodes
11961191
csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime)
1197-
1198-
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode.code), cloudprovider.InstanceErrorInfo{
1199-
ErrorClass: errorCode.class,
1200-
ErrorCode: errorCode.code,
1201-
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
1202-
}, gpuResource, gpuType, currentTime)
1192+
failedScaleUps = append(failedScaleUps, ScaleUpFailure{
1193+
NodeGroup: nodeGroup,
1194+
Reason: metrics.FailedScaleUpReason(errorCode.code),
1195+
ErrorInfo: cloudprovider.InstanceErrorInfo{
1196+
ErrorClass: errorCode.class,
1197+
ErrorCode: errorCode.code,
1198+
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
1199+
},
1200+
Time: currentTime,
1201+
})
12031202
}
12041203
}
1204+
return failedScaleUps
12051205
}
12061206

12071207
func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string {

0 commit comments

Comments
 (0)