Skip to content

Commit

Permalink
[DP-103] Add aiq_day_start() pushdown (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterDDT authored Sep 19, 2024
1 parent 805cf20 commit cdaca0e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,29 @@ public void testAiqDateToStringPart2() {
"2019-09-01 02:50:50:52")));
}

/**
* Reading from a BigQuery table created with (note selects will not keep the order, so sort by
* id):
*
* <p>create or replace table aiq-dev.connector_dev.dt4 (ts bigint, tz string, pd int)
*
* <p>insert into aiq-dev.connector_dev.dt4 values(1460080000000, 'America/New_York', 2) insert
* into aiq-dev.connector_dev.dt4 values(1460080000000, 'Asia/Tokyo', -1)
*/
@Test
public void testAiqDayStart() {
Dataset<Row> df = readTestDataFromBigQuery("connector_dev", "connector_dev.dt4");
df.createOrReplaceTempView("dt4");

List<Long> results =
spark.sql("select aiq_day_start(ts, tz, pd), tz from dt4 order by tz").collectAsList()
.stream()
.map(r -> r.getLong(0))
.collect(Collectors.toList());

assert (results.equals(Arrays.asList(1460174400000L, 1459954800000L)));
}

/** Test for AIQ EXE-2026 */
@Test
public void testSourceQuery() {
Expand Down
2 changes: 1 addition & 1 deletion spark-bigquery-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

<properties>
<gpg.skip>true</gpg.skip>
<revision>0.30.0-aiq17</revision>
<revision>0.30.0-aiq18</revision>

<avro.version>1.11.1</avro.version>
<arrow.version>11.0.0</arrow.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,43 @@ abstract class SparkExpressionConverter {
blockStatement(
convertStatement(date, fields) + s", ${format.toString()}"
)

/**
* --- spark.sql(
* --- "select aiq_day_start(1460080000000, 'America/New_York', 2)"
* --- ).as[Long].collect.head == 1460174400000L
*
* SELECT UNIX_MILLIS(
* TIMESTAMP(
* DATETIME_TRUNC(
* DATETIME_ADD(
* DATETIME(TIMESTAMP_MILLIS(1460080000000), 'America/New_York'),
* INTERVAL 2 DAY
* ),
* DAY
* ),
* 'America/New_York'
* )
* )
* 1460174400000
*/
case AiqDayStart(timestamp, timezone, plusDays) =>
val tzStmt = convertStatement(timezone, fields)
val innerTsm = ConstantString("TIMESTAMP_MILLIS") + blockStatement(convertStatement(timestamp, fields))
val innerDt = ConstantString("DATETIME") + blockStatement(
innerTsm + "," + tzStmt
)
val dtAdd = ConstantString("DATETIME_ADD") + blockStatement(
innerDt + ", INTERVAL " + convertStatement(plusDays, fields) + " DAY"
)
val dtTrunc = ConstantString("DATETIME_TRUNC") + blockStatement(
dtAdd + ", DAY"
)
val outerTs = ConstantString("TIMESTAMP") + blockStatement(
dtTrunc + "," + tzStmt
)
ConstantString("UNIX_MILLIS") + blockStatement(outerTs)

/**
* --- spark.sql(
* --- """select aiq_day_diff(1693609200000, 1693616400000, 'UTC')"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryPushdownUnsupportedExc
import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation
import com.google.cloud.spark.bigquery.pushdowns.TestConstants.expressionConverter
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, AiqDayStart, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.types._
import org.mockito.{Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -962,6 +962,15 @@ class SparkExpressionConverterSuite extends AnyFunSuite with BeforeAndAfter {
assert(bigQuerySQLStatement.get.toString == "DATE_TRUNC ( '2016-07-30' , YEAR )")
}

test("convertDateExpressions with AiqDayStart") {
val exp = AiqDayStart(startMsAttributeReference, Literal("America/New_York"), Literal(2))
val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields)
assert(bigQuerySQLStatement.get.toString ==
"UNIX_MILLIS ( TIMESTAMP ( DATETIME_TRUNC ( DATETIME_ADD ( DATETIME ( TIMESTAMP_MILLIS ( " +
"STARTMS ) , 'America/New_York' ) , INTERVAL 2 DAY ) , DAY ) , 'America/New_York' ) )"
)
}

test("convertDateExpressions with AiqDayDiff") {
val exp = AiqDayDiff(startMsAttributeReference, Literal(1695945601000L), Literal("UTC"))
val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields)
Expand Down

0 comments on commit cdaca0e

Please sign in to comment.