Skip to content

Commit

Permalink
Update variance calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
iverson52000 committed Feb 29, 2024
1 parent 3f71ad1 commit b1b418f
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ class MeasurementConsumerSimulator(
val measurementComputationInfo: MeasurementComputationInfo =
buildMeasurementComputationInfo(protocol, result.impression.noiseMechanism)

val maxFrequencyPerUser =
if (result.impression.deterministicCount.customMaximumFrequencyPerUser != 0) {
result.impression.deterministicCount.customMaximumFrequencyPerUser
} else {
measurementSpec.impression.maximumFrequencyPerUser
}

return VariancesImpl.computeMeasurementVariance(
measurementComputationInfo.methodology,
ImpressionMeasurementVarianceParams(
Expand All @@ -523,7 +530,7 @@ class MeasurementConsumerSimulator(
ImpressionMeasurementParams(
vidSamplingInterval = measurementSpec.vidSamplingInterval.toStatsVidSamplingInterval(),
dpParams = measurementSpec.impression.privacyParams.toNoiserDpParams(),
maximumFrequencyPerUser = measurementSpec.impression.maximumFrequencyPerUser,
maximumFrequencyPerUser = maxFrequencyPerUser,
noiseMechanism = measurementComputationInfo.noiseMechanism.toStatsNoiseMechanism(),
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,13 @@ fun buildWeightedImpressionMeasurementVarianceParamsPerResult(
return@map null
}

val maxFrequencyPerUser =
if (impressionResult.deterministicCount.customMaximumFrequencyPerUser != 0) {
impressionResult.deterministicCount.customMaximumFrequencyPerUser
} else {
metricSpec.impressionCount.maximumFrequencyPerUser
}

WeightedImpressionMeasurementVarianceParams(
binaryRepresentation = weightedMeasurement.binaryRepresentation,
weight = weightedMeasurement.weight,
Expand All @@ -2104,7 +2111,7 @@ fun buildWeightedImpressionMeasurementVarianceParamsPerResult(
ImpressionMeasurementParams(
vidSamplingInterval = metricSpec.vidSamplingInterval.toStatsVidSamplingInterval(),
dpParams = metricSpec.impressionCount.privacyParams.toNoiserDpParams(),
maximumFrequencyPerUser = metricSpec.impressionCount.maximumFrequencyPerUser,
maximumFrequencyPerUser = maxFrequencyPerUser,
noiseMechanism = statsNoiseMechanism,
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ import org.wfanet.measurement.api.v2alpha.copy
import org.wfanet.measurement.api.v2alpha.createMeasurementRequest
import org.wfanet.measurement.api.v2alpha.customDirectMethodology
import org.wfanet.measurement.api.v2alpha.dataProvider
import org.wfanet.measurement.api.v2alpha.deterministicCount
import org.wfanet.measurement.api.v2alpha.differentialPrivacyParams
import org.wfanet.measurement.api.v2alpha.encryptionPublicKey
import org.wfanet.measurement.api.v2alpha.getCertificateRequest
Expand Down Expand Up @@ -189,6 +190,7 @@ import org.wfanet.measurement.internal.reporting.v2.batchSetMeasurementResultsRe
import org.wfanet.measurement.internal.reporting.v2.copy
import org.wfanet.measurement.internal.reporting.v2.createMetricRequest as internalCreateMetricRequest
import org.wfanet.measurement.internal.reporting.v2.customDirectMethodology as internalCustomDirectMethodology
import org.wfanet.measurement.internal.reporting.v2.deterministicCount as internalDeterministicCount
import org.wfanet.measurement.internal.reporting.v2.liquidLegionsDistribution as internalLiquidLegionsDistribution
import org.wfanet.measurement.internal.reporting.v2.measurement as internalMeasurement
import org.wfanet.measurement.internal.reporting.v2.metric as internalMetric
Expand Down Expand Up @@ -258,6 +260,7 @@ private const val IMPRESSION_VID_SAMPLING_WIDTH = 62.0f / NUMBER_VID_BUCKETS
private const val IMPRESSION_VID_SAMPLING_START = 143.0f / NUMBER_VID_BUCKETS
private const val IMPRESSION_EPSILON = 0.0011
private const val IMPRESSION_MAXIMUM_FREQUENCY_PER_USER = 60
private const val IMPRESSION_CUSTOM_MAXIMUM_FREQUENCY_PER_USER = 100

private const val WATCH_DURATION_VID_SAMPLING_WIDTH = 95.0f / NUMBER_VID_BUCKETS
private const val WATCH_DURATION_VID_SAMPLING_START = 205.0f / NUMBER_VID_BUCKETS
Expand Down Expand Up @@ -827,6 +830,25 @@ private val INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT =
}
}

private val INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP =
INTERNAL_PENDING_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT.copy {
state = InternalMeasurement.State.SUCCEEDED
details =
InternalMeasurementKt.details {
results +=
InternalMeasurementKt.result {
impression =
InternalMeasurementKt.ResultKt.impression {
value = IMPRESSION_VALUE
noiseMechanism = NoiseMechanism.CONTINUOUS_LAPLACE
deterministicCount = internalDeterministicCount {
customMaximumFrequencyPerUser = IMPRESSION_CUSTOM_MAXIMUM_FREQUENCY_PER_USER
}
}
}
}
}

// Internal cross-publisher watch duration measurements
private val INTERNAL_REQUESTING_UNION_ALL_WATCH_DURATION_MEASUREMENT = internalMeasurement {
cmmsMeasurementConsumerId = MEASUREMENT_CONSUMERS.keys.first().measurementConsumerId
Expand Down Expand Up @@ -1195,6 +1217,27 @@ private val SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT =
}
}

private val SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP =
PENDING_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT.copy {
state = Measurement.State.SUCCEEDED
results += resultOutput {
val result =
MeasurementKt.result {
impression =
MeasurementKt.ResultKt.impression {
value = IMPRESSION_VALUE
noiseMechanism = ProtocolConfig.NoiseMechanism.CONTINUOUS_LAPLACE
deterministicCount = deterministicCount {
customMaximumFrequencyPerUser = IMPRESSION_CUSTOM_MAXIMUM_FREQUENCY_PER_USER
}
}
}
encryptedResult =
encryptResult(signResult(result, AGGREGATOR_SIGNING_KEY), MEASUREMENT_CONSUMER_PUBLIC_KEY)
certificate = AGGREGATOR_CERTIFICATE.name
}
}

// CMMS cross publisher watch duration measurements
private val UNION_ALL_WATCH_DURATION_MEASUREMENT_SPEC = measurementSpec {
measurementPublicKey = MEASUREMENT_CONSUMER_PUBLIC_KEY.pack()
Expand Down Expand Up @@ -1606,6 +1649,16 @@ private val INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_METRIC =
}
}

private val INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_METRIC_CUSTOM_CAP =
INTERNAL_PENDING_SINGLE_PUBLISHER_IMPRESSION_METRIC.copy {
weightedMeasurements.clear()
weightedMeasurements += weightedMeasurement {
weight = 1
binaryRepresentation = 1
measurement = INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP
}
}

// Internal Cross Publisher Watch Duration Metrics
private val INTERNAL_REQUESTING_CROSS_PUBLISHER_WATCH_DURATION_METRIC = internalMetric {
cmmsMeasurementConsumerId = MEASUREMENT_CONSUMERS.keys.first().measurementConsumerId
Expand Down Expand Up @@ -6810,6 +6863,91 @@ class MetricsServiceTest {
assertThat(result).isEqualTo(SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_METRIC)
}

@Test
fun `getMetric returns impression metric with SUCCEEDED when measurements have custom frequency cap and are updated to SUCCEEDED`() =
runBlocking {
whenever(
internalMetricsMock.batchGetMetrics(
eq(
internalBatchGetMetricsRequest {
cmmsMeasurementConsumerId =
INTERNAL_PENDING_SINGLE_PUBLISHER_IMPRESSION_METRIC.cmmsMeasurementConsumerId
externalMetricIds +=
INTERNAL_PENDING_SINGLE_PUBLISHER_IMPRESSION_METRIC.externalMetricId
}
)
)
)
.thenReturn(
internalBatchGetMetricsResponse {
metrics += INTERNAL_PENDING_SINGLE_PUBLISHER_IMPRESSION_METRIC
},
internalBatchGetMetricsResponse {
metrics += INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_METRIC_CUSTOM_CAP
},
)

whenever(measurementsMock.batchGetMeasurements(any())).thenAnswer {
val batchGetMeasurementsRequest = it.arguments[0] as BatchGetMeasurementsRequest
val measurementsMap =
mapOf(
PENDING_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT.name to
SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP
)
batchGetMeasurementsResponse {
measurements +=
batchGetMeasurementsRequest.namesList.map { name -> measurementsMap.getValue(name) }
}
}

whenever(internalMeasurementsMock.batchSetMeasurementResults(any()))
.thenReturn(
batchSetCmmsMeasurementResultsResponse {
measurements += INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP
}
)

val request = getMetricRequest { name = PENDING_SINGLE_PUBLISHER_IMPRESSION_METRIC.name }

val result =
withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMERS.values.first().name, CONFIG) {
runBlocking { service.getMetric(request) }
}

// Verify proto argument of internal MeasurementsCoroutineImplBase::batchSetMeasurementResults
val batchSetMeasurementResultsCaptor: KArgumentCaptor<BatchSetMeasurementResultsRequest> =
argumentCaptor()
verifyBlocking(internalMeasurementsMock, times(1)) {
batchSetMeasurementResults(batchSetMeasurementResultsCaptor.capture())
}
assertThat(batchSetMeasurementResultsCaptor.allValues)
.containsExactly(
batchSetMeasurementResultsRequest {
cmmsMeasurementConsumerId =
INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP
.cmmsMeasurementConsumerId
measurementResults += measurementResult {
cmmsMeasurementId =
INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP
.cmmsMeasurementId
this.results +=
INTERNAL_SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT_CUSTOM_CAP.details
.resultsList
}
}
)

// Verify proto argument of internal
// MeasurementsCoroutineImplBase::batchSetMeasurementFailures
val batchSetMeasurementFailuresCaptor: KArgumentCaptor<BatchSetMeasurementFailuresRequest> =
argumentCaptor()
verifyBlocking(internalMeasurementsMock, never()) {
batchSetMeasurementFailures(batchSetMeasurementFailuresCaptor.capture())
}

assertThat(result).isEqualTo(SUCCEEDED_SINGLE_PUBLISHER_IMPRESSION_METRIC)
}

@Test
fun `getMetric returns the metric with FAILED when measurements are updated to FAILED`() =
runBlocking {
Expand Down

0 comments on commit b1b418f

Please sign in to comment.