Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Querier: Add active API requests tracker logging to help with OOMKill troubleshooting. #7216
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/axiomhq/hyperloglog v0.2.6
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.3.0
github.com/edsrzf/mmap-go v1.2.0
github.com/go-openapi/swag/jsonutils v0.25.1
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -148,7 +149,6 @@ require (
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/edsrzf/mmap-go v1.2.0 // indirect
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
Expand Down
51 changes: 39 additions & 12 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/request_tracker"
)

const (
Expand Down Expand Up @@ -285,35 +286,61 @@ func NewQuerierHandler(

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
var instantQueryHandler http.Handler
var rangedQueryHandler http.Handler
var legacyAPIHandler http.Handler
if requestTracker != nil {
apiHandler = request_tracker.NewRequestWrapper(promRouter, requestTracker, &request_tracker.ApiExtractor{})
legacyAPIHandler = request_tracker.NewRequestWrapper(legacyPromRouter, requestTracker, &request_tracker.ApiExtractor{})
instantQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.InstantQueryHandler), requestTracker, &request_tracker.InstantQueryExtractor{})
rangedQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.RangeQueryHandler), requestTracker, &request_tracker.RangedQueryExtractor{})

httpHeaderMiddleware := &HTTPHeaderMiddleware{
TargetHeaders: cfg.HTTPRequestHeadersToLog,
RequestIdHeader: cfg.RequestIdHeader,
}
apiHandler = httpHeaderMiddleware.Wrap(apiHandler)
legacyAPIHandler = httpHeaderMiddleware.Wrap(legacyAPIHandler)
instantQueryHandler = httpHeaderMiddleware.Wrap(instantQueryHandler)
rangedQueryHandler = httpHeaderMiddleware.Wrap(rangedQueryHandler)
} else {
apiHandler = promRouter
legacyAPIHandler = legacyPromRouter
instantQueryHandler = queryAPI.Wrap(queryAPI.InstantQueryHandler)
rangedQueryHandler = queryAPI.Wrap(queryAPI.RangeQueryHandler)
}

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(apiHandler)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyAPIHandler)

