Skip to content

Commit

Permalink
[DP-1226] Add concat_ws pushdown (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterDDT authored Oct 31, 2024
1 parent a27ca00 commit a764326
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void testStringFunctionExpressions() {
"LIKE(word, 'a_g_rs') as like_with_underscore",
"LIKE(word, 'b_g_rs') as like_with_underscore_return_false",
"FORMAT_NUMBER(CAST((word_count + 10000) AS FLOAT)/6, 3)",
"FORMAT_NUMBER(word_count + 10000, 0)")
"FORMAT_NUMBER(word_count + 10000, 0)",
"CONCAT_WS('-', word, word, word)")
.where("word = 'augurs'");
List<Row> result = df.collectAsList();
Row r1 = result.get(0);
Expand Down Expand Up @@ -122,8 +123,11 @@ public void testStringFunctionExpressions() {
assertThat(r1.get(20)).isEqualTo(true); // LIKE(word, '%aug%urs%')
assertThat(r1.get(21)).isEqualTo(true); // LIKE(word, 'a_g_rs')
assertThat(r1.get(22)).isEqualTo(false); // LIKE(word, 'b_g_rs')
assertThat(r1.getString(23).equals("1,666.833"));
assertThat(r1.getString(24).equals("10,001"));
assertThat(
r1.getString(23)
.equals("1,666.833")); // FORMAT_NUMBER(CAST((word_count + 10000) AS FLOAT)/6, 3)
assertThat(r1.getString(24).equals("10,001")); // FORMAT_NUMBER(word_count + 10000, 0)
assertThat(r1.getString(25).equals("augurs-augurs-augurs")); // CONCAT_WS('-', word, word, word)
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions spark-bigquery-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

<properties>
<gpg.skip>true</gpg.skip>
<revision>0.30.0-aiq25</revision>
<revision>0.30.0-aiq26</revision>

<avro.version>1.11.1</avro.version>
<arrow.version>11.0.0</arrow.version>
Expand Down Expand Up @@ -128,7 +128,7 @@
<!-- checkstyle
<checkstyle.header.file>${reactor.project.basedir}/java.header</checkstyle.header.file>
-->
<spark.version>3-3-2-aiq109</spark.version>
<spark.version>3-3-2-aiq116</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.patch.version>15</scala.patch.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ abstract class SparkExpressionConverter {
_: Upper | _: StringInstr | _: InitCap |
_: Substring =>
ConstantString(expression.prettyName.toUpperCase()) + blockStatement(convertStatements(fields, expression.children: _*))
case ConcatWs(children) if children.length > 1 && children.head.foldable =>
val separator = children.head.toString
val numExps = children.size - 1
val sepExpression = Seq.fill(numExps)("%s").mkString(separator)
val formatExps = Literal(sepExpression) +: children.tail
ConstantString("FORMAT") + blockStatement(convertStatements(fields, formatExps: _*))
case _: Like =>
convertLikeExpression(expression, fields)
case RegExpExtract(child, Literal(pattern: UTF8String, StringType), idx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryPushdownUnsupportedExc
import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation
import com.google.cloud.spark.bigquery.pushdowns.TestConstants.expressionConverter
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Abs, Conv, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, AiqDayStart, AiqStringToDate, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Remainder, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.catalyst.expressions.{Abs, Conv, Md5, Sha1, Sha2, Acos, AiqDateToString, AiqDayDiff, AiqDayStart, AiqStringToDate, Alias, And, Ascending, Ascii, Asin, Atan, AttributeReference, Base64, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Coalesce, Concat, ConcatWs, Contains, Cos, Cosh, DateAdd, DateSub, DenseRank, Descending, EndsWith, EqualNullSafe, EqualTo, Exp, ExprId, Floor, FormatNumber, FormatString, GreaterThan, GreaterThanOrEqual, Greatest, If, In, InitCap, InSet, IsNaN, IsNotNull, IsNull, Least, Length, LessThan, LessThanOrEqual, Literal, Log10, Logarithm, Lower, Month, Not, Or, PercentRank, Pi, Pow, PromotePrecision, Quarter, Rand, Rank, RegExpExtract, RegExpReplace, Remainder, Round, RowNumber, ShiftLeft, ShiftRight, Signum, Sin, Sinh, SortOrder, SoundEx, Sqrt, StartsWith, StringInstr, StringLPad, StringRPad, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Tan, Tanh, TruncDate, UnBase64, UnscaledValue, Upper, Year}
import org.apache.spark.sql.types._
import org.mockito.{Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -379,6 +379,13 @@ class SparkExpressionConverterSuite extends AnyFunSuite with BeforeAndAfter {
assert(bigQuerySQLStatement.get.toString == "CONCAT ( SUBQUERY_2.SCHOOLID , '**' )")
}

test("convertStringExpressions with ConcatWs") {
val concatExpression = ConcatWs.apply(List(Literal("-"), schoolIdAttributeReference, Literal("**")))
val bigQuerySQLStatement = expressionConverter.convertStringExpressions(concatExpression, fields)
assert(bigQuerySQLStatement.isDefined)
assert(bigQuerySQLStatement.get.toString == "FORMAT ( '%s-%s' , SUBQUERY_2.SCHOOLID , '**' )")
}

test("convertStringExpressions with Length") {
val lengthExpression = Length.apply(schoolIdAttributeReference)
val bigQuerySQLStatement = expressionConverter.convertStringExpressions(lengthExpression, fields)
Expand Down

0 comments on commit a764326

Please sign in to comment.