Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(processor): Add new data type for statically enforcing envelope and event #4444

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
453 changes: 235 additions & 218 deletions relay-server/src/services/processor.rs

Large diffs are not rendered by default.

24 changes: 15 additions & 9 deletions relay-server/src/services/processor/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use relay_statsd::metric;
use crate::envelope::{AttachmentType, ContentType};
use crate::statsd::RelayTimers;

use crate::services::processor::payload;
use crate::services::projects::project::ProjectInfo;
use crate::utils::TypedEnvelope;
#[cfg(feature = "processing")]
use {
crate::services::processor::{ErrorGroup, EventFullyNormalized},
Expand All @@ -27,24 +27,25 @@ use {
///
/// If the event payload was empty before, it is created.
#[cfg(feature = "processing")]
pub fn create_placeholders(
managed_envelope: &mut TypedEnvelope<ErrorGroup>,
event: &mut Annotated<Event>,
pub fn create_placeholders<'a>(
payload: impl Into<payload::WithEventRefMut<'a, ErrorGroup>>,
metrics: &mut Metrics,
) -> Option<EventFullyNormalized> {
let envelope = managed_envelope.envelope();
let payload = payload.into();

let envelope = payload.managed_envelope.envelope();
let minidump_attachment =
envelope.get_item_by(|item| item.attachment_type() == Some(&AttachmentType::Minidump));
let apple_crash_report_attachment = envelope
.get_item_by(|item| item.attachment_type() == Some(&AttachmentType::AppleCrashReport));

if let Some(item) = minidump_attachment {
let event = event.get_or_insert_with(Event::default);
let event = payload.event.get_or_insert_with(Event::default);
metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64);
utils::process_minidump(event, &item.payload());
return Some(EventFullyNormalized(false));
} else if let Some(item) = apple_crash_report_attachment {
let event = event.get_or_insert_with(Event::default);
let event = payload.event.get_or_insert_with(Event::default);
metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64);
utils::process_apple_crash_report(event, &item.payload());
return Some(EventFullyNormalized(false));
Expand All @@ -58,8 +59,13 @@ pub fn create_placeholders(
/// This only applies the new PII rules that explicitly select `ValueType::Binary` or one of the
/// attachment types. When special attachments are detected, these are scrubbed with custom
/// logic; otherwise the entire attachment is treated as a single binary blob.
pub fn scrub<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: Arc<ProjectInfo>) {
let envelope = managed_envelope.envelope_mut();
pub fn scrub<'a, G>(
payload: impl Into<payload::MaybeEventRefMut<'a, G>>,
project_info: Arc<ProjectInfo>,
) {
let payload = payload.into();

let envelope = payload.managed_envelope.envelope_mut();
if let Some(ref config) = project_info.config.pii_config {
let minidump = envelope
.get_item_by_mut(|item| item.attachment_type() == Some(&AttachmentType::Minidump));
Expand Down
66 changes: 39 additions & 27 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig};

use crate::envelope::{CountFor, ItemType};
use crate::services::outcome::Outcome;
use crate::services::processor::{event_category, EventProcessing, Sampling, TransactionGroup};
use crate::services::processor::{
event_category, payload, EventProcessing, Sampling, TransactionGroup,
};
use crate::services::projects::project::ProjectInfo;
use crate::utils::{self, SamplingResult, TypedEnvelope};

Expand Down Expand Up @@ -41,46 +43,48 @@ use crate::utils::{self, SamplingResult, TypedEnvelope};
/// The function will return the sampling project information of the root project for the event. If
/// no sampling project information is specified, the project information of the event’s project
/// will be returned.
pub fn validate_and_set_dsc(
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
event: &mut Annotated<Event>,
pub fn validate_and_set_dsc<'a>(
payload: impl Into<payload::WithEventRefMut<'a, TransactionGroup>>,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
) -> Option<Arc<ProjectInfo>> {
if managed_envelope.envelope().dsc().is_some() && sampling_project_info.is_some() {
let payload = payload.into();

if payload.managed_envelope.envelope().dsc().is_some() && sampling_project_info.is_some() {
return sampling_project_info;
}

// The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
// below already checks for the event type.
let Some(event) = event.value() else {
let Some(event) = payload.event.value() else {
return sampling_project_info;
};
let Some(key_config) = project_info.get_public_key_config() else {
return sampling_project_info;
};

if let Some(dsc) = utils::dsc_from_event(key_config.public_key, event) {
managed_envelope.envelope_mut().set_dsc(dsc);
payload.managed_envelope.envelope_mut().set_dsc(dsc);
return Some(project_info.clone());
}

sampling_project_info
}

/// Computes the sampling decision on the incoming event
pub fn run<Group>(
managed_envelope: &mut TypedEnvelope<Group>,
event: &mut Annotated<Event>,
pub fn run<'a, G>(
payload: impl Into<payload::WithEventRefMut<'a, G>>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir: &ReservoirEvaluator,
) -> SamplingResult
where
Group: Sampling,
G: Sampling,
{
if !Group::supports_sampling(&project_info) {
let payload = payload.into();

if !G::supports_sampling(&project_info) {
return SamplingResult::Pending;
}

Expand All @@ -95,26 +99,28 @@ where
_ => None,
};

let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
let reservoir = G::supports_reservoir_sampling().then_some(reservoir);

compute_sampling_decision(
config.processing_enabled(),
reservoir,
sampling_config,
event.value(),
payload.event.value(),
root_config,
managed_envelope.envelope().dsc(),
payload.managed_envelope.envelope().dsc(),
)
}

/// Apply the dynamic sampling decision from `compute_sampling_decision`.
pub fn drop_unsampled_items(
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
event: Annotated<Event>,
pub fn drop_unsampled_items<'a>(
payload: impl Into<payload::WithEventRefMut<'a, TransactionGroup>>,
outcome: Outcome,
) {
let payload = payload.into();

// Remove all items from the envelope which need to be dropped due to dynamic sampling.
let dropped_items = managed_envelope
let dropped_items = payload
.managed_envelope
.envelope_mut()
// Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
// later processed in Sentry and potentially dropped there.
Expand All @@ -127,19 +133,21 @@ pub fn drop_unsampled_items(
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

managed_envelope.track_outcome(outcome.clone(), category, quantity);
payload
.managed_envelope
.track_outcome(outcome.clone(), category, quantity);
}
}

// Mark all remaining items in the envelope as un-sampled.
for item in managed_envelope.envelope_mut().items_mut() {
for item in payload.managed_envelope.envelope_mut().items_mut() {
item.set_sampled(false);
}

// All items have been dropped, now make sure the event is also handled and dropped.
if let Some(category) = event_category(&event) {
if let Some(category) = event_category(&payload.event) {
let category = category.index_category().unwrap_or(category);
managed_envelope.track_outcome(outcome, category, 1)
payload.managed_envelope.track_outcome(outcome, category, 1)
}
}

Expand Down Expand Up @@ -198,13 +206,17 @@ fn compute_sampling_decision(
///
/// This execution of dynamic sampling is technically a "simulation" since we will use the result
/// only for tagging errors and not for actually sampling incoming events.
pub fn tag_error_with_sampling_decision<Group: EventProcessing>(
managed_envelope: &mut TypedEnvelope<Group>,
event: &mut Annotated<Event>,
pub fn tag_error_with_sampling_decision<'a, G: EventProcessing>(
payload: impl Into<payload::WithEventRefMut<'a, G>>,
sampling_project_info: Option<Arc<ProjectInfo>>,
config: &Config,
) {
let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
let payload = payload.into();

let (Some(dsc), Some(event)) = (
payload.managed_envelope.envelope().dsc(),
payload.event.value_mut(),
) else {
return;
};

Expand Down
Loading
Loading