Skip to content

Commit

Permalink
[DP-1][Epic] Hybrid Compute Observability Fixes 3
Browse files Browse the repository at this point in the history
  • Loading branch information
yannistze authored Jul 18, 2024
2 parents ffc92d1 + ac89fe4 commit c5d4a76
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 118 deletions.
2 changes: 1 addition & 1 deletion PUSHDOWN.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Spark to BigQuery Query Pushdown Usage Instructions and Technical Details
# Spark to BigQuery Query Pushdown Usage Instructions and Technical Details

Query Pushdown is an advanced optimization technique in which Spark transformations/actions (hereby referred to as “Spark operations”) performed for reading data into Spark from BigQuery are translated into SQL queries and pushed down to BigQuery from the open-source [spark-bigquery-connector](https://github.com/GoogleCloudDataproc/spark-bigquery-connector) (hereby referred to as “connector”) enabling improved read performance. With BigQuery as the data source for Spark, the connector can push large and complex Spark SQL queries to be processed in BigQuery thus bringing the computation next to the data and reducing the I/O overhead. This capability combines the robust query-processing of BigQuery with the computational capabilities of Spark and its ecosystem.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigquery.connector.common;

import java.io.Serializable;
import org.apache.spark.TaskContext;

/**
* Interface to capture tracing in information for the BigQuery connector. Modelled after {@link
Expand All @@ -29,8 +28,6 @@
* <p>For internal use only.
*/
public interface BigQueryStorageReadRowsTracer extends Serializable {
/** Record Query Submission time */
void querySubmissionTime(String querySubmissionTime);
/** Record stream initialization time. */
void startStream();
/** Indicates a fully decoded element has been requested by spark (i.e. Arrow RecordBatch). */
Expand All @@ -51,9 +48,6 @@ public interface BigQueryStorageReadRowsTracer extends Serializable {
/** Called when the next batch is needed from spark. */
void nextBatchNeeded();

/** Log AIQ bigquery connector latency */
void logWarehouseLatency(TaskContext context);

/**
* Must only be called before any calls are made to the tracer. This is intended for cases when
* multiple threads might be used for processing one stream. tracer that is distinguished between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@
import com.google.gson.JsonObject;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import org.apache.spark.TaskContext;
import org.apache.spark.DataSourceTelemetryHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Predef;
import scala.collection.JavaConverters;

/**
* Implementation of {@link BigQueryStorageReadRowsTracer} that accumulates and logs times
* periodically.
*/
public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageReadRowsTracer {
public class LoggingBigQueryStorageReadRowsTracer
implements BigQueryStorageReadRowsTracer, DataSourceTelemetryHelpers {
private static final Logger log =
LoggerFactory.getLogger(LoggingBigQueryStorageReadRowsTracer.class);

Expand All @@ -47,39 +44,12 @@ public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageRead
long bytes = 0;
// For confirming data is logged.
long linesLogged = 0;
Instant querySubmissionTime = Instant.now();
Instant firstRowReadAt = Instant.MAX;
Instant lastRowReadAt = Instant.MIN;
long warehouseReadLatency = 0;
long warehouseQueryLatency = 0;
long warehouseLogged = 0;

LoggingBigQueryStorageReadRowsTracer(String streamName, int logIntervalPowerOf2) {
this.streamName = streamName;
this.logIntervalPowerOf2 = logIntervalPowerOf2;
}

private void recordWarehouseQueryLatency() {
firstRowReadAt = (firstRowReadAt.compareTo(Instant.now()) < 0) ? firstRowReadAt : Instant.now();
warehouseQueryLatency =
Math.max(
Instant.now().toEpochMilli() - querySubmissionTime.toEpochMilli(),
warehouseQueryLatency);
}

private void recordWarehouseReadLatency() {
lastRowReadAt = (lastRowReadAt.compareTo(Instant.now()) > 0) ? lastRowReadAt : Instant.now();
warehouseReadLatency =
Math.max(
Instant.now().toEpochMilli() - firstRowReadAt.toEpochMilli(), warehouseReadLatency);
}

@Override
public void querySubmissionTime(String querySubmissionTime) {
this.querySubmissionTime =
Instant.from(DateTimeFormatter.ISO_INSTANT.parse(querySubmissionTime));
}

@Override
public void startStream() {
startTime = Instant.now();
Expand All @@ -88,14 +58,12 @@ public void startStream() {
@Override
public void rowsParseStarted() {
parseTime.start();
recordWarehouseQueryLatency();
}

@Override
public void rowsParseFinished(long rows) {
this.rows += rows;
parseTime.finish();
recordWarehouseReadLatency();
}

@Override
Expand All @@ -115,25 +83,6 @@ public void finished() {
logData();
}

public void logWarehouseLatency(TaskContext context) {
if (warehouseLogged == 0) {
HashMap<String, String> tags = new HashMap<>();
tags.put("warehouse_read_latency_millis", String.valueOf(warehouseReadLatency));
tags.put("warehouse_query_latency_millis", String.valueOf(warehouseQueryLatency));
tags.put("data_source", "bigquery");
tags.put("number_of_samples", String.valueOf(parseTime.getSamples()));
tags.put("stream_name", streamName);
tags.put("query_submitted_at", querySubmissionTime.toString());
tags.put("first_row_read_at", firstRowReadAt.toString());
tags.put("last_row_read_at", lastRowReadAt.toString());
tags.put("row_count", String.valueOf(rows));

context.emitMetricsLog(
JavaConverters.mapAsScalaMapConverter(tags).asScala().toMap(Predef.$conforms()));
warehouseLogged = warehouseLogged + 1;
}
}

private static Duration average(DurationTimer durationTimer) {
long samples = durationTimer.getSamples();
if (samples == 0) {
Expand Down Expand Up @@ -183,7 +132,7 @@ private void logData() {
jsonObject.addProperty("Bytes", bytes);
jsonObject.addProperty("Rows", rows);
jsonObject.addProperty("I/O time", serviceTime.getAccumulatedTime().toMillis());
log.info("Tracer Logs:{}", new Gson().toJson(jsonObject));
log.info(logEventNameTagger("Tracer Logs:{}"), new Gson().toJson(jsonObject));
linesLogged++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.spark.DataSourceTelemetryHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// A helper class, also handles view materialization
public class ReadSessionCreator {
public class ReadSessionCreator implements DataSourceTelemetryHelpers {

public static final int DEFAULT_MAX_PARALLELISM = 20_000;
public static final int MINIMAL_PARALLELISM = 1;
Expand Down Expand Up @@ -139,7 +140,8 @@ public ReadSessionResponse create(
if (minStreamCount > maxStreamCount) {
minStreamCount = maxStreamCount;
log.warn(
"preferred min parallelism is larger than the max parallelism, therefore setting it to max parallelism [{}]",
logEventNameTagger(
"preferred min parallelism is larger than the max parallelism, therefore setting it to max parallelism [{}]"),
minStreamCount);
}
Instant sessionPrepEndTime = Instant.now();
Expand All @@ -160,7 +162,10 @@ public ReadSessionResponse create(
if (config.isReadSessionCachingEnabled()
&& getReadSessionCache().asMap().containsKey(createReadSessionRequest)) {
ReadSession readSession = getReadSessionCache().asMap().get(createReadSessionRequest);
log.info("Reusing read session: {}, for table: {}", readSession.getName(), table);
log.info(
logEventNameTagger("Reusing read session: {}, for table: {}"),
readSession.getName(),
table);
return new ReadSessionResponse(readSession, actualTable);
}
ReadSession readSession = bigQueryReadClient.createReadSession(createReadSessionRequest);
Expand All @@ -186,11 +191,12 @@ && getReadSessionCache().asMap().containsKey(createReadSessionRequest)) {
log.info("Read session:{}", new Gson().toJson(jsonObject));
if (readSession.getStreamsCount() != maxStreamCount) {
log.info(
"Requested {} max partitions, but only received {} "
+ "from the BigQuery Storage API for session {}. Notice that the "
+ "number of streams in actual may be lower than the requested number, depending on "
+ "the amount parallelism that is reasonable for the table and the maximum amount of "
+ "parallelism allowed by the system.",
logEventNameTagger(
"Requested {} max partitions, but only received {} "
+ "from the BigQuery Storage API for session {}. Notice that the "
+ "number of streams in actual may be lower than the requested number, depending on "
+ "the amount parallelism that is reasonable for the table and the maximum amount of "
+ "parallelism allowed by the system."),
maxStreamCount,
readSession.getStreamsCount(),
readSession.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,21 +31,18 @@ public class InternalRowIterator implements Iterator<InternalRow> {
private ReadRowsResponseToInternalRowIteratorConverter converter;
private ReadRowsHelper readRowsHelper;
private final BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer;
private TaskContext context;
private Iterator<InternalRow> rows = ImmutableList.<InternalRow>of().iterator();
private static final Logger log = LoggerFactory.getLogger(InternalRowIterator.class);

public InternalRowIterator(
Iterator<ReadRowsResponse> readRowsResponses,
ReadRowsResponseToInternalRowIteratorConverter converter,
ReadRowsHelper readRowsHelper,
BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer,
TaskContext context) {
BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer) {
this.readRowsResponses = readRowsResponses;
this.converter = converter;
this.readRowsHelper = readRowsHelper;
this.bigQueryStorageReadRowsTracer = bigQueryStorageReadRowsTracer;
this.context = context;
}

@Override
Expand All @@ -55,7 +51,6 @@ public boolean hasNext() {
bigQueryStorageReadRowsTracer.readRowsResponseRequested();
if (!readRowsResponses.hasNext()) {
try {
bigQueryStorageReadRowsTracer.logWarehouseLatency(context);
bigQueryStorageReadRowsTracer.finished();
} catch (Exception e) {
log.debug("Failure finishing tracer. stream:{} exception:{}", readRowsHelper, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public String getStream() {
public int index() {
return index;
}

public String toPartitionString() {
return stream + "_" + index;
}
}
Loading

0 comments on commit c5d4a76

Please sign in to comment.