Skip to content

Commit

Permalink
feat: Report processor supports custom measurement policy. (#1974)
Browse files Browse the repository at this point in the history
  • Loading branch information
ple13 authored Dec 17, 2024
1 parent 9a9f30d commit 87c6608
Show file tree
Hide file tree
Showing 6 changed files with 7,304 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def _get_corrected_measurements(self):
"""
Correct the report and returns the adjusted value for each measurement.
"""
children_metric = []
if "mrc" in self._cumulative_measurements:
children_metric.append("mrc")
if "custom" in self._cumulative_measurements:
children_metric.append("custom")

# Builds the report based on the extracted primitive measurements.
report = Report(
{
Expand All @@ -102,7 +108,7 @@ def _get_corrected_measurements(self):
for policy in self._cumulative_measurements
},
metric_subsets_by_parent={
ami: [mrc]} if "mrc" in self._cumulative_measurements else {},
ami: children_metric} if children_metric else {},
cumulative_inconsistency_allowed_edp_combinations={},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,94 @@ import org.wfanet.measurement.reporting.postprocessing.v2alpha.MeasurementDetail

@RunWith(JUnit4::class)
class ReportConversionTest {
@Test
fun `report without custom measurement policy is successfully converted to report summary proto`() {
val reportFile = TEST_DATA_RUNTIME_DIR.resolve("sample_report_with_custom_policy.json").toFile()
val reportAsJson = reportFile.readText()
val reportSummary = ReportConversion.convertJsontoReportSummaries(reportAsJson)

val unionCustomEdp1Edp2MeasurementDetail = measurementDetail {
measurementPolicy = "custom"
setOperation = "union"
dataProviders += "edp1"
dataProviders += "edp2"
leftHandSideTargets += "edp1"
leftHandSideTargets += "edp2"
measurementResults += measurementResult {
reach = 92459
standardDeviation = 145777.467021918
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/adacfb57a-fe7b-44b5-9c29-022be610a407"
}
}

val cummulativeCustomEdp1Edp2MeasurementDetail = measurementDetail {
measurementPolicy = "custom"
setOperation = "cumulative"
isCumulative = true
dataProviders += "edp1"
dataProviders += "edp2"
leftHandSideTargets += "edp1"
leftHandSideTargets += "edp2"
measurementResults += measurementResult {
reach = 18000
standardDeviation = 185589.5021572231
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/a400e54b3-95d1-4056-b92f-f978615a05c3"
}
measurementResults += measurementResult {
reach = 92700
standardDeviation = 191025.0129033726
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/aebc1632a-3676-4fea-b22f-78486f0c48d7"
}
measurementResults += measurementResult {
reach = 163700
standardDeviation = 196286.4317309566
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/a61e13352-b0ff-41eb-879b-dfa7a458d232"
}
measurementResults += measurementResult {
reach = 19100
standardDeviation = 185668.79847492787
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/aa0605786-63a4-4fe1-bbc1-42717bf17ff6"
}
measurementResults += measurementResult {
reach = 127200
standardDeviation = 193570.0395894004
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/accc56b11-d361-4291-a00b-cef502b50d74"
}
measurementResults += measurementResult {
reach = 224400
standardDeviation = 200858.04694133752
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/affcf4c2b-d2ec-4083-9db8-8659c6bd5c67"
}
measurementResults += measurementResult {
reach = 100
standardDeviation = 184302.26284602462
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/a6932cc4a-d367-43b8-be1c-9b8d7f9e4c93"
}
measurementResults += measurementResult {
reach = 100
standardDeviation = 184302.26284602462
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/ae345574e-76ab-4da6-8b86-8e232453f413"
}
measurementResults += measurementResult {
reach = 100
standardDeviation = 184302.26284602462
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/a1cfe162d-cfac-443e-b71f-c75cf569200c"
}
measurementResults += measurementResult {
reach = 100
standardDeviation = 184302.26284602462
metric = "measurementConsumers/fLhOpt2Z4x8/metrics/a7645a53f-960f-44d7-a13e-8e388ed53f6b"
}
}

// Verifies that reportSummary contains the above two protos for custom measurements.
assertThat(reportSummary[0].measurementDetailsList)
.containsAtLeast(
unionCustomEdp1Edp2MeasurementDetail,
cummulativeCustomEdp1Edp2MeasurementDetail,
)
}

@Test
fun `report without unique reach is successfully converted to report summary proto`() {
val reportFile = TEST_DATA_RUNTIME_DIR.resolve("sample_report_small.json").toFile()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ data class MetricReport(

@RunWith(JUnit4::class)
class ReportProcessorTest {
@Test
fun `run correct report with custom policy successfully`() {
val reportFile = TEST_DATA_RUNTIME_DIR.resolve("sample_report_with_custom_policy.json").toFile()
val reportAsJson = reportFile.readText()

val report = ReportConversion.getReportFromJsonString(reportAsJson)
assertThat(report.hasConsistentCumulativeMeasurements()).isFalse()

val updatedReportAsJson = ReportProcessor.processReportJson(reportAsJson)
val updatedReport = ReportConversion.getReportFromJsonString(updatedReportAsJson)
assertThat(updatedReport.hasConsistentCumulativeMeasurements()).isTrue()
}

@Test
fun `run correct report with unique reach and incremental reach successfully`() {
val reportFile =
Expand All @@ -38,11 +51,11 @@ class ReportProcessorTest {
val reportAsJson = reportFile.readText()

val report = ReportConversion.getReportFromJsonString(reportAsJson)
assertThat(report.hasConsistentMeasurements()).isEqualTo(false)
assertThat(report.hasConsistentCumulativeMeasurements()).isFalse()

val updatedReportAsJson = ReportProcessor.processReportJson(reportAsJson)
val updatedReport = ReportConversion.getReportFromJsonString(updatedReportAsJson)
assertThat(updatedReport.hasConsistentMeasurements()).isEqualTo(true)
assertThat(updatedReport.hasConsistentCumulativeMeasurements()).isTrue()
}

@Test
Expand All @@ -51,14 +64,15 @@ class ReportProcessorTest {
val reportAsJson = reportFile.readText()

val report = ReportConversion.getReportFromJsonString(reportAsJson)
assertThat(report.hasConsistentMeasurements()).isEqualTo(false)
assertThat(report.hasConsistentCumulativeMeasurements()).isFalse()

val updatedReportAsJson = ReportProcessor.processReportJson(reportAsJson)
val updatedReport = ReportConversion.getReportFromJsonString(updatedReportAsJson)
assertThat(updatedReport.hasConsistentMeasurements()).isEqualTo(true)
assertThat(updatedReport.hasConsistentCumulativeMeasurements()).isTrue()
}

companion object {
private val POLICIES = listOf("ami", "mrc", "custom")
private val TEST_DATA_RUNTIME_DIR: Path =
getRuntimePath(
Paths.get(
Expand Down Expand Up @@ -100,32 +114,14 @@ class ReportProcessorTest {
}
}

for (entry in measurementDetailsList) {
if (entry.setOperation == "difference" && entry.uniqueReachTarget != "") {
val subset =
entry.dataProvidersList.filter { it != entry.uniqueReachTarget }.toSortedSet()
val measurements = entry.measurementResultsList.map { result -> result.reach }

if (entry.measurementPolicy == measurementPolicy) {
val supersetMeasurement = totalMeasurements[entry.dataProvidersList.toSortedSet()]!!
if (subset !in totalMeasurements) {
totalMeasurements[subset] = supersetMeasurement - measurements[0]
}
}
}
}

return MetricReport(cumulativeMeasurements, totalMeasurements)
}

private fun ReportSummary.toReportByPolicy(): Map<String, MetricReport> {
val metricReportByPolicy: MutableMap<String, MetricReport> = mutableMapOf()
metricReportByPolicy["ami"] = this.toMetricReport("ami")
metricReportByPolicy["mrc"] = this.toMetricReport("mrc")
return metricReportByPolicy
return POLICIES.associateWith { policy -> this.toMetricReport(policy) }
}

private fun MetricReport.hasConsistentMeasurements(): Boolean {
private fun MetricReport.hasConsistentCumulativeMeasurements(): Boolean {
if (cumulativeMeasurements.isEmpty()) {
return true
}
Expand All @@ -142,17 +138,12 @@ class ReportProcessorTest {
return true
}

private fun Report.hasConsistentMeasurements(): Boolean {
this.toReportSummaries().forEach {
val metricReportByPolicy = it.toReportByPolicy()
if (
!metricReportByPolicy["ami"]!!.hasConsistentMeasurements() ||
!metricReportByPolicy["mrc"]!!.hasConsistentMeasurements()
) {
return false
private fun Report.hasConsistentCumulativeMeasurements(): Boolean {
return this.toReportSummaries().all {
it.toReportByPolicy().values.all { metricReport ->
metricReport.hasConsistentCumulativeMeasurements()
}
}
return true
}
}
}
Loading

0 comments on commit 87c6608

Please sign in to comment.