Skip to content

Commit

Permalink
[DP-1106] Add pushdown for md5 and sha functions (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterDDT authored Sep 18, 2024
1 parent ccf25cd commit 805cf20
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ public void testMathematicalFunctionExpressions() {
"TAN(0) as Tan",
"TANH(0) as Tanh",
"ISNAN(word_count) as IsNan",
"SIGNUM(word_count) as Signum")
"SIGNUM(word_count) as Signum",
"MD5(word)",
"SHA1(word)",
"SHA2(word, 256)")
.where("word_count = 10 and word = 'glass'");
List<Row> result = df.collectAsList();
Row r1 = result.get(0);
Expand All @@ -253,6 +256,10 @@ public void testMathematicalFunctionExpressions() {
assertThat(r1.get(20)).isEqualTo(0.0); // TANH(0)
assertThat(r1.get(21)).isEqualTo(false); // ISNAN(word_count)
assertThat(r1.get(22)).isEqualTo(1.0); // SIGNUM(word_count)
assertThat(r1.getString(23) == "JXDJGfXvHXCR8PZtVNrJdA=="); // MD5(word)
assertThat(r1.getString(24) == "Fw/X6EobL5cjcwEC0OkFF6kXWII="); // SHA1(word)
assertThat(
r1.getString(25) == "EyoaORzOGBxJCixDUFlyIfrZriB3FNH5oQ9nvLHIKI0="); // SHA2(word, 256)
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion spark-bigquery-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

<properties>
<gpg.skip>true</gpg.skip>
<revision>0.30.0-aiq16</revision>
<revision>0.30.0-aiq17</revision>

<avro.version>1.11.1</avro.version>
<arrow.version>11.0.0</arrow.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,29 @@ abstract class SparkExpressionConverter {
_: Sqrt | _: Tan | _: Tanh =>
ConstantString(expression.prettyName.toUpperCase) + blockStatement(convertStatements(fields, expression.children: _*))

// These hash functions all return bytes, so must convert to hex string

// SELECT TO_HEX(MD5("Spark"))
// 8cde774d6f7333752ed72cacddb05126
case Md5(child) =>
val hashRes = ConstantString("MD5") + blockStatement(convertStatement(child, fields))
ConstantString("TO_HEX") + blockStatement(hashRes)

// SELECT sha1('Spark')
// 85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c
case Sha1(child) =>
val hashRes = ConstantString("SHA1") + blockStatement(convertStatement(child, fields))
ConstantString("TO_HEX") + blockStatement(hashRes)

// BQ only supports sha 256:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/hash_functions#sha256
//
// SELECT sha2('Spark', 256)
// 529bc3b07127ecb7e53a4dcf1991d9152c24537d919178022b2c42657f79a26b
case Sha2(child, bitLen) if bitLen.foldable && bitLen.toString == "256" =>
val hashRes = ConstantString("SHA256") + blockStatement(convertStatement(child, fields))
ConstantString("TO_HEX") + blockStatement(hashRes)

case IsNaN(child) =>
ConstantString("IS_NAN") + blockStatement(convertStatement(child, fields))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryPushdownUnsupportedExc
import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation
import com.google.cloud.spark.bigquery.pushdowns.TestConstants.expressionConverter
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Abs, Acos, AiqDateToString, AiqDayDiff, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.catalyst.expressions.{Abs, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.types._
import org.mockito.{Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -504,6 +504,24 @@ class SparkExpressionConverterSuite extends AnyFunSuite with BeforeAndAfter {
assert(bigQuerySQLStatement.get.toString == "FORMAT ( 12.3456 , 2 )")
}

test("convertMathematicalExpressions with Md5") {
val exp = Md5(Literal("foo"))
val bigQuerySQLStatement = expressionConverter.convertMathematicalExpressions(exp, fields)
assert(bigQuerySQLStatement.get.toString == "TO_HEX ( MD5 ( 'foo' ) )")
}

test("convertMathematicalExpressions with Sha1") {
val exp = Sha1(Literal("foo"))
val bigQuerySQLStatement = expressionConverter.convertMathematicalExpressions(exp, fields)
assert(bigQuerySQLStatement.get.toString == "TO_HEX ( SHA1 ( 'foo' ) )")
}

test("convertMathematicalExpressions with Sha2") {
val exp = Sha2(Literal("foo"), Literal(256))
val bigQuerySQLStatement = expressionConverter.convertMathematicalExpressions(exp, fields)
assert(bigQuerySQLStatement.get.toString == "TO_HEX ( SHA256 ( 'foo' ) )")
}

test("convertMathematicalExpressions with Abs") {
val absExpression = Abs.apply(Literal(-12))
val bigQuerySQLStatement = expressionConverter.convertMathematicalExpressions(absExpression, fields)
Expand Down

0 comments on commit 805cf20

Please sign in to comment.