From 5fb547faac29564ab2fe03622f2a0446fc4480c4 Mon Sep 17 00:00:00 2001 From: Kev Date: Wed, 15 Jan 2025 13:54:39 -0500 Subject: [PATCH 01/27] feat(ourlogs): Allow log ingestion behind a flag This adds log ingestion (currently only working for the OTel log format) behind a feature flag 'organizations:ourlogs-ingestion'. This PR aims to be the minimum possible to support local and test-org ingestion before we move to dogfooding. Other notes: - We need to add two DataCategories because we need to track quantity (for current discarded breadcrumb ClientOutcome tracking) and also bytes for total log bytes ingested, which is one of the quota recommendations. - Eventually we will convert Breadcrumbs into logs as well, very similar to span extraction for spans on the event. How exactly that will work is still being discussed with product and sdk folks. - The name 'ourlogs' is an internal name to disambiguate between 'our log product' logs and internally created logs. User facing strings will be set to 'Log' to avoid exposing implementation details. --- Cargo.lock | 16 ++ Cargo.toml | 1 + relay-base-schema/src/data_category.rs | 13 + relay-cabi/include/relay.h | 17 +- relay-cogs/src/lib.rs | 3 + relay-config/src/config.rs | 8 + relay-dynamic-config/src/feature.rs | 6 +- relay-event-schema/src/processor/attrs.rs | 2 + relay-event-schema/src/processor/traits.rs | 1 + relay-event-schema/src/protocol/mod.rs | 2 + relay-event-schema/src/protocol/ourlog.rs | 138 ++++++++++ relay-kafka/src/config.rs | 6 +- relay-ourlogs/Cargo.toml | 29 +++ relay-ourlogs/src/lib.rs | 13 + relay-ourlogs/src/ourlog.rs | 235 ++++++++++++++++++ relay-pii/src/selector.rs | 1 + relay-quotas/src/quota.rs | 2 + relay-server/Cargo.toml | 63 ++--- relay-server/src/envelope.rs | 12 + relay-server/src/services/outcome.rs | 4 + relay-server/src/services/processor.rs | 38 +++ relay-server/src/services/processor/event.rs | 2 + relay-server/src/services/processor/ourlog.rs | 73 ++++++ relay-server/src/services/store.rs | 122 +++++++++ relay-server/src/utils/managed_envelope.rs | 15 ++ relay-server/src/utils/rate_limits.rs | 69 +++++ relay-server/src/utils/sizes.rs | 3 + 27 files changed, 859 insertions(+), 35 deletions(-) create mode 100644 relay-event-schema/src/protocol/ourlog.rs create mode 100644 relay-ourlogs/Cargo.toml create mode 100644 relay-ourlogs/src/lib.rs create mode 100644 relay-ourlogs/src/ourlog.rs create mode 100644 relay-server/src/services/processor/ourlog.rs diff --git a/Cargo.lock b/Cargo.lock index f7ce7631049..d977b916877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2804,6 +2804,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -3717,6 +3718,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "relay-ourlogs" +version = "24.12.1" +dependencies = [ + "chrono", + "hex", + "insta", + "once_cell", + "opentelemetry-proto", + "relay-event-schema", + "relay-protocol", + "serde_json", +] + [[package]] name = "relay-pattern" version = "24.12.2" @@ -3934,6 +3949,7 @@ dependencies = [ "relay-log", "relay-metrics", "relay-monitors", + "relay-ourlogs", "relay-pii", "relay-profiling", "relay-protocol", diff --git a/Cargo.toml b/Cargo.toml index 0743f320316..60df1c23e1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ relay-kafka = { path = "relay-kafka" } relay-log = { path = "relay-log" } relay-metrics = { path = "relay-metrics" } relay-monitors = { path = "relay-monitors" } +relay-ourlogs = { path = "relay-ourlogs" } relay-pattern = { path = "relay-pattern" } relay-pii = { path = "relay-pii" } relay-profiling = { path = "relay-profiling" } diff --git a/relay-base-schema/src/data_category.rs b/relay-base-schema/src/data_category.rs index 37720d5cd9a..3704ef57a59 100644 --- a/relay-base-schema/src/data_category.rs +++ b/relay-base-schema/src/data_category.rs @@ -92,6 +92,15 @@ pub enum DataCategory { Uptime = 21, /// Counts the number of individual attachments, as opposed to the number of bytes in an attachment. AttachmentItem = 22, + /// LogCount + /// + /// This is the category for logs for which we store the count log events for users for measuring + /// missing breadcrumbs, and count of logs for rate limiting purposes. + LogCount = 23, + /// LogBytes + /// + /// This is the category for logs for which we store log event total bytes for users. + LogBytes = 24, // // IMPORTANT: After adding a new entry to DataCategory, go to the `relay-cabi` subfolder and run // `make header` to regenerate the C-binding. This allows using the data category from Python. @@ -120,6 +129,8 @@ impl DataCategory { "transaction_indexed" => Self::TransactionIndexed, "monitor" => Self::Monitor, "span" => Self::Span, + "log_count" => Self::LogCount, + "log_bytes" => Self::LogBytes, "monitor_seat" => Self::MonitorSeat, "feedback" => Self::UserReportV2, "user_report_v2" => Self::UserReportV2, @@ -152,6 +163,8 @@ impl DataCategory { Self::TransactionIndexed => "transaction_indexed", Self::Monitor => "monitor", Self::Span => "span", + Self::LogCount => "log_count", + Self::LogBytes => "log_bytes", Self::MonitorSeat => "monitor_seat", Self::UserReportV2 => "feedback", Self::MetricBucket => "metric_bucket", diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index bdeee11e2fd..38bc93d83af 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -3,7 +3,7 @@ #ifndef RELAY_H_INCLUDED #define RELAY_H_INCLUDED -/* Generated with cbindgen:0.26.0 */ +/* Generated with cbindgen:0.27.0 */ /* Warning, this file is autogenerated. Do not modify this manually. */ @@ -142,6 +142,19 @@ enum RelayDataCategory { * Counts the number of individual attachments, as opposed to the number of bytes in an attachment. */ RELAY_DATA_CATEGORY_ATTACHMENT_ITEM = 22, + /** + * LogCount + * + * This is the category for logs for which we store the count log events for users for measuring + * missing breadcrumbs, and count of logs for rate limiting purposes. + */ + RELAY_DATA_CATEGORY_LOG_COUNT = 23, + /** + * LogBytes + * + * This is the category for logs for which we store log event total bytes for users. + */ + RELAY_DATA_CATEGORY_LOG_BYTES = 24, /** * Any other data category not known by this Relay. */ @@ -679,4 +692,4 @@ struct RelayStr normalize_cardinality_limit_config(const struct RelayStr *value) */ struct RelayStr relay_normalize_global_config(const struct RelayStr *value); -#endif /* RELAY_H_INCLUDED */ +#endif /* RELAY_H_INCLUDED */ diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index 46c13bf46b1..d29f8fa392b 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -117,6 +117,8 @@ pub enum AppFeature { Transactions, /// Errors. Errors, + /// Logs. + Logs, /// Spans. Spans, /// Sessions. @@ -159,6 +161,7 @@ impl AppFeature { Self::Transactions => "transactions", Self::Errors => "errors", Self::Spans => "spans", + Self::Logs => "our_logs", Self::Sessions => "sessions", Self::ClientReports => "client_reports", Self::CheckIns => "check_ins", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 6007af5a9da..ae2fc83109e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -617,6 +617,8 @@ pub struct Limits { /// The maximum payload size for a profile pub max_profile_size: ByteSize, /// The maximum payload size for a span. + pub max_log_size: ByteSize, + /// The maximum payload size for a span. pub max_span_size: ByteSize, /// The maximum payload size for a statsd metric. pub max_statsd_size: ByteSize, @@ -677,6 +679,7 @@ impl Default for Limits { max_api_file_upload_size: ByteSize::mebibytes(40), max_api_chunk_upload_size: ByteSize::mebibytes(100), max_profile_size: ByteSize::mebibytes(50), + max_log_size: ByteSize::mebibytes(1), max_span_size: ByteSize::mebibytes(1), max_statsd_size: ByteSize::mebibytes(1), max_metric_buckets_size: ByteSize::mebibytes(1), @@ -2206,6 +2209,11 @@ impl Config { self.values.limits.max_check_in_size.as_bytes() } + /// Returns the maximum payload size of a log in bytes. + pub fn max_log_size(&self) -> usize { + self.values.limits.max_log_size.as_bytes() + } + /// Returns the maximum payload size of a span in bytes. pub fn max_span_size(&self) -> usize { self.values.limits.max_span_size.as_bytes() diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 2609729e3d2..8144aa9fd6f 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -102,7 +102,11 @@ pub enum Feature { /// Serialized as `organizations:ingest-spans-in-eap` #[serde(rename = "organizations:ingest-spans-in-eap")] IngestSpansInEap, - + /// Enable log ingestion for our log product (this is not internal logging). + /// + /// Serialized as `organizations:ourlogs-ingestion`. + #[serde(rename = "organizations:ourlogs-ingestion")] + OurLogsIngestion, /// This feature has graduated and is hard-coded for external Relays. #[doc(hidden)] #[serde(rename = "projects:profiling-ingest-unsampled-profiles")] diff --git a/relay-event-schema/src/processor/attrs.rs b/relay-event-schema/src/processor/attrs.rs index b8772a09dcf..563b1e65ace 100644 --- a/relay-event-schema/src/processor/attrs.rs +++ b/relay-event-schema/src/processor/attrs.rs @@ -46,6 +46,7 @@ pub enum ValueType { Message, Thread, Breadcrumb, + OurLog, Span, ClientSdkInfo, @@ -84,6 +85,7 @@ relay_common::derive_fromstr_and_display!(ValueType, UnknownValueTypeError, { ValueType::Message => "message", ValueType::Thread => "thread", ValueType::Breadcrumb => "breadcrumb", + ValueType::OurLog => "ourlog", ValueType::Span => "span", ValueType::ClientSdkInfo => "sdk", ValueType::Minidump => "minidump", diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index ac1a78728d7..99f1fa55a7b 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -108,6 +108,7 @@ pub trait Processor: Sized { process_method!(process_breadcrumb, crate::protocol::Breadcrumb); process_method!(process_template_info, crate::protocol::TemplateInfo); process_method!(process_header_name, crate::protocol::HeaderName); + process_method!(process_ourlog, crate::protocol::OurLog); process_method!(process_span, crate::protocol::Span); process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); diff --git a/relay-event-schema/src/protocol/mod.rs b/relay-event-schema/src/protocol/mod.rs index 7827447ce58..fc1c9307787 100644 --- a/relay-event-schema/src/protocol/mod.rs +++ b/relay-event-schema/src/protocol/mod.rs @@ -18,6 +18,7 @@ mod mechanism; mod metrics; mod metrics_summary; mod nel; +mod ourlog; mod relay_info; mod replay; mod request; @@ -54,6 +55,7 @@ pub use self::mechanism::*; pub use self::metrics::*; pub use self::metrics_summary::*; pub use self::nel::*; +pub use self::ourlog::*; pub use self::relay_info::*; pub use self::replay::*; pub use self::request::*; diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs new file mode 100644 index 00000000000..9194b26b03e --- /dev/null +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -0,0 +1,138 @@ +use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value}; + +use crate::processor::ProcessValue; +use crate::protocol::{SpanId, TraceId}; + +#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +#[metastructure(process_func = "process_ourlog", value_type = "OurLog")] +pub struct OurLog { + /// Time when the event occurred. + #[metastructure(required = true, trim = false)] + pub timestamp_nanos: Annotated, + + /// Time when the event was observed. + #[metastructure(required = true, trim = false)] + pub observed_timestamp_nanos: Annotated, + + /// The ID of the trace the log belongs to. + #[metastructure(required = false, trim = false)] + pub trace_id: Annotated, + /// The Span id. + /// + #[metastructure(required = false, trim = false)] + pub span_id: Annotated, + + /// Trace flag bitfield. + #[metastructure(required = false)] + pub trace_flags: Annotated, + + /// This is the original string representation of the severity as it is known at the source + #[metastructure(required = false, max_chars = 32, pii = "maybe", trim = false)] + pub severity_text: Annotated, + + /// Numerical representation of the severity level + #[metastructure(required = false)] + pub severity_number: Annotated, + + /// Log body. + #[metastructure(required = true, pii = "maybe", trim = false)] + pub body: Annotated, + + /// Arbitrary attributes on a log. + #[metastructure(pii = "maybe", trim = false)] + pub attributes: Annotated>, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties, retain = true, pii = "maybe", trim = false)] + pub other: Object, +} + +#[derive(Debug, Clone, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +pub struct AttributeValue { + pub string_value: Annotated, + pub int_value: Annotated, + pub double_value: Annotated, + pub bool_value: Annotated, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ourlog_serialization() { + let json = r#"{ + "timestamp_nanos": 1544712660300000000, + "observed_timestamp_nanos": 1544712660300000000, + "severity_number": 10, + "severity_text": "Information", + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "body": "Example log record", + "attributes": { + "string.attribute": { + "string_value": "some string" + }, + "boolean.attribute": { + "bool_value": true + }, + "int.attribute": { + "int_value": 10 + }, + "double.attribute": { + "double_value": 637.704 + } + } +}"#; + + let mut attributes = Object::new(); + attributes.insert( + "string.attribute".into(), + Annotated::new(AttributeValue { + string_value: Annotated::new(Value::String("some string".into())), + ..Default::default() + }), + ); + attributes.insert( + "boolean.attribute".into(), + Annotated::new(AttributeValue { + bool_value: Annotated::new(Value::Bool(true)), + ..Default::default() + }), + ); + attributes.insert( + "int.attribute".into(), + Annotated::new(AttributeValue { + int_value: Annotated::new(Value::I64(10)), + ..Default::default() + }), + ); + attributes.insert( + "double.attribute".into(), + Annotated::new(AttributeValue { + double_value: Annotated::new(Value::F64(637.704)), + ..Default::default() + }), + ); + + let log = Annotated::new(OurLog { + timestamp_nanos: Annotated::new(1544712660300000000), + observed_timestamp_nanos: Annotated::new(1544712660300000000), + severity_number: Annotated::new(10), + severity_text: Annotated::new("Information".to_string()), + trace_id: Annotated::new(TraceId("5b8efff798038103d269b633813fc60c".into())), + span_id: Annotated::new(SpanId("eee19b7ec3c1b174".into())), + body: Annotated::new("Example log record".to_string()), + attributes: Annotated::new(attributes), + ..Default::default() + }); + + let expected: serde_json::Value = serde_json::from_str(json).unwrap(); + let actual: serde_json::Value = + serde_json::from_str(&log.to_json_pretty().unwrap()).unwrap(); + assert_eq!(expected, actual); + + let log_from_string = Annotated::::from_json(json).unwrap(); + assert_eq!(log, log_from_string); + } +} diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 7b9e8411241..98952699705 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -45,6 +45,8 @@ pub enum KafkaTopic { ReplayRecordings, /// Monitor check-ins. Monitors, + /// Logs (our log product). + Logs, /// Standalone spans without a transaction. Spans, /// Feedback events topic. @@ -56,7 +58,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 13] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, @@ -68,6 +70,7 @@ impl KafkaTopic { ReplayEvents, ReplayRecordings, Monitors, + Logs, Spans, Feedback, ]; @@ -129,6 +132,7 @@ define_topic_assignments! { replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."), replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."), monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), + logs: (KafkaTopic::Logs, "snuba-ourlogs", "Logs from our logs product."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), } diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml new file mode 100644 index 00000000000..c132967b924 --- /dev/null +++ b/relay-ourlogs/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "relay-ourlogs" +authors = ["Sentry "] +description = "Log normalization and processing" +homepage = "https://getsentry.github.io/relay/" +repository = "https://github.com/getsentry/relay" +version = "24.12.1" +edition = "2021" +license-file = "../LICENSE" +publish = false + +[lints] +workspace = true + +[dependencies] +chrono = { workspace = true } +hex = { workspace = true } +once_cell = { workspace = true } +opentelemetry-proto = { workspace = true, features = [ + "gen-tonic", + "with-serde", + "logs", +] } +relay-event-schema = { workspace = true } +relay-protocol = { workspace = true } +serde_json = { workspace = true } + +[dev-dependencies] +insta = { workspace = true } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs new file mode 100644 index 00000000000..eb91e5f1104 --- /dev/null +++ b/relay-ourlogs/src/lib.rs @@ -0,0 +1,13 @@ +//! Structs and functions needed to ingest OpenTelemetry spans. + +#![warn(missing_docs)] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png", + html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" +)] + +pub use crate::ourlog::otel_to_sentry_log; + +pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; + +mod ourlog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs new file mode 100644 index 00000000000..43d917b3ac0 --- /dev/null +++ b/relay-ourlogs/src/ourlog.rs @@ -0,0 +1,235 @@ +use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; + +use crate::OtelLog; +use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; +use relay_protocol::{Annotated, Object, Value}; + +/// Transform an OtelLog to a Sentry log. +pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { + let OtelLog { + severity_number, + severity_text, + body, + attributes, + trace_id, + span_id, + .. + } = otel_log; + + let span_id = hex::encode(span_id); + let trace_id = hex::encode(trace_id); + + let body = body + .and_then(|v| v.value) + .and_then(|v| match v { + OtelValue::StringValue(s) => Some(s), + _ => None, + }) + .unwrap_or_else(String::new); + let mut attribute_data = Object::new(); + + for attribute in attributes.into_iter() { + if let Some(value) = attribute.value.and_then(|v| v.value) { + let key = attribute.key; + match value { + OtelValue::ArrayValue(_) => {} + OtelValue::BoolValue(v) => { + attribute_data.insert( + key, + Annotated::new(AttributeValue { + bool_value: Annotated::new(Value::Bool(v)), + ..Default::default() + }), + ); + } + OtelValue::BytesValue(v) => { + if let Ok(v) = String::from_utf8(v) { + attribute_data.insert( + key, + Annotated::new(AttributeValue { + string_value: Annotated::new(Value::String(v)), + ..Default::default() + }), + ); + } + } + OtelValue::DoubleValue(v) => { + attribute_data.insert( + key, + Annotated::new(AttributeValue { + double_value: Annotated::new(Value::F64(v)), + ..Default::default() + }), + ); + } + OtelValue::IntValue(v) => { + attribute_data.insert( + key, + Annotated::new(AttributeValue { + int_value: Annotated::new(Value::I64(v)), + ..Default::default() + }), + ); + } + OtelValue::KvlistValue(_) => {} + OtelValue::StringValue(v) => { + attribute_data.insert( + key, + Annotated::new(AttributeValue { + string_value: Annotated::new(Value::String(v)), + ..Default::default() + }), + ); + } + } + } + } + + OurLog { + timestamp_nanos: Annotated::new(otel_log.time_unix_nano), + observed_timestamp_nanos: Annotated::new(otel_log.observed_time_unix_nano), + trace_id: TraceId(trace_id).into(), + span_id: Annotated::new(SpanId(span_id)), + trace_flags: Annotated::new(0.0), + severity_text: severity_text.into(), + severity_number: Annotated::new(severity_number as i64), + attributes: attribute_data.into(), + body: Annotated::new(body), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use relay_protocol::get_path; + + #[test] + fn parse_log() { + // https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Example log record" + }, + "attributes": [ + { + "key": "string.attribute", + "value": { + "stringValue": "some string" + } + }, + { + "key": "boolean.attribute", + "value": { + "boolValue": true + } + }, + { + "key": "int.attribute", + "value": { + "intValue": "10" + } + }, + { + "key": "double.attribute", + "value": { + "doubleValue": 637.704 + } + }, + { + "key": "array.attribute", + "value": { + "arrayValue": { + "values": [ + { + "stringValue": "many" + }, + { + "stringValue": "values" + } + ] + } + } + }, + { + "key": "map.attribute", + "value": { + "kvlistValue": { + "values": [ + { + "key": "some.map.key", + "value": { + "stringValue": "some value" + } + } + ] + } + } + } + ] + }"#; + + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log: OurLog = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Example log record".into())) + ); + } + + #[test] + fn parse_log_with_db_attributes() { + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Database query executed" + }, + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "database" + } + }, + { + "key": "db.type", + "value": { + "stringValue": "sql" + } + }, + { + "key": "db.statement", + "value": { + "stringValue": "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" + } + } + ] + }"#; + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Database query executed".into())) + ); + assert_eq!( + get_path!(annotated_log.attributes["db.statement"].string_value), + Some(&Annotated::new( + "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into() + )) + ); + } +} diff --git a/relay-pii/src/selector.rs b/relay-pii/src/selector.rs index c0481107bf2..880a71044c4 100644 --- a/relay-pii/src/selector.rs +++ b/relay-pii/src/selector.rs @@ -135,6 +135,7 @@ impl SelectorPathItem { | ValueType::Message | ValueType::Thread | ValueType::Breadcrumb + | ValueType::OurLog | ValueType::Span | ValueType::Minidump | ValueType::HeapMemory diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 9a06ed1a729..888658a4736 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -192,6 +192,8 @@ impl CategoryUnit { | DataCategory::ProfileIndexed | DataCategory::TransactionProcessed | DataCategory::TransactionIndexed + | DataCategory::LogCount + | DataCategory::LogBytes | DataCategory::Span | DataCategory::SpanIndexed | DataCategory::MonitorSeat diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index daff9885c78..d70f197e2e4 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -13,16 +13,16 @@ publish = false [features] default = [] processing = [ - "dep:minidump", - "dep:symbolic-common", - "dep:symbolic-unreal", - "relay-cardinality/redis", - "relay-config/processing", - "relay-kafka/producer", - "relay-metrics/redis", - "relay-quotas/redis", - "relay-redis/impl", - "relay-sampling/redis", + "dep:minidump", + "dep:symbolic-common", + "dep:symbolic-unreal", + "relay-cardinality/redis", + "relay-config/processing", + "relay-kafka/producer", + "relay-metrics/redis", + "relay-quotas/redis", + "relay-redis/impl", + "relay-sampling/redis", ] [lints] @@ -75,6 +75,7 @@ relay-event-schema = { workspace = true } relay-filter = { workspace = true } relay-kafka = { workspace = true, optional = true } relay-log = { workspace = true, features = ["sentry"] } +relay-ourlogs = { workspace = true } relay-metrics = { workspace = true } relay-monitors = { workspace = true } relay-pii = { workspace = true } @@ -88,10 +89,10 @@ relay-spans = { workspace = true } relay-statsd = { workspace = true } relay-system = { workspace = true } reqwest = { workspace = true, features = [ - "gzip", - "hickory-dns", - "stream", - "native-tls-vendored", + "gzip", + "hickory-dns", + "stream", + "native-tls-vendored", ] } rmp-serde = { workspace = true } serde = { workspace = true } @@ -100,32 +101,32 @@ serde_json = { workspace = true } smallvec = { workspace = true, features = ["drain_filter"] } socket2 = { workspace = true } sqlx = { workspace = true, features = [ - "macros", - "migrate", - "sqlite", - "runtime-tokio", + "macros", + "migrate", + "sqlite", + "runtime-tokio", ], default-features = false } symbolic-common = { workspace = true, optional = true, default-features = false } symbolic-unreal = { workspace = true, optional = true, default-features = false, features = [ - "serde", + "serde", ] } sysinfo = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "time"] } tower = { workspace = true, default-features = false } tower-http = { workspace = true, default-features = false, features = [ - "catch-panic", - "compression-br", - "compression-deflate", - "compression-gzip", - "compression-zstd", - "cors", - "decompression-br", - "decompression-deflate", - "decompression-gzip", - "decompression-zstd", - "set-header", - "trace", + "catch-panic", + "compression-br", + "compression-deflate", + "compression-gzip", + "compression-zstd", + "cors", + "decompression-br", + "decompression-deflate", + "decompression-gzip", + "decompression-zstd", + "set-header", + "trace", ] } url = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["v5"] } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index e29444199c5..91f1e99d9d3 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -117,6 +117,10 @@ pub enum ItemType { ReplayVideo, /// Monitor check-in encoded as JSON. CheckIn, + /// A log from the [OTEL Log format](https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition) + OtelLog, + /// A log for the log product, not internal logs. + Log, /// A standalone span. Span, /// A standalone OpenTelemetry span serialized as JSON. @@ -174,6 +178,8 @@ impl ItemType { Self::ReplayRecording => "replay_recording", Self::ReplayVideo => "replay_video", Self::CheckIn => "check_in", + Self::Log => "log", + Self::OtelLog => "otel_log", Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", @@ -227,6 +233,8 @@ impl std::str::FromStr for ItemType { "replay_recording" => Self::ReplayRecording, "replay_video" => Self::ReplayVideo, "check_in" => Self::CheckIn, + "log" => Self::Log, + "otel_log" => Self::OtelLog, "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, @@ -696,6 +704,7 @@ impl Item { CountFor::Outcomes => smallvec![], }, ItemType::Statsd | ItemType::MetricBuckets => smallvec![], + ItemType::Log | ItemType::OtelLog => smallvec![], ItemType::FormData => smallvec![], ItemType::UserReport => smallvec![], ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], @@ -952,6 +961,8 @@ impl Item { | ItemType::Profile | ItemType::CheckIn | ItemType::Span + | ItemType::Log + | ItemType::OtelLog | ItemType::OtelSpan | ItemType::OtelTracesData | ItemType::ProfileChunk => false, @@ -986,6 +997,7 @@ impl Item { ItemType::Profile => true, ItemType::CheckIn => false, ItemType::Span => false, + ItemType::Log | ItemType::OtelLog => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, ItemType::ProfileChunk => false, diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index d11f1f9a97c..6b869fe8197 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -457,6 +457,9 @@ pub enum DiscardReason { /// (Relay) Profiling related discard reasons Profiling(&'static str), + /// (Relay) A log that is not valid after normalization. + InvalidLog, + /// (Relay) A span is not valid after normalization. InvalidSpan, @@ -506,6 +509,7 @@ impl DiscardReason { DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording", DiscardReason::InvalidReplayVideoEvent => "invalid_replay_video", DiscardReason::Profiling(reason) => reason, + DiscardReason::InvalidLog => "invalid_log", DiscardReason::InvalidSpan => "invalid_span", DiscardReason::FeatureDisabled(_) => "feature_disabled", } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3417f5ed08a..5cd56665008 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -86,6 +86,7 @@ mod attachment; mod dynamic_sampling; mod event; mod metrics; +mod ourlog; mod profile; mod profile_chunk; mod replay; @@ -193,6 +194,7 @@ processing_group!(StandaloneGroup, Standalone); processing_group!(ClientReportGroup, ClientReport); processing_group!(ReplayGroup, Replay); processing_group!(CheckInGroup, CheckIn); +processing_group!(LogGroup, Log); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { @@ -240,6 +242,8 @@ pub enum ProcessingGroup { Replay, /// Crons. CheckIn, + /// Logs. + Log, /// Spans. Span, /// Metrics. @@ -303,6 +307,7 @@ impl ProcessingGroup { &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData ) }); + if !span_items.is_empty() { grouped_envelopes.push(( ProcessingGroup::Span, @@ -310,6 +315,17 @@ impl ProcessingGroup { )) } + // Extract logs. + let logs_items = + envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog)); + + if !logs_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Log, + Envelope::from_parts(headers.clone(), logs_items), + )) + } + // Extract all metric items. // // Note: Should only be relevant in proxy mode. In other modes we send metrics through @@ -411,6 +427,7 @@ impl ProcessingGroup { ProcessingGroup::ClientReport => "client_report", ProcessingGroup::Replay => "replay", ProcessingGroup::CheckIn => "check_in", + ProcessingGroup::Log => "log", ProcessingGroup::Span => "span", ProcessingGroup::Metrics => "metrics", ProcessingGroup::ProfileChunk => "profile_chunk", @@ -430,6 +447,7 @@ impl From for AppFeature { ProcessingGroup::ClientReport => AppFeature::ClientReports, ProcessingGroup::Replay => AppFeature::Replays, ProcessingGroup::CheckIn => AppFeature::CheckIns, + ProcessingGroup::Log => AppFeature::Logs, ProcessingGroup::Span => AppFeature::Spans, ProcessingGroup::Metrics => AppFeature::UnattributedMetrics, ProcessingGroup::ProfileChunk => AppFeature::Profiles, @@ -2000,6 +2018,25 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } + /// Process logs + /// + fn process_logs( + &self, + managed_envelope: &mut TypedEnvelope, + project_info: Arc, + ) -> Result, ProcessingError> { + ourlog::filter(managed_envelope); + if_processing!(self.inner.config, { + ourlog::process( + managed_envelope, + project_info, + &self.inner.global_config.current(), + &self.inner.config, + ); + }); + Ok(None) + } + /// Processes standalone spans. /// /// This function does *not* run for spans extracted from transactions. @@ -2155,6 +2192,7 @@ impl EnvelopeProcessorService { ProcessingGroup::CheckIn => { run!(process_checkins, project_id, project_info, rate_limits) } + ProcessingGroup::Log => run!(process_logs, project_info), ProcessingGroup::Span => run!( process_standalone_spans, self.inner.config.clone(), diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index d6032e3e98c..0769e5c8b8f 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -468,6 +468,8 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::ReplayRecording => false, ItemType::ReplayVideo => false, ItemType::CheckIn => false, + ItemType::Log => false, + ItemType::OtelLog => false, ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs new file mode 100644 index 00000000000..cd7eda81947 --- /dev/null +++ b/relay-server/src/services/processor/ourlog.rs @@ -0,0 +1,73 @@ +//! Log processing code. + +use std::sync::Arc; + +use relay_config::Config; +use relay_dynamic_config::GlobalConfig; +use relay_event_schema::protocol::OurLog; + +use crate::envelope::ItemType; +use crate::services::projects::project::ProjectInfo; +use crate::utils::{ItemAction, TypedEnvelope}; + +use relay_ourlogs::OtelLog; +use relay_protocol::Annotated; + +#[cfg(feature = "processing")] +use { + crate::envelope::ContentType, + crate::services::outcome::{DiscardReason, Outcome}, + crate::services::processor::LogGroup, +}; + +/// Removes logs from the envelope if the feature is not enabled. +pub fn filter(managed_envelope: &mut TypedEnvelope) { + // All log types are currently kept + managed_envelope.retain_items(|_| ItemAction::Keep); +} + +/// Processes logs. +#[cfg(feature = "processing")] +pub fn process( + managed_envelope: &mut TypedEnvelope, + _project_info: Arc, + _global_config: &GlobalConfig, + _config: &Config, +) { + use crate::envelope::Item; + + managed_envelope.retain_items(|item| { + let annotated_log = match item.ty() { + ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { + Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), + Err(err) => { + relay_log::debug!("failed to parse OTel Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + ItemType::Log => match Annotated::::from_json_bytes(&item.payload()) { + Ok(our_log) => our_log, + Err(err) => { + relay_log::debug!("failed to parse Sentry Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + + _ => return ItemAction::Keep, + }; + + let mut new_item = Item::new(ItemType::Log); + let payload = match annotated_log.to_json() { + Ok(payload) => payload, + Err(err) => { + relay_log::debug!("failed to serialize log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + }; + new_item.set_payload(ContentType::Json, payload); + + *item = new_item; + + ItemAction::Keep + }); +} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 68bdee36106..266d76e0732 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -290,6 +290,7 @@ impl StoreService { ItemType::Span => { self.produce_span(scoping, received_at, event_id, retention, item)? } + ItemType::Log => self.produce_log(scoping, received_at, retention, item)?, ItemType::ProfileChunk => self.produce_profile_chunk( scoping.organization_id, scoping.project_id, @@ -940,6 +941,83 @@ impl StoreService { scoping, timestamp: received_at, }); + Ok(()) + } + + fn produce_log( + &self, + scoping: Scoping, + received_at: DateTime, + retention_days: u16, + item: &Item, + ) -> Result<(), StoreError> { + relay_log::trace!("Producing log"); + let payload = item.payload(); + let d = &mut Deserializer::from_slice(&payload); + + let mut log: LogKafkaMessage = match serde_path_to_error::deserialize(d) { + Ok(log) => log, + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to parse log" + ); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogCount, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidLog), + quantity: 1, + remote_addr: None, + scoping, + timestamp: received_at, + }); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogBytes, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidLog), + quantity: payload.len() as u32, + remote_addr: None, + scoping, + timestamp: received_at, + }); + return Ok(()); + } + }; + + log.organization_id = scoping.organization_id.value(); + log.project_id = scoping.project_id.value(); + log.retention_days = retention_days; + + self.produce( + KafkaTopic::Logs, + KafkaMessage::Log { + headers: BTreeMap::from([( + "project_id".to_string(), + scoping.project_id.to_string(), + )]), + message: log, + }, + )?; + + // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogCount, + event_id: None, + outcome: Outcome::Accepted, + quantity: 1, + remote_addr: None, + scoping, + timestamp: received_at, + }); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogBytes, + event_id: None, + outcome: Outcome::Accepted, + quantity: payload.len() as u32, + remote_addr: None, + scoping, + timestamp: received_at, + }); Ok(()) } @@ -1306,6 +1384,33 @@ struct SpanKafkaMessage<'a> { platform: Cow<'a, str>, // We only use this for logging for now } +#[derive(Debug, Deserialize, Serialize)] +struct LogKafkaMessage<'a> { + #[serde(default)] + organization_id: u64, + #[serde(default)] + project_id: u64, + /// Number of days until these data should be deleted. + #[serde(default)] + retention_days: u16, + #[serde(default)] + timestamp_nanos: u64, + #[serde(default)] + observed_timestamp_nanos: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + severity_number: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + severity_text: Option<&'a str>, + body: &'a RawValue, + #[serde(default, skip_serializing_if = "none_or_empty_object")] + attributes: Option<&'a RawValue>, + #[serde(default, skip_serializing_if = "Option::is_none")] + span_id: Option<&'a str>, + trace_id: EventId, + #[serde(default, skip_serializing_if = "Option::is_none")] + flags: Option, +} + fn none_or_empty_object(value: &Option<&RawValue>) -> bool { match value { None => true, @@ -1347,6 +1452,12 @@ enum KafkaMessage<'a> { #[serde(flatten)] message: SpanKafkaMessage<'a>, }, + Log { + #[serde(skip)] + headers: BTreeMap, + #[serde(flatten)] + message: LogKafkaMessage<'a>, + }, ProfileChunk(ProfileChunkKafkaMessage), } @@ -1369,6 +1480,7 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(_) => "replay_event", KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", + KafkaMessage::Log { .. } => "log", KafkaMessage::Span { .. } => "span", KafkaMessage::ProfileChunk(_) => "profile_chunk", } @@ -1392,6 +1504,7 @@ impl Message for KafkaMessage<'_> { // Random partitioning Self::Profile(_) | Self::Span { .. } + | Self::Log { .. } | Self::ReplayRecordingNotChunked(_) | Self::ProfileChunk(_) => Uuid::nil(), @@ -1420,6 +1533,12 @@ impl Message for KafkaMessage<'_> { } None } + KafkaMessage::Log { headers, .. } => { + if !headers.is_empty() { + return Some(headers); + } + None + } KafkaMessage::Span { headers, .. } => { if !headers.is_empty() { return Some(headers); @@ -1439,6 +1558,9 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(message) => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), + KafkaMessage::Log { message, .. } => serde_json::to_vec(message) + .map(Cow::Owned) + .map_err(ClientError::InvalidJson), KafkaMessage::Span { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index ad9c6234643..c8899074d52 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -414,6 +414,21 @@ impl ManagedEnvelope { ); } + if self.context.summary.log_count_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogCount, + self.context.summary.log_count_quantity, + ); + } + if self.context.summary.log_bytes_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogBytes, + self.context.summary.log_bytes_quantity, + ); + } + // Track outcomes for attached secondary transactions, e.g. extracted from metrics. // // Primary transaction count is already tracked through the event category diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index c2d1ac98547..8bddc75d11e 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -128,6 +128,8 @@ fn infer_event_category(item: &Item) -> Option { ItemType::ReplayVideo => None, ItemType::ClientReport => None, ItemType::CheckIn => None, + ItemType::Log => None, + ItemType::OtelLog => None, ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, @@ -164,6 +166,12 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub monitor_quantity: usize, + /// The number of log for the log product sent. + pub log_count_quantity: usize, + + /// The number of log bytes for the log product sent, in bytes + pub log_bytes_quantity: usize, + /// Secondary number of transactions. /// /// This is 0 for envelopes which contain a transaction, @@ -239,6 +247,8 @@ impl EnvelopeSummary { DataCategory::ReplayVideo => &mut self.replay_quantity, DataCategory::Monitor => &mut self.monitor_quantity, DataCategory::Span => &mut self.span_quantity, + DataCategory::LogCount => &mut self.log_count_quantity, + DataCategory::LogBytes => &mut self.log_bytes_quantity, DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, // TODO: This catch-all return looks dangerous _ => return, @@ -344,6 +354,8 @@ pub struct Enforcement { pub replays: CategoryLimit, /// The combined check-in item rate limit. pub check_ins: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub logs: CategoryLimit, /// The combined spans rate limit. pub spans: CategoryLimit, /// The rate limit for the indexed span category. @@ -385,6 +397,7 @@ impl Enforcement { profiles_indexed, replays, check_ins, + logs, spans, spans_indexed, user_reports_v2, @@ -400,6 +413,7 @@ impl Enforcement { profiles_indexed, replays, check_ins, + logs, spans, spans_indexed, user_reports_v2, @@ -488,6 +502,8 @@ impl Enforcement { ItemType::ReplayVideo => !self.replays.is_active(), ItemType::ReplayRecording => !self.replays.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), + ItemType::OtelLog => !self.logs.is_active(), + ItemType::Log => !self.logs.is_active(), ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { !self.spans_indexed.is_active() } @@ -700,6 +716,28 @@ where rate_limits.merge(session_limits); } + // Handle logs. + if summary.log_count_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogCount); + let log_limits = self.check.apply(item_scoping, summary.log_count_quantity)?; + enforcement.logs = CategoryLimit::new( + DataCategory::LogCount, + summary.log_count_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + if summary.log_bytes_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogBytes); + let log_limits = self.check.apply(item_scoping, summary.log_bytes_quantity)?; + enforcement.logs = CategoryLimit::new( + DataCategory::LogBytes, + summary.log_bytes_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + // Handle profiles. if enforcement.is_event_active() { enforcement.profiles = enforcement @@ -1612,4 +1650,35 @@ mod tests { assert_eq!(summary.profile_quantity, 2); assert_eq!(summary.secondary_transaction_quantity, 7); } + + #[test] + fn test_enforce_limit_logs_count() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogCount); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogCount, 2); + + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogCount, 2)]); + } + + #[test] + fn test_enforce_limit_logs_bytes() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogBytes); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogBytes, 20); + + assert_eq!( + get_outcomes(enforcement), + vec![(DataCategory::LogBytes, 20)] + ); + } } diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 6c03420c2df..96ec13e3de3 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -15,6 +15,7 @@ use crate::utils::{ItemAction, ManagedEnvelope}; /// - `max_attachments_size` /// - `max_check_in_size` /// - `max_event_size` +/// - `max_log_size` /// - `max_metric_buckets_size` /// - `max_profile_size` /// - `max_replay_compressed_size` @@ -61,6 +62,8 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> Resul ItemType::UserReport => NO_LIMIT, ItemType::Statsd => config.max_statsd_size(), ItemType::MetricBuckets => config.max_metric_buckets_size(), + ItemType::Log => config.max_log_size(), + ItemType::OtelLog => config.max_log_size(), ItemType::Span | ItemType::OtelSpan => config.max_span_size(), ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), From 97b75908a9ff14f17bba58b167d815ac8fd47a52 Mon Sep 17 00:00:00 2001 From: Kev Date: Wed, 15 Jan 2025 14:01:57 -0500 Subject: [PATCH 02/27] Update consts file --- py/sentry_relay/consts.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index ceac094305b..e8ead3b1df4 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,7 +8,6 @@ class DataCategory(IntEnum): - # begin generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 @@ -32,6 +31,8 @@ class DataCategory(IntEnum): REPLAY_VIDEO = 20 UPTIME = 21 ATTACHMENT_ITEM = 22 + LOG_COUNT = 23 + LOG_BYTES = 24 UNKNOWN = -1 # end generated From 6dc83eff9a95392cd209e674ab06dd6911d0d073 Mon Sep 17 00:00:00 2001 From: Kev Date: Wed, 15 Jan 2025 14:04:06 -0500 Subject: [PATCH 03/27] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 166447a7787..9b6441c62b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Internal** - Updates performance score calculation on spans and events to also store cdf values as measurements. ([#4438](https://github.com/getsentry/relay/pull/4438)) +- Allow log ingestion behind a flag, only for internal use currently. ([#4448](https://github.com/getsentry/relay/pull/4448)) ## 24.12.2 From 3f6f6f90dce72df008440e2b1f6a708492f83135 Mon Sep 17 00:00:00 2001 From: Kev Date: Wed, 15 Jan 2025 14:16:57 -0500 Subject: [PATCH 04/27] Re-add flag to processing and filter --- relay-server/src/services/processor.rs | 13 ++++---- relay-server/src/services/processor/ourlog.rs | 33 +++++++++++++------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 5cd56665008..f60cc13ac03 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2025,14 +2025,13 @@ impl EnvelopeProcessorService { managed_envelope: &mut TypedEnvelope, project_info: Arc, ) -> Result, ProcessingError> { - ourlog::filter(managed_envelope); + ourlog::filter( + managed_envelope, + self.inner.config.clone(), + project_info.clone(), + ); if_processing!(self.inner.config, { - ourlog::process( - managed_envelope, - project_info, - &self.inner.global_config.current(), - &self.inner.config, - ); + ourlog::process(managed_envelope, &self.inner.config, project_info.clone()); }); Ok(None) } diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index cd7eda81947..a228bf814b1 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -1,12 +1,13 @@ //! Log processing code. - use std::sync::Arc; +use crate::envelope::Item; use relay_config::Config; -use relay_dynamic_config::GlobalConfig; +use relay_dynamic_config::Feature; use relay_event_schema::protocol::OurLog; use crate::envelope::ItemType; +use crate::services::processor::should_filter; use crate::services::projects::project::ProjectInfo; use crate::utils::{ItemAction, TypedEnvelope}; @@ -21,22 +22,34 @@ use { }; /// Removes logs from the envelope if the feature is not enabled. -pub fn filter(managed_envelope: &mut TypedEnvelope) { - // All log types are currently kept - managed_envelope.retain_items(|_| ItemAction::Keep); +pub fn filter( + managed_envelope: &mut TypedEnvelope, + config: Arc, + project_info: Arc, +) { + let logging_disabled = should_filter(&config, &project_info, Feature::OurLogsIngestion); + managed_envelope.retain_items(|_| { + if logging_disabled { + ItemAction::DropSilently + } else { + ItemAction::Keep + } + }); } /// Processes logs. #[cfg(feature = "processing")] pub fn process( managed_envelope: &mut TypedEnvelope, - _project_info: Arc, - _global_config: &GlobalConfig, - _config: &Config, + config: &Config, + project_info: Arc, ) { - use crate::envelope::Item; - + let logging_disabled = should_filter(config, &project_info, Feature::OurLogsIngestion); managed_envelope.retain_items(|item| { + if logging_disabled { + return ItemAction::DropSilently; + } + let annotated_log = match item.ty() { ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), From 817a9fd88d1bb952165e5162b15f44436bdaa8ad Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 12:17:22 -0500 Subject: [PATCH 05/27] Update data category names --- py/sentry_relay/consts.py | 4 +- relay-base-schema/src/data_category.rs | 12 +++--- relay-cabi/include/relay.h | 4 +- relay-quotas/src/quota.rs | 4 +- relay-server/src/services/store.rs | 8 ++-- relay-server/src/utils/managed_envelope.rs | 12 +++--- relay-server/src/utils/rate_limits.rs | 43 ++++++++++------------ 7 files changed, 42 insertions(+), 45 deletions(-) diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index e8ead3b1df4..a2fc4617b03 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -31,8 +31,8 @@ class DataCategory(IntEnum): REPLAY_VIDEO = 20 UPTIME = 21 ATTACHMENT_ITEM = 22 - LOG_COUNT = 23 - LOG_BYTES = 24 + LOG_ITEM = 23 + LOG_BYTE = 24 UNKNOWN = -1 # end generated diff --git a/relay-base-schema/src/data_category.rs b/relay-base-schema/src/data_category.rs index 3704ef57a59..1bfad11e739 100644 --- a/relay-base-schema/src/data_category.rs +++ b/relay-base-schema/src/data_category.rs @@ -96,11 +96,11 @@ pub enum DataCategory { /// /// This is the category for logs for which we store the count log events for users for measuring /// missing breadcrumbs, and count of logs for rate limiting purposes. - LogCount = 23, + LogItem = 23, /// LogBytes /// /// This is the category for logs for which we store log event total bytes for users. - LogBytes = 24, + LogByte = 24, // // IMPORTANT: After adding a new entry to DataCategory, go to the `relay-cabi` subfolder and run // `make header` to regenerate the C-binding. This allows using the data category from Python. @@ -129,8 +129,8 @@ impl DataCategory { "transaction_indexed" => Self::TransactionIndexed, "monitor" => Self::Monitor, "span" => Self::Span, - "log_count" => Self::LogCount, - "log_bytes" => Self::LogBytes, + "log_item" => Self::LogItem, + "log_byte" => Self::LogByte, "monitor_seat" => Self::MonitorSeat, "feedback" => Self::UserReportV2, "user_report_v2" => Self::UserReportV2, @@ -163,8 +163,8 @@ impl DataCategory { Self::TransactionIndexed => "transaction_indexed", Self::Monitor => "monitor", Self::Span => "span", - Self::LogCount => "log_count", - Self::LogBytes => "log_bytes", + Self::LogItem => "log_item", + Self::LogByte => "log_byte", Self::MonitorSeat => "monitor_seat", Self::UserReportV2 => "feedback", Self::MetricBucket => "metric_bucket", diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index 38bc93d83af..db50243b8f0 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -148,13 +148,13 @@ enum RelayDataCategory { * This is the category for logs for which we store the count log events for users for measuring * missing breadcrumbs, and count of logs for rate limiting purposes. */ - RELAY_DATA_CATEGORY_LOG_COUNT = 23, + RELAY_DATA_CATEGORY_LOG_ITEM = 23, /** * LogBytes * * This is the category for logs for which we store log event total bytes for users. */ - RELAY_DATA_CATEGORY_LOG_BYTES = 24, + RELAY_DATA_CATEGORY_LOG_BYTE = 24, /** * Any other data category not known by this Relay. */ diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 888658a4736..c8508bf8d9d 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -192,8 +192,8 @@ impl CategoryUnit { | DataCategory::ProfileIndexed | DataCategory::TransactionProcessed | DataCategory::TransactionIndexed - | DataCategory::LogCount - | DataCategory::LogBytes + | DataCategory::LogItem + | DataCategory::LogByte | DataCategory::Span | DataCategory::SpanIndexed | DataCategory::MonitorSeat diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 266d76e0732..c98ad7fc7d1 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -963,7 +963,7 @@ impl StoreService { "failed to parse log" ); self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogCount, + category: DataCategory::LogItem, event_id: None, outcome: Outcome::Invalid(DiscardReason::InvalidLog), quantity: 1, @@ -972,7 +972,7 @@ impl StoreService { timestamp: received_at, }); self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogBytes, + category: DataCategory::LogByte, event_id: None, outcome: Outcome::Invalid(DiscardReason::InvalidLog), quantity: payload.len() as u32, @@ -1001,7 +1001,7 @@ impl StoreService { // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogCount, + category: DataCategory::LogItem, event_id: None, outcome: Outcome::Accepted, quantity: 1, @@ -1010,7 +1010,7 @@ impl StoreService { timestamp: received_at, }); self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogBytes, + category: DataCategory::LogByte, event_id: None, outcome: Outcome::Accepted, quantity: payload.len() as u32, diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index c8899074d52..0ca927b3d87 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -414,18 +414,18 @@ impl ManagedEnvelope { ); } - if self.context.summary.log_count_quantity > 0 { + if self.context.summary.log_item_quantity > 0 { self.track_outcome( outcome.clone(), - DataCategory::LogCount, - self.context.summary.log_count_quantity, + DataCategory::LogItem, + self.context.summary.log_item_quantity, ); } - if self.context.summary.log_bytes_quantity > 0 { + if self.context.summary.log_byte_quantity > 0 { self.track_outcome( outcome.clone(), - DataCategory::LogBytes, - self.context.summary.log_bytes_quantity, + DataCategory::LogByte, + self.context.summary.log_byte_quantity, ); } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 8bddc75d11e..5fa9036e41f 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -167,10 +167,10 @@ pub struct EnvelopeSummary { pub monitor_quantity: usize, /// The number of log for the log product sent. - pub log_count_quantity: usize, + pub log_item_quantity: usize, /// The number of log bytes for the log product sent, in bytes - pub log_bytes_quantity: usize, + pub log_byte_quantity: usize, /// Secondary number of transactions. /// @@ -247,8 +247,8 @@ impl EnvelopeSummary { DataCategory::ReplayVideo => &mut self.replay_quantity, DataCategory::Monitor => &mut self.monitor_quantity, DataCategory::Span => &mut self.span_quantity, - DataCategory::LogCount => &mut self.log_count_quantity, - DataCategory::LogBytes => &mut self.log_bytes_quantity, + DataCategory::LogItem => &mut self.log_item_quantity, + DataCategory::LogByte => &mut self.log_byte_quantity, DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, // TODO: This catch-all return looks dangerous _ => return, @@ -717,22 +717,22 @@ where } // Handle logs. - if summary.log_count_quantity > 0 { - let item_scoping = scoping.item(DataCategory::LogCount); - let log_limits = self.check.apply(item_scoping, summary.log_count_quantity)?; + if summary.log_item_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogItem); + let log_limits = self.check.apply(item_scoping, summary.log_item_quantity)?; enforcement.logs = CategoryLimit::new( - DataCategory::LogCount, - summary.log_count_quantity, + DataCategory::LogItem, + summary.log_item_quantity, log_limits.longest(), ); rate_limits.merge(log_limits); } - if summary.log_bytes_quantity > 0 { - let item_scoping = scoping.item(DataCategory::LogBytes); - let log_limits = self.check.apply(item_scoping, summary.log_bytes_quantity)?; + if summary.log_byte_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogByte); + let log_limits = self.check.apply(item_scoping, summary.log_byte_quantity)?; enforcement.logs = CategoryLimit::new( - DataCategory::LogBytes, - summary.log_bytes_quantity, + DataCategory::LogByte, + summary.log_byte_quantity, log_limits.longest(), ); rate_limits.merge(log_limits); @@ -1655,30 +1655,27 @@ mod tests { fn test_enforce_limit_logs_count() { let mut envelope = envelope![Log, Log]; - let mut mock = MockLimiter::default().deny(DataCategory::LogCount); + let mut mock = MockLimiter::default().deny(DataCategory::LogItem); let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); assert!(limits.is_limited()); assert_eq!(envelope.envelope().len(), 0); - mock.assert_call(DataCategory::LogCount, 2); + mock.assert_call(DataCategory::LogItem, 2); - assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogCount, 2)]); + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]); } #[test] fn test_enforce_limit_logs_bytes() { let mut envelope = envelope![Log, Log]; - let mut mock = MockLimiter::default().deny(DataCategory::LogBytes); + let mut mock = MockLimiter::default().deny(DataCategory::LogByte); let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); assert!(limits.is_limited()); assert_eq!(envelope.envelope().len(), 0); - mock.assert_call(DataCategory::LogBytes, 20); + mock.assert_call(DataCategory::LogByte, 20); - assert_eq!( - get_outcomes(enforcement), - vec![(DataCategory::LogBytes, 20)] - ); + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]); } } From 87f4882acd2980749e560dd797ce75c4456f55ce Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 12:06:14 -0500 Subject: [PATCH 06/27] feat(ourlogs): Add data categories for log ingestion This is pulling out data categories from #4448 as their own PR. --- py/sentry_relay/consts.py | 2 ++ relay-base-schema/src/data_category.rs | 13 +++++++++++++ relay-cabi/include/relay.h | 17 +++++++++++++++-- relay-quotas/src/quota.rs | 2 ++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index ceac094305b..825f7e0a119 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -32,6 +32,8 @@ class DataCategory(IntEnum): REPLAY_VIDEO = 20 UPTIME = 21 ATTACHMENT_ITEM = 22 + LOG_ITEM = 23 + LOG_BYTE = 24 UNKNOWN = -1 # end generated diff --git a/relay-base-schema/src/data_category.rs b/relay-base-schema/src/data_category.rs index 37720d5cd9a..6ab7da8a18b 100644 --- a/relay-base-schema/src/data_category.rs +++ b/relay-base-schema/src/data_category.rs @@ -92,6 +92,15 @@ pub enum DataCategory { Uptime = 21, /// Counts the number of individual attachments, as opposed to the number of bytes in an attachment. AttachmentItem = 22, + /// LogItem + /// + /// This is the category for logs for which we store the count log events for users for measuring + /// missing breadcrumbs, and count of logs for rate limiting purposes. + LogItem = 23, + /// LogByte + /// + /// This is the category for logs for which we store log event total bytes for users. + LogByte = 24, // // IMPORTANT: After adding a new entry to DataCategory, go to the `relay-cabi` subfolder and run // `make header` to regenerate the C-binding. This allows using the data category from Python. @@ -120,6 +129,8 @@ impl DataCategory { "transaction_indexed" => Self::TransactionIndexed, "monitor" => Self::Monitor, "span" => Self::Span, + "log_item" => Self::LogItem, + "log_byte" => Self::LogByte, "monitor_seat" => Self::MonitorSeat, "feedback" => Self::UserReportV2, "user_report_v2" => Self::UserReportV2, @@ -152,6 +163,8 @@ impl DataCategory { Self::TransactionIndexed => "transaction_indexed", Self::Monitor => "monitor", Self::Span => "span", + Self::LogItem => "log_item", + Self::LogByte => "log_byte", Self::MonitorSeat => "monitor_seat", Self::UserReportV2 => "feedback", Self::MetricBucket => "metric_bucket", diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index bdeee11e2fd..db50243b8f0 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -3,7 +3,7 @@ #ifndef RELAY_H_INCLUDED #define RELAY_H_INCLUDED -/* Generated with cbindgen:0.26.0 */ +/* Generated with cbindgen:0.27.0 */ /* Warning, this file is autogenerated. Do not modify this manually. */ @@ -142,6 +142,19 @@ enum RelayDataCategory { * Counts the number of individual attachments, as opposed to the number of bytes in an attachment. */ RELAY_DATA_CATEGORY_ATTACHMENT_ITEM = 22, + /** + * LogCount + * + * This is the category for logs for which we store the count log events for users for measuring + * missing breadcrumbs, and count of logs for rate limiting purposes. + */ + RELAY_DATA_CATEGORY_LOG_ITEM = 23, + /** + * LogBytes + * + * This is the category for logs for which we store log event total bytes for users. + */ + RELAY_DATA_CATEGORY_LOG_BYTE = 24, /** * Any other data category not known by this Relay. */ @@ -679,4 +692,4 @@ struct RelayStr normalize_cardinality_limit_config(const struct RelayStr *value) */ struct RelayStr relay_normalize_global_config(const struct RelayStr *value); -#endif /* RELAY_H_INCLUDED */ +#endif /* RELAY_H_INCLUDED */ diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 9a06ed1a729..c8508bf8d9d 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -192,6 +192,8 @@ impl CategoryUnit { | DataCategory::ProfileIndexed | DataCategory::TransactionProcessed | DataCategory::TransactionIndexed + | DataCategory::LogItem + | DataCategory::LogByte | DataCategory::Span | DataCategory::SpanIndexed | DataCategory::MonitorSeat From 20e75e453ea9ce4482b6540d1b71d531486f3521 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 12:32:17 -0500 Subject: [PATCH 07/27] Add changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d94a898375..17637f564b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## Unreleased +- Add data categories for LogItem and LogByte. ([#4448](https://github.com/getsentry/relay/pull/4455)) + ## 25.1.0 **Internal** From 7cbe5c92fa6ab7d1c9ceaaf715fcd2cc1edc3e54 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:02:53 -0500 Subject: [PATCH 08/27] Use enum --- relay-event-schema/src/processor/traits.rs | 1 + relay-event-schema/src/protocol/ourlog.rs | 164 +++++++++++++++++---- relay-ourlogs/src/ourlog.rs | 50 ++----- 3 files changed, 148 insertions(+), 67 deletions(-) diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index 99f1fa55a7b..9c5491872d4 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -113,6 +113,7 @@ pub trait Processor: Sized { process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); process_method!(process_contexts, crate::protocol::Contexts); + process_method!(process_attribute_value, crate::protocol::AttributeValue); fn process_other( &mut self, diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 9194b26b03e..265c401922f 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -1,4 +1,6 @@ -use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value}; +use relay_protocol::{ + Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, +}; use crate::processor::ProcessValue; use crate::protocol::{SpanId, TraceId}; @@ -47,12 +49,135 @@ pub struct OurLog { pub other: Object, } -#[derive(Debug, Clone, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] -pub struct AttributeValue { - pub string_value: Annotated, - pub int_value: Annotated, - pub double_value: Annotated, - pub bool_value: Annotated, +#[derive(Debug, Clone, PartialEq, ProcessValue)] +pub enum AttributeValue { + #[metastructure(field = "string_value", pii = "true")] + StringValue(String), + #[metastructure(field = "int_value", pii = "true")] + IntValue(i64), + #[metastructure(field = "double_value", pii = "true")] + DoubleValue(f64), + #[metastructure(field = "bool_value", pii = "true")] + BoolValue(bool), + /// Any other unknown attribute value. + /// + /// This exists to ensure other attribute values such as array and object can be added in the future. + Unknown(String), +} + +impl IntoValue for AttributeValue { + fn into_value(self) -> Value { + let mut map = Object::new(); + match self { + AttributeValue::StringValue(v) => { + map.insert("string_value".to_string(), Annotated::new(Value::String(v))); + } + AttributeValue::IntValue(v) => { + map.insert("int_value".to_string(), Annotated::new(Value::I64(v))); + } + AttributeValue::DoubleValue(v) => { + map.insert("double_value".to_string(), Annotated::new(Value::F64(v))); + } + AttributeValue::BoolValue(v) => { + map.insert("bool_value".to_string(), Annotated::new(Value::Bool(v))); + } + AttributeValue::Unknown(v) => { + map.insert("unknown".to_string(), Annotated::new(Value::String(v))); + } + } + Value::Object(map) + } + + fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result + where + Self: Sized, + S: serde::Serializer, + { + match self { + AttributeValue::StringValue(v) => s.serialize_str(v), + AttributeValue::IntValue(v) => s.serialize_i64(*v), + AttributeValue::DoubleValue(v) => s.serialize_f64(*v), + AttributeValue::BoolValue(v) => s.serialize_bool(*v), + AttributeValue::Unknown(v) => s.serialize_str(v), + } + } +} + +impl AttributeValue { + /// Returns the string representation of this attribute value. + pub fn as_str(&self) -> String { + match self { + AttributeValue::StringValue(s) => s.clone(), + AttributeValue::IntValue(s) => s.to_string(), + AttributeValue::DoubleValue(s) => s.to_string(), + AttributeValue::BoolValue(s) => s.to_string(), + AttributeValue::Unknown(s) => s.clone(), + } + } + + /// Returns the string value if this is a StringValue variant + pub fn string_value(&self) -> Option<&String> { + match self { + AttributeValue::StringValue(s) => Some(s), + _ => None, + } + } + + /// Returns the int value if this is an IntValue variant + pub fn int_value(&self) -> Option { + match self { + AttributeValue::IntValue(i) => Some(*i), + _ => None, + } + } + + /// Returns the double value if this is a DoubleValue variant + pub fn double_value(&self) -> Option { + match self { + AttributeValue::DoubleValue(d) => Some(*d), + _ => None, + } + } + + /// Returns the bool value if this is a BoolValue variant + pub fn bool_value(&self) -> Option { + match self { + AttributeValue::BoolValue(b) => Some(*b), + _ => None, + } + } +} + +impl Empty for AttributeValue { + #[inline] + fn is_empty(&self) -> bool { + matches!(self, Self::Unknown(_)) + } +} + +impl FromValue for AttributeValue { + fn from_value(value: Annotated) -> Annotated { + match value { + Annotated(Some(Value::String(value)), meta) => { + Annotated(Some(AttributeValue::StringValue(value)), meta) + } + Annotated(Some(Value::I64(value)), meta) => { + Annotated(Some(AttributeValue::IntValue(value)), meta) + } + Annotated(Some(Value::F64(value)), meta) => { + Annotated(Some(AttributeValue::DoubleValue(value)), meta) + } + Annotated(Some(Value::Bool(value)), meta) => { + Annotated(Some(AttributeValue::BoolValue(value)), meta) + } + Annotated(Some(value), mut meta) => { + meta.add_error(Error::expected("a primitive value")); + meta.set_original_value(Some(value)); + Annotated(None, meta) + } + Annotated(None, meta) => Annotated(None, meta), + } + } } #[cfg(test)] @@ -88,31 +213,19 @@ mod tests { let mut attributes = Object::new(); attributes.insert( "string.attribute".into(), - Annotated::new(AttributeValue { - string_value: Annotated::new(Value::String("some string".into())), - ..Default::default() - }), + Annotated::new(AttributeValue::StringValue("some string".into())), ); attributes.insert( "boolean.attribute".into(), - Annotated::new(AttributeValue { - bool_value: Annotated::new(Value::Bool(true)), - ..Default::default() - }), + Annotated::new(AttributeValue::BoolValue(true)), ); attributes.insert( "int.attribute".into(), - Annotated::new(AttributeValue { - int_value: Annotated::new(Value::I64(10)), - ..Default::default() - }), + Annotated::new(AttributeValue::IntValue(10)), ); attributes.insert( "double.attribute".into(), - Annotated::new(AttributeValue { - double_value: Annotated::new(Value::F64(637.704)), - ..Default::default() - }), + Annotated::new(AttributeValue::DoubleValue(637.704)), ); let log = Annotated::new(OurLog { @@ -127,10 +240,7 @@ mod tests { ..Default::default() }); - let expected: serde_json::Value = serde_json::from_str(json).unwrap(); - let actual: serde_json::Value = - serde_json::from_str(&log.to_json_pretty().unwrap()).unwrap(); - assert_eq!(expected, actual); + assert_eq!(json, log.to_json_pretty().unwrap()); let log_from_string = Annotated::::from_json(json).unwrap(); assert_eq!(log, log_from_string); diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 43d917b3ac0..4f0f41b62ce 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -2,7 +2,7 @@ use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; use crate::OtelLog; use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; -use relay_protocol::{Annotated, Object, Value}; +use relay_protocol::{Annotated, Object}; /// Transform an OtelLog to a Sentry log. pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { @@ -34,52 +34,22 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { match value { OtelValue::ArrayValue(_) => {} OtelValue::BoolValue(v) => { - attribute_data.insert( - key, - Annotated::new(AttributeValue { - bool_value: Annotated::new(Value::Bool(v)), - ..Default::default() - }), - ); + attribute_data.insert(key, Annotated::new(AttributeValue::BoolValue(v))); } OtelValue::BytesValue(v) => { if let Ok(v) = String::from_utf8(v) { - attribute_data.insert( - key, - Annotated::new(AttributeValue { - string_value: Annotated::new(Value::String(v)), - ..Default::default() - }), - ); + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); } } OtelValue::DoubleValue(v) => { - attribute_data.insert( - key, - Annotated::new(AttributeValue { - double_value: Annotated::new(Value::F64(v)), - ..Default::default() - }), - ); + attribute_data.insert(key, Annotated::new(AttributeValue::DoubleValue(v))); } OtelValue::IntValue(v) => { - attribute_data.insert( - key, - Annotated::new(AttributeValue { - int_value: Annotated::new(Value::I64(v)), - ..Default::default() - }), - ); + attribute_data.insert(key, Annotated::new(AttributeValue::IntValue(v))); } OtelValue::KvlistValue(_) => {} OtelValue::StringValue(v) => { - attribute_data.insert( - key, - Annotated::new(AttributeValue { - string_value: Annotated::new(Value::String(v)), - ..Default::default() - }), - ); + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); } } } @@ -226,10 +196,10 @@ mod tests { Some(&Annotated::new("Database query executed".into())) ); assert_eq!( - get_path!(annotated_log.attributes["db.statement"].string_value), - Some(&Annotated::new( - "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into() - )) + get_path!(annotated_log.attributes["db.statement"]) + .and_then(|v| v.value()) + .and_then(|v| v.string_value()), + Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) ); } } From 413338c4a12845510018f0834ecb2f5013789f58 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 15:56:22 -0500 Subject: [PATCH 09/27] Default to stricter with pii on any user provided field --- relay-event-schema/src/protocol/ourlog.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 265c401922f..68f5782987c 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -29,7 +29,7 @@ pub struct OurLog { pub trace_flags: Annotated, /// This is the original string representation of the severity as it is known at the source - #[metastructure(required = false, max_chars = 32, pii = "maybe", trim = false)] + #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] pub severity_text: Annotated, /// Numerical representation of the severity level @@ -37,11 +37,11 @@ pub struct OurLog { pub severity_number: Annotated, /// Log body. - #[metastructure(required = true, pii = "maybe", trim = false)] + #[metastructure(required = true, pii = "true", trim = false)] pub body: Annotated, /// Arbitrary attributes on a log. - #[metastructure(pii = "maybe", trim = false)] + #[metastructure(pii = "true", trim = false)] pub attributes: Annotated>, /// Additional arbitrary fields for forwards compatibility. From 99b1d601de8bfd7a4a361405b4ced38bd43e150e Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:09:50 -0500 Subject: [PATCH 10/27] Remove extra drop --- relay-server/src/services/processor/ourlog.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index a228bf814b1..1be2fdc2780 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -46,10 +46,6 @@ pub fn process( ) { let logging_disabled = should_filter(config, &project_info, Feature::OurLogsIngestion); managed_envelope.retain_items(|item| { - if logging_disabled { - return ItemAction::DropSilently; - } - let annotated_log = match item.ty() { ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), From f92a7265b10288682fe736589fa7dcaaca716159 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Thu, 16 Jan 2025 22:10:05 -0500 Subject: [PATCH 11/27] Update relay-ourlogs/src/lib.rs Co-authored-by: Joris Bayer --- relay-ourlogs/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs index eb91e5f1104..38130a4a4c6 100644 --- a/relay-ourlogs/src/lib.rs +++ b/relay-ourlogs/src/lib.rs @@ -1,4 +1,4 @@ -//! Structs and functions needed to ingest OpenTelemetry spans. +//! Structs and functions needed to ingest OpenTelemetry logs. #![warn(missing_docs)] #![doc( From 568bde8f0ea89d5a0adcf277dd8238a2156e5145 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:14:15 -0500 Subject: [PATCH 12/27] Remove extra code for flag in process --- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/processor/ourlog.rs | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index f60cc13ac03..6838146f97b 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2031,7 +2031,7 @@ impl EnvelopeProcessorService { project_info.clone(), ); if_processing!(self.inner.config, { - ourlog::process(managed_envelope, &self.inner.config, project_info.clone()); + ourlog::process(managed_envelope); }); Ok(None) } diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 1be2fdc2780..336b313d225 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -39,12 +39,7 @@ pub fn filter( /// Processes logs. #[cfg(feature = "processing")] -pub fn process( - managed_envelope: &mut TypedEnvelope, - config: &Config, - project_info: Arc, -) { - let logging_disabled = should_filter(config, &project_info, Feature::OurLogsIngestion); +pub fn process(managed_envelope: &mut TypedEnvelope) { managed_envelope.retain_items(|item| { let annotated_log = match item.ty() { ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { From 077d50e8f5933362d920ac2a8c6a5d6fd8734dbd Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:25:22 -0500 Subject: [PATCH 13/27] Add scrubbing --- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/processor/ourlog.rs | 34 +++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6838146f97b..b0e9f51461e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2031,7 +2031,7 @@ impl EnvelopeProcessorService { project_info.clone(), ); if_processing!(self.inner.config, { - ourlog::process(managed_envelope); + ourlog::process(managed_envelope, project_info.clone()); }); Ok(None) } diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 336b313d225..141afa9c5ec 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use crate::envelope::Item; use relay_config::Config; -use relay_dynamic_config::Feature; +use relay_dynamic_config::{Feature, ProjectConfig}; +use relay_event_schema::processor::{process_value, ProcessingAction, ProcessingState}; use relay_event_schema::protocol::OurLog; use crate::envelope::ItemType; @@ -11,7 +12,9 @@ use crate::services::processor::should_filter; use crate::services::projects::project::ProjectInfo; use crate::utils::{ItemAction, TypedEnvelope}; +use crate::services::processor::ProcessingError; use relay_ourlogs::OtelLog; +use relay_pii::PiiProcessor; use relay_protocol::Annotated; #[cfg(feature = "processing")] @@ -39,9 +42,9 @@ pub fn filter( /// Processes logs. #[cfg(feature = "processing")] -pub fn process(managed_envelope: &mut TypedEnvelope) { +pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc) { managed_envelope.retain_items(|item| { - let annotated_log = match item.ty() { + let mut annotated_log = match item.ty() { ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), Err(err) => { @@ -60,6 +63,11 @@ pub fn process(managed_envelope: &mut TypedEnvelope) { _ => return ItemAction::Keep, }; + if let Err(e) = scrub(&mut annotated_log, &project_info.config) { + relay_log::debug!("failed to scrub pii from log: {}", e); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + let mut new_item = Item::new(ItemType::Log); let payload = match annotated_log.to_json() { Ok(payload) => payload, @@ -75,3 +83,23 @@ pub fn process(managed_envelope: &mut TypedEnvelope) { ItemAction::Keep }); } + +fn scrub( + annotated_log: &mut Annotated, + project_config: &ProjectConfig, +) -> Result<(), ProcessingError> { + if let Some(ref config) = project_config.pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + let pii_config = project_config + .datascrubbing_settings + .pii_config() + .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; + if let Some(config) = pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + + Ok(()) +} From 240b92bead8aa70081c7b7c78350bf9ca07f8d89 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:26:52 -0500 Subject: [PATCH 14/27] Wrong type --- relay-server/src/services/processor/ourlog.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 141afa9c5ec..0ca91fcb3ac 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::envelope::Item; use relay_config::Config; use relay_dynamic_config::{Feature, ProjectConfig}; -use relay_event_schema::processor::{process_value, ProcessingAction, ProcessingState}; +use relay_event_schema::processor::{process_value, ProcessingState}; use relay_event_schema::protocol::OurLog; use crate::envelope::ItemType; @@ -25,8 +25,8 @@ use { }; /// Removes logs from the envelope if the feature is not enabled. -pub fn filter( - managed_envelope: &mut TypedEnvelope, +pub fn filter( + managed_envelope: &mut TypedEnvelope, config: Arc, project_info: Arc, ) { From 153c0bcc25bcd7469f4c4ac3bde71068eed85c15 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 22:39:34 -0500 Subject: [PATCH 15/27] Pass payload through as raw bytes --- relay-server/src/services/store.rs | 83 +++++++----------------------- 1 file changed, 18 insertions(+), 65 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index c98ad7fc7d1..d0c6f1827b9 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -953,51 +953,18 @@ impl StoreService { ) -> Result<(), StoreError> { relay_log::trace!("Producing log"); let payload = item.payload(); - let d = &mut Deserializer::from_slice(&payload); - - let mut log: LogKafkaMessage = match serde_path_to_error::deserialize(d) { - Ok(log) => log, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to parse log" - ); - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogItem, - event_id: None, - outcome: Outcome::Invalid(DiscardReason::InvalidLog), - quantity: 1, - remote_addr: None, - scoping, - timestamp: received_at, - }); - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogByte, - event_id: None, - outcome: Outcome::Invalid(DiscardReason::InvalidLog), - quantity: payload.len() as u32, - remote_addr: None, - scoping, - timestamp: received_at, - }); - return Ok(()); - } - }; - - log.organization_id = scoping.organization_id.value(); - log.project_id = scoping.project_id.value(); - log.retention_days = retention_days; + let payload_len = payload.len(); - self.produce( - KafkaTopic::Logs, - KafkaMessage::Log { - headers: BTreeMap::from([( - "project_id".to_string(), - scoping.project_id.to_string(), - )]), - message: log, + let message = KafkaMessage::Log { + headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), + message: LogKafkaMessage { + payload, + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + retention_days, + received: safe_timestamp(received_at), }, - )?; + }; // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. self.outcome_aggregator.send(TrackOutcome { @@ -1013,12 +980,14 @@ impl StoreService { category: DataCategory::LogByte, event_id: None, outcome: Outcome::Accepted, - quantity: payload.len() as u32, + quantity: payload_len as u32, remote_addr: None, scoping, timestamp: received_at, }); + self.produce(KafkaTopic::Logs, message)?; + Ok(()) } @@ -1385,30 +1354,14 @@ struct SpanKafkaMessage<'a> { } #[derive(Debug, Deserialize, Serialize)] -struct LogKafkaMessage<'a> { - #[serde(default)] +struct LogKafkaMessage { + /// Raw log payload. + payload: Bytes, organization_id: u64, - #[serde(default)] project_id: u64, /// Number of days until these data should be deleted. - #[serde(default)] retention_days: u16, - #[serde(default)] - timestamp_nanos: u64, - #[serde(default)] - observed_timestamp_nanos: u64, - #[serde(default, skip_serializing_if = "Option::is_none")] - severity_number: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - severity_text: Option<&'a str>, - body: &'a RawValue, - #[serde(default, skip_serializing_if = "none_or_empty_object")] - attributes: Option<&'a RawValue>, - #[serde(default, skip_serializing_if = "Option::is_none")] - span_id: Option<&'a str>, - trace_id: EventId, - #[serde(default, skip_serializing_if = "Option::is_none")] - flags: Option, + received: u64, } fn none_or_empty_object(value: &Option<&RawValue>) -> bool { @@ -1456,7 +1409,7 @@ enum KafkaMessage<'a> { #[serde(skip)] headers: BTreeMap, #[serde(flatten)] - message: LogKafkaMessage<'a>, + message: LogKafkaMessage, }, ProfileChunk(ProfileChunkKafkaMessage), } From 12127e1d2ff9ccc836c8b151721ddea7a165937d Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 23:02:31 -0500 Subject: [PATCH 16/27] Fix default topic test error --- Cargo.lock | 5 +++-- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4127cf2b1b9..fae96ee39e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4475,15 +4475,16 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.1.122" +version = "0.1.129" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6636bbc9fda2c104d326386bf8fdcc36d4031bca525a74a970ad8bbecb7570d2" +checksum = "790627715d4ea0e58e252dcb657a44146fde401b5520bbbc3b6500764ef71c86" dependencies = [ "jsonschema", "serde", "serde_json", "serde_yaml", "thiserror", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2872277a061..932c696b661 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ reqwest = "0.12.9" rmp-serde = "1.3.0" sentry = "0.34.0" sentry-core = "0.34.0" -sentry-kafka-schemas = { version = "0.1.122", default-features = false } +sentry-kafka-schemas = { version = "0.1.129", default-features = false } sentry-release-parser = { version = "1.3.2", default-features = false } sentry-types = "0.34.0" semver = "1.0.23" From 7c2bd4d2bdc90827b4f79cd180b339edb9590334 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 23:20:20 -0500 Subject: [PATCH 17/27] Fix serializing back out into AnyValue type format --- relay-event-schema/src/protocol/ourlog.rs | 53 +++++++++++++---------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 68f5782987c..9cb43b60e11 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -2,6 +2,8 @@ use relay_protocol::{ Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, }; +use serde::ser::SerializeMap; + use crate::processor::ProcessValue; use crate::protocol::{SpanId, TraceId}; @@ -93,13 +95,25 @@ impl IntoValue for AttributeValue { Self: Sized, S: serde::Serializer, { + let mut map = s.serialize_map(None)?; match self { - AttributeValue::StringValue(v) => s.serialize_str(v), - AttributeValue::IntValue(v) => s.serialize_i64(*v), - AttributeValue::DoubleValue(v) => s.serialize_f64(*v), - AttributeValue::BoolValue(v) => s.serialize_bool(*v), - AttributeValue::Unknown(v) => s.serialize_str(v), + AttributeValue::StringValue(v) => { + map.serialize_entry("string_value", v)?; + } + AttributeValue::IntValue(v) => { + map.serialize_entry("int_value", v)?; + } + AttributeValue::DoubleValue(v) => { + map.serialize_entry("double_value", v)?; + } + AttributeValue::BoolValue(v) => { + map.serialize_entry("bool_value", v)?; + } + AttributeValue::Unknown(v) => { + map.serialize_entry("unknown", v)?; + } } + map.end() } } @@ -114,32 +128,24 @@ impl AttributeValue { AttributeValue::Unknown(s) => s.clone(), } } - - /// Returns the string value if this is a StringValue variant pub fn string_value(&self) -> Option<&String> { match self { AttributeValue::StringValue(s) => Some(s), _ => None, } } - - /// Returns the int value if this is an IntValue variant pub fn int_value(&self) -> Option { match self { AttributeValue::IntValue(i) => Some(*i), _ => None, } } - - /// Returns the double value if this is a DoubleValue variant pub fn double_value(&self) -> Option { match self { AttributeValue::DoubleValue(d) => Some(*d), _ => None, } } - - /// Returns the bool value if this is a BoolValue variant pub fn bool_value(&self) -> Option { match self { AttributeValue::BoolValue(b) => Some(*b), @@ -171,7 +177,9 @@ impl FromValue for AttributeValue { Annotated(Some(AttributeValue::BoolValue(value)), meta) } Annotated(Some(value), mut meta) => { - meta.add_error(Error::expected("a primitive value")); + meta.add_error(Error::expected( + "a valid attribute value (string, int, double, bool)", + )); meta.set_original_value(Some(value)); Annotated(None, meta) } @@ -189,23 +197,23 @@ mod tests { let json = r#"{ "timestamp_nanos": 1544712660300000000, "observed_timestamp_nanos": 1544712660300000000, - "severity_number": 10, - "severity_text": "Information", "trace_id": "5b8efff798038103d269b633813fc60c", "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, "body": "Example log record", "attributes": { - "string.attribute": { - "string_value": "some string" - }, "boolean.attribute": { "bool_value": true }, + "double.attribute": { + "double_value": 637.704 + }, "int.attribute": { "int_value": 10 }, - "double.attribute": { - "double_value": 637.704 + "string.attribute": { + "string_value": "some string" } } }"#; @@ -241,8 +249,5 @@ mod tests { }); assert_eq!(json, log.to_json_pretty().unwrap()); - - let log_from_string = Annotated::::from_json(json).unwrap(); - assert_eq!(log, log_from_string); } } From bbe569d9740baff047c6bd88a080368ad52f8c78 Mon Sep 17 00:00:00 2001 From: Kev Date: Thu, 16 Jan 2025 23:56:26 -0500 Subject: [PATCH 18/27] Fix enforcing rate limits --- relay-server/src/envelope.rs | 5 ++++- relay-server/src/services/processor.rs | 14 ++++++++++++-- relay-server/src/utils/rate_limits.rs | 22 +++++++++++++++------- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 91f1e99d9d3..6a3d6101eea 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -704,7 +704,10 @@ impl Item { CountFor::Outcomes => smallvec![], }, ItemType::Statsd | ItemType::MetricBuckets => smallvec![], - ItemType::Log | ItemType::OtelLog => smallvec![], + ItemType::Log | ItemType::OtelLog => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, 1) + ], ItemType::FormData => smallvec![], ItemType::UserReport => smallvec![], ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index b0e9f51461e..6f126df41c9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2024,16 +2024,26 @@ impl EnvelopeProcessorService { &self, managed_envelope: &mut TypedEnvelope, project_info: Arc, + #[allow(unused_variables)] rate_limits: Arc, ) -> Result, ProcessingError> { + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + ourlog::filter( managed_envelope, self.inner.config.clone(), project_info.clone(), ); if_processing!(self.inner.config, { + self.enforce_quotas( + managed_envelope, + Annotated::empty(), + &mut extracted_metrics, + project_info.clone(), + rate_limits, + )?; ourlog::process(managed_envelope, project_info.clone()); }); - Ok(None) + Ok(Some(extracted_metrics)) } /// Processes standalone spans. @@ -2191,7 +2201,7 @@ impl EnvelopeProcessorService { ProcessingGroup::CheckIn => { run!(process_checkins, project_id, project_info, rate_limits) } - ProcessingGroup::Log => run!(process_logs, project_info), + ProcessingGroup::Log => run!(process_logs, project_info, rate_limits), ProcessingGroup::Span => run!( process_standalone_spans, self.inner.config.clone(), diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 5fa9036e41f..54347e7ccf5 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -229,6 +229,7 @@ impl EnvelopeSummary { } summary.payload_size += item.len(); + for (category, quantity) in item.quantities(CountFor::RateLimits) { summary.add_quantity(category, quantity); } @@ -355,7 +356,9 @@ pub struct Enforcement { /// The combined check-in item rate limit. pub check_ins: CategoryLimit, /// The combined logs (our product logs) rate limit. - pub logs: CategoryLimit, + pub log_items: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub log_bytes: CategoryLimit, /// The combined spans rate limit. pub spans: CategoryLimit, /// The rate limit for the indexed span category. @@ -397,7 +400,8 @@ impl Enforcement { profiles_indexed, replays, check_ins, - logs, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -413,7 +417,8 @@ impl Enforcement { profiles_indexed, replays, check_ins, - logs, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -502,8 +507,9 @@ impl Enforcement { ItemType::ReplayVideo => !self.replays.is_active(), ItemType::ReplayRecording => !self.replays.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), - ItemType::OtelLog => !self.logs.is_active(), - ItemType::Log => !self.logs.is_active(), + ItemType::OtelLog | ItemType::Log => { + !(self.log_items.is_active() || self.log_bytes.is_active()) + } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { !self.spans_indexed.is_active() } @@ -720,7 +726,7 @@ where if summary.log_item_quantity > 0 { let item_scoping = scoping.item(DataCategory::LogItem); let log_limits = self.check.apply(item_scoping, summary.log_item_quantity)?; - enforcement.logs = CategoryLimit::new( + enforcement.log_items = CategoryLimit::new( DataCategory::LogItem, summary.log_item_quantity, log_limits.longest(), @@ -730,7 +736,7 @@ where if summary.log_byte_quantity > 0 { let item_scoping = scoping.item(DataCategory::LogByte); let log_limits = self.check.apply(item_scoping, summary.log_byte_quantity)?; - enforcement.logs = CategoryLimit::new( + enforcement.log_bytes = CategoryLimit::new( DataCategory::LogByte, summary.log_byte_quantity, log_limits.longest(), @@ -1661,6 +1667,7 @@ mod tests { assert!(limits.is_limited()); assert_eq!(envelope.envelope().len(), 0); mock.assert_call(DataCategory::LogItem, 2); + mock.assert_call(DataCategory::LogByte, 20); assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]); } @@ -1674,6 +1681,7 @@ mod tests { assert!(limits.is_limited()); assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogItem, 2); mock.assert_call(DataCategory::LogByte, 20); assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]); From 1ab81d325d06caaa67a7702d83d95f01b380232f Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:22:26 -0500 Subject: [PATCH 19/27] Update relay-server/src/services/processor/ourlog.rs Co-authored-by: Joris Bayer --- relay-server/src/services/processor/ourlog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 0ca91fcb3ac..cb33af82347 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -25,7 +25,7 @@ use { }; /// Removes logs from the envelope if the feature is not enabled. -pub fn filter( +pub fn filter( managed_envelope: &mut TypedEnvelope, config: Arc, project_info: Arc, From 06b30defb3f9ccb6f4a303d76e8103e73cb6561f Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:22:34 -0500 Subject: [PATCH 20/27] Update relay-server/src/services/processor/ourlog.rs Co-authored-by: Joris Bayer --- relay-server/src/services/processor/ourlog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index cb33af82347..d29af542c56 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -64,7 +64,7 @@ pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc }; if let Err(e) = scrub(&mut annotated_log, &project_info.config) { - relay_log::debug!("failed to scrub pii from log: {}", e); + relay_log::error!("failed to scrub pii from log: {}", e); return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); } From 5c0198928b8138e23c92a0138a6efd5d8105a774 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:23:56 -0500 Subject: [PATCH 21/27] Update relay-ourlogs/src/ourlog.rs Co-authored-by: Joris Bayer --- relay-ourlogs/src/ourlog.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 4f0f41b62ce..8195c78faef 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -196,9 +196,7 @@ mod tests { Some(&Annotated::new("Database query executed".into())) ); assert_eq!( - get_path!(annotated_log.attributes["db.statement"]) - .and_then(|v| v.value()) - .and_then(|v| v.string_value()), + get_value!(annotated_log.attributes["db.statement"]!).string_value() Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) ); } From 0748d2c902adaa12704ccb4fb3ce61d8504b8763 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:24:43 -0500 Subject: [PATCH 22/27] Update relay-server/src/services/store.rs Co-authored-by: Joris Bayer --- relay-server/src/services/store.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index d0c6f1827b9..517996ceec7 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1511,9 +1511,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(message) => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), - KafkaMessage::Log { message, .. } => serde_json::to_vec(message) - .map(Cow::Owned) - .map_err(ClientError::InvalidJson), KafkaMessage::Span { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), From 188f0eda50b71130322bdbae002cf2db1b535b8b Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 17 Jan 2025 09:57:03 -0500 Subject: [PATCH 23/27] Remove as_str --- relay-event-schema/src/protocol/ourlog.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 9cb43b60e11..e4eea9882ff 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -118,16 +118,6 @@ impl IntoValue for AttributeValue { } impl AttributeValue { - /// Returns the string representation of this attribute value. - pub fn as_str(&self) -> String { - match self { - AttributeValue::StringValue(s) => s.clone(), - AttributeValue::IntValue(s) => s.to_string(), - AttributeValue::DoubleValue(s) => s.to_string(), - AttributeValue::BoolValue(s) => s.to_string(), - AttributeValue::Unknown(s) => s.clone(), - } - } pub fn string_value(&self) -> Option<&String> { match self { AttributeValue::StringValue(s) => Some(s), From 535022c04c3de838b3dc721c5e5faece421db329 Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 17 Jan 2025 09:57:35 -0500 Subject: [PATCH 24/27] Add outcomes only after kafka produce --- relay-server/src/services/store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 517996ceec7..1468fd67b4f 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -966,6 +966,8 @@ impl StoreService { }, }; + self.produce(KafkaTopic::Logs, message)?; + // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. self.outcome_aggregator.send(TrackOutcome { category: DataCategory::LogItem, @@ -986,8 +988,6 @@ impl StoreService { timestamp: received_at, }); - self.produce(KafkaTopic::Logs, message)?; - Ok(()) } From 57f578f34310f168c49bea3be7faf5f76e9965b4 Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 17 Jan 2025 11:37:47 -0500 Subject: [PATCH 25/27] Add integration test just for OTelLog for now --- py/sentry_relay/consts.py | 1 + relay-cabi/include/relay.h | 4 +- relay-kafka/src/config.rs | 6 +- relay-ourlogs/src/ourlog.rs | 5 +- relay-server/src/services/store.rs | 2 +- requirements-dev.txt | 4 +- tests/integration/conftest.py | 1 + tests/integration/fixtures/processing.py | 25 +++++ tests/integration/test_ourlogs.py | 118 +++++++++++++++++++++++ 9 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 tests/integration/test_ourlogs.py diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index a2fc4617b03..b841567ac3d 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,6 +8,7 @@ class DataCategory(IntEnum): + # start generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index db50243b8f0..a191829ed6a 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -143,14 +143,14 @@ enum RelayDataCategory { */ RELAY_DATA_CATEGORY_ATTACHMENT_ITEM = 22, /** - * LogCount + * LogItem * * This is the category for logs for which we store the count log events for users for measuring * missing breadcrumbs, and count of logs for rate limiting purposes. */ RELAY_DATA_CATEGORY_LOG_ITEM = 23, /** - * LogBytes + * LogByte * * This is the category for logs for which we store log event total bytes for users. */ diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 98952699705..faaa2b69a00 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -46,7 +46,7 @@ pub enum KafkaTopic { /// Monitor check-ins. Monitors, /// Logs (our log product). - Logs, + OurLogs, /// Standalone spans without a transaction. Spans, /// Feedback events topic. @@ -70,7 +70,7 @@ impl KafkaTopic { ReplayEvents, ReplayRecordings, Monitors, - Logs, + OurLogs, Spans, Feedback, ]; @@ -131,8 +131,8 @@ define_topic_assignments! { profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"), replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."), replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."), + ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."), monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), - logs: (KafkaTopic::Logs, "snuba-ourlogs", "Logs from our logs product."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), } diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 8195c78faef..9662a822a11 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -26,6 +26,7 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { _ => None, }) .unwrap_or_else(String::new); + let mut attribute_data = Object::new(); for attribute in attributes.into_iter() { @@ -72,7 +73,7 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { #[cfg(test)] mod tests { use super::*; - use relay_protocol::get_path; + use relay_protocol::{get_path, get_value}; #[test] fn parse_log() { @@ -196,7 +197,7 @@ mod tests { Some(&Annotated::new("Database query executed".into())) ); assert_eq!( - get_value!(annotated_log.attributes["db.statement"]!).string_value() + get_value!(annotated_log.attributes["db.statement"]!).string_value(), Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) ); } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1468fd67b4f..6f1ea14d680 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -966,7 +966,7 @@ impl StoreService { }, }; - self.produce(KafkaTopic::Logs, message)?; + self.produce(KafkaTopic::OurLogs, message)?; // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. self.outcome_aggregator.send(TrackOutcome { diff --git a/requirements-dev.txt b/requirements-dev.txt index a3f517ab67a..942dfd3ee4d 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,13 +10,13 @@ devservices==1.0.8 flake8==7.0.0 confluent-kafka==2.1.1 flask==3.0.3 -msgpack==1.0.7 +msgpack==1.1.0 opentelemetry-proto==1.22.0 pytest-localserver==0.8.1 pytest-sentry==0.3.0 pytest-xdist==3.5.0 pytest==7.4.3 -PyYAML==6.0.1 +PyYAML==6.0.2 redis==4.5.4 requests==2.32.2 sentry_sdk==2.10.0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 97cad120413..a89f638780e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,6 +34,7 @@ replay_events_consumer, monitors_consumer, spans_consumer, + ourlogs_consumer, profiles_consumer, feedback_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 98efbefee38..2a1b19b849f 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -60,6 +60,7 @@ def inner(options=None): "metrics_generic": metrics_topic, "replay_events": get_topic_name("replay_events"), "replay_recordings": get_topic_name("replay_recordings"), + "ourlogs": get_topic_name("ourlogs"), "monitors": get_topic_name("monitors"), "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), @@ -348,6 +349,11 @@ def spans_consumer(consumer_fixture): yield from consumer_fixture(SpansConsumer, "spans") +@pytest.fixture +def ourlogs_consumer(consumer_fixture): + yield from consumer_fixture(OurLogsConsumer, "ourlogs") + + @pytest.fixture def profiles_consumer(consumer_fixture): yield from consumer_fixture(ProfileConsumer, "profiles") @@ -508,6 +514,25 @@ def get_spans(self, timeout=None, n=None): return spans +class OurLogsConsumer(ConsumerBase): + def get_ourlog(self): + message = self.poll() + assert message is not None + assert message.error() is None + + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + return json.loads(message_dict["payload"].decode("utf8")), message_dict + + def get_ourlogs(self): + ourlogs = [] + for message in self.poll_many(): + assert message is not None + assert message.error() is None + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + ourlogs.append(json.loads(message_dict["payload"].decode("utf8"))) + return ourlogs + + class ProfileConsumer(ConsumerBase): def get_profile(self): message = self.poll() diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py new file mode 100644 index 00000000000..6652f9bb486 --- /dev/null +++ b/tests/integration/test_ourlogs.py @@ -0,0 +1,118 @@ +import json +from datetime import datetime, timedelta, timezone + +from sentry_sdk.envelope import Envelope, Item, PayloadRef + + +TEST_CONFIG = { + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + } +} + + +def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: + envelope = Envelope() + envelope.add_item( + Item( + type="otel_log", + payload=PayloadRef( + bytes=json.dumps( + { + "timeUnixNano": str(int(start.timestamp() * 1e9)), + "observedTimeUnixNano": str(int(end.timestamp() * 1e9)), + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": {"stringValue": "Example log record"}, + "attributes": [ + { + "key": "string.attribute", + "value": {"stringValue": "some string"}, + }, + {"key": "boolean.attribute", "value": {"boolValue": True}}, + {"key": "int.attribute", "value": {"intValue": "10"}}, + { + "key": "double.attribute", + "value": {"doubleValue": 637.704}, + }, + ], + } + ).encode() + ), + ) + ) + return envelope + + +def test_ourlog_extraction( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + ] + + relay = relay_with_processing(options=TEST_CONFIG) + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + # Send OTel log and sentry log via envelope + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 1 + expected_0 = { + "timestamp_nanos": int(start.timestamp() * 1e9), + "observed_timestamp_nanos": int(end.timestamp() * 1e9), + "trace_id": "5b8efff798038103d269b633813fc60c", + "body": "Example log record", + "trace_flags": 0.0, + "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, + "attributes": { + "string.attribute": {"string_value": "some string"}, + "boolean.attribute": {"bool_value": True}, + "int.attribute": {"int_value": 10}, + "double.attribute": {"double_value": 637.704}, + }, + } + assert ourlogs[0] == expected_0 + + ourlogs_consumer.assert_empty() + + +def test_ourlog_extraction_is_disabled_without_feature( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + relay = relay_with_processing(options=TEST_CONFIG) + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [] + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 0 + + ourlogs_consumer.assert_empty() From 484e5408187040aaedd6a390606a4d426e98f674 Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 17 Jan 2025 11:49:24 -0500 Subject: [PATCH 26/27] Clean up optional imports --- relay-server/src/services/processor/ourlog.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index d29af542c56..5edac8e592b 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -1,27 +1,26 @@ //! Log processing code. use std::sync::Arc; -use crate::envelope::Item; +use crate::services::processor::LogGroup; use relay_config::Config; use relay_dynamic_config::{Feature, ProjectConfig}; use relay_event_schema::processor::{process_value, ProcessingState}; use relay_event_schema::protocol::OurLog; -use crate::envelope::ItemType; use crate::services::processor::should_filter; +use crate::services::processor::ProcessingError; use crate::services::projects::project::ProjectInfo; use crate::utils::{ItemAction, TypedEnvelope}; -use crate::services::processor::ProcessingError; -use relay_ourlogs::OtelLog; use relay_pii::PiiProcessor; use relay_protocol::Annotated; #[cfg(feature = "processing")] use { crate::envelope::ContentType, + crate::envelope::{Item, ItemType}, crate::services::outcome::{DiscardReason, Outcome}, - crate::services::processor::LogGroup, + relay_ourlogs::OtelLog, }; /// Removes logs from the envelope if the feature is not enabled. From b2c16132f9d436689923e48d1733f9f1edcf8076 Mon Sep 17 00:00:00 2001 From: Kev Date: Fri, 17 Jan 2025 12:12:44 -0500 Subject: [PATCH 27/27] Lint --- relay-server/src/services/processor.rs | 1 + relay-server/src/services/processor/ourlog.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6f126df41c9..9418e9065c6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2026,6 +2026,7 @@ impl EnvelopeProcessorService { project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, ) -> Result, ProcessingError> { + #[allow(unused_mut)] let mut extracted_metrics = ProcessingExtractedMetrics::new(); ourlog::filter( diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 5edac8e592b..b33f4d08d7f 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -83,6 +83,7 @@ pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc }); } +#[cfg(feature = "processing")] fn scrub( annotated_log: &mut Annotated, project_config: &ProjectConfig,