From 51f70fce07152ee6a37424a2380c004056fdf963 Mon Sep 17 00:00:00 2001 From: John Wu <87321568+SjapFiWi@users.noreply.github.com> Date: Tue, 23 Apr 2024 16:08:22 -0400 Subject: [PATCH] Revert "log" This reverts commit 3172b7380dd4afa0ee0e4dda349d0313956e7785. --- .../common/BigQueryStorageReadRowsTracer.java | 3 --- .../connector/common/DurationTimer.java | 4 ---- .../LoggingBigQueryStorageReadRowsTracer.java | 17 ----------------- .../bigquery/direct/BigQueryRDDFactory.java | 3 --- .../bigquery/direct/Scala213BigQueryRDD.java | 4 +--- .../bigquery/direct/PreScala213BigQueryRDD.java | 4 +--- 6 files changed, 2 insertions(+), 33 deletions(-) diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java index 5ccd30865..161950bc7 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java @@ -28,9 +28,6 @@ *

For internal use only. */ public interface BigQueryStorageReadRowsTracer extends Serializable { - /** Record Query Submission time */ - void querySubmissionTime(long querySubmittedAt); - /** Record stream initialization time. */ void startStream(); /** Indicates a fully decoded element has been requested by spark (i.e. Arrow RecordBatch). */ diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/DurationTimer.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/DurationTimer.java index 2615a8310..c0dcde718 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/DurationTimer.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/DurationTimer.java @@ -34,10 +34,6 @@ public void start() { start = System.nanoTime(); } - public void start(long timestamp) { - start = timestamp; - } - public void finish() { long now = System.nanoTime(); if (start != Long.MIN_VALUE) { diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java index 7f5276de1..2c3b52de2 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java @@ -37,26 +37,17 @@ public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageRead final DurationTimer parseTime = new DurationTimer(); final DurationTimer sparkTime = new DurationTimer(); final DurationTimer serviceTime = new DurationTimer(); - - final DurationTimer warehouseQueryLatency = new DurationTimer(); Instant endTime; long rows = 0; long bytes = 0; // For confirming data is logged. long linesLogged = 0; - long querySubmissionTime = 0; - LoggingBigQueryStorageReadRowsTracer(String streamName, int logIntervalPowerOf2) { this.streamName = streamName; this.logIntervalPowerOf2 = logIntervalPowerOf2; } - @Override - public void querySubmissionTime(long querySubmittedAt) { - this.querySubmissionTime = querySubmittedAt; - } - @Override public void startStream() { startTime = Instant.now(); @@ -64,9 +55,7 @@ public void startStream() { @Override public void rowsParseStarted() { - warehouseQueryLatency.start(querySubmissionTime); parseTime.start(); - warehouseQueryLatency.finish(); } @Override @@ -142,12 +131,6 @@ private void logData() { jsonObject.addProperty("Rows", rows); jsonObject.addProperty("I/O time", serviceTime.getAccumulatedTime().toMillis()); log.info("Tracer Logs:{}", new Gson().toJson(jsonObject)); - log.info( - "Statistics:" - + " warehouse_read_latency={} ms warehouse_query_latency={} ms" - + " data_source=bigquery", - parseTime.getAccumulatedTime().toMillis(), - warehouseQueryLatency.getAccumulatedTime().toMillis()); linesLogged++; } diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java index a91104a4d..4769365d3 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java @@ -86,9 +86,6 @@ public BigQueryRDDFactory( */ public RDD buildScanFromSQL(String sql) { log.info("Materializing the following sql query to a BigQuery table: {}", sql); - sqlContext - .sparkContext() - .setLocalProperty("querySubmissionTime", String.valueOf(System.nanoTime())); TableInfo actualTable = bigQueryClient.materializeQueryToTable( diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/Scala213BigQueryRDD.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/Scala213BigQueryRDD.java index d230f564a..67d3d18d5 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/Scala213BigQueryRDD.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/Scala213BigQueryRDD.java @@ -129,13 +129,11 @@ public scala.collection.Iterator compute(Partition split, TaskConte Optional.of(schema), Optional.of(tracer)); } - tracer.querySubmissionTime(Long.valueOf(context.getLocalProperty("querySubmissionTime"))); return new InterruptibleIterator( context, new ScalaIterator( - new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)), - scala.Option.empty()); + new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer))); } @Override diff --git a/spark-bigquery-scala-212-support/src/main/java/com/google/cloud/spark/bigquery/direct/PreScala213BigQueryRDD.java b/spark-bigquery-scala-212-support/src/main/java/com/google/cloud/spark/bigquery/direct/PreScala213BigQueryRDD.java index 2bfa3abf6..24778e209 100644 --- a/spark-bigquery-scala-212-support/src/main/java/com/google/cloud/spark/bigquery/direct/PreScala213BigQueryRDD.java +++ b/spark-bigquery-scala-212-support/src/main/java/com/google/cloud/spark/bigquery/direct/PreScala213BigQueryRDD.java @@ -129,13 +129,11 @@ public scala.collection.Iterator compute(Partition split, TaskConte Optional.of(schema), Optional.of(tracer)); } - tracer.querySubmissionTime(Long.valueOf(context.getLocalProperty("querySubmissionTime"))); return new InterruptibleIterator( context, new ScalaIterator( - new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)), - scala.Option.empty()); + new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer))); } @Override