Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-103] Add aiq_day_start() pushdown #24

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,29 @@ public void testAiqDateToStringPart2() {
"2019-09-01 02:50:50:52")));
}

/**
* Reading from a BigQuery table created with (note selects will not keep the order, so sort by
* id):
*
* <p>create or replace table aiq-dev.connector_dev.dt4 (ts bigint, tz string, pd int)
*
* <p>insert into aiq-dev.connector_dev.dt4 values(1460080000000, 'America/New_York', 2) insert
* into aiq-dev.connector_dev.dt4 values(1460080000000, 'Asia/Tokyo', -1)
*/
@Test
public void testAiqDayStart() {
Dataset<Row> df = readTestDataFromBigQuery("connector_dev", "connector_dev.dt4");
df.createOrReplaceTempView("dt4");

List<Long> results =
spark.sql("select aiq_day_start(ts, tz, pd), tz from dt4 order by tz").collectAsList()
.stream()
.map(r -> r.getLong(0))
.collect(Collectors.toList());

assert (results.equals(Arrays.asList(1460174400000L, 1459954800000L)));
}

/** Test for AIQ EXE-2026 */
@Test
public void testSourceQuery() {
Expand Down
2 changes: 1 addition & 1 deletion spark-bigquery-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

<properties>
<gpg.skip>true</gpg.skip>
<revision>0.30.0-aiq17</revision>
<revision>0.30.0-aiq18</revision>

<avro.version>1.11.1</avro.version>
<arrow.version>11.0.0</arrow.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,43 @@ abstract class SparkExpressionConverter {
blockStatement(
convertStatement(date, fields) + s", ${format.toString()}"
)

/**
* --- spark.sql(
* --- "select aiq_day_start(1460080000000, 'America/New_York', 2)"
* --- ).as[Long].collect.head == 1460174400000L
*
* SELECT UNIX_MILLIS(
* TIMESTAMP(
* DATETIME_TRUNC(
* DATETIME_ADD(
* DATETIME(TIMESTAMP_MILLIS(1460080000000), 'America/New_York'),
* INTERVAL 2 DAY
* ),
* DAY
* ),
* 'America/New_York'
* )
* )
* 1460174400000
*/
case AiqDayStart(timestamp, timezone, plusDays) =>
val tzStmt = convertStatement(timezone, fields)
val innerTsm = ConstantString("TIMESTAMP_MILLIS") + blockStatement(convertStatement(timestamp, fields))
val innerDt = ConstantString("DATETIME") + blockStatement(
innerTsm + "," + tzStmt
)
val dtAdd = ConstantString("DATETIME_ADD") + blockStatement(
innerDt + ", INTERVAL " + convertStatement(plusDays, fields) + " DAY"
)
val dtTrunc = ConstantString("DATETIME_TRUNC") + blockStatement(
dtAdd + ", DAY"
)
val outerTs = ConstantString("TIMESTAMP") + blockStatement(
dtTrunc + "," + tzStmt
)
ConstantString("UNIX_MILLIS") + blockStatement(outerTs)
Comment on lines +246 to +259

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please put in the tickets for the Spark functions refactoring that this needs cause things are pilling up 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep will do


/**
* --- spark.sql(
* --- """select aiq_day_diff(1693609200000, 1693616400000, 'UTC')"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryPushdownUnsupportedExc
import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation
import com.google.cloud.spark.bigquery.pushdowns.TestConstants.expressionConverter
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, AiqDayStart, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.types._
import org.mockito.{Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -962,6 +962,15 @@ class SparkExpressionConverterSuite extends AnyFunSuite with BeforeAndAfter {
assert(bigQuerySQLStatement.get.toString == "DATE_TRUNC ( '2016-07-30' , YEAR )")
}

test("convertDateExpressions with AiqDayStart") {
val exp = AiqDayStart(startMsAttributeReference, Literal("America/New_York"), Literal(2))
val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields)
assert(bigQuerySQLStatement.get.toString ==
"UNIX_MILLIS ( TIMESTAMP ( DATETIME_TRUNC ( DATETIME_ADD ( DATETIME ( TIMESTAMP_MILLIS ( " +
"STARTMS ) , 'America/New_York' ) , INTERVAL 2 DAY ) , DAY ) , 'America/New_York' ) )"
)
}

test("convertDateExpressions with AiqDayDiff") {
val exp = AiqDayDiff(startMsAttributeReference, Literal(1695945601000L), Literal("UTC"))
val bigQuerySQLStatement = expressionConverter.convertDateExpressions(exp, fields)
Expand Down
Loading