Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery for lost executors 364 #389

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public static void setUpExecutor(
appId, driverUrl, cores, classPathEntries).remote();
}

/**
* Check whether the executor is alive by calling the remote helathcheck method.
* @param handler
* @return
*/
public static boolean isExecutorAlive(
ActorHandle<RayDPExecutor> handler) {
try {
// We wait for 1 second to check whether the executor is alive.
return Ray.get(handler.task(RayDPExecutor::alive).remote(), 1000);
} catch (Exception e) {
return false;
}
}

public static String[] getBlockLocations(
ActorHandle<RayDPExecutor> handler,
int rddId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ private[spark] class ApplicationInfo(
addressToExecutorId(address) = executorId
}

/**
* Remove a lost executor from the application context.
* @param executorId
*/
def removeLostExecutor(executorId: String): Unit = {
if (executors.contains(executorId)) {
val exec = executors(executorId)
if (exec.registered) {
registeredExecutors -= 1
}
removedExecutors += executors(executorId)
executors -= executorId
coresGranted -= exec.cores
executorIdToHandler -= executorId
}
}

def kill(address: RpcAddress): Boolean = {
if (addressToExecutorId.contains(address)) {
kill(addressToExecutorId(address))
Expand Down Expand Up @@ -134,6 +151,10 @@ private[spark] class ApplicationInfo(
executorIdToHandler.get(executorId)
}

def getAllExecutorIds(): Seq[String] = {
executorIdToHandler.keys.toSeq
}

def remainingUnRegisteredExecutors(): Int = {
desc.numExecutors - registeredExecutors
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ case class ExecutorStarted(executorId: String) extends RayDPDeployMessage

case class RequestExecutors(appId: String, requestedTotal: Int) extends RayDPDeployMessage

/**
* Message to mark an executor as lost if its handler is not reachable.
*/
case class MarkLostExecutors(appDescription: ApplicationDescription) extends RayDPDeployMessage

case class KillExecutors(appId: String, executorIds: Seq[String]) extends RayDPDeployMessage

case class RequestAddPendingRestartedExecutor(executorId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,37 @@ class RayAppMaster(host: String,
driver.send(RegisteredApplication(app.id, self))
schedule()

/**
* On receiving the event, we check if any executors are lost and mark them in the appInfo.
*/
case MarkLostExecutors(appDescription: ApplicationDescription) =>
logDebug("Received an event for MarkLostExecutors")
val allExecutors = appInfo.getAllExecutorIds()
logDebug(s"Checking if any of the executor handlers are unreachable for ${allExecutors.mkString(", ")}")
allExecutors.foreach { executorId =>
val handlerOpt = appInfo.getExecutorHandler(executorId)
if (handlerOpt.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not likely to happen, right?

logInfo(s"Executor ${executorId} is empty. Removing it from the appInfo")
appInfo.removeLostExecutor(executorId)
} else {
val handler = handlerOpt.get.asInstanceOf[ActorHandle[RayDPExecutor]]
logDebug(s"Checking if executor ${executorId} is alive")
if (!RayExecutorUtils.isExecutorAlive(handler)){
logInfo(s"Executor ${executorId} is not reachable. Removing it from the appInfo")
appInfo.removeLostExecutor(executorId)
requestNewExecutor()
} else {
logDebug(s"Executor ${executorId} is alive")
}
}
}
val targetExecutors = appDescription.numExecutors
logDebug(s"Current state of numExec is ${appInfo.currentExecutors()} and target executors is ${targetExecutors}")
if (targetExecutors > appInfo.currentExecutors()) {
logDebug("Recovering lost executors")
requestNewExecutor()
}

case UnregisterApplication(appId) =>
assert(appInfo != null && appInfo.id == appId)
appInfo.markFinished(ApplicationState.FINISHED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class RayDPExecutor(
started.compareAndSet(false, true)
}

def alive(): Boolean = {
return started.get()
}

def createWorkingDir(appId: String): Unit = {
// create the application dir
val app_dir = new File(RayConfig.create().sessionDir, appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef, RpcEnv, ThreadS
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

/**
* A SchedulerBackend that request executor from Ray.
Expand Down Expand Up @@ -246,16 +248,35 @@ class RayCoarseGrainedSchedulerBackend(
appDesc: ApplicationDescription,
override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging {

private val executorRecoveryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-executor-recovery-thread")

private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _

override def onStart(): Unit = {
try {
registerToAppMaster()
/*
* Periodically send events to the app master to check for executor loss
*/
checkForWorkerTimeOutTask = executorRecoveryThread.scheduleAtFixedRate(
() => appMasterRef.get.send(MarkLostExecutors(appDesc)),
30, 30, TimeUnit.SECONDS)
} catch {
case e: Exception =>
logWarning("Failed to connect to app master", e)
stop()
}
}

override def onStop(): Unit = {
// Stop the daemon threads
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
executorRecoveryThread.shutdownNow()
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(id, ref) =>
appId.set(id)
Expand Down
Loading