Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start Read Txn and Lock table for read operations #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ crossScalaVersions := Seq("2.11.12")

scalaVersion := crossScalaVersions.value.head

sparkVersion := "2.4.3"
sparkVersion := sys.props.getOrElse("spark.version", "2.4.3")

scalacOptions ++= Seq(
"-Xlint",
Expand All @@ -21,20 +21,20 @@ scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
)

resolvers += "spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
resolvers += "spark-packages" at sys.props.getOrElse("spark.repo", "https://dl.bintray.com/spark-packages/maven/")

libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % "2.4.3" % "provided" excludeAll(
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided" excludeAll(
ExclusionRule("org.apache", "hadoop-common"),
ExclusionRule("org.apache", "hadoop-hdfs")),
"org.apache.spark" %% "spark-sql" % "2.4.3" % "provided" excludeAll(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" excludeAll(
ExclusionRule("org.apache", "hadoop-common"),
ExclusionRule("org.apache", "hadoop-hdfs")),
"org.apache.spark" %% "spark-core" % "2.4.3" % "provided" excludeAll(
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" excludeAll(
ExclusionRule("org.apache", "hadoop-common"),
ExclusionRule("org.apache", "hadoop-hdfs")),
"org.apache.spark" %% "spark-catalyst" % "2.4.3" % "provided" excludeAll(
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" excludeAll(
ExclusionRule("org.apache", "hadoop-common"),
ExclusionRule("org.apache", "hadoop-hdfs"))
)
Expand All @@ -49,7 +49,7 @@ libraryDependencies ++= Seq(

// Shaded jar dependency
libraryDependencies ++= Seq(
"com.qubole" %% "spark-acid-shaded-dependencies" % "0.1"
"com.qubole" %% "spark-acid-shaded-dependencies" % sys.props.getOrElse("package.version", "0.1")
)


Expand Down
8 changes: 4 additions & 4 deletions docker/files/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.1
RUN tar -xvzf hadoop-3.1.1.tar.gz
RUN ln -sf /hadoop-3.1.1 /hadoop

RUN wget http://mirrors.estointernet.in/apache/hive/hive-3.1.1/apache-hive-3.1.1-bin.tar.gz
RUN tar -xvzf apache-hive-3.1.1-bin.tar.gz
RUN ln -sf /apache-hive-3.1.1-bin /hive
RUN wget http://mirrors.estointernet.in/apache/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
RUN tar -xvzf apache-hive-3.1.2-bin.tar.gz
RUN ln -sf /apache-hive-3.1.2-bin /hive

RUN yum -y install \
mysql-server mysql-connector-java \
&& yum -y clean all && rm -rf /tmp/* /var/tmp/* \
&& ln -s /usr/share/java/mysql-connector-java.jar apache-hive-3.1.1-bin/lib/mysql-connector-java.jar
&& ln -s /usr/share/java/mysql-connector-java.jar apache-hive-3.1.2-bin/lib/mysql-connector-java.jar

# Setup sock proxy
RUN yum install -y openssh openssh-clients openssh-server
Expand Down
8 changes: 5 additions & 3 deletions shaded-dependencies/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark-acid-shaded-dependencies"

version := "0.1"
version := sys.props.getOrElse("package.version", "0.1")

organization:= "com.qubole"

Expand Down Expand Up @@ -28,9 +28,11 @@ publishArtifact in (Compile, packageSrc) := false

publishArtifact in (Compile, packageBin) := false

val hive_version = "3.1.1"
val hive_version = sys.props.getOrElse("hive.version", "3.1.2")

val orc_version = "1.5.6"
val orc_version = sys.props.getOrElse("orc.version", "1.5.6")

resolvers += "Additional Maven Repository" at sys.props.getOrElse("hive.repo", "https://repo1.maven.org/maven2/")

// Shaded dependency
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ class HiveAcidRelation(var sqlContext: SQLContext,
s"hive.io.file.readcolumn.names: ${hadoopConf.get("hive.io.file.readcolumn.names")}, " +
s"hive.io.file.readcolumn.ids: ${hadoopConf.get("hive.io.file.readcolumn.ids")}")

val acidState = new HiveAcidState(sqlContext.sparkSession, hiveConf, hTable,
val acidState = new HiveAcidState(sqlContext.sparkSession, hiveConf,
sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes, partitionSchema, isFullAcidTable)
acidState.addTable(hTable)

val hiveReader = new HiveTableReader(
requiredAttributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,121 @@

package com.qubole.spark.datasources.hiveacid

import java.util.{ArrayList, List}

import com.qubole.shaded.hadoop.hive.common.{ValidTxnWriteIdList, ValidWriteIdList}
import com.qubole.shaded.hadoop.hive.conf.HiveConf
import com.qubole.shaded.hadoop.hive.metastore.HiveMetaStoreClient
import com.qubole.shaded.hadoop.hive.metastore.txn.TxnUtils
import com.qubole.shaded.hadoop.hive.metastore.api
import com.qubole.shaded.hadoop.hive.ql.metadata
import com.qubole.spark.datasources.hiveacid
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import com.qubole.shaded.hadoop.hive.ql.lockmgr.DbLockManager

import scala.collection.JavaConversions._
import java.util.concurrent._

import com.qubole.spark.datasources.hiveacid.rdd.{SparkAcidListener, SparkAcidQueryListener}
import com.qubole.shaded.hadoop.hive.metastore.{LockComponentBuilder, LockRequestBuilder}
import com.qubole.shaded.hadoop.hive.metastore.api.{DataOperationType, LockComponent}
import com.qubole.shaded.hadoop.hive.ql.lockmgr.HiveLock
import com.qubole.shaded.hadoop.hive.metastore.api.LockResponse

class HiveAcidState(sparkSession: SparkSession,
val hiveConf: HiveConf,
val table: metadata.Table,
val sizeInBytes: Long,
val pSchema: StructType,
val isFullAcidTable: Boolean) extends Logging {
private var dbName: String = _
private var tableName: String = _
private var txnId: Long = -1
private var lockId: Long = -1
private var validWriteIds: ValidWriteIdList = _
private var heartBeater: ScheduledFuture[_] = _
sparkSession.sparkContext.addSparkListener(new SparkAcidListener(this))
sparkSession.listenerManager.register(new SparkAcidQueryListener(this))
private val sparkUser = sparkSession.sparkContext.sparkUser

def startHeartbitThread(txnId : Long) : ScheduledFuture[_] = {
// Need to create a new client as multi thread is not supported for normal hms client.
val client = new HiveMetaStoreClient(hiveConf, null, false)

val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run(): Unit = client.heartbeat(txnId, lockId)
}
ex.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS)
}

def addTable(table: metadata.Table) : Unit = {
dbName = table.getDbName
tableName = table.getTableName
}

def lockTable(client : HiveMetaStoreClient): Unit = {
val compBuilder = new LockComponentBuilder
compBuilder.setShared
compBuilder.setOperationType(DataOperationType.SELECT)
compBuilder.setDbName(dbName)
compBuilder.setTableName(tableName)

val location: Path = table.getDataLocation
private val dbName: String = table.getDbName
private val tableName: String = table.getTableName
private val txnId: Long = -1
private var validWriteIdsNoTxn: ValidWriteIdList = _
val lockComponents: java.util.List[LockComponent] = new java.util.ArrayList[LockComponent]
lockComponents.add(compBuilder.build)

val rqstBuilder = new LockRequestBuilder("spark-acid")
rqstBuilder.setTransactionId(txnId).setUser(sparkUser)
//rqstBuilder.addLockComponents(lockComponents)
lockId = client.lock(rqstBuilder.build).asInstanceOf[LockResponse].getLockid
}

def beginRead(): Unit = {
// Get write ids to read. Currently, this data source does not open a transaction or take locks against
// it's read entities(partitions). This can be enhanced in the future
val client = new HiveMetaStoreClient(hiveConf, null, false)
val validTxns = client.getValidTxns()
val txnWriteIds: ValidTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(txnId,
client.getValidWriteIds(Seq(dbName + "." + tableName),
validTxns.writeToString()))
validWriteIdsNoTxn = txnWriteIds.getTableValidWriteIdList(table.getDbName + "." + table.getTableName)
client.close()

try {
txnId = client.openTxn("spark-acid")
heartBeater = startHeartbitThread(txnId)
lockTable(client)
val validTxns = client.getValidTxns(txnId)
val txnWriteIds: ValidTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(txnId,
client.getValidWriteIds(Seq(dbName + "." + tableName),
validTxns.writeToString()))
validWriteIds = txnWriteIds.getTableValidWriteIdList(dbName + "." + tableName)
} catch {
case ise: Throwable => endTxn(client)
} finally {
client.close()
}
}

def endTxn(client : HiveMetaStoreClient): Unit = {
if (txnId != -1) {
heartBeater.cancel(true)
try {
client.commitTxn(txnId)
} finally {
// If heart bit thread is stopped, the transaction will be committed eventually.
txnId = -1
}
}
}

def end(): Unit = {
// no op for now. If we start taking locks in the future, this can be implemented to release the locks and
// close the transaction
val client = new HiveMetaStoreClient(hiveConf, null, false)
try {
endTxn(client)
} finally {
client.close()
}
}

def getValidWriteIds: ValidWriteIdList = {
if (validWriteIdsNoTxn == null) {
if (validWriteIds == null) {
throw HiveAcidErrors.validWriteIdsNotInitialized
}
validWriteIdsNoTxn
validWriteIds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package com.qubole.spark.datasources.hiveacid.rdd

import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.concurrent.ConcurrentHashMap
import java.util.{Date, Locale}

import com.qubole.shaded.hadoop.hive.common.ValidWriteIdList
Expand Down Expand Up @@ -50,7 +51,7 @@ import scala.reflect.ClassTag

object Cache {
import com.google.common.collect.MapMaker
val jobConf = new MapMaker().softValues().makeMap[String, Any]()
val jobConf = new ConcurrentHashMap[String, Any]()
}

class Hive3Partition(rddId: Int, override val index: Int, s: InputSplit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2019 Qubole, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark.datasources.hiveacid.rdd

import com.qubole.spark.datasources.hiveacid.HiveAcidState
import org.apache.spark.scheduler._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.util.QueryExecutionListener

class SparkAcidQueryListener(acidState: HiveAcidState) extends QueryExecutionListener with Logging {
private val acidStateLocal: HiveAcidState = acidState

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
acidStateLocal.end()
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
acidStateLocal.end()
}
}

class SparkAcidListener(acidState: HiveAcidState) extends SparkListener {
private val acidStateLocal: HiveAcidState = acidState

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
super.onApplicationEnd(applicationEnd)
acidStateLocal.end()
}
}