diff --git a/core/raydp-main/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java b/core/raydp-main/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java index 0af7c8e5..5c09a98e 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java +++ b/core/raydp-main/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java @@ -60,8 +60,8 @@ public static ActorHandle createExecutorActor( if (placementGroup != null) { creator.setPlacementGroup(placementGroup, bundleIndex); } - creator.setMaxRestarts(3); - creator.setMaxTaskRetries(3); + creator.setMaxRestarts(-1); + creator.setMaxTaskRetries(-1); creator.setMaxConcurrency(2); return creator.remote(); } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala index c5b7c114..4091ccdd 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala @@ -99,15 +99,15 @@ private[spark] class ApplicationInfo( addressToExecutorId(address) = executorId } - def kill(address: RpcAddress): Boolean = { + def kill(address: RpcAddress, shutdownActor: Boolean): Boolean = { if (addressToExecutorId.contains(address)) { - kill(addressToExecutorId(address)) + kill(addressToExecutorId(address), shutdownActor) } else { false } } - def kill(executorId: String): Boolean = { + def kill(executorId: String, shutdownActor: Boolean): Boolean = { if (executors.contains(executorId)) { val exec = executors(executorId) if (exec.registered) { @@ -116,12 +116,18 @@ private[spark] class ApplicationInfo( removedExecutors += executors(executorId) executors -= executorId coresGranted -= exec.cores - // Previously we use Ray.kill(true) here, which prevents executors from restarting. - // But we want executors died accidentally to restart, so we use Ray.exitActor now. - // Because if ray actor is already dead, it probably died from node failure, - // and this method won't be executed, so it can restart. - // Otherwise, it exits intentionally here and won't restart. - RayExecutorUtils.exitExecutor(executorIdToHandler(executorId)) + if (shutdownActor) { + // Previously we used to exitExecutor for all scenarios, but it will cause + // the following issue when a executor is down because of OOM issue: + // - Executor E1 dies at T0 lets say because of OOm + // - We try to kill it by firing stop call on E1 actor + // - Since the actor is not available, the stop task fails for E1 + // - In the mean while, ray brings up the lost executor E1 + // - The failed task (stop task) gets retried as there are task retries configured. + // - The stop task gets fired on the new executor which got recovered + // - The Recovered executor exits with status as user intended exit. + RayExecutorUtils.exitExecutor(executorIdToHandler(executorId)) + } executorIdToHandler -= executorId true } else { diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index c74a1120..08945732 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -183,7 +183,7 @@ class RayAppMaster(host: String, assert(appInfo != null && appInfo.id == appId) var success = true for (executorId <- executorIds) { - if (!appInfo.kill(executorId)) { + if (!appInfo.kill(executorId, shutdownActor = true)) { success = false } } @@ -210,7 +210,7 @@ class RayAppMaster(host: String, } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - appInfo.kill(remoteAddress) + appInfo.kill(remoteAddress, shutdownActor = false) } override def onStop(): Unit = {