if cfg.buildInfoEnabled {
router.Path(path.Join(prefix, "/api/v1/status/buildinfo")).Methods("GET").Handler(promRouter)
Expand Down
115 changes: 115 additions & 0 deletions pkg/util/request_tracker/request_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package request_tracker

import (
"encoding/json"
"net/http"
"strings"
"time"
"unicode/utf8"

"github.com/cortexproject/cortex/pkg/util/requestmeta"
"github.com/cortexproject/cortex/pkg/util/users"
)

type Extractor interface {
Extract(r *http.Request) []byte
}

type ApiExtractor struct{}

type InstantQueryExtractor struct{}

type RangedQueryExtractor struct{}

func generateCommonMap(r *http.Request) map[string]any {
ctx := r.Context()
entryMap := make(map[string]any)
entryMap["timestampSec"] = time.Now().Unix()
entryMap["Path"] = r.URL.Path
entryMap["Method"] = r.Method
entryMap["TenantID"], _ = users.TenantID(ctx)
entryMap["RequestID"] = requestmeta.RequestIdFromContext(ctx)
entryMap["UserAgent"] = r.Header.Get("User-Agent")
entryMap["DashboardUID"] = r.Header.Get("X-Dashboard-UID")
entryMap["PanelId"] = r.Header.Get("X-Panel-Id")

return entryMap
}

func (e *ApiExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")

matches := r.URL.Query()["match[]"]
entryMap["numberOfMatches"] = len(matches)
matchesStr := strings.Join(matches, ",")

return generateJSONEntryWithTruncatedField(entryMap, "matches", matchesStr)
}

func (e *InstantQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["time"] = r.URL.Query().Get("time")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func (e *RangedQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")
entryMap["step"] = r.URL.Query().Get("step")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func generateJSONEntry(entryMap map[string]any) []byte {
jsonEntry, err := json.Marshal(entryMap)
if err != nil {
return []byte{}
}

return jsonEntry
}

func generateJSONEntryWithTruncatedField(entryMap map[string]any, fieldName, fieldValue string) []byte {
entryMap[fieldName] = ""
minEntryJSON := generateJSONEntry(entryMap)
entryMap[fieldName] = trimForJsonMarshal(fieldValue, maxEntrySize-(len(minEntryJSON)+1))
return generateJSONEntry(entryMap)
}

func trimStringByBytes(str string, size int) string {
bytesStr := []byte(str)
trimIndex := len(bytesStr)
if size < len(bytesStr) {
for !utf8.RuneStart(bytesStr[size]) {
size--
}
trimIndex = size
}

return string(bytesStr[:trimIndex])
}

func trimForJsonMarshal(field string, size int) string {
return trimForJsonMarshalRecursive(field, size, 0, size)
}

func trimForJsonMarshalRecursive(field string, size int, repeatCount int, repeatSize int) string {
//Should only repeat once since were over slightly over cutting based on the encoded size if we miss once
if repeatCount > 1 {
return ""
}

fieldTrimmed := trimStringByBytes(field, repeatSize)
fieldEncoded, err := json.Marshal(fieldTrimmed)
if err != nil {
return ""
}
if len(fieldEncoded) > size {
repeatSize = repeatSize - (len(fieldEncoded) - repeatSize)
return trimForJsonMarshalRecursive(fieldTrimmed, size, repeatCount+1, repeatSize)
}
return fieldTrimmed
}
90 changes: 90 additions & 0 deletions pkg/util/request_tracker/request_extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package request_tracker

import (
"encoding/json"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetSeriesExtractor(t *testing.T) {
extractor := &ApiExtractor{}
req := httptest.NewRequest("GET", "/api/v1/series", nil)
q := req.URL.Query()
q.Add("limit", "100")
q.Add("match[]", "up")
q.Add("match[]", "down")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]any
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "100", data["limit"])
assert.Equal(t, float64(2), data["numberOfMatches"])
assert.Contains(t, data["matches"], "up")
}

func TestInstantQueryExtractor(t *testing.T) {
extractor := &InstantQueryExtractor{}
req := httptest.NewRequest("GET", "/api/v1/query", nil)
q := req.URL.Query()
q.Add("query", "up{job=\"prometheus\"}")
q.Add("time", "1234567890")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]any
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "1234567890", data["time"])
assert.Equal(t, "up{job=\"prometheus\"}", data["query"])
}

func TestRangedQueryExtractor(t *testing.T) {
extractor := &RangedQueryExtractor{}
req := httptest.NewRequest("GET", "/api/v1/query_range", nil)
q := req.URL.Query()
q.Add("query", "rate(http_requests_total[5m])")
q.Add("start", "1000")
q.Add("end", "2000")
q.Add("step", "15")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]any
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "1000", data["start"])
assert.Equal(t, "2000", data["end"])
assert.Equal(t, "15", data["step"])
assert.Equal(t, "rate(http_requests_total[5m])", data["query"])
}

func TestLongQueryTruncate(t *testing.T) {
longQuery := strings.Repeat("metric_name{label=\"value\"} or ", maxEntrySize*2) + "final_metric"
req := httptest.NewRequest("GET", "/api/v1/query", nil)
q := req.URL.Query()
q.Add("query", longQuery)
q.Add("time", "1234567890")
req.URL.RawQuery = q.Encode()

extractor := &InstantQueryExtractor{}
extractedData := extractor.Extract(req)

require.NotEmpty(t, extractedData)
assert.True(t, len(extractedData) > 0)
assert.LessOrEqual(t, len(extractedData), maxEntrySize)
assert.Contains(t, string(extractedData), "metric_name")
assert.Contains(t, string(extractedData), "1234567890")
assert.NotContains(t, string(extractedData), "final_metric")
}
Loading
Loading