Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Add dimensison `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
42 changes: 33 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
Expand Down Expand Up @@ -73,8 +74,9 @@ const (
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8

sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
sampleMetricTypeHistogramNHCB = "nhcb" // Native histogram with custom buckets schema
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -523,10 +525,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
Expand Down Expand Up @@ -736,15 +740,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

numFloatSamples := 0
numHistogramSamples := 0
numNHCBSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numFloatSamples += len(ts.Samples)
nhcb := countNHCB(ts.Histograms)
numNHCBSamples += nhcb
numHistogramSamples += len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numHistogramSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(numNHCBSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
Expand Down Expand Up @@ -799,14 +807,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
if err != nil {
return nil, err
}
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)

d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(validatedNHCBSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

Expand Down Expand Up @@ -1014,7 +1023,18 @@ type samplesLabelSetEntry struct {
labels labels.Labels
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
// countNHCB returns the number of native histograms with custom buckets schema in the given slice.
func countNHCB(histograms []cortexpb.Histogram) int {
n := 0
for _, h := range histograms {
if histogram.IsCustomBucketsSchema(h.GetSchema()) {
n++
}
}
return n
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()

Expand All @@ -1026,6 +1046,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
nhSeriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedNHCBSamples := 0
validatedExemplars := 0
limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID)

Expand All @@ -1045,9 +1066,10 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for _, ts := range req.Timeseries {
for i := range req.Timeseries {
ts := &req.Timeseries[i]
if len(ts.Labels) == 0 {
return nil, nil, nil, nil, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found")
return nil, nil, nil, nil, 0, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found")
}

if limits.AcceptHASamples && limits.AcceptMixedHASamples {
Expand Down Expand Up @@ -1138,9 +1160,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, nil, nil, nil, 0, 0, 0, nil, err
return nil, nil, nil, nil, 0, 0, 0, 0, nil, err
}
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)
validatedSeries, validationErr := d.validateSeries(*ts, userID, skipLabelNameValidation, limits)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand All @@ -1160,6 +1182,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// TODO: use pool.
labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits))
}
nhcb := countNHCB(ts.Histograms)
for _, l := range matchedLabelSetLimits {
if c, exists := labelSetCounters[l.Hash]; exists {
c.floatSamples += int64(len(ts.Samples))
Expand All @@ -1182,6 +1205,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedNHCBSamples += nhcb
validatedExemplars += len(ts.Exemplars)
}
for h, counter := range labelSetCounters {
Expand All @@ -1195,7 +1219,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
}

return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, nil
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push to 2 happy ingesters should succeed, histograms": {
Expand All @@ -314,6 +315,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push to 1 happy ingesters should fail, histograms": {
Expand All @@ -331,6 +333,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 10
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push exceeding burst size should fail, histograms": {
Expand All @@ -349,6 +352,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
} {
Expand Down Expand Up @@ -434,6 +438,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.receivedSamples.WithLabelValues("userA", sampleMetricTypeFloat).Add(5)
d.receivedSamples.WithLabelValues("userB", sampleMetricTypeFloat).Add(10)
d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogram).Add(15)
d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogramNHCB).Add(0)
d.receivedExemplars.WithLabelValues("userA").Add(5)
d.receivedExemplars.WithLabelValues("userB").Add(10)
d.receivedMetadata.WithLabelValues("userA").Add(5)
Expand Down Expand Up @@ -492,6 +497,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
cortex_distributor_received_samples_total{type="float",user="userA"} 5
cortex_distributor_received_samples_total{type="float",user="userB"} 10
cortex_distributor_received_samples_total{type="histogram",user="userC"} 15
cortex_distributor_received_samples_total{type="nhcb",user="userC"} 0

# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -550,6 +556,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userB"} 10
cortex_distributor_received_samples_total{type="histogram",user="userC"} 15
cortex_distributor_received_samples_total{type="nhcb",user="userC"} 0

# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
Expand Down
Loading