Skip to content

Commit

Permalink
Issue#39 Fix the insert into Date and Timestamp column into Hive ACID…
Browse files Browse the repository at this point in the history
… tables

Hive Writer expected the Date and Timestamp in Hive format and
it was being provided as java.sql.* format earlier. Fix converts
it into expected format

(cherry picked from commit 8453fc1)
  • Loading branch information
amoghmargoor committed Apr 8, 2020
1 parent c67e3c2 commit 0e5bbf8
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions src/main/scala/org/apache/spark/sql/hive/Hive3Inspectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@
package org.apache.spark.sql.hive

import java.lang.reflect.{ParameterizedType, Type, WildcardType}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle, SignStyle}
import java.time.temporal.ChronoField

import scala.collection.JavaConverters._

import com.qubole.shaded.hadoop.hive.common.`type`.{HiveChar, HiveDecimal, HiveVarchar}
import com.qubole.shaded.hadoop.hive.common.`type`.{Date, HiveChar, HiveDecimal, HiveVarchar, Timestamp, TimestampUtils}
import com.qubole.shaded.hadoop.hive.serde2.{io => hiveIo}
import com.qubole.shaded.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _}
import com.qubole.shaded.hadoop.hive.serde2.objectinspector.primitive._
import com.qubole.shaded.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory}
import com.qubole.spark.hiveacid.AnalysisException
import org.apache.hadoop.{io => hadoopIo}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types

import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -45,6 +44,21 @@ import org.apache.spark.unsafe.types.UTF8String
*/
trait Hive3Inspectors {

val PRINT_FORMATTER = print_formatter
val PARSE_FORMATTER = parse_formatter

private def parse_formatter:DateTimeFormatter = {
var builder = new DateTimeFormatterBuilder
builder.appendValue(ChronoField.YEAR, 1, 10, SignStyle.NORMAL).appendLiteral('-').appendValue(ChronoField.MONTH_OF_YEAR, 1, 2, SignStyle.NORMAL).appendLiteral('-').appendValue(ChronoField.DAY_OF_MONTH, 1, 2, SignStyle.NORMAL)
builder.toFormatter.withResolverStyle(ResolverStyle.LENIENT)
}

private def print_formatter: DateTimeFormatter = {
val builder = new DateTimeFormatterBuilder()
builder.append(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
builder.toFormatter()
}

def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
Expand Down Expand Up @@ -185,10 +199,16 @@ trait Hive3Inspectors {
HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal))
case _: JavaDateObjectInspector =>
withNullSafe(o =>
DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
Date.valueOf(DateTimeUtils
.toJavaDate(o.asInstanceOf[Int])
.toLocalDate.format(print_formatter)))
case _: JavaTimestampObjectInspector =>
withNullSafe(o =>
DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
withNullSafe(o => {
val micros = o.asInstanceOf[Long]
val millis: Long = micros / 1000
val nanos: Long = (micros % 1000) * 1000
Timestamp.ofEpochMilli(millis, nanos.toInt)
})
case _: HiveDecimalObjectInspector if x.preferWritable() =>
withNullSafe(o => getDecimalWritable(o.asInstanceOf[Decimal]))
case _: HiveDecimalObjectInspector =>
Expand All @@ -201,11 +221,18 @@ trait Hive3Inspectors {
case _: DateObjectInspector if x.preferWritable() =>
withNullSafe(o => getDateWritable(o))
case _: DateObjectInspector =>
withNullSafe(o => DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
withNullSafe(o => Date.valueOf(DateTimeUtils
.toJavaDate(o.asInstanceOf[Int])
.toLocalDate.format(print_formatter)))
case _: TimestampObjectInspector if x.preferWritable() =>
withNullSafe(o => getTimestampWritable(o))
case _: TimestampObjectInspector =>
withNullSafe(o => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
withNullSafe(o => {
val micros = o.asInstanceOf[Long]
val millis: Long = micros / 1000
val nanos: Long = (micros % 1000) * 1000
Timestamp.ofEpochMilli(millis, nanos.toInt)
})
case _: VoidObjectInspector =>
(_: Any) => null // always be null for void object inspector
}
Expand Down

0 comments on commit 0e5bbf8

Please sign in to comment.