Skip to content

Commit

Permalink
Revert "log"
Browse files Browse the repository at this point in the history
This reverts commit 3172b73.
  • Loading branch information
ShaoFuWu committed Apr 23, 2024
1 parent 3172b73 commit 51f70fc
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
* <p>For internal use only.
*/
public interface BigQueryStorageReadRowsTracer extends Serializable {
/** Record Query Submission time */
void querySubmissionTime(long querySubmittedAt);

/** Record stream initialization time. */
void startStream();
/** Indicates a fully decoded element has been requested by spark (i.e. Arrow RecordBatch). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public void start() {
start = System.nanoTime();
}

public void start(long timestamp) {
start = timestamp;
}

public void finish() {
long now = System.nanoTime();
if (start != Long.MIN_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,25 @@ public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageRead
final DurationTimer parseTime = new DurationTimer();
final DurationTimer sparkTime = new DurationTimer();
final DurationTimer serviceTime = new DurationTimer();

final DurationTimer warehouseQueryLatency = new DurationTimer();
Instant endTime;
long rows = 0;
long bytes = 0;
// For confirming data is logged.
long linesLogged = 0;

long querySubmissionTime = 0;

LoggingBigQueryStorageReadRowsTracer(String streamName, int logIntervalPowerOf2) {
this.streamName = streamName;
this.logIntervalPowerOf2 = logIntervalPowerOf2;
}

@Override
public void querySubmissionTime(long querySubmittedAt) {
this.querySubmissionTime = querySubmittedAt;
}

@Override
public void startStream() {
startTime = Instant.now();
}

@Override
public void rowsParseStarted() {
warehouseQueryLatency.start(querySubmissionTime);
parseTime.start();
warehouseQueryLatency.finish();
}

@Override
Expand Down Expand Up @@ -142,12 +131,6 @@ private void logData() {
jsonObject.addProperty("Rows", rows);
jsonObject.addProperty("I/O time", serviceTime.getAccumulatedTime().toMillis());
log.info("Tracer Logs:{}", new Gson().toJson(jsonObject));
log.info(
"Statistics:"
+ " warehouse_read_latency={} ms warehouse_query_latency={} ms"
+ " data_source=bigquery",
parseTime.getAccumulatedTime().toMillis(),
warehouseQueryLatency.getAccumulatedTime().toMillis());
linesLogged++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public BigQueryRDDFactory(
*/
public RDD<InternalRow> buildScanFromSQL(String sql) {
log.info("Materializing the following sql query to a BigQuery table: {}", sql);
sqlContext
.sparkContext()
.setLocalProperty("querySubmissionTime", String.valueOf(System.nanoTime()));

TableInfo actualTable =
bigQueryClient.materializeQueryToTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,11 @@ public scala.collection.Iterator<InternalRow> compute(Partition split, TaskConte
Optional.of(schema),
Optional.of(tracer));
}
tracer.querySubmissionTime(Long.valueOf(context.getLocalProperty("querySubmissionTime")));

return new InterruptibleIterator<InternalRow>(
context,
new ScalaIterator<InternalRow>(
new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)),
scala.Option.empty());
new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,11 @@ public scala.collection.Iterator<InternalRow> compute(Partition split, TaskConte
Optional.of(schema),
Optional.of(tracer));
}
tracer.querySubmissionTime(Long.valueOf(context.getLocalProperty("querySubmissionTime")));

return new InterruptibleIterator<InternalRow>(
context,
new ScalaIterator<InternalRow>(
new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)),
scala.Option.empty());
new InternalRowIterator(readRowsResponseIterator, converter, readRowsHelper, tracer)));
}

@Override
Expand Down

0 comments on commit 51f70fc

Please sign in to comment.