From ebf762e5e4decde9e893f0095da871ae5c8fd239 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 5 Feb 2026 23:20:39 -0800 Subject: [PATCH 1/2] improve user index metrics and logging Signed-off-by: yeya24 --- pkg/alertmanager/multitenant.go | 4 +++- pkg/compactor/compactor.go | 2 ++ pkg/ruler/ruler.go | 4 +++- pkg/util/users/index_updater.go | 18 +++++++++--------- pkg/util/users/scanner.go | 2 +- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 8bb3f0d75d4..094f05a4f90 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -704,7 +704,7 @@ func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) { // Hardcode ID to check which alertmanager owns updating user index. userID := users.UserIndexCompressedFilename // Align with clean up interval. - ticker := time.NewTicker(util.DurationWithJitter(am.userIndexUpdater.GetCleanUpInterval(), 0.1)) + ticker := time.NewTicker(util.DurationWithJitter(am.userIndexUpdater.GetUpdateInterval(), 0.1)) defer ticker.Stop() for { @@ -717,11 +717,13 @@ func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) { if !owned { continue } + start := time.Now() if err := am.userIndexUpdater.UpdateUserIndex(ctx); err != nil { level.Error(am.logger).Log("msg", "failed to update user index", "err", err) // Wait for next interval. Worst case, the user index scanner will fallback to list strategy. continue } + level.Info(am.logger).Log("msg", "successfully updated user index", "duration_ms", time.Since(start).Milliseconds()) } } } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 4251fe49174..85a847b0a96 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -1231,11 +1231,13 @@ func (c *Compactor) userIndexUpdateLoop(ctx context.Context) { if !owned { continue } + start := time.Now() if err := c.userIndexUpdater.UpdateUserIndex(ctx); err != nil { level.Error(c.logger).Log("msg", "failed to update user index", "err", err) // Wait for next interval. Worst case, the user index scanner will fallback to list strategy. continue } + level.Info(c.logger).Log("msg", "successfully updated user index", "duration_ms", time.Since(start).Milliseconds()) } } } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a965b33f39d..3f5730af937 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -738,7 +738,7 @@ func (r *Ruler) userIndexUpdateLoop(ctx context.Context) { // Hardcode ID to check which ruler owns updating user index. userID := users.UserIndexCompressedFilename // Align with clean up interval. - ticker := time.NewTicker(util.DurationWithJitter(r.store.GetUserIndexUpdater().GetCleanUpInterval(), 0.1)) + ticker := time.NewTicker(util.DurationWithJitter(r.store.GetUserIndexUpdater().GetUpdateInterval(), 0.1)) defer ticker.Stop() for { @@ -755,11 +755,13 @@ func (r *Ruler) userIndexUpdateLoop(ctx context.Context) { if !owned { continue } + start := time.Now() if err := r.userIndexUpdater.UpdateUserIndex(ctx); err != nil { level.Error(r.logger).Log("msg", "failed to update user index", "err", err) // Wait for next interval. Worst case, the user index scanner will fallback to list strategy. continue } + level.Info(r.logger).Log("msg", "successfully updated user index", "duration_ms", time.Since(start).Milliseconds()) } } } diff --git a/pkg/util/users/index_updater.go b/pkg/util/users/index_updater.go index bac9a89036a..2dc11935c7f 100644 --- a/pkg/util/users/index_updater.go +++ b/pkg/util/users/index_updater.go @@ -10,18 +10,18 @@ import ( ) type UserIndexUpdater struct { - bkt objstore.InstrumentedBucket - cleanupInterval time.Duration - scanner Scanner + bkt objstore.InstrumentedBucket + updateInterval time.Duration + scanner Scanner userIndexLastUpdated prometheus.Gauge } -func NewUserIndexUpdater(bkt objstore.InstrumentedBucket, cleanupInterval time.Duration, scanner Scanner, reg prometheus.Registerer) *UserIndexUpdater { +func NewUserIndexUpdater(bkt objstore.InstrumentedBucket, updateInterval time.Duration, scanner Scanner, reg prometheus.Registerer) *UserIndexUpdater { return &UserIndexUpdater{ - bkt: bkt, - cleanupInterval: cleanupInterval, - scanner: scanner, + bkt: bkt, + updateInterval: updateInterval, + scanner: scanner, userIndexLastUpdated: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "cortex_user_index_last_successful_update_timestamp_seconds", Help: "Timestamp of the last successful update of user index.", @@ -29,8 +29,8 @@ func NewUserIndexUpdater(bkt objstore.InstrumentedBucket, cleanupInterval time.D } } -func (u *UserIndexUpdater) GetCleanUpInterval() time.Duration { - return u.cleanupInterval +func (u *UserIndexUpdater) GetUpdateInterval() time.Duration { + return u.updateInterval } func (u *UserIndexUpdater) UpdateUserIndex(ctx context.Context) error { diff --git a/pkg/util/users/scanner.go b/pkg/util/users/scanner.go index 7ea45f8999c..3b5bf21ea03 100644 --- a/pkg/util/users/scanner.go +++ b/pkg/util/users/scanner.go @@ -155,7 +155,7 @@ func (s *userIndexScanner) ScanUsers(ctx context.Context) ([]string, []string, [ if errors.Is(err, ErrIndexNotFound) { level.Info(s.logger).Log("msg", "user index not found, fallback to base scanner") s.fallbackScans.WithLabelValues("not-found").Inc() - } else { + } else if !errors.Is(err, context.Canceled) { // Always fallback to the list scanner if failed to read the user index. level.Error(s.logger).Log("msg", "failed to read user index, fallback to base scanner", "error", err) s.fallbackScans.WithLabelValues("corrupted").Inc() From a42b70cae690553f4c8cceb0b3d5a966a8421fdd Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 6 Feb 2026 09:45:25 -0800 Subject: [PATCH 2/2] update ignored logs Signed-off-by: yeya24 --- pkg/compactor/compactor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index ecb0b982a6a..57a4e064c02 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1505,6 +1505,7 @@ func removeIgnoredLogs(input []string) []string { `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="changing instance state from LEAVING -> LEAVING is disallowed"`: {}, `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="changing instance state from JOINING -> LEAVING is disallowed"`: {}, `level=info component=compactor msg="user index not found, fallback to base scanner"`: {}, + `level=info component=compactor msg="successfully updated user index"`: {}, `level=error component=compactor msg="context timeout, exit user index update loop" err="context canceled"`: {}, `level=debug component=compactor msg="unregistering instance from ring" ring=compactor`: {}, `level=info component=compactor msg="instance removed from the KV store" ring=compactor`: {},