diff --git a/build.sbt b/build.sbt index 454329b..37cac67 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ crossScalaVersions := Seq("2.11.12") scalaVersion := crossScalaVersions.value.head -sparkVersion := "2.4.3" +sparkVersion := sys.props.getOrElse("spark.version", "2.4.3") scalacOptions ++= Seq( "-Xlint", @@ -21,20 +21,20 @@ scalacOptions in (Compile, doc) ++= Seq( "-no-link-warnings" // Suppresses problems with Scaladoc @throws links ) -resolvers += "spark-packages" at "https://dl.bintray.com/spark-packages/maven/" +resolvers += "spark-packages" at sys.props.getOrElse("spark.repo", "https://dl.bintray.com/spark-packages/maven/") libraryDependencies ++= Seq( // Adding test classifier seems to break transitive resolution of the core dependencies - "org.apache.spark" %% "spark-hive" % "2.4.3" % "provided" excludeAll( + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided" excludeAll( ExclusionRule("org.apache", "hadoop-common"), ExclusionRule("org.apache", "hadoop-hdfs")), - "org.apache.spark" %% "spark-sql" % "2.4.3" % "provided" excludeAll( + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" excludeAll( ExclusionRule("org.apache", "hadoop-common"), ExclusionRule("org.apache", "hadoop-hdfs")), - "org.apache.spark" %% "spark-core" % "2.4.3" % "provided" excludeAll( + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" excludeAll( ExclusionRule("org.apache", "hadoop-common"), ExclusionRule("org.apache", "hadoop-hdfs")), - "org.apache.spark" %% "spark-catalyst" % "2.4.3" % "provided" excludeAll( + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" excludeAll( ExclusionRule("org.apache", "hadoop-common"), ExclusionRule("org.apache", "hadoop-hdfs")) ) @@ -49,7 +49,7 @@ libraryDependencies ++= Seq( // Shaded jar dependency libraryDependencies ++= Seq( - "com.qubole" %% "spark-acid-shaded-dependencies" % "0.1" + "com.qubole" %% "spark-acid-shaded-dependencies" % sys.props.getOrElse("package.version", "0.1") ) diff --git a/docker/files/Dockerfile b/docker/files/Dockerfile index 1d46ae6..930a11f 100644 --- a/docker/files/Dockerfile +++ b/docker/files/Dockerfile @@ -31,14 +31,14 @@ RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.1 RUN tar -xvzf hadoop-3.1.1.tar.gz RUN ln -sf /hadoop-3.1.1 /hadoop -RUN wget http://mirrors.estointernet.in/apache/hive/hive-3.1.1/apache-hive-3.1.1-bin.tar.gz -RUN tar -xvzf apache-hive-3.1.1-bin.tar.gz -RUN ln -sf /apache-hive-3.1.1-bin /hive +RUN wget http://mirrors.estointernet.in/apache/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz +RUN tar -xvzf apache-hive-3.1.2-bin.tar.gz +RUN ln -sf /apache-hive-3.1.2-bin /hive RUN yum -y install \ mysql-server mysql-connector-java \ && yum -y clean all && rm -rf /tmp/* /var/tmp/* \ - && ln -s /usr/share/java/mysql-connector-java.jar apache-hive-3.1.1-bin/lib/mysql-connector-java.jar + && ln -s /usr/share/java/mysql-connector-java.jar apache-hive-3.1.2-bin/lib/mysql-connector-java.jar # Setup sock proxy RUN yum install -y openssh openssh-clients openssh-server diff --git a/shaded-dependencies/build.sbt b/shaded-dependencies/build.sbt index 425aa93..c6ad1f4 100644 --- a/shaded-dependencies/build.sbt +++ b/shaded-dependencies/build.sbt @@ -1,6 +1,6 @@ name := "spark-acid-shaded-dependencies" -version := "0.1" +version := sys.props.getOrElse("package.version", "0.1") organization:= "com.qubole" @@ -28,9 +28,11 @@ publishArtifact in (Compile, packageSrc) := false publishArtifact in (Compile, packageBin) := false -val hive_version = "3.1.1" +val hive_version = sys.props.getOrElse("hive.version", "3.1.2") -val orc_version = "1.5.6" +val orc_version = sys.props.getOrElse("orc.version", "1.5.6") + +resolvers += "Additional Maven Repository" at sys.props.getOrElse("hive.repo", "https://repo1.maven.org/maven2/") // Shaded dependency libraryDependencies ++= Seq( diff --git a/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidRelation.scala b/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidRelation.scala index fa7bd66..4991306 100644 --- a/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidRelation.scala +++ b/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidRelation.scala @@ -163,8 +163,9 @@ class HiveAcidRelation(var sqlContext: SQLContext, s"hive.io.file.readcolumn.names: ${hadoopConf.get("hive.io.file.readcolumn.names")}, " + s"hive.io.file.readcolumn.ids: ${hadoopConf.get("hive.io.file.readcolumn.ids")}") - val acidState = new HiveAcidState(sqlContext.sparkSession, hiveConf, hTable, + val acidState = new HiveAcidState(sqlContext.sparkSession, hiveConf, sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes, partitionSchema, isFullAcidTable) + acidState.addTable(hTable) val hiveReader = new HiveTableReader( requiredAttributes, diff --git a/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidState.scala b/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidState.scala index 6169e00..ac37a79 100644 --- a/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidState.scala +++ b/src/main/scala/com/qubole/spark/datasources/hiveacid/HiveAcidState.scala @@ -19,52 +19,121 @@ package com.qubole.spark.datasources.hiveacid +import java.util.{ArrayList, List} + import com.qubole.shaded.hadoop.hive.common.{ValidTxnWriteIdList, ValidWriteIdList} import com.qubole.shaded.hadoop.hive.conf.HiveConf import com.qubole.shaded.hadoop.hive.metastore.HiveMetaStoreClient import com.qubole.shaded.hadoop.hive.metastore.txn.TxnUtils +import com.qubole.shaded.hadoop.hive.metastore.api import com.qubole.shaded.hadoop.hive.ql.metadata +import com.qubole.spark.datasources.hiveacid import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import com.qubole.shaded.hadoop.hive.ql.lockmgr.DbLockManager import scala.collection.JavaConversions._ +import java.util.concurrent._ + +import com.qubole.spark.datasources.hiveacid.rdd.{SparkAcidListener, SparkAcidQueryListener} +import com.qubole.shaded.hadoop.hive.metastore.{LockComponentBuilder, LockRequestBuilder} +import com.qubole.shaded.hadoop.hive.metastore.api.{DataOperationType, LockComponent} +import com.qubole.shaded.hadoop.hive.ql.lockmgr.HiveLock +import com.qubole.shaded.hadoop.hive.metastore.api.LockResponse class HiveAcidState(sparkSession: SparkSession, val hiveConf: HiveConf, - val table: metadata.Table, val sizeInBytes: Long, val pSchema: StructType, val isFullAcidTable: Boolean) extends Logging { + private var dbName: String = _ + private var tableName: String = _ + private var txnId: Long = -1 + private var lockId: Long = -1 + private var validWriteIds: ValidWriteIdList = _ + private var heartBeater: ScheduledFuture[_] = _ + sparkSession.sparkContext.addSparkListener(new SparkAcidListener(this)) + sparkSession.listenerManager.register(new SparkAcidQueryListener(this)) + private val sparkUser = sparkSession.sparkContext.sparkUser + + def startHeartbitThread(txnId : Long) : ScheduledFuture[_] = { + // Need to create a new client as multi thread is not supported for normal hms client. + val client = new HiveMetaStoreClient(hiveConf, null, false) + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + def run(): Unit = client.heartbeat(txnId, lockId) + } + ex.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS) + } + + def addTable(table: metadata.Table) : Unit = { + dbName = table.getDbName + tableName = table.getTableName + } + + def lockTable(client : HiveMetaStoreClient): Unit = { + val compBuilder = new LockComponentBuilder + compBuilder.setShared + compBuilder.setOperationType(DataOperationType.SELECT) + compBuilder.setDbName(dbName) + compBuilder.setTableName(tableName) - val location: Path = table.getDataLocation - private val dbName: String = table.getDbName - private val tableName: String = table.getTableName - private val txnId: Long = -1 - private var validWriteIdsNoTxn: ValidWriteIdList = _ + val lockComponents: java.util.List[LockComponent] = new java.util.ArrayList[LockComponent] + lockComponents.add(compBuilder.build) + + val rqstBuilder = new LockRequestBuilder("spark-acid") + rqstBuilder.setTransactionId(txnId).setUser(sparkUser) + //rqstBuilder.addLockComponents(lockComponents) + lockId = client.lock(rqstBuilder.build).asInstanceOf[LockResponse].getLockid + } def beginRead(): Unit = { - // Get write ids to read. Currently, this data source does not open a transaction or take locks against - // it's read entities(partitions). This can be enhanced in the future val client = new HiveMetaStoreClient(hiveConf, null, false) - val validTxns = client.getValidTxns() - val txnWriteIds: ValidTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(txnId, - client.getValidWriteIds(Seq(dbName + "." + tableName), - validTxns.writeToString())) - validWriteIdsNoTxn = txnWriteIds.getTableValidWriteIdList(table.getDbName + "." + table.getTableName) - client.close() + + try { + txnId = client.openTxn("spark-acid") + heartBeater = startHeartbitThread(txnId) + lockTable(client) + val validTxns = client.getValidTxns(txnId) + val txnWriteIds: ValidTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(txnId, + client.getValidWriteIds(Seq(dbName + "." + tableName), + validTxns.writeToString())) + validWriteIds = txnWriteIds.getTableValidWriteIdList(dbName + "." + tableName) + } catch { + case ise: Throwable => endTxn(client) + } finally { + client.close() + } + } + + def endTxn(client : HiveMetaStoreClient): Unit = { + if (txnId != -1) { + heartBeater.cancel(true) + try { + client.commitTxn(txnId) + } finally { + // If heart bit thread is stopped, the transaction will be committed eventually. + txnId = -1 + } + } } def end(): Unit = { - // no op for now. If we start taking locks in the future, this can be implemented to release the locks and - // close the transaction + val client = new HiveMetaStoreClient(hiveConf, null, false) + try { + endTxn(client) + } finally { + client.close() + } } def getValidWriteIds: ValidWriteIdList = { - if (validWriteIdsNoTxn == null) { + if (validWriteIds == null) { throw HiveAcidErrors.validWriteIdsNotInitialized } - validWriteIdsNoTxn + validWriteIds } } diff --git a/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala b/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala index 7211c57..c2155c7 100644 --- a/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala +++ b/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala @@ -21,6 +21,7 @@ package com.qubole.spark.datasources.hiveacid.rdd import java.io.{FileNotFoundException, IOException} import java.text.SimpleDateFormat +import java.util.concurrent.ConcurrentHashMap import java.util.{Date, Locale} import com.qubole.shaded.hadoop.hive.common.ValidWriteIdList @@ -50,7 +51,7 @@ import scala.reflect.ClassTag object Cache { import com.google.common.collect.MapMaker - val jobConf = new MapMaker().softValues().makeMap[String, Any]() + val jobConf = new ConcurrentHashMap[String, Any]() } class Hive3Partition(rddId: Int, override val index: Int, s: InputSplit) diff --git a/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/SparkAcidListener.scala b/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/SparkAcidListener.scala new file mode 100644 index 0000000..0ed1e92 --- /dev/null +++ b/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/SparkAcidListener.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2019 Qubole, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qubole.spark.datasources.hiveacid.rdd + +import com.qubole.spark.datasources.hiveacid.HiveAcidState +import org.apache.spark.scheduler._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.util.QueryExecutionListener + +class SparkAcidQueryListener(acidState: HiveAcidState) extends QueryExecutionListener with Logging { + private val acidStateLocal: HiveAcidState = acidState + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + acidStateLocal.end() + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + acidStateLocal.end() + } +} + +class SparkAcidListener(acidState: HiveAcidState) extends SparkListener { + private val acidStateLocal: HiveAcidState = acidState + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + super.onApplicationEnd(applicationEnd) + acidStateLocal.end() + } +} + +