diff --git a/model.go b/model.go
index 387a322..fbba85f 100644
--- a/model.go
+++ b/model.go
@@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io"
+ "time"
)
const SchemaPayload string = `{
@@ -573,3 +574,328 @@ func createAlertResponse(id string, state string, created string, datasets []str
}
]`, created, id, state, string(datasetsJSON))
}
+
+// --- New payload models for expanded test coverage ---
+
+// Dashboard payloads
+func getDashboardCreateBody() string {
+ return `{
+ "title": "Quest Test Dashboard",
+ "description": "Dashboard created by quest integration test",
+ "tags": ["quest-test", "smoke"],
+ "tiles": []
+ }`
+}
+
+func getDashboardUpdateBody() string {
+ return `{
+ "title": "Quest Test Dashboard Updated",
+ "description": "Dashboard updated by quest integration test",
+ "tags": ["quest-test", "smoke", "updated"]
+ }`
+}
+
+func getDashboardAddTileBody(stream string) string {
+ return fmt.Sprintf(`{
+ "name": "Log Count Tile",
+ "description": "Shows total log count",
+ "query": "SELECT COUNT(*) as count FROM %s",
+ "visualization": "table",
+ "order": 1
+ }`, stream)
+}
+
+type DashboardResponse struct {
+ Id string `json:"id"`
+ Title string `json:"title"`
+ Description string `json:"description"`
+ Tags []string `json:"tags"`
+}
+
+func getIdFromDashboardResponse(body io.Reader) string {
+ var response DashboardResponse
+ if err := json.NewDecoder(body).Decode(&response); err != nil {
+ fmt.Printf("Error decoding dashboard: %v\n", err)
+ }
+ return response.Id
+}
+
+// Filter payloads
+func getFilterCreateBody(stream string) string {
+ return fmt.Sprintf(`{
+ "stream_name": "%s",
+ "filter_name": "Quest Test Filter",
+ "query": {
+ "filter_type": "sql",
+ "filter_query": "SELECT * FROM %s WHERE level = 'error'"
+ },
+ "tags": ["quest-test"]
+ }`, stream, stream)
+}
+
+func getFilterUpdateBody(stream string) string {
+ return fmt.Sprintf(`{
+ "stream_name": "%s",
+ "filter_name": "Quest Test Filter Updated",
+ "query": {
+ "filter_type": "sql",
+ "filter_query": "SELECT * FROM %s WHERE level = 'warn'"
+ },
+ "tags": ["quest-test", "updated"]
+ }`, stream, stream)
+}
+
+type FilterResponse struct {
+ FilterId string `json:"filter_id"`
+ FilterName string `json:"filter_name"`
+}
+
+func getIdFromFilterResponse(body io.Reader) string {
+ var response FilterResponse
+ if err := json.NewDecoder(body).Decode(&response); err != nil {
+ fmt.Printf("Error decoding filter: %v\n", err)
+ }
+ return response.FilterId
+}
+
+// OTel log payload
+func getOTelLogPayload() string {
+ now := time.Now().UTC().Format(time.RFC3339Nano)
+ return fmt.Sprintf(`{
+ "resourceLogs": [
+ {
+ "resource": {
+ "attributes": [
+ {"key": "service.name", "value": {"stringValue": "quest-test-service"}},
+ {"key": "host.name", "value": {"stringValue": "quest-test-host"}}
+ ]
+ },
+ "scopeLogs": [
+ {
+ "scope": {"name": "quest-test"},
+ "logRecords": [
+ {
+ "timeUnixNano": "%s",
+ "severityNumber": 9,
+ "severityText": "INFO",
+ "body": {"stringValue": "Quest OTel test log message"},
+ "attributes": [
+ {"key": "log.type", "value": {"stringValue": "quest-otel-test"}}
+ ],
+ "traceId": "abcdef1234567890abcdef1234567890",
+ "spanId": "abcdef1234567890"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }`, now)
+}
+
+// OTel trace payload
+func getOTelTracePayload() string {
+ now := time.Now().UTC().Format(time.RFC3339Nano)
+ return fmt.Sprintf(`{
+ "resourceSpans": [
+ {
+ "resource": {
+ "attributes": [
+ {"key": "service.name", "value": {"stringValue": "quest-test-service"}},
+ {"key": "host.name", "value": {"stringValue": "quest-test-host"}}
+ ]
+ },
+ "scopeSpans": [
+ {
+ "scope": {"name": "quest-test"},
+ "spans": [
+ {
+ "traceId": "abcdef1234567890abcdef1234567890",
+ "spanId": "abcdef1234567890",
+ "name": "quest-test-span",
+ "kind": 1,
+ "startTimeUnixNano": "%s",
+ "endTimeUnixNano": "%s",
+ "status": {"code": 1},
+ "attributes": [
+ {"key": "http.method", "value": {"stringValue": "GET"}},
+ {"key": "http.status_code", "value": {"intValue": "200"}}
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }`, now, now)
+}
+
+// OTel metric payload
+func getOTelMetricPayload() string {
+ now := time.Now().UTC().Format(time.RFC3339Nano)
+ return fmt.Sprintf(`{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {"key": "service.name", "value": {"stringValue": "quest-test-service"}},
+ {"key": "host.name", "value": {"stringValue": "quest-test-host"}}
+ ]
+ },
+ "scopeMetrics": [
+ {
+ "scope": {"name": "quest-test"},
+ "metrics": [
+ {
+ "name": "quest.test.counter",
+ "unit": "1",
+ "sum": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "%s",
+ "timeUnixNano": "%s",
+ "asInt": "42",
+ "attributes": [
+ {"key": "env", "value": {"stringValue": "test"}}
+ ]
+ }
+ ],
+ "aggregationTemporality": 2,
+ "isMonotonic": true
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }`, now, now)
+}
+
+// Alert modification payloads
+func getAlertModifyBody(stream string, targetId string) string {
+ return fmt.Sprintf(`{
+ "severity": "high",
+ "title": "AlertTitle Modified",
+ "query": "select count(level) from %s where level = 'error'",
+ "alertType": "threshold",
+ "thresholdConfig": {
+ "operator": ">=",
+ "value": 50
+ },
+ "evalConfig": {
+ "rollingWindow": {
+ "evalStart": "10m",
+ "evalEnd": "now",
+ "evalFrequency": 2
+ }
+ },
+ "notificationConfig": {
+ "interval": 5
+ },
+ "targets": ["%s"],
+ "tags": ["quest-test", "modified"]
+ }`, stream, targetId)
+}
+
+// Hot tier payloads
+func getHotTierBody() string {
+ return `{
+ "size": "10GiB"
+ }`
+}
+
+// Dataset stats payload
+func getDatasetStatsBody(streams []string) string {
+ streamsJSON, _ := json.Marshal(streams)
+ return fmt.Sprintf(`{
+ "streams": %s
+ }`, string(streamsJSON))
+}
+
+// Prism datasets query payload
+func getPrismDatasetsBody(stream string) string {
+ now := time.Now().UTC()
+ startTime := now.Add(-30 * time.Minute).Format(time.RFC3339Nano)
+ endTime := now.Add(time.Second).Format(time.RFC3339Nano)
+ return fmt.Sprintf(`{
+ "query": "SELECT * FROM %s LIMIT 10",
+ "startTime": "%s",
+ "endTime": "%s"
+ }`, stream, startTime, endTime)
+}
+
+// Target update payload
+func getTargetUpdateBody() string {
+ return `{
+ "name": "targetNameUpdated",
+ "type": "webhook",
+ "endpoint": "https://webhook.site/ec627445-d52b-44e9-948d-56671df3581e",
+ "headers": {"X-Custom": "quest-test"},
+ "skipTlsCheck": true
+ }`
+}
+
+// RBAC add/remove role payloads
+func getRoleAddBody(roleName string) string {
+ return fmt.Sprintf(`["%s"]`, roleName)
+}
+
+func getMultiPrivilegeRoleBody(writerStream, readerStream string) string {
+ return fmt.Sprintf(`[{"privilege": "writer", "resource": {"stream": "%s"}}, {"privilege": "reader", "resource": {"stream": "%s"}}]`, writerStream, readerStream)
+}
+
+// NewSampleJsonWithFields merges base SampleJson fields with extra fields.
+func NewSampleJsonWithFields(extraFields map[string]interface{}) string {
+ var base map[string]interface{}
+ _ = json.Unmarshal([]byte(SampleJson), &base)
+ for k, v := range extraFields {
+ base[k] = v
+ }
+ out, _ := json.Marshal(base)
+ return string(out)
+}
+
+// NewSampleJsonBatch generates a JSON array of count events, each with a unique ID.
+func NewSampleJsonBatch(count int) string {
+ var base map[string]interface{}
+ _ = json.Unmarshal([]byte(SampleJson), &base)
+
+ events := make([]map[string]interface{}, count)
+ for i := 0; i < count; i++ {
+ evt := make(map[string]interface{}, len(base)+1)
+ for k, v := range base {
+ evt[k] = v
+ }
+ evt["batch_id"] = i
+ evt["p_timestamp"] = time.Now().UTC().Format(time.RFC3339Nano)
+ events[i] = evt
+ }
+ out, _ := json.Marshal(events)
+ return string(out)
+}
+
+func getDynamicSchemaEvent() string {
+ return `{
+ "source_time": "2024-10-27T05:13:26.742Z",
+ "level": "info",
+ "message": "Event with extra field",
+ "version": "1.2.0",
+ "user_id": 42,
+ "device_id": 100,
+ "session_id": "sess123",
+ "os": "Linux",
+ "host": "192.168.1.1",
+ "uuid": "test-uuid-001",
+ "location": "us-east-1",
+ "timezone": "UTC",
+ "user_agent": "TestAgent",
+ "runtime": "go",
+ "request_body": "test body",
+ "status_code": 200,
+ "response_time": 50,
+ "process_id": 999,
+ "app_meta": "test-meta",
+ "extra_field": "this is a new field not in original schema"
+ }`
+}
diff --git a/quest_test.go b/quest_test.go
index 8ebf253..0161896 100644
--- a/quest_test.go
+++ b/quest_test.go
@@ -36,7 +36,9 @@ const (
)
func TestSmokeListLogStream(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
req, err := NewGlob.QueryClient.NewRequest("GET", "logstream", nil)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -46,38 +48,34 @@ func TestSmokeListLogStream(t *testing.T) {
body := readAsString(response.Body)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status)
res, err := readJsonBody[[]string](bytes.NewBufferString(body))
- if err != nil {
+ if err == nil {
for _, stream := range res {
if stream == NewGlob.Stream {
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ // Stream found in list, tracked for cleanup
+ _ = stream
}
}
}
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
func TestSmokeCreateStream(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
}
func TestSmokeDetectSchema(t *testing.T) {
+ t.Parallel()
DetectSchema(t, NewGlob.QueryClient, SampleJson, SchemaBody)
}
func TestSmokeIngestEventsToStream(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
- if NewGlob.IngestorUrl.String() == "" {
- RunFlog(t, NewGlob.QueryClient, NewGlob.Stream)
- } else {
- RunFlog(t, NewGlob.IngestorClient, NewGlob.Stream)
- }
- // Calling Sleep method
- time.Sleep(120 * time.Second)
-
- QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50)
+ rt.TrackStream(NewGlob.Stream)
+ RunFlogAuto(t, NewGlob.Stream)
+ WaitForIngest(t, NewGlob.QueryClient, NewGlob.Stream, 50, 180*time.Second)
AssertStreamSchema(t, NewGlob.QueryClient, NewGlob.Stream, FlogJsonSchema)
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
// func TestTimePartition_TimeStampMismatch(t *testing.T) {
@@ -117,22 +115,25 @@ func TestSmokeIngestEventsToStream(t *testing.T) {
// }
func TestLoadStream_StaticSchema_EventWithSameFields(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
staticSchemaStream := NewGlob.Stream + "staticschema"
staticSchemaFlagHeader := map[string]string{"X-P-Static-Schema-Flag": "true"}
CreateStreamWithSchemaBody(t, NewGlob.QueryClient, staticSchemaStream, staticSchemaFlagHeader, SchemaPayload)
+ rt.TrackStream(staticSchemaStream)
if NewGlob.IngestorUrl.String() == "" {
IngestOneEventForStaticSchemaStream_SameFieldsInLog(t, NewGlob.QueryClient, staticSchemaStream)
} else {
IngestOneEventForStaticSchemaStream_SameFieldsInLog(t, NewGlob.IngestorClient, staticSchemaStream)
}
- DeleteStream(t, NewGlob.QueryClient, staticSchemaStream)
}
func TestLoadStreamBatchWithK6_StaticSchema(t *testing.T) {
if NewGlob.Mode == "load" {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
staticSchemaStream := NewGlob.Stream + "staticschema"
staticSchemaFlagHeader := map[string]string{"X-P-Static-Schema-Flag": "true"}
CreateStreamWithSchemaBody(t, NewGlob.QueryClient, staticSchemaStream, staticSchemaFlagHeader, SchemaPayload)
+ rt.TrackStream(staticSchemaStream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -172,28 +173,28 @@ func TestLoadStreamBatchWithK6_StaticSchema(t *testing.T) {
}
t.Log(string(op))
}
-
- DeleteStream(t, NewGlob.QueryClient, staticSchemaStream)
}
}
func TestLoadStream_StaticSchema_EventWithNewField(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
staticSchemaStream := NewGlob.Stream + "staticschema"
staticSchemaFlagHeader := map[string]string{"X-P-Static-Schema-Flag": "true"}
CreateStreamWithSchemaBody(t, NewGlob.QueryClient, staticSchemaStream, staticSchemaFlagHeader, SchemaPayload)
+ rt.TrackStream(staticSchemaStream)
if NewGlob.IngestorUrl.String() == "" {
IngestOneEventForStaticSchemaStream_NewFieldInLog(t, NewGlob.QueryClient, staticSchemaStream)
} else {
IngestOneEventForStaticSchemaStream_NewFieldInLog(t, NewGlob.IngestorClient, staticSchemaStream)
}
- DeleteStream(t, NewGlob.QueryClient, staticSchemaStream)
}
func TestCreateStream_WithCustomPartition_Success(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
customPartitionStream := NewGlob.Stream + "custompartition"
customHeader := map[string]string{"X-P-Custom-Partition": "level"}
CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader)
- DeleteStream(t, NewGlob.QueryClient, customPartitionStream)
+ rt.TrackStream(customPartitionStream)
}
func TestCreateStream_WithCustomPartition_Error(t *testing.T) {
@@ -203,34 +204,26 @@ func TestCreateStream_WithCustomPartition_Error(t *testing.T) {
}
func TestSmokeQueryTwoStreams(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
stream1 := NewGlob.Stream + "1"
stream2 := NewGlob.Stream + "2"
CreateStream(t, NewGlob.QueryClient, stream1)
+ rt.TrackStream(stream1)
CreateStream(t, NewGlob.QueryClient, stream2)
- if NewGlob.IngestorUrl.String() == "" {
- RunFlog(t, NewGlob.QueryClient, stream1)
- RunFlog(t, NewGlob.QueryClient, stream2)
- } else {
- RunFlog(t, NewGlob.IngestorClient, stream1)
- RunFlog(t, NewGlob.IngestorClient, stream2)
-
- }
- time.Sleep(120 * time.Second)
+ rt.TrackStream(stream2)
+ RunFlogAuto(t, stream1)
+ RunFlogAuto(t, stream2)
+ WaitForIngest(t, NewGlob.QueryClient, stream1, 1, 180*time.Second)
+ WaitForIngest(t, NewGlob.QueryClient, stream2, 1, 180*time.Second)
QueryTwoLogStreamCount(t, NewGlob.QueryClient, stream1, stream2, 100)
- DeleteStream(t, NewGlob.QueryClient, stream1)
- DeleteStream(t, NewGlob.QueryClient, stream2)
}
func TestSmokeRunQueries(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
- if NewGlob.IngestorUrl.String() == "" {
- RunFlog(t, NewGlob.QueryClient, NewGlob.Stream)
- } else {
- RunFlog(t, NewGlob.IngestorClient, NewGlob.Stream)
- }
- time.Sleep(120 * time.Second)
- // test count
- QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50)
+ rt.TrackStream(NewGlob.Stream)
+ RunFlogAuto(t, NewGlob.Stream)
+ WaitForIngest(t, NewGlob.QueryClient, NewGlob.Stream, 50, 180*time.Second)
// test yeild all values
AssertQueryOK(t, NewGlob.QueryClient, "SELECT * FROM %s", NewGlob.Stream)
AssertQueryOK(t, NewGlob.QueryClient, "SELECT * FROM %s OFFSET 25 LIMIT 25", NewGlob.Stream)
@@ -243,12 +236,12 @@ func TestSmokeRunQueries(t *testing.T) {
// test group by
AssertQueryOK(t, NewGlob.QueryClient, "SELECT method, COUNT(*) FROM %s GROUP BY method", NewGlob.Stream)
AssertQueryOK(t, NewGlob.QueryClient, `SELECT DATE_TRUNC('minute', p_timestamp) as minute, COUNT(*) FROM %s GROUP BY minute`, NewGlob.Stream)
-
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
func TestSmokeLoadWithK6Stream(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -272,10 +265,8 @@ func TestSmokeLoadWithK6Stream(t *testing.T) {
cmd.Run()
cmd.Output()
}
- time.Sleep(150 * time.Second)
- QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 20000)
+ WaitForIngest(t, NewGlob.QueryClient, NewGlob.Stream, 20000, 180*time.Second)
AssertStreamSchema(t, NewGlob.QueryClient, NewGlob.Stream, SchemaBody)
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
// func TestSmokeLoad_TimePartition_WithK6Stream(t *testing.T) {
@@ -311,9 +302,11 @@ func TestSmokeLoadWithK6Stream(t *testing.T) {
// }
func TestSmokeLoad_CustomPartition_WithK6Stream(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
custom_partition_stream := NewGlob.Stream + "custompartition"
customHeader := map[string]string{"X-P-Custom-Partition": "level"}
CreateStreamWithHeader(t, NewGlob.QueryClient, custom_partition_stream, customHeader)
+ rt.TrackStream(custom_partition_stream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -337,9 +330,7 @@ func TestSmokeLoad_CustomPartition_WithK6Stream(t *testing.T) {
cmd.Run()
cmd.Output()
}
- time.Sleep(120 * time.Second)
- QueryLogStreamCount(t, NewGlob.QueryClient, custom_partition_stream, 20000)
- DeleteStream(t, NewGlob.QueryClient, custom_partition_stream)
+ WaitForIngest(t, NewGlob.QueryClient, custom_partition_stream, 20000, 180*time.Second)
}
// func TestSmokeLoad_TimeAndCustomPartition_WithK6Stream(t *testing.T) {
@@ -375,138 +366,88 @@ func TestSmokeLoad_CustomPartition_WithK6Stream(t *testing.T) {
// }
func TestSmokeSetTarget(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
body := getTargetBody()
req, _ := NewGlob.QueryClient.NewRequest("POST", "/targets", strings.NewReader(body))
response, err := NewGlob.QueryClient.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
+ targetsBody := ListTargets(t, NewGlob.QueryClient)
+ targetId := getIdFromTargetResponse(bytes.NewReader([]byte(targetsBody)))
+ rt.TrackTarget(targetId)
}
-func TestSmokeSetAlert(t *testing.T) {
- stream := NewGlob.Stream + "alert_testing"
- CreateStream(t, NewGlob.QueryClient, stream)
- if NewGlob.IngestorUrl.String() == "" {
- cmd := exec.Command("k6",
- "run",
- "-e", fmt.Sprintf("P_URL=%s", NewGlob.QueryUrl.String()),
- "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.QueryUsername),
- "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.QueryPassword),
- "-e", fmt.Sprintf("P_STREAM=%s", stream),
- "./scripts/smoke.js")
-
- cmd.Run()
- cmd.Output()
- } else {
- cmd := exec.Command("k6",
- "run",
- "-e", fmt.Sprintf("P_URL=%s", NewGlob.IngestorUrl.String()),
- "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.IngestorUsername),
- "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.IngestorPassword),
- "-e", fmt.Sprintf("P_STREAM=%s", stream),
- "./scripts/smoke.js")
-
- cmd.Run()
- cmd.Output()
- }
- time.Sleep(120 * time.Second)
- req, _ := NewGlob.QueryClient.NewRequest("GET", "/targets", nil)
- response, err := NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- bodyTargets, _ := io.ReadAll(response.Body)
- reader1 := bytes.NewReader(bodyTargets)
- targetId := getIdFromTargetResponse(reader1)
- body := getAlertBody(stream, targetId)
- req, _ = NewGlob.QueryClient.NewRequest("POST", "/alerts", strings.NewReader(body))
- response, err = NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
-}
-
-func TestSmokeGetAlert(t *testing.T) {
- stream := NewGlob.Stream + "alert_testing"
- req, _ := NewGlob.QueryClient.NewRequest("GET", "/targets", nil)
- response, err := NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- bodyTargets, _ := io.ReadAll(response.Body)
- reader1 := bytes.NewReader(bodyTargets)
- targetId := getIdFromTargetResponse(reader1)
- req, _ = NewGlob.QueryClient.NewRequest("GET", "/alerts", nil)
- response, err = NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- body, _ := io.ReadAll(response.Body)
- reader1 = bytes.NewReader(body)
- reader2 := bytes.NewReader(body)
- expected := readAsString(reader1)
- id, state, created, datasets := getMetadataFromAlertResponse(reader2)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
- res := createAlertResponse(id, state, created, datasets)
- require.JSONEq(t, expected, res, "Get alert response doesn't match with Alert config returned")
- DeleteAlert(t, NewGlob.QueryClient, id)
- DeleteTarget(t, NewGlob.QueryClient, targetId)
- DeleteStream(t, NewGlob.QueryClient, stream)
-}
-
-func TestSmokeSetRetention(t *testing.T) {
+func TestSmokeRetentionLifecycle(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
+
+ // Set retention
req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/retention", strings.NewReader(RetentionBody))
response, err := NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
-}
+ require.NoErrorf(t, err, "Set retention failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Set retention failed with status: %s", response.Status)
-func TestSmokeGetRetention(t *testing.T) {
- req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/retention", nil)
- response, err := NewGlob.QueryClient.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
+ // Get retention
+ req, _ = NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/retention", nil)
+ response, err = NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Get retention failed: %s", err)
body := readAsString(response.Body)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
+ require.Equalf(t, 200, response.StatusCode, "Get retention failed with status: %s", response.Status)
require.JSONEq(t, RetentionBody, body, "Get retention response doesn't match with retention config returned")
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
// This test calls all the User API endpoints
// in a sequence to check if they work as expected.
func TestSmoke_AllUsersAPI(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateRole(t, NewGlob.QueryClient, "dummyrole", dummyRole)
+ rt.TrackRole("dummyrole")
AssertRole(t, NewGlob.QueryClient, "dummyrole", dummyRole)
CreateUser(t, NewGlob.QueryClient, "dummyuser")
+ rt.TrackUser("dummyuser")
CreateUserWithRole(t, NewGlob.QueryClient, "dummyanotheruser", []string{"dummyrole"})
+ rt.TrackUser("dummyanotheruser")
AssertUserRole(t, NewGlob.QueryClient, "dummyanotheruser", "dummyrole", dummyRole)
RegenPassword(t, NewGlob.QueryClient, "dummyuser")
- DeleteUser(t, NewGlob.QueryClient, "dummyuser")
- DeleteUser(t, NewGlob.QueryClient, "dummyanotheruser")
- DeleteRole(t, NewGlob.QueryClient, "dummyrole")
}
// This test checks that a new user doesn't get any role by default
// even if a default role is set.
func TestSmoke_NewUserNoRole(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
CreateRole(t, NewGlob.QueryClient, "dummyrole", dummyRole)
+ rt.TrackRole("dummyrole")
SetDefaultRole(t, NewGlob.QueryClient, "dummyrole")
AssertDefaultRole(t, NewGlob.QueryClient, "\"dummyrole\"")
CreateUser(t, NewGlob.QueryClient, "dummyuser")
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackUser("dummyuser")
}
func TestSmokeRbacBasic(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
CreateRole(t, NewGlob.QueryClient, "dummy", dummyRole)
+ rt.TrackRole("dummy")
AssertRole(t, NewGlob.QueryClient, "dummy", dummyRole)
CreateUserWithRole(t, NewGlob.QueryClient, "dummy", []string{"dummy"})
+ rt.TrackUser("dummy")
userClient := NewGlob.QueryClient
userClient.Username = "dummy"
userClient.Password = RegenPassword(t, NewGlob.QueryClient, "dummy")
checkAPIAccess(t, userClient, NewGlob.QueryClient, NewGlob.Stream, "editor")
- DeleteUser(t, NewGlob.QueryClient, "dummy")
- DeleteRole(t, NewGlob.QueryClient, "dummy")
}
func TestSmokeRoles(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
cases := []struct {
roleName string
body string
@@ -532,9 +473,11 @@ func TestSmokeRoles(t *testing.T) {
for _, tc := range cases {
t.Run(tc.roleName, func(t *testing.T) {
CreateRole(t, NewGlob.QueryClient, tc.roleName, tc.body)
+ rt.TrackRole(tc.roleName)
AssertRole(t, NewGlob.QueryClient, tc.roleName, tc.body)
username := tc.roleName + "_user"
password := CreateUserWithRole(t, NewGlob.QueryClient, username, []string{tc.roleName})
+ rt.TrackUser(username)
var ingestClient HTTPClient
queryClient := NewGlob.QueryClient
queryClient.Username = username
@@ -550,15 +493,15 @@ func TestSmokeRoles(t *testing.T) {
}
checkAPIAccess(t, queryClient, ingestClient, NewGlob.Stream, tc.roleName)
- DeleteUser(t, NewGlob.QueryClient, username)
- DeleteRole(t, NewGlob.QueryClient, tc.roleName)
})
}
}
func TestLoadStreamBatchWithK6(t *testing.T) {
if NewGlob.Mode == "load" {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -598,8 +541,6 @@ func TestLoadStreamBatchWithK6(t *testing.T) {
}
t.Log(string(op))
}
- DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
-
}
}
@@ -653,9 +594,11 @@ func TestLoadStreamBatchWithK6(t *testing.T) {
// }
func TestLoadStreamBatchWithCustomPartitionWithK6(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
customPartitionStream := NewGlob.Stream + "custompartition"
customHeader := map[string]string{"X-P-Custom-Partition": "level"}
CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader)
+ rt.TrackStream(customPartitionStream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -695,13 +638,13 @@ func TestLoadStreamBatchWithCustomPartitionWithK6(t *testing.T) {
}
t.Log(string(op))
}
-
- DeleteStream(t, NewGlob.QueryClient, customPartitionStream)
}
func TestLoadStreamNoBatchWithK6(t *testing.T) {
if NewGlob.Mode == "load" {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -791,9 +734,11 @@ func TestLoadStreamNoBatchWithK6(t *testing.T) {
// }
func TestLoadStreamNoBatchWithCustomPartitionWithK6(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
customPartitionStream := NewGlob.Stream + "custompartition"
customHeader := map[string]string{"X-P-Custom-Partition": "level"}
CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader)
+ rt.TrackStream(customPartitionStream)
if NewGlob.IngestorUrl.String() == "" {
cmd := exec.Command("k6",
"run",
@@ -831,10 +776,388 @@ func TestLoadStreamNoBatchWithCustomPartitionWithK6(t *testing.T) {
}
t.Log(string(op))
}
-
- DeleteStream(t, NewGlob.QueryClient, customPartitionStream)
}
func TestDeleteStream(t *testing.T) {
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}
+
+// ===== P0 — New Tests =====
+
+func TestSmokeHealthEndpoints(t *testing.T) {
+ t.Parallel()
+ AssertLiveness(t, NewGlob.QueryClient)
+ AssertLivenessHead(t, NewGlob.QueryClient)
+ AssertReadiness(t, NewGlob.QueryClient)
+ AssertReadinessHead(t, NewGlob.QueryClient)
+}
+
+func TestSmokeStreamInfoAndStats(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
+ rt.TrackStream(NewGlob.Stream)
+ RunFlogAuto(t, NewGlob.Stream)
+ WaitForIngest(t, NewGlob.QueryClient, NewGlob.Stream, 1, 180*time.Second)
+ AssertStreamInfo(t, NewGlob.QueryClient, NewGlob.Stream)
+ AssertStreamStats(t, NewGlob.QueryClient, NewGlob.Stream)
+}
+
+func TestSmokeOTelLogsIngestion(t *testing.T) {
+ IngestOTelLogs(t, NewGlob.QueryClient)
+}
+
+func TestSmokeOTelTracesIngestion(t *testing.T) {
+ IngestOTelTraces(t, NewGlob.QueryClient)
+}
+
+func TestSmokeOTelMetricsIngestion(t *testing.T) {
+ IngestOTelMetrics(t, NewGlob.QueryClient)
+}
+
+func TestSmokeAlertLifecycle(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "alertlifecycle"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, NewGlob.QueryClient, stream, 1, 180*time.Second)
+
+ // Create target
+ targetBody := getTargetBody()
+ req, _ := NewGlob.QueryClient.NewRequest("POST", "targets", strings.NewReader(targetBody))
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Create target failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Create target failed with status: %s", response.Status)
+
+ // Get target ID
+ req, _ = NewGlob.QueryClient.NewRequest("GET", "targets", nil)
+ response, err = NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "List targets failed: %s", err)
+ bodyTargets, _ := io.ReadAll(response.Body)
+ targetId := getIdFromTargetResponse(bytes.NewReader(bodyTargets))
+ rt.TrackTarget(targetId)
+
+ // Create alert
+ alertBody := getAlertBody(stream, targetId)
+ req, _ = NewGlob.QueryClient.NewRequest("POST", "alerts", strings.NewReader(alertBody))
+ response, err = NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Create alert failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Create alert failed with status: %s", response.Status)
+
+ // Get alert ID from list
+ req, _ = NewGlob.QueryClient.NewRequest("GET", "alerts", nil)
+ response, err = NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "List alerts failed: %s", err)
+ bodyAlerts, _ := io.ReadAll(response.Body)
+ alertId, _, _, _ := getMetadataFromAlertResponse(bytes.NewReader(bodyAlerts))
+ rt.TrackAlert(alertId)
+
+ // Get by ID
+ GetAlertById(t, NewGlob.QueryClient, alertId)
+
+ // Modify
+ modifyBody := getAlertModifyBody(stream, targetId)
+ ModifyAlert(t, NewGlob.QueryClient, alertId, modifyBody)
+
+ // Disable
+ DisableAlert(t, NewGlob.QueryClient, alertId)
+
+ // Enable
+ EnableAlert(t, NewGlob.QueryClient, alertId)
+
+ // List tags
+ ListAlertTags(t, NewGlob.QueryClient)
+}
+
+func TestNegative_IngestToNonExistentStream(t *testing.T) {
+ payload := `[{"level":"info","message":"test"}]`
+ req, _ := NewGlob.QueryClient.NewRequest("POST", "ingest", bytes.NewBufferString(payload))
+ req.Header.Add("X-P-Stream", "nonexistent_stream_xyz_12345")
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ // Parseable may auto-create the stream (200) or reject (400/404)
+ t.Logf("Ingest to non-existent stream returned status: %d", response.StatusCode)
+}
+
+func TestNegative_QueryNonExistentStream(t *testing.T) {
+ t.Parallel()
+ AssertQueryError(t, NewGlob.QueryClient, "SELECT * FROM nonexistent_stream_xyz_99999", 400)
+}
+
+func TestNegative_InvalidQuerySyntax(t *testing.T) {
+ t.Parallel()
+ AssertQueryError(t, NewGlob.QueryClient, "SELEC * FORM invalid_syntax", 400)
+}
+
+// ===== P1 — New Tests =====
+
+func TestSmokeDashboardLifecycle(t *testing.T) {
+ t.Parallel()
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ // Create
+ dashboardId := CreateDashboard(t, NewGlob.QueryClient)
+ require.NotEmptyf(t, dashboardId, "Dashboard ID should not be empty")
+ rt.TrackDashboard(dashboardId)
+
+ // List
+ ListDashboards(t, NewGlob.QueryClient)
+
+ // Get by ID
+ GetDashboardById(t, NewGlob.QueryClient, dashboardId)
+
+ // Update
+ UpdateDashboard(t, NewGlob.QueryClient, dashboardId)
+
+ // Add tile (using a generic stream name)
+ AddDashboardTile(t, NewGlob.QueryClient, dashboardId, NewGlob.Stream)
+
+ // List tags
+ ListDashboardTags(t, NewGlob.QueryClient)
+}
+
+func TestSmokeFilterLifecycle(t *testing.T) {
+ t.Parallel()
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "filtertest"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+
+ // Create
+ filterId := CreateFilter(t, NewGlob.QueryClient, stream)
+ require.NotEmptyf(t, filterId, "Filter ID should not be empty")
+ rt.TrackFilter(filterId)
+
+ // List
+ ListFilters(t, NewGlob.QueryClient)
+
+ // Get by ID
+ GetFilterById(t, NewGlob.QueryClient, filterId)
+
+ // Update
+ UpdateFilter(t, NewGlob.QueryClient, filterId, stream)
+}
+
+func TestSmokePrismHome(t *testing.T) {
+ t.Parallel()
+ AssertPrismHome(t, NewGlob.QueryClient)
+}
+
+func TestSmokePrismLogstreamInfo(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "prisminfo"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, NewGlob.QueryClient, stream, 1, 180*time.Second)
+ AssertPrismLogstreamInfo(t, NewGlob.QueryClient, stream)
+}
+
+func TestSmokePrismDatasets(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "prismdatasets"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, NewGlob.QueryClient, stream, 1, 180*time.Second)
+ AssertPrismDatasets(t, NewGlob.QueryClient, stream)
+}
+
+func TestSmokeRbacAddRemoveRoles(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ // Create roles
+ roleName1 := "testrole_add1"
+ roleName2 := "testrole_add2"
+ CreateRole(t, NewGlob.QueryClient, roleName1, RoleEditor)
+ rt.TrackRole(roleName1)
+ CreateRole(t, NewGlob.QueryClient, roleName2, RoleEditor)
+ rt.TrackRole(roleName2)
+
+ // Create user
+ CreateUser(t, NewGlob.QueryClient, "rbac_patch_user")
+ rt.TrackUser("rbac_patch_user")
+
+ // Add roles via PATCH
+ AddRolesToUser(t, NewGlob.QueryClient, "rbac_patch_user", []string{roleName1, roleName2})
+
+ // Remove one role via PATCH
+ RemoveRolesFromUser(t, NewGlob.QueryClient, "rbac_patch_user", []string{roleName2})
+
+ // List all roles
+ ListAllRoles(t, NewGlob.QueryClient)
+}
+
+func TestSmokeTargetLifecycle(t *testing.T) {
+ t.Parallel()
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ // Create
+ targetBody := getTargetBody()
+ req, _ := NewGlob.QueryClient.NewRequest("POST", "targets", strings.NewReader(targetBody))
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Create target failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Create target failed with status: %s", response.Status)
+
+ // List and get ID
+ targetsBody := ListTargets(t, NewGlob.QueryClient)
+ targetId := getIdFromTargetResponse(bytes.NewReader([]byte(targetsBody)))
+ rt.TrackTarget(targetId)
+
+ // Get by ID
+ GetTargetById(t, NewGlob.QueryClient, targetId)
+
+ // Update
+ UpdateTarget(t, NewGlob.QueryClient, targetId)
+}
+
+// ===== P2 — New Tests =====
+
+func TestSmokeAboutEndpoint(t *testing.T) {
+ t.Parallel()
+ body := AssertAbout(t, NewGlob.QueryClient)
+ require.Containsf(t, body, "version", "About response should contain version field")
+}
+
+func TestSmokeMetricsEndpoint(t *testing.T) {
+ t.Parallel()
+ body := AssertMetrics(t, NewGlob.QueryClient)
+ require.NotEmptyf(t, body, "Metrics response should not be empty")
+}
+
+func TestSmokeDemoData(t *testing.T) {
+ AssertDemoData(t, NewGlob.QueryClient)
+}
+
+func TestSmokeStreamHotTier(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "hottier"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+
+ SetStreamHotTier(t, NewGlob.QueryClient, stream)
+ GetStreamHotTier(t, NewGlob.QueryClient, stream)
+ DeleteStreamHotTier(t, NewGlob.QueryClient, stream)
+}
+
+func TestSmokeDatasetStats(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "datasetstats"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, NewGlob.QueryClient, stream, 1, 180*time.Second)
+ AssertDatasetStats(t, NewGlob.QueryClient, []string{stream})
+}
+
+func TestSmokeRunQueriesExpanded(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "queryexpanded"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, NewGlob.QueryClient, stream, 1, 180*time.Second)
+
+ // SELECT DISTINCT
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT DISTINCT method FROM %s", stream)
+ // ORDER BY ASC/DESC
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT * FROM %s ORDER BY p_timestamp ASC LIMIT 10", stream)
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT * FROM %s ORDER BY p_timestamp DESC LIMIT 10", stream)
+ // COUNT with GROUP BY and HAVING
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT method, COUNT(*) as cnt FROM %s GROUP BY method HAVING COUNT(*) > 0", stream)
+ // MIN/MAX/AVG/SUM
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT MIN(status) as min_s, MAX(status) as max_s, AVG(status) as avg_s FROM %s", stream)
+ // Large OFFSET/LIMIT
+ AssertQueryOK(t, NewGlob.QueryClient, "SELECT * FROM %s LIMIT 1 OFFSET 0", stream)
+}
+
+func TestSmokeConcurrentIngestQuery(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "concurrent"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+
+ ingestClient := NewGlob.QueryClient
+ if NewGlob.IngestorUrl.String() != "" {
+ ingestClient = NewGlob.IngestorClient
+ }
+
+ ConcurrentIngestAndQuery(t, ingestClient, NewGlob.QueryClient, stream, 20, 3)
+}
+
+func TestNegative_InvalidJsonIngest(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "invalidjson"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ AssertIngestError(t, NewGlob.QueryClient, stream, `{invalid json}`, 400)
+}
+
+func TestNegative_EmptyPayloadIngest(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "emptypayload"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ AssertIngestError(t, NewGlob.QueryClient, stream, ``, 400)
+}
+
+func TestNegative_DuplicateStreamCreation(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "duplicate"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ // Second creation should return an error (or idempotent 200)
+ req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+stream, nil)
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ t.Logf("Duplicate stream creation returned status: %d", response.StatusCode)
+}
+
+func TestNegative_DeleteNonExistentStream(t *testing.T) {
+ t.Parallel()
+ req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/nonexistent_stream_delete_test_xyz", nil)
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ require.NotEqualf(t, 200, response.StatusCode, "Deleting non-existent stream should not return 200")
+}
+
+func TestNegative_InvalidRetentionBody(t *testing.T) {
+ rt := NewResourceTracker(t, NewGlob.QueryClient)
+ stream := NewGlob.Stream + "invalidretention"
+ CreateStream(t, NewGlob.QueryClient, stream)
+ rt.TrackStream(stream)
+ invalidBody := `{"invalid": "retention"}`
+ req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+stream+"/retention", strings.NewReader(invalidBody))
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ require.NotEqualf(t, 200, response.StatusCode, "Invalid retention body should not return 200, got: %d", response.StatusCode)
+}
+
+func TestNegative_InvalidAlertBody(t *testing.T) {
+ t.Parallel()
+ invalidBody := `{"invalid": "alert"}`
+ req, _ := NewGlob.QueryClient.NewRequest("POST", "alerts", strings.NewReader(invalidBody))
+ response, err := NewGlob.QueryClient.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ require.NotEqualf(t, 200, response.StatusCode, "Invalid alert body should not return 200, got: %d", response.StatusCode)
+}
+
+// ===== P3 — New Tests =====
+
+func TestDistributed_ClusterInfo(t *testing.T) {
+ t.Parallel()
+ if NewGlob.IngestorUrl.String() == "" {
+ t.Skip("Skipping cluster test: no ingestor URL configured (standalone mode)")
+ }
+ AssertClusterInfo(t, NewGlob.QueryClient)
+}
+
+func TestDistributed_ClusterMetrics(t *testing.T) {
+ t.Parallel()
+ if NewGlob.IngestorUrl.String() == "" {
+ t.Skip("Skipping cluster test: no ingestor URL configured (standalone mode)")
+ }
+ AssertClusterMetrics(t, NewGlob.QueryClient)
+}
+
+func TestSmokeLLMEndpoint_NoAPIKey(t *testing.T) {
+ t.Parallel()
+ AssertLLMEndpointNoAPIKey(t, NewGlob.QueryClient)
+}
diff --git a/quest_utils.go b/quest_utils.go
new file mode 100644
index 0000000..ffe2467
--- /dev/null
+++ b/quest_utils.go
@@ -0,0 +1,304 @@
+// Copyright (c) 2023 Cloudnatively Services Pvt Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+// ---------------------------------------------------------------------------
+// TestMain — Health gate: polls liveness + readiness before running any tests.
+// ---------------------------------------------------------------------------
+
+func TestMain(m *testing.M) {
+ const (
+ maxWait = 60 * time.Second
+ interval = 2 * time.Second
+ )
+
+ deadline := time.Now().Add(maxWait)
+ ready := false
+
+ for time.Now().Before(deadline) {
+ if checkEndpoint(NewGlob.QueryClient, "liveness") && checkEndpoint(NewGlob.QueryClient, "readiness") {
+ ready = true
+ break
+ }
+ time.Sleep(interval)
+ }
+
+ if !ready {
+ fmt.Fprintf(os.Stderr, "FATAL: Parseable not reachable at %s after %s (checked /liveness + /readiness)\n",
+ NewGlob.QueryUrl.String(), maxWait)
+ os.Exit(1)
+ }
+
+ os.Exit(m.Run())
+}
+
+func checkEndpoint(client HTTPClient, path string) bool {
+ req, err := client.NewRequest("GET", path, nil)
+ if err != nil {
+ return false
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return false
+ }
+ defer resp.Body.Close()
+ return resp.StatusCode == http.StatusOK
+}
+
+// ---------------------------------------------------------------------------
+// WaitForIngest — Polls query count until >= minCount or timeout.
+// Replaces all hardcoded time.Sleep calls after ingestion.
+// ---------------------------------------------------------------------------
+
+func WaitForIngest(t *testing.T, client HTTPClient, stream string, minCount int, timeout time.Duration) int {
+ t.Helper()
+
+ const pollInterval = 2 * time.Second
+ deadline := time.Now().Add(timeout)
+
+ for {
+ count := queryCount(client, stream)
+ if count >= minCount {
+ return count
+ }
+ if time.Now().After(deadline) {
+ t.Fatalf("WaitForIngest: timed out after %s waiting for stream %q to reach %d events (last count: %d)",
+ timeout, stream, minCount, count)
+ }
+ time.Sleep(pollInterval)
+ }
+}
+
+// WaitForQueryable — Polls until stream returns count > 0.
+// Lighter variant of WaitForIngest for "data landed" checks.
+func WaitForQueryable(t *testing.T, client HTTPClient, stream string, timeout time.Duration) {
+ t.Helper()
+ WaitForIngest(t, client, stream, 1, timeout)
+}
+
+// queryCount returns the current count for a stream, or -1 on error.
+func queryCount(client HTTPClient, stream string) int {
+ endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
+ startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
+
+ query := map[string]interface{}{
+ "query": "select count(*) as count from " + stream,
+ "startTime": startTime,
+ "endTime": endTime,
+ }
+ queryJSON, _ := json.Marshal(query)
+ req, err := client.NewRequest("POST", "query", bytes.NewBuffer(queryJSON))
+ if err != nil {
+ return -1
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return -1
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ return -1
+ }
+
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return -1
+ }
+
+ var results []map[string]float64
+ if err := json.Unmarshal(body, &results); err != nil {
+ return -1
+ }
+ if len(results) == 0 {
+ return 0
+ }
+ return int(results[0]["count"])
+}
+
+// ---------------------------------------------------------------------------
+// UniqueStream — Returns a unique stream name with the given prefix.
+// Uses crypto/rand for collision-free names in parallel/repeated runs.
+// ---------------------------------------------------------------------------
+
+func UniqueStream(prefix string) string {
+ b := make([]byte, 4)
+ _, _ = rand.Read(b)
+ return prefix + "_" + hex.EncodeToString(b)
+}
+
+// ---------------------------------------------------------------------------
+// ResourceTracker — Guarantees cleanup even on t.Fatal/panic.
+// Registers t.Cleanup in constructor; runs all cleanups in LIFO order.
+// ---------------------------------------------------------------------------
+
+type ResourceTracker struct {
+ t *testing.T
+ client HTTPClient
+ mu sync.Mutex
+ cleanups []func()
+}
+
+func NewResourceTracker(t *testing.T, client HTTPClient) *ResourceTracker {
+ t.Helper()
+ rt := &ResourceTracker{t: t, client: client}
+ t.Cleanup(rt.cleanup)
+ return rt
+}
+
+func (rt *ResourceTracker) cleanup() {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ // Run in LIFO order
+ for i := len(rt.cleanups) - 1; i >= 0; i-- {
+ rt.cleanups[i]()
+ }
+}
+
+func (rt *ResourceTracker) addCleanup(fn func()) {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ rt.cleanups = append(rt.cleanups, fn)
+}
+
+func (rt *ResourceTracker) TrackStream(stream string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "logstream/"+stream)
+ })
+}
+
+func (rt *ResourceTracker) TrackUser(username string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "user/"+username)
+ })
+}
+
+func (rt *ResourceTracker) TrackRole(roleName string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "role/"+roleName)
+ })
+}
+
+func (rt *ResourceTracker) TrackAlert(alertID string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "alerts/"+alertID)
+ })
+}
+
+func (rt *ResourceTracker) TrackTarget(targetID string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "targets/"+targetID)
+ })
+}
+
+func (rt *ResourceTracker) TrackDashboard(dashID string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "dashboards/"+dashID)
+ })
+}
+
+func (rt *ResourceTracker) TrackFilter(filterID string) {
+ rt.addCleanup(func() {
+ deleteIgnoring404(rt.client, "DELETE", "filters/"+filterID)
+ })
+}
+
+func (rt *ResourceTracker) AddCustomCleanup(fn func()) {
+ rt.addCleanup(fn)
+}
+
+// deleteIgnoring404 sends a DELETE (or other method) request and ignores 404.
+func deleteIgnoring404(client HTTPClient, method, path string) {
+ req, err := client.NewRequest(method, path, nil)
+ if err != nil {
+ return
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return
+ }
+ resp.Body.Close()
+ // 200 or 404 are both fine during cleanup
+}
+
+// ---------------------------------------------------------------------------
+// RunFlogAuto — Wraps RunFlog using the appropriate client.
+// ---------------------------------------------------------------------------
+
+func RunFlogAuto(t *testing.T, stream string) {
+ t.Helper()
+ if NewGlob.IngestorUrl.String() != "" {
+ RunFlog(t, NewGlob.IngestorClient, stream)
+ } else {
+ RunFlog(t, NewGlob.QueryClient, stream)
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Response Assertion Helpers
+// ---------------------------------------------------------------------------
+
+func AssertResponseCode(t *testing.T, resp *http.Response, expected int) {
+ t.Helper()
+ if resp.StatusCode != expected {
+ body, _ := io.ReadAll(resp.Body)
+ snippet := string(body)
+ if len(snippet) > 200 {
+ snippet = snippet[:200] + "..."
+ }
+ t.Fatalf("Expected status %d, got %d. Body: %s", expected, resp.StatusCode, snippet)
+ }
+}
+
+func AssertResponseBodyContains(t *testing.T, resp *http.Response, substring string) {
+ t.Helper()
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatalf("Failed to read response body: %s", err)
+ }
+ if !strings.Contains(string(body), substring) {
+ snippet := string(body)
+ if len(snippet) > 200 {
+ snippet = snippet[:200] + "..."
+ }
+ t.Fatalf("Response body does not contain %q. Body: %s", substring, snippet)
+ }
+}
+
+func AssertResponseBodyNotEmpty(t *testing.T, resp *http.Response) {
+ t.Helper()
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatalf("Failed to read response body: %s", err)
+ }
+ if len(body) == 0 {
+ t.Fatal("Response body is empty")
+ }
+}
diff --git a/test_utils.go b/test_utils.go
index caabfc9..480e6ba 100644
--- a/test_utils.go
+++ b/test_utils.go
@@ -23,6 +23,8 @@ import (
"io"
"os/exec"
"strings"
+ "sync"
+ "sync/atomic"
"testing"
"time"
@@ -64,13 +66,15 @@ func Sleep() {
}
func CreateStream(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
req, _ := client.NewRequest("PUT", "logstream/"+stream, nil)
response, err := client.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status)
+ require.NoErrorf(t, err, "CreateStream(%s): request failed: %s", stream, err)
+ require.Equalf(t, 200, response.StatusCode, "CreateStream(%s): server returned http code: %s and response: %s", stream, response.Status, readAsString(response.Body))
}
func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, header map[string]string) {
+ t.Helper()
req, _ := client.NewRequest("PUT", "logstream/"+stream, nil)
for k, v := range header {
req.Header.Add(k, v)
@@ -81,6 +85,7 @@ func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, head
}
func CreateStreamWithCustompartitionError(t *testing.T, client HTTPClient, stream string, header map[string]string) {
+ t.Helper()
req, _ := client.NewRequest("PUT", "logstream/"+stream, nil)
for k, v := range header {
req.Header.Add(k, v)
@@ -90,7 +95,7 @@ func CreateStreamWithCustompartitionError(t *testing.T, client HTTPClient, strea
}
func CreateStreamWithSchemaBody(t *testing.T, client HTTPClient, stream string, header map[string]string, schema_payload string) {
-
+ t.Helper()
req, _ := client.NewRequest("PUT", "logstream/"+stream, bytes.NewBufferString(schema_payload))
for k, v := range header {
req.Header.Add(k, v)
@@ -101,6 +106,7 @@ func CreateStreamWithSchemaBody(t *testing.T, client HTTPClient, stream string,
}
func DetectSchema(t *testing.T, client HTTPClient, sampleJson string, schemaBody string) {
+ t.Helper()
req, _ := client.NewRequest("POST", "logstream/schema/detect", bytes.NewBufferString(sampleJson))
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -110,13 +116,15 @@ func DetectSchema(t *testing.T, client HTTPClient, sampleJson string, schemaBody
}
func DeleteStream(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
req, _ := client.NewRequest("DELETE", "logstream/"+stream, nil)
response, err := client.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status)
+ require.NoErrorf(t, err, "DeleteStream(%s): request failed: %s", stream, err)
+ require.Equalf(t, 200, response.StatusCode, "DeleteStream(%s): server returned http code: %s and response: %s", stream, response.Status, readAsString(response.Body))
}
func DeleteAlert(t *testing.T, client HTTPClient, alert_id string) {
+ t.Helper()
req, _ := client.NewRequest("DELETE", "alerts/"+alert_id, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -124,6 +132,7 @@ func DeleteAlert(t *testing.T, client HTTPClient, alert_id string) {
}
func DeleteTarget(t *testing.T, client HTTPClient, target_id string) {
+ t.Helper()
req, _ := client.NewRequest("DELETE", "targets/"+target_id, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -131,6 +140,7 @@ func DeleteTarget(t *testing.T, client HTTPClient, target_id string) {
}
func RunFlog(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
cmd := exec.Command("flog", "-f", "json", "-n", "50")
var out strings.Builder
cmd.Stdout = &out
@@ -152,6 +162,7 @@ func RunFlog(t *testing.T, client HTTPClient, stream string) {
}
func IngestOneEventWithTimePartition_TimeStampMismatch(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
var test_payload string = `{"source_time":"2024-03-26T18:08:00.434Z","level":"info","message":"Application is failing","version":"1.2.0","user_id":13912,"device_id":4138,"session_id":"abc","os":"Windows","host":"112.168.1.110","location":"ngeuprqhynuvpxgp","request_body":"rnkmffyawtdcindtrdqruyxbndbjpfsptzpwtujbmkwcqastmxwbvjwphmyvpnhordwljnodxhtvpjesjldtifswqbpyuhlcytmm","status_code":300,"app_meta":"ckgpibhmlusqqfunnpxbfxbc", "new_field_added_by":"ingestor 8020"}`
req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(test_payload))
req.Header.Add("X-P-Stream", stream)
@@ -161,6 +172,7 @@ func IngestOneEventWithTimePartition_TimeStampMismatch(t *testing.T, client HTTP
}
func IngestOneEventWithTimePartition_NoTimePartitionInLog(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
var test_payload string = `{"level":"info","message":"Application is failing","version":"1.2.0","user_id":13912,"device_id":4138,"session_id":"abc","os":"Windows","host":"112.168.1.110","location":"ngeuprqhynuvpxgp","request_body":"rnkmffyawtdcindtrdqruyxbndbjpfsptzpwtujbmkwcqastmxwbvjwphmyvpnhordwljnodxhtvpjesjldtifswqbpyuhlcytmm","status_code":300,"app_meta":"ckgpibhmlusqqfunnpxbfxbc", "new_field_added_by":"ingestor 8020"}`
req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(test_payload))
req.Header.Add("X-P-Stream", stream)
@@ -170,6 +182,7 @@ func IngestOneEventWithTimePartition_NoTimePartitionInLog(t *testing.T, client H
}
func IngestOneEventWithTimePartition_IncorrectDateTimeFormatTimePartitionInLog(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
var test_payload string = `{"source_time":"2024-03-26", "level":"info","message":"Application is failing","version":"1.2.0","user_id":13912,"device_id":4138,"session_id":"abc","os":"Windows","host":"112.168.1.110","location":"ngeuprqhynuvpxgp","request_body":"rnkmffyawtdcindtrdqruyxbndbjpfsptzpwtujbmkwcqastmxwbvjwphmyvpnhordwljnodxhtvpjesjldtifswqbpyuhlcytmm","status_code":300,"app_meta":"ckgpibhmlusqqfunnpxbfxbc", "new_field_added_by":"ingestor 8020"}`
req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(test_payload))
req.Header.Add("X-P-Stream", stream)
@@ -179,6 +192,7 @@ func IngestOneEventWithTimePartition_IncorrectDateTimeFormatTimePartitionInLog(t
}
func IngestOneEventForStaticSchemaStream_NewFieldInLog(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
var test_payload string = `{"source_time":"2024-03-26", "level":"info","message":"Application is failing","version":"1.2.0","user_id":13912,"device_id":4138,"session_id":"abc","os":"Windows","host":"112.168.1.110","location":"ngeuprqhynuvpxgp","request_body":"rnkmffyawtdcindtrdqruyxbndbjpfsptzpwtujbmkwcqastmxwbvjwphmyvpnhordwljnodxhtvpjesjldtifswqbpyuhlcytmm","status_code":300,"app_meta":"ckgpibhmlusqqfunnpxbfxbc", "new_field_added_by":"ingestor 8020"}`
req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(test_payload))
req.Header.Add("X-P-Stream", stream)
@@ -188,6 +202,7 @@ func IngestOneEventForStaticSchemaStream_NewFieldInLog(t *testing.T, client HTTP
}
func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
var test_payload string = `{"source_time":"2024-03-26", "level":"info","message":"Application is failing","version":"1.2.0","user_id":13912,"device_id":4138,"session_id":"abc","os":"Windows","host":"112.168.1.110","location":"ngeuprqhynuvpxgp","request_body":"rnkmffyawtdcindtrdqruyxbndbjpfsptzpwtujbmkwcqastmxwbvjwphmyvpnhordwljnodxhtvpjesjldtifswqbpyuhlcytmm","status_code":300,"app_meta":"ckgpibhmlusqqfunnpxbfxbc"}`
req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(test_payload))
req.Header.Add("X-P-Stream", stream)
@@ -197,6 +212,7 @@ func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HT
}
func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) {
+ t.Helper()
// Query last 30 minutes of data only
endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
@@ -217,7 +233,7 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u
}
func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream string, count uint64) {
- // Query last 30 minutes of data only
+ t.Helper()
now := time.Now()
startTime := now.AddDate(0, 0, -33).Format(time.RFC3339Nano)
endTime := now.AddDate(0, 0, -27).Format(time.RFC3339Nano)
@@ -238,7 +254,7 @@ func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream stri
}
func QueryTwoLogStreamCount(t *testing.T, client HTTPClient, stream1 string, stream2 string, count uint64) {
- // Query last 30 minutes of data only
+ t.Helper()
endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
@@ -258,7 +274,7 @@ func QueryTwoLogStreamCount(t *testing.T, client HTTPClient, stream1 string, str
}
func AssertQueryOK(t *testing.T, client HTTPClient, query string, args ...any) {
- // Query last 30 minutes of data only
+ t.Helper()
endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
@@ -283,15 +299,17 @@ func AssertQueryOK(t *testing.T, client HTTPClient, query string, args ...any) {
}
func AssertStreamSchema(t *testing.T, client HTTPClient, stream string, schema string) {
+ t.Helper()
req, _ := client.NewRequest("GET", "logstream/"+stream+"/schema", nil)
response, err := client.Do(req)
- require.NoErrorf(t, err, "Request failed: %s", err)
+ require.NoErrorf(t, err, "AssertStreamSchema(%s): request failed: %s", stream, err)
body := readAsString(response.Body)
- require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
- require.JSONEq(t, schema, body, "Get schema response doesn't match with expected schema")
+ require.Equalf(t, 200, response.StatusCode, "AssertStreamSchema(%s): server returned http code: %s and response: %s", stream, response.Status, body)
+ require.JSONEq(t, schema, body, "AssertStreamSchema(%s): actual schema doesn't match expected.\nActual: %s", stream, body)
}
func CreateRole(t *testing.T, client HTTPClient, name string, role string) {
+ t.Helper()
req, _ := client.NewRequest("PUT", "role/"+name, strings.NewReader(role))
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -299,6 +317,7 @@ func CreateRole(t *testing.T, client HTTPClient, name string, role string) {
}
func AssertRole(t *testing.T, client HTTPClient, name string, role string) {
+ t.Helper()
req, _ := client.NewRequest("GET", "role/"+name, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -308,6 +327,7 @@ func AssertRole(t *testing.T, client HTTPClient, name string, role string) {
}
func CreateUser(t *testing.T, client HTTPClient, user string) string {
+ t.Helper()
req, _ := client.NewRequest("POST", "user/"+user, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -317,6 +337,7 @@ func CreateUser(t *testing.T, client HTTPClient, user string) string {
}
func CreateUserWithRole(t *testing.T, client HTTPClient, user string, roles []string) string {
+ t.Helper()
payload, _ := json.Marshal(roles)
req, _ := client.NewRequest("POST", "user/"+user, bytes.NewBuffer(payload))
response, err := client.Do(req)
@@ -327,6 +348,7 @@ func CreateUserWithRole(t *testing.T, client HTTPClient, user string, roles []st
}
func AssignRolesToUser(t *testing.T, client HTTPClient, user string, roles []string) {
+ t.Helper()
payload, _ := json.Marshal(roles)
req, _ := client.NewRequest("PUT", "user/"+user+"/role", bytes.NewBuffer(payload))
response, err := client.Do(req)
@@ -335,6 +357,7 @@ func AssignRolesToUser(t *testing.T, client HTTPClient, user string, roles []str
}
func AssertUserRole(t *testing.T, client HTTPClient, user string, roleName, roleBody string) {
+ t.Helper()
req, _ := client.NewRequest("GET", "user/"+user+"/role", nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -345,6 +368,7 @@ func AssertUserRole(t *testing.T, client HTTPClient, user string, roleName, role
}
func RegenPassword(t *testing.T, client HTTPClient, user string) string {
+ t.Helper()
req, _ := client.NewRequest("POST", "user/"+user+"/generate-new-password", nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -354,6 +378,7 @@ func RegenPassword(t *testing.T, client HTTPClient, user string) string {
}
func SetUserRole(t *testing.T, client HTTPClient, user string, roles []string) {
+ t.Helper()
payload, _ := json.Marshal(roles)
req, _ := client.NewRequest("PUT", "user/"+user+"/role", bytes.NewBuffer(payload))
response, err := client.Do(req)
@@ -362,6 +387,7 @@ func SetUserRole(t *testing.T, client HTTPClient, user string, roles []string) {
}
func DeleteUser(t *testing.T, client HTTPClient, user string) {
+ t.Helper()
req, _ := client.NewRequest("DELETE", "user/"+user, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -369,6 +395,7 @@ func DeleteUser(t *testing.T, client HTTPClient, user string) {
}
func DeleteRole(t *testing.T, client HTTPClient, roleName string) {
+ t.Helper()
req, _ := client.NewRequest("DELETE", "role/"+roleName, nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -376,6 +403,7 @@ func DeleteRole(t *testing.T, client HTTPClient, roleName string) {
}
func SetDefaultRole(t *testing.T, client HTTPClient, roleName string) {
+ t.Helper()
payload, _ := json.Marshal(roleName)
req, _ := client.NewRequest("PUT", "role/default", bytes.NewBuffer(payload))
response, err := client.Do(req)
@@ -384,6 +412,7 @@ func SetDefaultRole(t *testing.T, client HTTPClient, roleName string) {
}
func AssertDefaultRole(t *testing.T, client HTTPClient, roleName string) {
+ t.Helper()
req, _ := client.NewRequest("GET", "role/default", nil)
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
@@ -393,6 +422,7 @@ func AssertDefaultRole(t *testing.T, client HTTPClient, roleName string) {
}
func PutSingleEventExpectErr(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
payload := `{
"id": "id;objectId",
"maxRunDistance": "float;1;20;1",
@@ -415,6 +445,7 @@ func PutSingleEventExpectErr(t *testing.T, client HTTPClient, stream string) {
}
func PutSingleEvent(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
payload := `{
"id": "id;objectId",
"maxRunDistance": "float;1;20;1",
@@ -437,6 +468,7 @@ func PutSingleEvent(t *testing.T, client HTTPClient, stream string) {
}
func checkAPIAccess(t *testing.T, queryClient HTTPClient, ingestClient HTTPClient, stream string, role string) {
+ t.Helper()
switch role {
case "editor":
// Check access to non-protected API
@@ -512,3 +544,655 @@ func checkAPIAccess(t *testing.T, queryClient HTTPClient, ingestClient HTTPClien
require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
}
}
+
+// --- New utility functions for expanded test coverage ---
+
+// Health check utilities
+func AssertLiveness(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "liveness", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Liveness request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Liveness check failed with status: %s", response.Status)
+}
+
+func AssertLivenessHead(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewRequest("HEAD", "liveness", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "HEAD liveness request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "HEAD liveness check failed with status: %s", response.Status)
+}
+
+func AssertReadiness(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "readiness", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Readiness request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Readiness check failed with status: %s", response.Status)
+}
+
+func AssertReadinessHead(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewRequest("HEAD", "readiness", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "HEAD readiness request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "HEAD readiness check failed with status: %s", response.Status)
+}
+
+// Stream info and stats utilities
+func AssertStreamInfo(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "logstream/"+stream+"/info", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Stream info request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Stream info failed with status: %s, body: %s", response.Status, body)
+ require.NotEmptyf(t, body, "Stream info response should not be empty")
+}
+
+func AssertStreamStats(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "logstream/"+stream+"/stats", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Stream stats request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Stream stats failed with status: %s, body: %s", response.Status, body)
+ require.NotEmptyf(t, body, "Stream stats response should not be empty")
+}
+
+// OTel ingestion utilities
+func IngestOTelLogs(t *testing.T, client HTTPClient) {
+ t.Helper()
+ payload := getOTelLogPayload()
+ req, _ := client.NewOTelRequest("POST", "logs", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "OTel logs ingestion request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "OTel logs ingestion failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func IngestOTelTraces(t *testing.T, client HTTPClient) {
+ t.Helper()
+ payload := getOTelTracePayload()
+ req, _ := client.NewOTelRequest("POST", "traces", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "OTel traces ingestion request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "OTel traces ingestion failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func IngestOTelMetrics(t *testing.T, client HTTPClient) {
+ t.Helper()
+ payload := getOTelMetricPayload()
+ req, _ := client.NewOTelRequest("POST", "metrics", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "OTel metrics ingestion request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "OTel metrics ingestion failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+// Alert lifecycle utilities
+func GetAlertById(t *testing.T, client HTTPClient, alertId string) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "alerts/"+alertId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get alert by ID request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get alert by ID failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func ModifyAlert(t *testing.T, client HTTPClient, alertId string, payload string) {
+ t.Helper()
+ req, _ := client.NewRequest("PUT", "alerts/"+alertId, strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Modify alert request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Modify alert failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func DisableAlert(t *testing.T, client HTTPClient, alertId string) {
+ t.Helper()
+ req, _ := client.NewRequest("PUT", "alerts/"+alertId+"/disable", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Disable alert request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Disable alert failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func EnableAlert(t *testing.T, client HTTPClient, alertId string) {
+ t.Helper()
+ req, _ := client.NewRequest("PUT", "alerts/"+alertId+"/enable", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Enable alert request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Enable alert failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func ListAlertTags(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "alerts/tags", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List alert tags request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List alert tags failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func EvaluateAlert(t *testing.T, client HTTPClient, alertId string) {
+ t.Helper()
+ req, _ := client.NewRequest("POST", "alerts/"+alertId+"/evaluate", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Evaluate alert request failed: %s", err)
+ body := readAsString(response.Body)
+ // Evaluate may return 200 or other success codes depending on state
+ require.Containsf(t, []int{200, 202}, response.StatusCode, "Evaluate alert returned unexpected status: %s, body: %s", response.Status, body)
+}
+
+// Dashboard CRUD utilities
+func CreateDashboard(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ payload := getDashboardCreateBody()
+ req, _ := client.NewRequest("POST", "dashboards", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Create dashboard request failed: %s", err)
+ body, _ := io.ReadAll(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Create dashboard failed with status: %s, body: %s", response.Status, string(body))
+ reader := bytes.NewReader(body)
+ return getIdFromDashboardResponse(reader)
+}
+
+func ListDashboards(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "dashboards", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List dashboards request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List dashboards failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func GetDashboardById(t *testing.T, client HTTPClient, dashboardId string) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "dashboards/"+dashboardId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get dashboard by ID request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get dashboard failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func UpdateDashboard(t *testing.T, client HTTPClient, dashboardId string) {
+ t.Helper()
+ payload := getDashboardUpdateBody()
+ req, _ := client.NewRequest("PUT", "dashboards/"+dashboardId, strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Update dashboard request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Update dashboard failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func AddDashboardTile(t *testing.T, client HTTPClient, dashboardId string, stream string) {
+ t.Helper()
+ payload := getDashboardAddTileBody(stream)
+ req, _ := client.NewRequest("PUT", "dashboards/"+dashboardId+"/add_tile", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Add dashboard tile request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Add tile failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func ListDashboardTags(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "dashboards/list_tags", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List dashboard tags request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List dashboard tags failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func DeleteDashboard(t *testing.T, client HTTPClient, dashboardId string) {
+ t.Helper()
+ req, _ := client.NewRequest("DELETE", "dashboards/"+dashboardId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Delete dashboard request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Delete dashboard failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+// Filter CRUD utilities
+func CreateFilter(t *testing.T, client HTTPClient, stream string) string {
+ t.Helper()
+ payload := getFilterCreateBody(stream)
+ req, _ := client.NewRequest("POST", "filters", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Create filter request failed: %s", err)
+ body, _ := io.ReadAll(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Create filter failed with status: %s, body: %s", response.Status, string(body))
+ reader := bytes.NewReader(body)
+ return getIdFromFilterResponse(reader)
+}
+
+func ListFilters(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "filters", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List filters request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List filters failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func GetFilterById(t *testing.T, client HTTPClient, filterId string) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "filters/"+filterId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get filter by ID request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get filter failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func UpdateFilter(t *testing.T, client HTTPClient, filterId string, stream string) {
+ t.Helper()
+ payload := getFilterUpdateBody(stream)
+ req, _ := client.NewRequest("PUT", "filters/"+filterId, strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Update filter request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Update filter failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func DeleteFilter(t *testing.T, client HTTPClient, filterId string) {
+ t.Helper()
+ req, _ := client.NewRequest("DELETE", "filters/"+filterId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Delete filter request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Delete filter failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+// Prism API utilities
+func AssertPrismHome(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewPrismRequest("GET", "home", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Prism home request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Prism home failed with status: %s, body: %s", response.Status, body)
+}
+
+func AssertPrismLogstreamInfo(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ req, _ := client.NewPrismRequest("GET", "logstream/"+stream+"/info", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Prism logstream info request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Prism logstream info failed with status: %s, body: %s", response.Status, body)
+}
+
+func AssertPrismDatasets(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ payload := getPrismDatasetsBody(stream)
+ req, _ := client.NewPrismRequest("POST", "datasets", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Prism datasets request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Prism datasets failed with status: %s, body: %s", response.Status, body)
+}
+
+// RBAC add/remove role utilities
+func AddRolesToUser(t *testing.T, client HTTPClient, user string, roles []string) {
+ t.Helper()
+ payload, _ := json.Marshal(roles)
+ req, _ := client.NewRequest("PATCH", "user/"+user+"/role/add", bytes.NewBuffer(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Add roles request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Add roles failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func RemoveRolesFromUser(t *testing.T, client HTTPClient, user string, roles []string) {
+ t.Helper()
+ payload, _ := json.Marshal(roles)
+ req, _ := client.NewRequest("PATCH", "user/"+user+"/role/remove", bytes.NewBuffer(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Remove roles request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Remove roles failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func ListAllRoles(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "roles", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List roles request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List roles failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+// Target CRUD utilities
+func ListTargets(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "targets", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List targets request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List targets failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func GetTargetById(t *testing.T, client HTTPClient, targetId string) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "targets/"+targetId, nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get target by ID request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get target failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func UpdateTarget(t *testing.T, client HTTPClient, targetId string) {
+ t.Helper()
+ payload := getTargetUpdateBody()
+ req, _ := client.NewRequest("PUT", "targets/"+targetId, strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Update target request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Update target failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+// About, metrics, demo data utilities
+func AssertAbout(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "about", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "About request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "About failed with status: %s, body: %s", response.Status, body)
+ require.NotEmptyf(t, body, "About response should not be empty")
+ return body
+}
+
+func AssertMetrics(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "metrics", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Metrics request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Metrics failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func AssertDemoData(t *testing.T, client HTTPClient) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "demodata", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Demo data request failed: %s", err)
+ require.Containsf(t, []int{200, 404}, response.StatusCode, "Demo data returned unexpected status: %s", response.Status)
+}
+
+// Hot tier utilities
+func SetStreamHotTier(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ payload := getHotTierBody()
+ req, _ := client.NewRequest("PUT", "logstream/"+stream+"/hottier", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Set hot tier request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Set hot tier failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func GetStreamHotTier(t *testing.T, client HTTPClient, stream string) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "logstream/"+stream+"/hottier", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get hot tier request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get hot tier failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func DeleteStreamHotTier(t *testing.T, client HTTPClient, stream string) {
+ t.Helper()
+ req, _ := client.NewRequest("DELETE", "logstream/"+stream+"/hottier", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Delete hot tier request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Delete hot tier failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+// Dataset stats utility
+func AssertDatasetStats(t *testing.T, client HTTPClient, streams []string) string {
+ t.Helper()
+ payload := getDatasetStatsBody(streams)
+ req, _ := client.NewRequest("POST", "dataset_stats", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Dataset stats request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Dataset stats failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+// Negative test utilities
+func AssertQueryError(t *testing.T, client HTTPClient, query string, expectedStatus int) {
+ t.Helper()
+ endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
+ startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
+
+ queryJSON, _ := json.Marshal(map[string]interface{}{
+ "query": query,
+ "startTime": startTime,
+ "endTime": endTime,
+ })
+
+ req, _ := client.NewRequest("POST", "query", bytes.NewBuffer(queryJSON))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ require.Equalf(t, expectedStatus, response.StatusCode, "Expected status %d but got %s for query: %s", expectedStatus, response.Status, query)
+}
+
+func AssertIngestError(t *testing.T, client HTTPClient, stream string, payload string, expectedStatus int) {
+ t.Helper()
+ req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(payload))
+ req.Header.Add("X-P-Stream", stream)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ require.Equalf(t, expectedStatus, response.StatusCode, "Expected status %d but got %s for ingest to stream %s", expectedStatus, response.Status, stream)
+}
+
+// Concurrent ingest+query utility
+func ConcurrentIngestAndQuery(t *testing.T, ingestClient HTTPClient, queryClient HTTPClient, stream string, ingestCount int, queryConcurrency int) {
+ t.Helper()
+ var wg sync.WaitGroup
+ var errors int64
+
+ // Ingest goroutine
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < ingestCount; i++ {
+ payload := fmt.Sprintf(`[{"level":"info","message":"concurrent test %d","host":"test-host"}]`, i)
+ req, _ := ingestClient.NewRequest("POST", "ingest", bytes.NewBufferString(payload))
+ req.Header.Add("X-P-Stream", stream)
+ response, err := ingestClient.Do(req)
+ if err != nil || response.StatusCode >= 500 {
+ atomic.AddInt64(&errors, 1)
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }()
+
+ // Query goroutines
+ for i := 0; i < queryConcurrency; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for j := 0; j < 5; j++ {
+ endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
+ startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
+ queryJSON, _ := json.Marshal(map[string]interface{}{
+ "query": "SELECT COUNT(*) as count FROM " + stream,
+ "startTime": startTime,
+ "endTime": endTime,
+ })
+ req, _ := queryClient.NewRequest("POST", "query", bytes.NewBuffer(queryJSON))
+ response, err := queryClient.Do(req)
+ if err != nil || response.StatusCode >= 500 {
+ atomic.AddInt64(&errors, 1)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }()
+ }
+
+ wg.Wait()
+ require.Equalf(t, int64(0), errors, "Concurrent ingest+query had %d 500-level errors", errors)
+}
+
+// Cluster API utilities (for distributed mode)
+func AssertClusterInfo(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "cluster/info", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Cluster info request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Cluster info failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+func AssertClusterMetrics(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "cluster/metrics", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Cluster metrics request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Cluster metrics failed with status: %s, body: %s", response.Status, body)
+ return body
+}
+
+// LLM endpoint utility
+func AssertLLMEndpointNoAPIKey(t *testing.T, client HTTPClient) {
+ t.Helper()
+ payload := `{"prompt": "test"}`
+ req, _ := client.NewRequest("POST", "llm", strings.NewReader(payload))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "LLM request failed: %s", err)
+ // Without API key, should return an error status (400 or 401 or 500)
+ require.NotEqualf(t, 200, response.StatusCode, "LLM endpoint should fail without API key, got 200")
+}
+
+// --- Use case test utilities ---
+
+func SetRetention(t *testing.T, client HTTPClient, stream string, body string) {
+ t.Helper()
+ req, _ := client.NewRequest("PUT", "logstream/"+stream+"/retention", strings.NewReader(body))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Set retention request failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Set retention failed with status: %s, body: %s", response.Status, readAsString(response.Body))
+}
+
+func AssertRetention(t *testing.T, client HTTPClient, stream string, expectedBody string) {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "logstream/"+stream+"/retention", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get retention request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get retention failed with status: %s, body: %s", response.Status, body)
+ require.JSONEq(t, expectedBody, body, "Retention config doesn't match expected")
+}
+
+func AssertForbidden(t *testing.T, client HTTPClient, method, path string, body io.Reader) {
+ t.Helper()
+ req, _ := client.NewRequest(method, path, body)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ respBody := readAsString(response.Body)
+ require.Equalf(t, 403, response.StatusCode, "Expected 403 Forbidden for %s %s, got status %d (%s), body: %s", method, path, response.StatusCode, response.Status, respBody)
+}
+
+func IngestCustomPayload(t *testing.T, client HTTPClient, stream string, payload string, expectedStatus int) {
+ t.Helper()
+ req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(payload))
+ req.Header.Add("X-P-Stream", stream)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Ingest request failed: %s", err)
+ require.Equalf(t, expectedStatus, response.StatusCode, "Expected status %d for ingest to %s, got: %s, body: %s", expectedStatus, stream, response.Status, readAsString(response.Body))
+}
+
+func DiscoverOTelStreams(t *testing.T, client HTTPClient) []string {
+ t.Helper()
+ req, _ := client.NewRequest("GET", "logstream", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "List streams request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "List streams failed with status: %s", response.Status)
+
+ var otelStreams []string
+ var items []interface{}
+ if err := json.Unmarshal([]byte(body), &items); err != nil {
+ t.Logf("Could not parse stream list: %s", err)
+ return nil
+ }
+ for _, item := range items {
+ var name string
+ switch v := item.(type) {
+ case string:
+ name = v
+ case map[string]interface{}:
+ if n, ok := v["name"].(string); ok {
+ name = n
+ }
+ }
+ if name == "" {
+ continue
+ }
+ lower := strings.ToLower(name)
+ if strings.Contains(lower, "otel") || strings.Contains(lower, "opentelemetry") {
+ otelStreams = append(otelStreams, name)
+ }
+ }
+ return otelStreams
+}
+
+func ConcurrentMultiStreamIngest(t *testing.T, client HTTPClient, streams []string, eventsPerStream int) {
+ t.Helper()
+ var wg sync.WaitGroup
+ var errors int64
+
+ for _, stream := range streams {
+ wg.Add(1)
+ go func(s string) {
+ defer wg.Done()
+ for i := 0; i < eventsPerStream; i++ {
+ payload := fmt.Sprintf(`[{"level":"info","message":"concurrent multi-stream test %d","host":"test-host-%s"}]`, i, s)
+ req, _ := client.NewRequest("POST", "ingest", bytes.NewBufferString(payload))
+ req.Header.Add("X-P-Stream", s)
+ resp, err := client.Do(req)
+ if err != nil {
+ atomic.AddInt64(&errors, 1)
+ continue
+ }
+ resp.Body.Close()
+ if resp.StatusCode >= 500 {
+ atomic.AddInt64(&errors, 1)
+ }
+ }
+ }(stream)
+ }
+
+ wg.Wait()
+ require.Equalf(t, int64(0), errors, "Concurrent multi-stream ingest had %d 500-level errors", errors)
+}
+
+func QueryLogStreamCountMinimum(t *testing.T, client HTTPClient, stream string, minCount int) {
+ t.Helper()
+ endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
+ startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
+
+ query := map[string]interface{}{
+ "query": "select count(*) as count from " + stream,
+ "startTime": startTime,
+ "endTime": endTime,
+ }
+ queryJSON, _ := json.Marshal(query)
+ req, _ := client.NewRequest("POST", "query", bytes.NewBuffer(queryJSON))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Request failed: %s", err)
+ body := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Query failed: %s, body: %s", response.Status, body)
+
+ var results []map[string]float64
+ err = json.Unmarshal([]byte(body), &results)
+ require.NoErrorf(t, err, "Failed to parse query response: %s", err)
+ require.NotEmptyf(t, results, "Query returned no results")
+ count := int(results[0]["count"])
+ require.GreaterOrEqualf(t, count, minCount, "Expected at least %d events in %s, got %d", minCount, stream, count)
+}
diff --git a/usecase_test.go b/usecase_test.go
new file mode 100644
index 0000000..cf02baa
--- /dev/null
+++ b/usecase_test.go
@@ -0,0 +1,659 @@
+// Copyright (c) 2023 Cloudnatively Services Pvt Ltd
+//
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+// helper to create a target and return its ID
+func createTargetGetID(t *testing.T, client HTTPClient) string {
+ t.Helper()
+ req, _ := client.NewRequest("POST", "targets", strings.NewReader(getTargetBody()))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Create target failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Create target failed: %s", response.Status)
+
+ targetsResp := ListTargets(t, client)
+ return getIdFromTargetResponse(bytes.NewReader([]byte(targetsResp)))
+}
+
+// helper to create an alert and return its ID
+func createAlertGetID(t *testing.T, client HTTPClient, stream, targetId string) string {
+ t.Helper()
+ alertBody := getAlertBody(stream, targetId)
+ req, _ := client.NewRequest("POST", "alerts", strings.NewReader(alertBody))
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Create alert failed: %s", err)
+ require.Equalf(t, 200, response.StatusCode, "Create alert failed: %s", response.Status)
+
+ req, _ = client.NewRequest("GET", "alerts", nil)
+ response, err = client.Do(req)
+ require.NoErrorf(t, err, "List alerts failed: %s", err)
+ bodyAlerts, _ := io.ReadAll(response.Body)
+ alertId, _, _, _ := getMetadataFromAlertResponse(bytes.NewReader(bodyAlerts))
+ return alertId
+}
+
+// helper to build a user client for queries
+func userQueryClient(username, password string) HTTPClient {
+ c := NewGlob.QueryClient
+ c.Username = username
+ c.Password = password
+ return c
+}
+
+// helper to build a user client for ingest
+func userIngestClient(username, password string) HTTPClient {
+ if NewGlob.IngestorUrl.String() != "" {
+ c := NewGlob.IngestorClient
+ c.Username = username
+ c.Password = password
+ return c
+ }
+ c := NewGlob.QueryClient
+ c.Username = username
+ c.Password = password
+ return c
+}
+
+// UC1: DevOps engineer onboards a new microservice onto Parseable.
+func TestUseCase_DevOpsNewMicroserviceOnboarding(t *testing.T) {
+ stream := UniqueStream("uc1_microservice")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create stream
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+
+ // 2. Ingest 50 structured logs via flog
+ RunFlogAuto(t, stream)
+
+ // 3. Wait for ingest
+ WaitForIngest(t, client, stream, 50, 180*time.Second)
+
+ // 4. Verify schema matches expected
+ AssertStreamSchema(t, client, stream, FlogJsonSchema)
+
+ // 5. Set retention policy (20-day delete)
+ SetRetention(t, client, stream, RetentionBody)
+
+ // 6. Verify retention applied
+ AssertRetention(t, client, stream, RetentionBody)
+
+ // 7. Create alert target + alert on error level
+ targetId := createTargetGetID(t, client)
+ rt.TrackTarget(targetId)
+ alertId := createAlertGetID(t, client, stream, targetId)
+ rt.TrackAlert(alertId)
+
+ // 8. Create dashboard + add tile for the stream
+ dashboardId := CreateDashboard(t, client)
+ rt.TrackDashboard(dashboardId)
+ AddDashboardTile(t, client, dashboardId, stream)
+
+ // 9. Create saved filter for error queries
+ filterId := CreateFilter(t, client, stream)
+ rt.TrackFilter(filterId)
+
+ // 10. Verify via Prism (logstream info + datasets)
+ AssertPrismLogstreamInfo(t, client, stream)
+ AssertPrismDatasets(t, client, stream)
+}
+
+// UC2: SRE investigating production incident spanning two services.
+func TestUseCase_SREIncidentInvestigation(t *testing.T) {
+ stream1 := UniqueStream("uc2_frontend")
+ stream2 := UniqueStream("uc2_backend")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create two streams
+ CreateStream(t, client, stream1)
+ rt.TrackStream(stream1)
+ CreateStream(t, client, stream2)
+ rt.TrackStream(stream2)
+
+ // 2. Ingest flog data into both
+ RunFlogAuto(t, stream1)
+ RunFlogAuto(t, stream2)
+
+ // 3. Wait for ingest
+ WaitForIngest(t, client, stream1, 50, 180*time.Second)
+ WaitForIngest(t, client, stream2, 50, 180*time.Second)
+
+ // 4. Cross-stream union query
+ QueryTwoLogStreamCount(t, client, stream1, stream2, 100)
+
+ // 5. Run analytical queries (GROUP BY, DISTINCT on flog fields)
+ AssertQueryOK(t, client, "SELECT method, COUNT(*) as cnt FROM %s GROUP BY method", stream1)
+ AssertQueryOK(t, client, "SELECT DISTINCT host FROM %s", stream2)
+
+ // 6. Get dataset stats for both streams
+ AssertDatasetStats(t, client, []string{stream1, stream2})
+}
+
+// UC3: Platform team manages per-team stream access with multi-privilege roles.
+func TestUseCase_PlatformTeamMultiTenantRBAC(t *testing.T) {
+ streamA := UniqueStream("uc3_tenant_a")
+ streamB := UniqueStream("uc3_tenant_b")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create two streams
+ CreateStream(t, client, streamA)
+ rt.TrackStream(streamA)
+ CreateStream(t, client, streamB)
+ rt.TrackStream(streamB)
+
+ // 2. Create role "team_a_role": writer on tenant_a + reader on tenant_b
+ teamARoleName := UniqueStream("team_a_role")
+ teamARoleBody := getMultiPrivilegeRoleBody(streamA, streamB)
+ CreateRole(t, client, teamARoleName, teamARoleBody)
+ rt.TrackRole(teamARoleName)
+
+ // 3. Create role "team_b_role": writer on tenant_b + reader on tenant_a
+ teamBRoleName := UniqueStream("team_b_role")
+ teamBRoleBody := getMultiPrivilegeRoleBody(streamB, streamA)
+ CreateRole(t, client, teamBRoleName, teamBRoleBody)
+ rt.TrackRole(teamBRoleName)
+
+ // 4. Create user per role
+ teamAUserName := UniqueStream("team_a_user")
+ teamBUserName := UniqueStream("team_b_user")
+ teamAPassword := CreateUserWithRole(t, client, teamAUserName, []string{teamARoleName})
+ rt.TrackUser(teamAUserName)
+ teamBPassword := CreateUserWithRole(t, client, teamBUserName, []string{teamBRoleName})
+ rt.TrackUser(teamBUserName)
+
+ // 5. Build client for team_a user
+ teamAQuery := userQueryClient(teamAUserName, teamAPassword)
+ teamAIngest := userIngestClient(teamAUserName, teamAPassword)
+
+ // 6. Assert team_a can ingest to tenant_a (200)
+ IngestCustomPayload(t, teamAIngest, streamA, `[{"level":"info","message":"team_a log","host":"10.0.0.1"}]`, 200)
+
+ // 7. Assert team_a cannot delete tenant_a (403)
+ AssertForbidden(t, teamAQuery, "DELETE", "logstream/"+streamA, nil)
+
+ // 8. Assert team_a can read tenant_b but cannot ingest (403)
+ AssertQueryOK(t, teamAQuery, "SELECT COUNT(*) as count FROM %s", streamB)
+ IngestCustomPayload(t, teamAIngest, streamB, `[{"level":"info","message":"should fail","host":"10.0.0.2"}]`, 403)
+
+ // 9. Build client for team_b user
+ teamBIngest := userIngestClient(teamBUserName, teamBPassword)
+
+ // 10. Assert team_b can ingest to tenant_b (200)
+ IngestCustomPayload(t, teamBIngest, streamB, `[{"level":"info","message":"team_b log","host":"10.0.0.3"}]`, 200)
+
+ // 11. Assert team_b cannot ingest to tenant_a (403)
+ IngestCustomPayload(t, teamBIngest, streamA, `[{"level":"info","message":"should fail","host":"10.0.0.4"}]`, 403)
+
+ // 12. Add writer role for tenant_b to team_a via PATCH
+ writerBRoleName := UniqueStream("uc3_writer_b")
+ writerBRoleBody := RoleWriter(streamB)
+ CreateRole(t, client, writerBRoleName, writerBRoleBody)
+ rt.TrackRole(writerBRoleName)
+ AddRolesToUser(t, client, teamAUserName, []string{writerBRoleName})
+
+ // 13. Verify team_a can NOW ingest to tenant_b (200)
+ IngestCustomPayload(t, teamAIngest, streamB, `[{"level":"info","message":"team_a now can write to b","host":"10.0.0.5"}]`, 200)
+
+ // 14. Remove the added role
+ RemoveRolesFromUser(t, client, teamAUserName, []string{writerBRoleName})
+}
+
+// UC4: Security auditor verifying RBAC enforcement per role type.
+func TestUseCase_SecurityAuditAccessControls(t *testing.T) {
+ stream := UniqueStream("uc4_audit")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create stream, ingest data, wait for sync
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+ WaitForIngest(t, client, stream, 50, 180*time.Second)
+
+ // 2. Create four roles
+ editorRole := UniqueStream("uc4_editor")
+ writerRole := UniqueStream("uc4_writer")
+ readerRole := UniqueStream("uc4_reader")
+ ingestorRole := UniqueStream("uc4_ingestor")
+ CreateRole(t, client, editorRole, RoleEditor)
+ rt.TrackRole(editorRole)
+ CreateRole(t, client, writerRole, RoleWriter(stream))
+ rt.TrackRole(writerRole)
+ CreateRole(t, client, readerRole, RoleReader(stream))
+ rt.TrackRole(readerRole)
+ CreateRole(t, client, ingestorRole, Roleingestor(stream))
+ rt.TrackRole(ingestorRole)
+
+ // 3. Create four users, one per role
+ editorUser := UniqueStream("uc4_editor_u")
+ writerUser := UniqueStream("uc4_writer_u")
+ readerUser := UniqueStream("uc4_reader_u")
+ ingestorUser := UniqueStream("uc4_ingestor_u")
+ editorPass := CreateUserWithRole(t, client, editorUser, []string{editorRole})
+ rt.TrackUser(editorUser)
+ writerPass := CreateUserWithRole(t, client, writerUser, []string{writerRole})
+ rt.TrackUser(writerUser)
+ readerPass := CreateUserWithRole(t, client, readerUser, []string{readerRole})
+ rt.TrackUser(readerUser)
+ ingestorPass := CreateUserWithRole(t, client, ingestorUser, []string{ingestorRole})
+ rt.TrackUser(ingestorUser)
+
+ // 4. Build clients
+ readerQ := userQueryClient(readerUser, readerPass)
+ readerI := userIngestClient(readerUser, readerPass)
+ writerQ := userQueryClient(writerUser, writerPass)
+ writerI := userIngestClient(writerUser, writerPass)
+ ingestorQ := userQueryClient(ingestorUser, ingestorPass)
+ ingestorI := userIngestClient(ingestorUser, ingestorPass)
+ editorQ := userQueryClient(editorUser, editorPass)
+ editorI := userIngestClient(editorUser, editorPass)
+
+ // 5. Reader: can query (200), cannot ingest (403), cannot delete (403)
+ AssertQueryOK(t, readerQ, "SELECT COUNT(*) FROM %s", stream)
+ IngestCustomPayload(t, readerI, stream, `[{"host":"test"}]`, 403)
+ AssertForbidden(t, readerQ, "DELETE", "logstream/"+stream, nil)
+
+ // 6. Writer: can query (200), can ingest (200), cannot delete (403)
+ AssertQueryOK(t, writerQ, "SELECT COUNT(*) FROM %s", stream)
+ IngestCustomPayload(t, writerI, stream, `[{"host":"test"}]`, 200)
+ AssertForbidden(t, writerQ, "DELETE", "logstream/"+stream, nil)
+
+ // 7. Ingestor: can ingest (200), cannot query (403)
+ IngestCustomPayload(t, ingestorI, stream, `[{"host":"test"}]`, 200)
+ AssertQueryError(t, ingestorQ, "SELECT COUNT(*) FROM "+stream, 403)
+
+ // 8. Editor: can do all operations (200)
+ AssertQueryOK(t, editorQ, "SELECT COUNT(*) FROM %s", stream)
+ IngestCustomPayload(t, editorI, stream, `[{"host":"test"}]`, 200)
+ AssertStreamInfo(t, editorQ, stream)
+
+ // 9. Test default role assignment behavior
+ SetDefaultRole(t, client, readerRole)
+ AssertDefaultRole(t, client, fmt.Sprintf(`"%s"`, readerRole))
+
+ // 10. Verify listing all roles includes the four created
+ rolesBody := ListAllRoles(t, client)
+ require.Contains(t, rolesBody, editorRole)
+ require.Contains(t, rolesBody, writerRole)
+ require.Contains(t, rolesBody, readerRole)
+ require.Contains(t, rolesBody, ingestorRole)
+}
+
+// UC5: DevOps engineer sets up complete alerting pipeline with real data.
+func TestUseCase_AlertLifecycleWithDataIngestion(t *testing.T) {
+ stream := UniqueStream("uc5_alertpipe")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create stream, ingest data with known patterns
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+
+ // 2. Wait for ingest, verify data queryable
+ WaitForIngest(t, client, stream, 50, 180*time.Second)
+
+ // 3. Create webhook target
+ targetId := createTargetGetID(t, client)
+ rt.TrackTarget(targetId)
+
+ // 4. Create alert with threshold on the stream
+ alertId := createAlertGetID(t, client, stream, targetId)
+ rt.TrackAlert(alertId)
+
+ // 5. Get alert by ID, verify state field present
+ alertDetails := GetAlertById(t, client, alertId)
+ require.Contains(t, alertDetails, `"state"`, "Alert response should contain state field")
+
+ // 6. Modify alert (change severity, threshold)
+ modifyBody := getAlertModifyBody(stream, targetId)
+ ModifyAlert(t, client, alertId, modifyBody)
+
+ // 7. Get alert again, verify fields updated
+ alertDetails = GetAlertById(t, client, alertId)
+ require.Contains(t, alertDetails, "Modified", "Alert title should reflect modification")
+
+ // 8. Disable alert, verify state
+ DisableAlert(t, client, alertId)
+
+ // 9. Enable alert, verify state
+ EnableAlert(t, client, alertId)
+
+ // 10. Evaluate alert
+ req, _ := client.NewRequest("PUT", "alerts/"+alertId+"/evaluate_alert", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Evaluate alert request failed: %s", err)
+ t.Logf("Evaluate alert returned status: %d", response.StatusCode)
+
+ // 11. List alert tags, verify "quest-test" tag exists
+ tagsBody := ListAlertTags(t, client)
+ require.Contains(t, tagsBody, "quest-test", "Alert tags should include quest-test")
+
+ // 12. Get stream info + stats to confirm ingestion metrics
+ AssertStreamInfo(t, client, stream)
+ AssertStreamStats(t, client, stream)
+}
+
+// UC6: Platform engineer sets up OTel-based observability for all signal types.
+func TestUseCase_OTelFullPipelineIngestion(t *testing.T) {
+ client := NewGlob.QueryClient
+
+ // 1. Health check (liveness + readiness)
+ AssertLiveness(t, client)
+ AssertReadiness(t, client)
+
+ // 2. Ingest OTel logs via /v1/logs
+ IngestOTelLogs(t, client)
+
+ // 3. Ingest OTel traces via /v1/traces
+ IngestOTelTraces(t, client)
+
+ // 4. Ingest OTel metrics via /v1/metrics
+ IngestOTelMetrics(t, client)
+
+ // 5. Wait for OTel data to land
+ time.Sleep(5 * time.Second)
+
+ // 6. Discover auto-created OTel streams
+ otelStreams := DiscoverOTelStreams(t, client)
+ t.Logf("Discovered %d OTel streams: %v", len(otelStreams), otelStreams)
+
+ // 7. Query each OTel stream to verify data exists
+ for _, stream := range otelStreams {
+ AssertQueryOK(t, client, "SELECT COUNT(*) as count FROM \"%s\"", stream)
+ }
+
+ // 8. Get stream info for OTel streams
+ for _, stream := range otelStreams {
+ AssertStreamInfo(t, client, stream)
+ }
+
+ // 9. Check dataset stats for OTel streams
+ if len(otelStreams) > 0 {
+ AssertDatasetStats(t, client, otelStreams)
+ }
+}
+
+// UC7: Data engineer managing data lifecycle and tiering.
+func TestUseCase_DataEngineerRetentionAndHotTier(t *testing.T) {
+ stream := UniqueStream("uc7_retention")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create stream, ingest data
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+ RunFlogAuto(t, stream)
+
+ // 2. Wait for ingest
+ WaitForIngest(t, client, stream, 50, 180*time.Second)
+
+ // 3. Set retention policy
+ SetRetention(t, client, stream, RetentionBody)
+
+ // 4. Verify retention applied
+ AssertRetention(t, client, stream, RetentionBody)
+
+ // 5. Get stream info and stats
+ AssertStreamInfo(t, client, stream)
+ AssertStreamStats(t, client, stream)
+
+ // 6. Set hot tier config
+ SetStreamHotTier(t, client, stream)
+
+ // 7. Verify hot tier config
+ GetStreamHotTier(t, client, stream)
+
+ // 8. Query again to ensure hot tier doesn't break queries
+ AssertQueryOK(t, client, "SELECT COUNT(*) FROM %s", stream)
+
+ // 9. Delete hot tier
+ DeleteStreamHotTier(t, client, stream)
+}
+
+// UC8: Team lead builds a multi-stream monitoring dashboard.
+func TestUseCase_DashboardDrivenMonitoringSetup(t *testing.T) {
+ streamWeb := UniqueStream("uc8_web")
+ streamAPI := UniqueStream("uc8_api")
+ streamDB := UniqueStream("uc8_db")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create three streams
+ CreateStream(t, client, streamWeb)
+ rt.TrackStream(streamWeb)
+ CreateStream(t, client, streamAPI)
+ rt.TrackStream(streamAPI)
+ CreateStream(t, client, streamDB)
+ rt.TrackStream(streamDB)
+
+ // 2. Ingest data into all three
+ RunFlogAuto(t, streamWeb)
+ RunFlogAuto(t, streamAPI)
+ RunFlogAuto(t, streamDB)
+
+ // 3. Wait for ingest
+ WaitForIngest(t, client, streamWeb, 50, 180*time.Second)
+ WaitForIngest(t, client, streamAPI, 50, 180*time.Second)
+ WaitForIngest(t, client, streamDB, 50, 180*time.Second)
+
+ // 4. Create dashboard
+ dashboardId := CreateDashboard(t, client)
+ rt.TrackDashboard(dashboardId)
+
+ // 5. Add tiles for each stream
+ AddDashboardTile(t, client, dashboardId, streamWeb)
+ AddDashboardTile(t, client, dashboardId, streamAPI)
+ AddDashboardTile(t, client, dashboardId, streamDB)
+
+ // 6. Update dashboard metadata
+ UpdateDashboard(t, client, dashboardId)
+
+ // 7. List dashboard tags, verify present
+ tagsBody := ListDashboardTags(t, client)
+ require.Contains(t, tagsBody, "quest-test", "Dashboard tags should include quest-test")
+
+ // 8. Create saved filter for web errors
+ filterId := CreateFilter(t, client, streamWeb)
+ rt.TrackFilter(filterId)
+
+ // 9. Verify Prism datasets for each stream
+ AssertPrismDatasets(t, client, streamWeb)
+ AssertPrismDatasets(t, client, streamAPI)
+ AssertPrismDatasets(t, client, streamDB)
+
+ // 10. Get dashboard by ID, verify structure
+ dashBody := GetDashboardById(t, client, dashboardId)
+ require.Contains(t, dashBody, "Quest Test Dashboard", "Dashboard should contain expected title")
+}
+
+// UC10: Developer's microservice starts sending logs with new fields.
+func TestUseCase_StreamSchemaEvolution(t *testing.T) {
+ dynamicStream := UniqueStream("uc10_dyn")
+ staticStream := UniqueStream("uc10_static")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+ ic := NewGlob.QueryClient
+ if NewGlob.IngestorUrl.String() != "" {
+ ic = NewGlob.IngestorClient
+ }
+
+ // 1. Create dynamic stream + static schema stream
+ CreateStream(t, client, dynamicStream)
+ rt.TrackStream(dynamicStream)
+ staticHeader := map[string]string{"X-P-Static-Schema-Flag": "true"}
+ CreateStreamWithSchemaBody(t, client, staticStream, staticHeader, SchemaPayload)
+ rt.TrackStream(staticStream)
+
+ // 2. Ingest event matching schema to dynamic stream (200)
+ matchingEvent := `{"source_time":"2024-03-26T00:00:00Z","level":"info","message":"test","version":"1.0","user_id":1,"device_id":1,"session_id":"a","os":"Linux","host":"10.0.0.1","uuid":"u1","location":"us","timezone":"UTC","user_agent":"test","runtime":"go","request_body":"body","status_code":200,"response_time":10,"process_id":1,"app_meta":"meta"}`
+ IngestCustomPayload(t, ic, dynamicStream, matchingEvent, 200)
+
+ // 3. Ingest event WITH NEW FIELD to dynamic stream (200, schema evolves)
+ IngestCustomPayload(t, ic, dynamicStream, getDynamicSchemaEvent(), 200)
+
+ // 4. Verify dynamic stream schema now includes new field
+ WaitForQueryable(t, client, dynamicStream, 30*time.Second)
+ req, _ := client.NewRequest("GET", "logstream/"+dynamicStream+"/schema", nil)
+ response, err := client.Do(req)
+ require.NoErrorf(t, err, "Get schema failed: %s", err)
+ schemaBody := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get schema failed: %s", response.Status)
+ require.Contains(t, schemaBody, "extra_field", "Dynamic schema should include extra_field after evolution")
+
+ // 5. Ingest matching event to static stream (200)
+ IngestCustomPayload(t, ic, staticStream, matchingEvent, 200)
+
+ // 6. Ingest event with new field to static stream (400, rejected)
+ IngestCustomPayload(t, ic, staticStream, getDynamicSchemaEvent(), 400)
+
+ // 7. Verify static stream schema unchanged
+ req, _ = client.NewRequest("GET", "logstream/"+staticStream+"/schema", nil)
+ response, err = client.Do(req)
+ require.NoErrorf(t, err, "Get schema failed: %s", err)
+ staticSchemaBody := readAsString(response.Body)
+ require.Equalf(t, 200, response.StatusCode, "Get schema failed: %s", response.Status)
+ require.NotContains(t, staticSchemaBody, "extra_field", "Static schema should NOT include extra_field")
+}
+
+// UC11: Multiple teams simultaneously ingesting and querying.
+func TestUseCase_ConcurrentMultiTeamWorkload(t *testing.T) {
+ streams := []string{UniqueStream("uc11_t1"), UniqueStream("uc11_t2"), UniqueStream("uc11_t3")}
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+ ic := NewGlob.QueryClient
+ if NewGlob.IngestorUrl.String() != "" {
+ ic = NewGlob.IngestorClient
+ }
+
+ // 1. Create three team streams
+ for _, s := range streams {
+ CreateStream(t, client, s)
+ rt.TrackStream(s)
+ }
+
+ // 2-4. Launch goroutines: ingest to all three simultaneously (20 events each)
+ ConcurrentMultiStreamIngest(t, ic, streams, 20)
+
+ // 5. Wait for ingest, verify each stream has data
+ for _, s := range streams {
+ WaitForIngest(t, client, s, 1, 30*time.Second)
+ }
+
+ // 6. Cross-stream query across two streams
+ AssertQueryOK(t, client, "SELECT COUNT(*) as total FROM (SELECT * FROM %s UNION ALL SELECT * FROM %s)", streams[0], streams[1])
+}
+
+// UC12: Verifying distributed mode end-to-end.
+func TestUseCase_DistributedIngestQueryPipeline(t *testing.T) {
+ // 1. Skip if not distributed mode
+ if NewGlob.IngestorUrl.String() == "" {
+ t.Skip("Skipping distributed test: no ingestor URL configured (standalone mode)")
+ }
+
+ stream := UniqueStream("uc12_dist")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 2. Verify cluster health (info + metrics)
+ AssertClusterInfo(t, client)
+ AssertClusterMetrics(t, client)
+
+ // 3. Create stream via querier
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+
+ // 4. Ingest via ingestor (RunFlog)
+ RunFlog(t, NewGlob.IngestorClient, stream)
+
+ // 5. Wait for ingest
+ WaitForIngest(t, client, stream, 50, 180*time.Second)
+
+ // 6. Run analytical queries (GROUP BY, DISTINCT, ORDER BY) via querier
+ AssertQueryOK(t, client, "SELECT method, COUNT(*) as cnt FROM %s GROUP BY method", stream)
+ AssertQueryOK(t, client, "SELECT DISTINCT host FROM %s", stream)
+ AssertQueryOK(t, client, "SELECT * FROM %s ORDER BY p_timestamp DESC LIMIT 10", stream)
+
+ // 7. Verify stream info via querier
+ AssertStreamInfo(t, client, stream)
+
+ // 8. Verify Prism home via querier
+ AssertPrismHome(t, client)
+}
+
+// UC13: Security test verifying users cannot escalate privileges.
+func TestUseCase_RBACRoleEscalationPrevention(t *testing.T) {
+ stream := UniqueStream("uc13_escalation")
+ client := NewGlob.QueryClient
+ rt := NewResourceTracker(t, client)
+
+ // 1. Create stream
+ CreateStream(t, client, stream)
+ rt.TrackStream(stream)
+
+ // 2. Create reader role for the stream
+ roleName := UniqueStream("uc13_reader")
+ CreateRole(t, client, roleName, RoleReader(stream))
+ rt.TrackRole(roleName)
+
+ // 3. Create reader user, get credentials
+ userName := UniqueStream("uc13_reader_u")
+ readerPass := CreateUserWithRole(t, client, userName, []string{roleName})
+ rt.TrackUser(userName)
+
+ // 4. Build reader client
+ readerQ := userQueryClient(userName, readerPass)
+
+ // 5. Reader tries to create a role -> expect 403
+ AssertForbidden(t, readerQ, "PUT", "role/uc13_hack_role", strings.NewReader(RoleEditor))
+
+ // 6. Reader tries to create a user -> expect 403
+ AssertForbidden(t, readerQ, "POST", "user/uc13_hack_user", nil)
+
+ // 7. Reader tries to delete stream -> expect 403
+ AssertForbidden(t, readerQ, "DELETE", "logstream/"+stream, nil)
+
+ // 8. Reader tries to create alert -> expect 403
+ alertPayload := fmt.Sprintf(`{
+ "severity": "low",
+ "title": "HackAlert",
+ "query": "select count(*) from %s",
+ "alertType": "threshold",
+ "thresholdConfig": {"operator": "=", "value": 1},
+ "evalConfig": {"rollingWindow": {"evalStart": "5m", "evalEnd": "now", "evalFrequency": 1}},
+ "notificationConfig": {"interval": 1},
+ "targets": [],
+ "tags": ["hack"]
+ }`, stream)
+ AssertForbidden(t, readerQ, "POST", "alerts", strings.NewReader(alertPayload))
+
+ // 9. Reader tries to set retention -> expect 403
+ AssertForbidden(t, readerQ, "PUT", "logstream/"+stream+"/retention", strings.NewReader(RetentionBody))
+}