From cdaca0ea62e7d8ccb18c67f15e8fcb4a7d5ec904 Mon Sep 17 00:00:00 2001 From: Mitesh Patel Date: Thu, 19 Sep 2024 13:18:33 -0500 Subject: [PATCH] [DP-103] Add aiq_day_start() pushdown (#24) --- .../QueryPushdownIntegrationTestBase.java | 23 ++++++++++++ spark-bigquery-parent/pom.xml | 2 +- .../pushdowns/SparkExpressionConverter.scala | 37 +++++++++++++++++++ .../SparkExpressionConverterSuite.scala | 11 +++++- 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/QueryPushdownIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/QueryPushdownIntegrationTestBase.java index 7212b7d3d..12da551a5 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/QueryPushdownIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/QueryPushdownIntegrationTestBase.java @@ -1282,6 +1282,29 @@ public void testAiqDateToStringPart2() { "2019-09-01 02:50:50:52"))); } + /** + * Reading from a BigQuery table created with (note selects will not keep the order, so sort by + * id): + * + *

create or replace table aiq-dev.connector_dev.dt4 (ts bigint, tz string, pd int) + * + *

insert into aiq-dev.connector_dev.dt4 values(1460080000000, 'America/New_York', 2) insert + * into aiq-dev.connector_dev.dt4 values(1460080000000, 'Asia/Tokyo', -1) + */ + @Test + public void testAiqDayStart() { + Dataset df = readTestDataFromBigQuery("connector_dev", "connector_dev.dt4"); + df.createOrReplaceTempView("dt4"); + + List results = + spark.sql("select aiq_day_start(ts, tz, pd), tz from dt4 order by tz").collectAsList() + .stream() + .map(r -> r.getLong(0)) + .collect(Collectors.toList()); + + assert (results.equals(Arrays.asList(1460174400000L, 1459954800000L))); + } + /** Test for AIQ EXE-2026 */ @Test public void testSourceQuery() { diff --git a/spark-bigquery-parent/pom.xml b/spark-bigquery-parent/pom.xml index 99d24093f..f29cff136 100644 --- a/spark-bigquery-parent/pom.xml +++ b/spark-bigquery-parent/pom.xml @@ -85,7 +85,7 @@ true - 0.30.0-aiq17 + 0.30.0-aiq18 1.11.1 11.0.0 diff --git a/spark-bigquery-pushdown/pushdown_common_src/main/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverter.scala b/spark-bigquery-pushdown/pushdown_common_src/main/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverter.scala index 089cf6680..dd332f455 100644 --- a/spark-bigquery-pushdown/pushdown_common_src/main/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverter.scala +++ b/spark-bigquery-pushdown/pushdown_common_src/main/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverter.scala @@ -221,6 +221,43 @@ abstract class SparkExpressionConverter { blockStatement( convertStatement(date, fields) + s", ${format.toString()}" ) + + /** + * --- spark.sql( + * --- "select aiq_day_start(1460080000000, 'America/New_York', 2)" + * --- ).as[Long].collect.head == 1460174400000L + * + * SELECT UNIX_MILLIS( + * TIMESTAMP( + * DATETIME_TRUNC( + * DATETIME_ADD( + * DATETIME(TIMESTAMP_MILLIS(1460080000000), 'America/New_York'), + * INTERVAL 2 DAY + * ), + * DAY + * ), + * 'America/New_York' + * ) + * ) + * 1460174400000 + */ + case AiqDayStart(timestamp, timezone, plusDays) => + val tzStmt = convertStatement(timezone, fields) + val innerTsm = ConstantString("TIMESTAMP_MILLIS") + blockStatement(convertStatement(timestamp, fields)) + val innerDt = ConstantString("DATETIME") + blockStatement( + innerTsm + "," + tzStmt + ) + val dtAdd = ConstantString("DATETIME_ADD") + blockStatement( + innerDt + ", INTERVAL " + convertStatement(plusDays, fields) + " DAY" + ) + val dtTrunc = ConstantString("DATETIME_TRUNC") + blockStatement( + dtAdd + ", DAY" + ) + val outerTs = ConstantString("TIMESTAMP") + blockStatement( + dtTrunc + "," + tzStmt + ) + ConstantString("UNIX_MILLIS") + blockStatement(outerTs) + /** * --- spark.sql( * --- """select aiq_day_diff(1693609200000, 1693616400000, 'UTC')""" diff --git a/spark-bigquery-pushdown/pushdown_common_src/test/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverterSuite.scala b/spark-bigquery-pushdown/pushdown_common_src/test/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverterSuite.scala index 6d86fc02e..34d0d9ce1 100644 --- a/spark-bigquery-pushdown/pushdown_common_src/test/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverterSuite.scala +++ b/spark-bigquery-pushdown/pushdown_common_src/test/scala/com/google/cloud/spark/bigquery/pushdowns/SparkExpressionConverterSuite.scala @@ -4,7 +4,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryPushdownUnsupportedExc import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation import com.google.cloud.spark.bigquery.pushdowns.TestConstants.expressionConverter import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year} +import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, AiqDayStart, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year} import org.apache.spark.sql.types._ import org.mockito.{Mock, MockitoAnnotations} import org.scalatest.BeforeAndAfter @@ -962,6 +962,15 @@ class SparkExpressionConverterSuite extends AnyFunSuite with BeforeAndAfter { assert(bigQuerySQLStatement.get.toString == "DATE_TRUNC ( '2016-07-30' , YEAR )") } + test("convertDateExpressions with AiqDayStart") { + val exp = AiqDayStart(startMsAttributeReference, Literal("America/New_York"), Literal(2)) + val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields) + assert(bigQuerySQLStatement.get.toString == + "UNIX_MILLIS ( TIMESTAMP ( DATETIME_TRUNC ( DATETIME_ADD ( DATETIME ( TIMESTAMP_MILLIS ( " + + "STARTMS ) , 'America/New_York' ) , INTERVAL 2 DAY ) , DAY ) , 'America/New_York' ) )" + ) + } + test("convertDateExpressions with AiqDayDiff") { val exp = AiqDayDiff(startMsAttributeReference, Literal(1695945601000L), Literal("UTC")) val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields)