Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions resources/genai-pricing-example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"my-custom-model": {
"input": 0.000005,
"output": 0.000015
},
"my-finetuned-gpt4o": {
"input": 0.000003,
"output": 0.000012
}
}
44 changes: 44 additions & 0 deletions resources/parseable-genai-collector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Parseable GenAI Agent Observability - OTel Collector Config
#
# This is the canonical OTel Collector configuration for sending
# GenAI/LLM agent traces to Parseable. Fill in the 3 variables below.
#
# Usage:
# 1. Replace ${PARSEABLE_URL}, ${PARSEABLE_AUTH}, and ${STREAM_NAME}
# with your values (or set them as environment variables).
# 2. Run: otelcol --config parseable-genai-collector.yaml
#
# Environment variable examples:
# export PARSEABLE_URL=http://localhost:8000
# export PARSEABLE_AUTH=$(echo -n 'admin:admin' | base64)
# export STREAM_NAME=genai-traces

receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

processors:
batch:
timeout: 5s
send_batch_size: 256

exporters:
otlphttp/parseable:
endpoint: ${PARSEABLE_URL} # e.g., http://localhost:8000
encoding: json
headers:
Authorization: "Basic ${PARSEABLE_AUTH}" # base64(username:password)
X-P-Stream: "${STREAM_NAME}" # default: genai-traces
X-P-Log-Source: "otel-traces"
X-P-Dataset-Tag: "agent-observability"

service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlphttp/parseable]
29 changes: 22 additions & 7 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DATASET_TAG_KEY, DatasetTag, EXTRACT_LOG_KEY,
LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
use crate::metastore::MetastoreError;
use crate::option::Mode;
use crate::otel::genai::GENAI_KNOWN_FIELD_LIST;
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
Expand Down Expand Up @@ -215,10 +216,24 @@ pub async fn setup_otel_stream(

let stream_name = stream_name.to_str().unwrap().to_owned();

let log_source_entry = LogSourceEntry::new(
log_source.clone(),
known_fields.iter().map(|&s| s.to_string()).collect(),
);
// Parse dataset tag from X-P-Dataset-Tag header
let dataset_tag = req
.headers()
.get(DATASET_TAG_KEY)
.and_then(|h| h.to_str().ok())
.and_then(|s| DatasetTag::try_from(s).ok());

// Build known fields set — include GenAI fields when dataset tag is agent-observability
let mut all_known_fields: std::collections::HashSet<String> =
known_fields.iter().map(|&s| s.to_string()).collect();

if dataset_tag == Some(DatasetTag::AgentObservability) {
for &(field_name, _) in GENAI_KNOWN_FIELD_LIST {
all_known_fields.insert(field_name.to_string());
}
}

let log_source_entry = LogSourceEntry::new(log_source.clone(), all_known_fields);

PARSEABLE
.create_stream_if_not_exists(
Expand All @@ -227,7 +242,7 @@ pub async fn setup_otel_stream(
None,
vec![log_source_entry.clone()],
telemetry_type,
None,
dataset_tag,
)
.await?;
let mut time_partition = None;
Expand Down
27 changes: 24 additions & 3 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ use crate::{
format::{EventFormat, LogSource, json},
},
handlers::{
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType,
DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType,
http::{
ingest::PostError,
kinesis::{Message, flatten_kinesis_logs},
},
},
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
otel::{
genai::{coerce_genai_field_types, enrich_genai_record},
logs::flatten_otel_logs,
metrics::flatten_otel_metrics,
traces::flatten_otel_traces,
},
parseable::PARSEABLE,
storage::StreamType,
utils::json::{convert_array_to_object, flatten::convert_to_array},
Expand Down Expand Up @@ -93,7 +98,23 @@ pub async fn flatten_and_push_logs(
LogSource::OtelTraces => {
//custom flattening required for otel traces
let traces: TracesData = serde_json::from_value(json)?;
for record in flatten_otel_traces(&traces) {

// Check if this stream has the agent-observability dataset tag
let is_genai = PARSEABLE
.get_stream(stream_name)
.ok()
.and_then(|s| s.get_dataset_tag())
== Some(DatasetTag::AgentObservability);

for mut record in flatten_otel_traces(&traces) {
// Apply GenAI type coercion and cost enrichment for agent-observability streams
if is_genai {
if let Some(obj) = record.as_object_mut() {
coerce_genai_field_types(obj);
enrich_genai_record(obj);
}
}

push_logs(
stream_name,
record,
Expand Down
1 change: 1 addition & 0 deletions src/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

pub mod genai;
pub mod logs;
pub mod metrics;
pub mod otel_utils;
Expand Down
Loading
Loading