From 0e5bbf80748914117c0b78e777305e5b7ec8d5a1 Mon Sep 17 00:00:00 2001 From: Amogh Margoor Date: Tue, 7 Apr 2020 16:56:23 -0700 Subject: [PATCH] Issue#39 Fix the insert into Date and Timestamp column into Hive ACID tables Hive Writer expected the Date and Timestamp in Hive format and it was being provided as java.sql.* format earlier. Fix converts it into expected format (cherry picked from commit 8453fc1ddc29527b2b7294bf21d75278756629af) --- .../spark/sql/hive/Hive3Inspectors.scala | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/hive/Hive3Inspectors.scala b/src/main/scala/org/apache/spark/sql/hive/Hive3Inspectors.scala index 968c393..2e55fbd 100644 --- a/src/main/scala/org/apache/spark/sql/hive/Hive3Inspectors.scala +++ b/src/main/scala/org/apache/spark/sql/hive/Hive3Inspectors.scala @@ -20,22 +20,21 @@ package org.apache.spark.sql.hive import java.lang.reflect.{ParameterizedType, Type, WildcardType} +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle, SignStyle} +import java.time.temporal.ChronoField import scala.collection.JavaConverters._ - -import com.qubole.shaded.hadoop.hive.common.`type`.{HiveChar, HiveDecimal, HiveVarchar} +import com.qubole.shaded.hadoop.hive.common.`type`.{Date, HiveChar, HiveDecimal, HiveVarchar, Timestamp, TimestampUtils} import com.qubole.shaded.hadoop.hive.serde2.{io => hiveIo} import com.qubole.shaded.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _} import com.qubole.shaded.hadoop.hive.serde2.objectinspector.primitive._ import com.qubole.shaded.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory} import com.qubole.spark.hiveacid.AnalysisException import org.apache.hadoop.{io => hadoopIo} - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types - import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -45,6 +44,21 @@ import org.apache.spark.unsafe.types.UTF8String */ trait Hive3Inspectors { + val PRINT_FORMATTER = print_formatter + val PARSE_FORMATTER = parse_formatter + + private def parse_formatter:DateTimeFormatter = { + var builder = new DateTimeFormatterBuilder + builder.appendValue(ChronoField.YEAR, 1, 10, SignStyle.NORMAL).appendLiteral('-').appendValue(ChronoField.MONTH_OF_YEAR, 1, 2, SignStyle.NORMAL).appendLiteral('-').appendValue(ChronoField.DAY_OF_MONTH, 1, 2, SignStyle.NORMAL) + builder.toFormatter.withResolverStyle(ResolverStyle.LENIENT) + } + + private def print_formatter: DateTimeFormatter = { + val builder = new DateTimeFormatterBuilder() + builder.append(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + builder.toFormatter() + } + def javaTypeToDataType(clz: Type): DataType = clz match { // writable case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType @@ -185,10 +199,16 @@ trait Hive3Inspectors { HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal)) case _: JavaDateObjectInspector => withNullSafe(o => - DateTimeUtils.toJavaDate(o.asInstanceOf[Int])) + Date.valueOf(DateTimeUtils + .toJavaDate(o.asInstanceOf[Int]) + .toLocalDate.format(print_formatter))) case _: JavaTimestampObjectInspector => - withNullSafe(o => - DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long])) + withNullSafe(o => { + val micros = o.asInstanceOf[Long] + val millis: Long = micros / 1000 + val nanos: Long = (micros % 1000) * 1000 + Timestamp.ofEpochMilli(millis, nanos.toInt) + }) case _: HiveDecimalObjectInspector if x.preferWritable() => withNullSafe(o => getDecimalWritable(o.asInstanceOf[Decimal])) case _: HiveDecimalObjectInspector => @@ -201,11 +221,18 @@ trait Hive3Inspectors { case _: DateObjectInspector if x.preferWritable() => withNullSafe(o => getDateWritable(o)) case _: DateObjectInspector => - withNullSafe(o => DateTimeUtils.toJavaDate(o.asInstanceOf[Int])) + withNullSafe(o => Date.valueOf(DateTimeUtils + .toJavaDate(o.asInstanceOf[Int]) + .toLocalDate.format(print_formatter))) case _: TimestampObjectInspector if x.preferWritable() => withNullSafe(o => getTimestampWritable(o)) case _: TimestampObjectInspector => - withNullSafe(o => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long])) + withNullSafe(o => { + val micros = o.asInstanceOf[Long] + val millis: Long = micros / 1000 + val nanos: Long = (micros % 1000) * 1000 + Timestamp.ofEpochMilli(millis, nanos.toInt) + }) case _: VoidObjectInspector => (_: Any) => null // always be null for void object inspector }