diff --git a/CHANGELOG.md b/CHANGELOG.md index 4933309d93..3f0abc19fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ * [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210 * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 +* [ENHANCEMENT] Distributor: Add dimensison `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics. * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 1e65920c2b..a22ef02a2f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/scrape" @@ -73,8 +74,9 @@ const ( // it was based on empirical observation: See BenchmarkMergeSlicesParallel mergeSlicesParallelism = 8 - sampleMetricTypeFloat = "float" - sampleMetricTypeHistogram = "histogram" + sampleMetricTypeFloat = "float" + sampleMetricTypeHistogram = "histogram" + sampleMetricTypeHistogramNHCB = "nhcb" // Native histogram with custom buckets schema ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -523,10 +525,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) { d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat) d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram) + d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB) d.receivedExemplars.DeleteLabelValues(userID) d.receivedMetadata.DeleteLabelValues(userID) d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat) d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram) + d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB) d.incomingExemplars.DeleteLabelValues(userID) d.incomingMetadata.DeleteLabelValues(userID) d.nonHASamples.DeleteLabelValues(userID) @@ -736,15 +740,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co numFloatSamples := 0 numHistogramSamples := 0 + numNHCBSamples := 0 numExemplars := 0 for _, ts := range req.Timeseries { numFloatSamples += len(ts.Samples) + nhcb := countNHCB(ts.Histograms) + numNHCBSamples += nhcb numHistogramSamples += len(ts.Histograms) numExemplars += len(ts.Exemplars) } // Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics. d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples)) d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numHistogramSamples)) + d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(numNHCBSamples)) d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars)) // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) @@ -799,7 +807,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } // A WriteRequest can only contain series or metadata but not both. This might change in the future. - seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica) + seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica) if err != nil { return nil, err } @@ -807,6 +815,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples)) d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples)) + d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(validatedNHCBSamples)) d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) @@ -1014,7 +1023,18 @@ type samplesLabelSetEntry struct { labels labels.Labels } -func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) { +// countNHCB returns the number of native histograms with custom buckets schema in the given slice. +func countNHCB(histograms []cortexpb.Histogram) int { + n := 0 + for _, h := range histograms { + if histogram.IsCustomBucketsSchema(h.GetSchema()) { + n++ + } + } + return n +} + +func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, int, error, error) { pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys") defer pSpan.Finish() @@ -1026,6 +1046,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write nhSeriesKeys := make([]uint32, 0, len(req.Timeseries)) validatedFloatSamples := 0 validatedHistogramSamples := 0 + validatedNHCBSamples := 0 validatedExemplars := 0 limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID) @@ -1045,9 +1066,10 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() - for _, ts := range req.Timeseries { + for i := range req.Timeseries { + ts := &req.Timeseries[i] if len(ts.Labels) == 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found") + return nil, nil, nil, nil, 0, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found") } if limits.AcceptHASamples && limits.AcceptMixedHASamples { @@ -1138,9 +1160,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write // label and dropped labels (if any) key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, 0, nil, err } - validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits) + validatedSeries, validationErr := d.validateSeries(*ts, userID, skipLabelNameValidation, limits) // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. @@ -1160,6 +1182,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write // TODO: use pool. labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits)) } + nhcb := countNHCB(ts.Histograms) for _, l := range matchedLabelSetLimits { if c, exists := labelSetCounters[l.Hash]; exists { c.floatSamples += int64(len(ts.Samples)) @@ -1182,6 +1205,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write } validatedFloatSamples += len(ts.Samples) validatedHistogramSamples += len(ts.Histograms) + validatedNHCBSamples += nhcb validatedExemplars += len(ts.Exemplars) } for h, counter := range labelSetCounters { @@ -1195,7 +1219,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write } } - return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil + return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, nil } func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index cd2d742b5a..c467c7ed37 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -296,6 +296,7 @@ func TestDistributor_Push(t *testing.T) { # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5 + cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0 `, }, "A push to 2 happy ingesters should succeed, histograms": { @@ -314,6 +315,7 @@ func TestDistributor_Push(t *testing.T) { # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5 + cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0 `, }, "A push to 1 happy ingesters should fail, histograms": { @@ -331,6 +333,7 @@ func TestDistributor_Push(t *testing.T) { # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 10 + cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0 `, }, "A push exceeding burst size should fail, histograms": { @@ -349,6 +352,7 @@ func TestDistributor_Push(t *testing.T) { # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 + cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0 `, }, } { @@ -434,6 +438,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) { d.receivedSamples.WithLabelValues("userA", sampleMetricTypeFloat).Add(5) d.receivedSamples.WithLabelValues("userB", sampleMetricTypeFloat).Add(10) d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogram).Add(15) + d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogramNHCB).Add(0) d.receivedExemplars.WithLabelValues("userA").Add(5) d.receivedExemplars.WithLabelValues("userB").Add(10) d.receivedMetadata.WithLabelValues("userA").Add(5) @@ -492,6 +497,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) { cortex_distributor_received_samples_total{type="float",user="userA"} 5 cortex_distributor_received_samples_total{type="float",user="userB"} 10 cortex_distributor_received_samples_total{type="histogram",user="userC"} 15 + cortex_distributor_received_samples_total{type="nhcb",user="userC"} 0 # HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars. # TYPE cortex_distributor_received_exemplars_total counter @@ -550,6 +556,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) { # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userB"} 10 cortex_distributor_received_samples_total{type="histogram",user="userC"} 15 + cortex_distributor_received_samples_total{type="nhcb",user="userC"} 0 # HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples. # TYPE cortex_distributor_samples_in_total counter