diff --git a/PUSHDOWN.md b/PUSHDOWN.md
index 84a73d03d..63e31d062 100644
--- a/PUSHDOWN.md
+++ b/PUSHDOWN.md
@@ -1,4 +1,4 @@
-#Spark to BigQuery Query Pushdown Usage Instructions and Technical Details
+# Spark to BigQuery Query Pushdown Usage Instructions and Technical Details
Query Pushdown is an advanced optimization technique in which Spark transformations/actions (hereby referred to as “Spark operations”) performed for reading data into Spark from BigQuery are translated into SQL queries and pushed down to BigQuery from the open-source [spark-bigquery-connector](https://github.com/GoogleCloudDataproc/spark-bigquery-connector) (hereby referred to as “connector”) enabling improved read performance. With BigQuery as the data source for Spark, the connector can push large and complex Spark SQL queries to be processed in BigQuery thus bringing the computation next to the data and reducing the I/O overhead. This capability combines the robust query-processing of BigQuery with the computational capabilities of Spark and its ecosystem.
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java
index 81ace8196..161950bc7 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryStorageReadRowsTracer.java
@@ -16,7 +16,6 @@
package com.google.cloud.bigquery.connector.common;
import java.io.Serializable;
-import org.apache.spark.TaskContext;
/**
* Interface to capture tracing in information for the BigQuery connector. Modelled after {@link
@@ -29,8 +28,6 @@
*
For internal use only.
*/
public interface BigQueryStorageReadRowsTracer extends Serializable {
- /** Record Query Submission time */
- void querySubmissionTime(String querySubmissionTime);
/** Record stream initialization time. */
void startStream();
/** Indicates a fully decoded element has been requested by spark (i.e. Arrow RecordBatch). */
@@ -51,9 +48,6 @@ public interface BigQueryStorageReadRowsTracer extends Serializable {
/** Called when the next batch is needed from spark. */
void nextBatchNeeded();
- /** Log AIQ bigquery connector latency */
- void logWarehouseLatency(TaskContext context);
-
/**
* Must only be called before any calls are made to the tracer. This is intended for cases when
* multiple threads might be used for processing one stream. tracer that is distinguished between
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java
index d4fee1a36..1ed9ddaf4 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/LoggingBigQueryStorageReadRowsTracer.java
@@ -19,19 +19,16 @@
import com.google.gson.JsonObject;
import java.time.Duration;
import java.time.Instant;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import org.apache.spark.TaskContext;
+import org.apache.spark.DataSourceTelemetryHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Predef;
-import scala.collection.JavaConverters;
/**
* Implementation of {@link BigQueryStorageReadRowsTracer} that accumulates and logs times
* periodically.
*/
-public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageReadRowsTracer {
+public class LoggingBigQueryStorageReadRowsTracer
+ implements BigQueryStorageReadRowsTracer, DataSourceTelemetryHelpers {
private static final Logger log =
LoggerFactory.getLogger(LoggingBigQueryStorageReadRowsTracer.class);
@@ -47,39 +44,12 @@ public class LoggingBigQueryStorageReadRowsTracer implements BigQueryStorageRead
long bytes = 0;
// For confirming data is logged.
long linesLogged = 0;
- Instant querySubmissionTime = Instant.now();
- Instant firstRowReadAt = Instant.MAX;
- Instant lastRowReadAt = Instant.MIN;
- long warehouseReadLatency = 0;
- long warehouseQueryLatency = 0;
- long warehouseLogged = 0;
LoggingBigQueryStorageReadRowsTracer(String streamName, int logIntervalPowerOf2) {
this.streamName = streamName;
this.logIntervalPowerOf2 = logIntervalPowerOf2;
}
- private void recordWarehouseQueryLatency() {
- firstRowReadAt = (firstRowReadAt.compareTo(Instant.now()) < 0) ? firstRowReadAt : Instant.now();
- warehouseQueryLatency =
- Math.max(
- Instant.now().toEpochMilli() - querySubmissionTime.toEpochMilli(),
- warehouseQueryLatency);
- }
-
- private void recordWarehouseReadLatency() {
- lastRowReadAt = (lastRowReadAt.compareTo(Instant.now()) > 0) ? lastRowReadAt : Instant.now();
- warehouseReadLatency =
- Math.max(
- Instant.now().toEpochMilli() - firstRowReadAt.toEpochMilli(), warehouseReadLatency);
- }
-
- @Override
- public void querySubmissionTime(String querySubmissionTime) {
- this.querySubmissionTime =
- Instant.from(DateTimeFormatter.ISO_INSTANT.parse(querySubmissionTime));
- }
-
@Override
public void startStream() {
startTime = Instant.now();
@@ -88,14 +58,12 @@ public void startStream() {
@Override
public void rowsParseStarted() {
parseTime.start();
- recordWarehouseQueryLatency();
}
@Override
public void rowsParseFinished(long rows) {
this.rows += rows;
parseTime.finish();
- recordWarehouseReadLatency();
}
@Override
@@ -115,25 +83,6 @@ public void finished() {
logData();
}
- public void logWarehouseLatency(TaskContext context) {
- if (warehouseLogged == 0) {
- HashMap tags = new HashMap<>();
- tags.put("warehouse_read_latency_millis", String.valueOf(warehouseReadLatency));
- tags.put("warehouse_query_latency_millis", String.valueOf(warehouseQueryLatency));
- tags.put("data_source", "bigquery");
- tags.put("number_of_samples", String.valueOf(parseTime.getSamples()));
- tags.put("stream_name", streamName);
- tags.put("query_submitted_at", querySubmissionTime.toString());
- tags.put("first_row_read_at", firstRowReadAt.toString());
- tags.put("last_row_read_at", lastRowReadAt.toString());
- tags.put("row_count", String.valueOf(rows));
-
- context.emitMetricsLog(
- JavaConverters.mapAsScalaMapConverter(tags).asScala().toMap(Predef.$conforms()));
- warehouseLogged = warehouseLogged + 1;
- }
- }
-
private static Duration average(DurationTimer durationTimer) {
long samples = durationTimer.getSamples();
if (samples == 0) {
@@ -183,7 +132,7 @@ private void logData() {
jsonObject.addProperty("Bytes", bytes);
jsonObject.addProperty("Rows", rows);
jsonObject.addProperty("I/O time", serviceTime.getAccumulatedTime().toMillis());
- log.info("Tracer Logs:{}", new Gson().toJson(jsonObject));
+ log.info(logEventNameTagger("Tracer Logs:{}"), new Gson().toJson(jsonObject));
linesLogged++;
}
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
index 97f6f769e..4e0465f78 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
@@ -36,11 +36,12 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import org.apache.spark.DataSourceTelemetryHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// A helper class, also handles view materialization
-public class ReadSessionCreator {
+public class ReadSessionCreator implements DataSourceTelemetryHelpers {
public static final int DEFAULT_MAX_PARALLELISM = 20_000;
public static final int MINIMAL_PARALLELISM = 1;
@@ -139,7 +140,8 @@ public ReadSessionResponse create(
if (minStreamCount > maxStreamCount) {
minStreamCount = maxStreamCount;
log.warn(
- "preferred min parallelism is larger than the max parallelism, therefore setting it to max parallelism [{}]",
+ logEventNameTagger(
+ "preferred min parallelism is larger than the max parallelism, therefore setting it to max parallelism [{}]"),
minStreamCount);
}
Instant sessionPrepEndTime = Instant.now();
@@ -160,7 +162,10 @@ public ReadSessionResponse create(
if (config.isReadSessionCachingEnabled()
&& getReadSessionCache().asMap().containsKey(createReadSessionRequest)) {
ReadSession readSession = getReadSessionCache().asMap().get(createReadSessionRequest);
- log.info("Reusing read session: {}, for table: {}", readSession.getName(), table);
+ log.info(
+ logEventNameTagger("Reusing read session: {}, for table: {}"),
+ readSession.getName(),
+ table);
return new ReadSessionResponse(readSession, actualTable);
}
ReadSession readSession = bigQueryReadClient.createReadSession(createReadSessionRequest);
@@ -186,11 +191,12 @@ && getReadSessionCache().asMap().containsKey(createReadSessionRequest)) {
log.info("Read session:{}", new Gson().toJson(jsonObject));
if (readSession.getStreamsCount() != maxStreamCount) {
log.info(
- "Requested {} max partitions, but only received {} "
- + "from the BigQuery Storage API for session {}. Notice that the "
- + "number of streams in actual may be lower than the requested number, depending on "
- + "the amount parallelism that is reasonable for the table and the maximum amount of "
- + "parallelism allowed by the system.",
+ logEventNameTagger(
+ "Requested {} max partitions, but only received {} "
+ + "from the BigQuery Storage API for session {}. Notice that the "
+ + "number of streams in actual may be lower than the requested number, depending on "
+ + "the amount parallelism that is reasonable for the table and the maximum amount of "
+ + "parallelism allowed by the system."),
maxStreamCount,
readSession.getStreamsCount(),
readSession.getName());
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InternalRowIterator.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InternalRowIterator.java
index cb6f40b22..fce392933 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InternalRowIterator.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InternalRowIterator.java
@@ -21,7 +21,6 @@
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
-import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ public class InternalRowIterator implements Iterator {
private ReadRowsResponseToInternalRowIteratorConverter converter;
private ReadRowsHelper readRowsHelper;
private final BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer;
- private TaskContext context;
private Iterator rows = ImmutableList.of().iterator();
private static final Logger log = LoggerFactory.getLogger(InternalRowIterator.class);
@@ -40,13 +38,11 @@ public InternalRowIterator(
Iterator readRowsResponses,
ReadRowsResponseToInternalRowIteratorConverter converter,
ReadRowsHelper readRowsHelper,
- BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer,
- TaskContext context) {
+ BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer) {
this.readRowsResponses = readRowsResponses;
this.converter = converter;
this.readRowsHelper = readRowsHelper;
this.bigQueryStorageReadRowsTracer = bigQueryStorageReadRowsTracer;
- this.context = context;
}
@Override
@@ -55,7 +51,6 @@ public boolean hasNext() {
bigQueryStorageReadRowsTracer.readRowsResponseRequested();
if (!readRowsResponses.hasNext()) {
try {
- bigQueryStorageReadRowsTracer.logWarehouseLatency(context);
bigQueryStorageReadRowsTracer.finished();
} catch (Exception e) {
log.debug("Failure finishing tracer. stream:{} exception:{}", readRowsHelper, e);
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryPartition.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryPartition.java
index 8d95694fc..59c5c7234 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryPartition.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryPartition.java
@@ -35,4 +35,8 @@ public String getStream() {
public int index() {
return index;
}
+
+ public String toPartitionString() {
+ return stream + "_" + index;
+ }
}
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java
index 7b41aae2f..558c1d49a 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/BigQueryRDDFactory.java
@@ -38,24 +38,27 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import java.lang.reflect.Constructor;
-import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.spark.DataSourceTelemetry;
+import org.apache.spark.DataSourceTelemetryHelpers;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.DataSourceTelemetryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
/**
* Wrapper class for generating BigQueryRDD. Extracted this logic out so that we can reuse it from
* 1) Dsv1 buildScan 2) Dsv1 pushdown functionality 3) Dsv2 pushdown functionality
*/
-public class BigQueryRDDFactory {
+public class BigQueryRDDFactory implements DataSourceTelemetryProvider, DataSourceTelemetryHelpers {
private static final Logger log = LoggerFactory.getLogger(BigQueryRDDFactory.class);
@@ -79,6 +82,7 @@ public BigQueryRDDFactory(
this.bigQueryReadClientFactory = bigQueryReadClientFactory;
this.bigQueryTracerFactory = bigQueryTracerFactory;
this.sqlContext = sqlContext;
+ initializeRelationTelemetry(sqlContext, scala.collection.immutable.Map$.MODULE$.empty());
}
/**
@@ -86,12 +90,19 @@ public BigQueryRDDFactory(
* pushdown module
*/
public RDD buildScanFromSQL(String sql) {
- log.info("Materializing the following sql query to a BigQuery table: {}", sql);
- // pass a date as string in DateTimeFormatter ISO_INSTANT format via sparkContext
- // the string will be decoded to Java Instant downstream in Executors
+ log.info(
+ logEventNameTagger("Materializing the following sql query to a BigQuery table: {}"), sql);
+
+ DataSourceTelemetry dataSourceTelemetryMetrics =
+ DataSourceTelemetryHelpers.createDataSourceTelemetry(
+ sqlContext.sparkContext(), Option.empty());
+
sqlContext
.sparkContext()
- .setLocalProperty("querySubmissionTime", String.valueOf(Instant.now()));
+ .emitMetricsLog(
+ dataSourceTelemetryMetrics.compileGlobalTelemetryTagsMap(Option.apply(sql)));
+
+ dataSourceTelemetryMetrics.setQuerySubmissionTime();
TableInfo actualTable =
bigQueryClient.materializeQueryToTable(
@@ -113,7 +124,7 @@ public RDD buildScanFromSQL(String sql) {
.collect(Collectors.toList());
log.info(
- "Querying table {}, requiredColumns=[{}]",
+ logEventNameTagger("Querying table {}, requiredColumns=[{}]"),
actualTable.getTableId().getProject()
+ ":"
+ actualTable.getTableId().getDataset()
@@ -130,7 +141,8 @@ public RDD buildScanFromSQL(String sql) {
actualTable.getTableId(),
readSessionCreator,
requiredColumns.toArray(new String[0]),
- "");
+ "",
+ dataSourceTelemetryMetrics);
}
// Creates BigQueryRDD from the BigQuery table that is passed in. Note that we return RDD>
@@ -140,7 +152,8 @@ public RDD> createRddFromTable(
TableId tableId,
ReadSessionCreator readSessionCreator,
String[] requiredColumns,
- String filter) {
+ String filter,
+ DataSourceTelemetry dataSourceTelemetryMetrics) {
ReadSessionResponse readSessionResponse =
readSessionCreator.create(
tableId, ImmutableList.copyOf(requiredColumns), BigQueryUtil.emptyIfNeeded(filter));
@@ -155,7 +168,7 @@ public RDD> createRddFromTable(
.collect(Collectors.toList());
log.info(
- "Created read session for table '{}': {}",
+ logEventNameTagger("Created read session for table '{}': {}"),
BigQueryUtil.friendlyTableName(tableId),
readSession.getName());
@@ -175,7 +188,8 @@ public RDD> createRddFromTable(
requiredColumns,
options,
bigQueryReadClientFactory,
- bigQueryTracerFactory);
+ bigQueryTracerFactory,
+ dataSourceTelemetryMetrics);
}
// Moved from BigQueryRDD.scanTable
@@ -188,7 +202,8 @@ RDD createRDD(
String[] columnsInOrder,
SparkBigQueryConfig options,
BigQueryClientFactory bigQueryClientFactory,
- BigQueryTracerFactory bigQueryTracerFactory) {
+ BigQueryTracerFactory bigQueryTracerFactory,
+ DataSourceTelemetry dataSourceTelemetryMetrics) {
// Unfortunately we need to use reflection here due to a cyclic dependency issue, and the fact
// that RDD constructor dependencies are different between Scala 2.12 and Scala 2.13. In Scala
// 2.13 `scala.collection.Seq` is mapped to `scala.collection.immutable.Seq` and this is why we
@@ -216,7 +231,8 @@ RDD createRDD(
String[].class,
SparkBigQueryConfig.class,
BigQueryClientFactory.class,
- BigQueryTracerFactory.class);
+ BigQueryTracerFactory.class,
+ DataSourceTelemetry.class);
RDD bigQueryRDD =
constructor.newInstance(
@@ -227,7 +243,8 @@ RDD createRDD(
columnsInOrder,
options,
bigQueryClientFactory,
- bigQueryTracerFactory);
+ bigQueryTracerFactory,
+ dataSourceTelemetryMetrics);
return bigQueryRDD;
} catch (Exception e) {
@@ -250,4 +267,19 @@ public long getNumBytes(TableDefinition tableDefinition) {
return standardTableDefinition.getNumBytes();
}
}
+
+ @Override
+ public String shortName() {
+ return "bigquery";
+ }
+
+ @Override
+ public String dataSourceType() {
+ return "spark_connector";
+ }
+
+ @Override
+ public String dataWarehouseName(scala.collection.immutable.Map parameters) {
+ return shortName();
+ }
}
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java
index 60d34b1ef..8e026f9a5 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java
@@ -36,6 +36,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import org.apache.spark.DataSourceTelemetryHelpers;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -50,10 +51,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
+import scala.Option;
import scala.runtime.AbstractFunction1;
public class DirectBigQueryRelation extends BigQueryRelation
- implements TableScan, PrunedScan, PrunedFilteredScan, InsertableRelation {
+ implements TableScan,
+ PrunedScan,
+ PrunedFilteredScan,
+ InsertableRelation,
+ DataSourceTelemetryHelpers {
private final SparkBigQueryConfig options;
private final TableInfo table;
@@ -111,10 +117,20 @@ public RDD buildScan(String[] requiredColumns) {
@Override
public RDD buildScan(String[] requiredColumns, Filter[] filters) {
+
+ if (sqlContext.sparkContext().dataSourceTelemetry().checkForPushDownFailures().get()) {
+ sqlContext
+ .sparkContext()
+ .dataSourceTelemetry()
+ .numOfFailedPushDownQueries()
+ .getAndDecrement();
+ }
+
log.info(
- "|Querying table {}, parameters sent from Spark:"
- + "|requiredColumns=[{}],"
- + "|filters=[{}]",
+ logEventNameTagger(
+ "|Querying table {}, parameters sent from Spark:"
+ + "|requiredColumns=[{}],"
+ + "|filters=[{}]"),
getTableName(),
String.join(",", requiredColumns),
Arrays.stream(filters).map(f -> f.toString()).collect(Collectors.joining(",")));
@@ -137,7 +153,12 @@ public RDD buildScan(String[] requiredColumns, Filter[] filters) {
return (RDD)
bigQueryRDDFactory.createRddFromTable(
- getTableId(), readSessionCreator, requiredColumns, compiledFilter);
+ getTableId(),
+ readSessionCreator,
+ requiredColumns,
+ compiledFilter,
+ DataSourceTelemetryHelpers.createDataSourceTelemetry(
+ sqlContext.sparkContext(), Option.empty()));
}
@Override
@@ -191,7 +212,7 @@ private RDD> generateEmptyRowRDD(TableInfo tableInfo, String filter) {
Function1