Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery for lost executors 364 #389

Closed
wants to merge 3 commits into from
Closed

Recovery for lost executors 364 #389

wants to merge 3 commits into from

Conversation

KiranP-d11
Copy link
Contributor

Pull request for the bug described in the issue #364.

Copy link
Collaborator

@kira-lin kira-lin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR!
I have a concern, though, which is how will this interact with fault tolerance mode? In fault tolerance mode, if an executor(actor) failed, it will get restarted and re-register. I think this pr does pretty much the same thing, but in Spark. If users turn on fault tolerant mode, will executors be requested multiple times?

Maybe I forgot the context, but why does fault tolerant mode failed to meet your need?

logDebug(s"Checking if any of the executor handlers are unreachable for ${allExecutors.mkString(", ")}")
allExecutors.foreach { executorId =>
val handlerOpt = appInfo.getExecutorHandler(executorId)
if (handlerOpt.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not likely to happen, right?

@kira-lin
Copy link
Collaborator

kira-lin commented Nov 9, 2023

Please also fix the style lint errors, thanks

@KiranP-d11
Copy link
Contributor Author

Correct me if I am wrong, but the fault tolerance mode is only for converting a Spark DataFrame to a Ray dataset. If the use case is to just run a Spark job and write the output to a destination (like S3), then enabling fault tolerance mode has no effect.

That being said, I understand that when the executor actors die, either because of OOM issues or node failures, Ray would restart the actors, and the restarted executors should be added back to Spark. However, currently, this is not happening. When an executor dies, it gets restarted, but it immediately dies before adding itself to the Spark application. This results in the Spark application getting stuck, where it indefinitely waits for the executors to come up without doing anything.

I will try to debug why this issue is happening and will attempt to raise a PR for it (in which case this current PR with changes in spark is not needed.)

@kira-lin
Copy link
Collaborator

However, currently, this is not happening. When an executor dies, it gets restarted, but it immediately dies before adding itself to the Spark application. This results in the Spark application getting stuck, where it indefinitely waits for the executors to come up without doing anything.

Do you mean even when fault tolerant mode is on, executors are not adding back as expected? I see, this might be some bug.

We add fault tolerance mode to recover dataframes, indeed, and it introduces some behavior which might not be wanted. We are happy to have this feature, but we should make sure this can be turned off when fault tolerance mode is on.

@KiranP-d11
Copy link
Contributor Author

Do you mean even when fault tolerant mode is on, executors are not adding back as expected? I see, this might be some bug.

Yes, even when the fault tolerance mode is on, executors are not adding back.

@kira-lin
Copy link
Collaborator

I see.

We are happy to have this feature, but we should make sure this can be turned off when fault tolerance mode is on.

Is it possible to check if fault tolerance mode is on, and if so disable the periodic sent RPC?

@pang-wu , does this solve your issue?

@KiranP-d11
Copy link
Contributor Author

Is it possible to check if fault tolerance mode is on, and if so disable the periodic sent RPC?

I will check and get back on this.

@pang-wu
Copy link
Contributor

pang-wu commented Nov 10, 2023

@kira-lin I need to check, but I am very exited about this feature -- it solve a big problem in our production. However I have some concern on turning this feature off automatically, if possible, I would rather have an option for user to turn it off. The reasons are below:

Is it possible to check if fault tolerance mode is on, and if so disable the periodic sent RPC?

Let's say if fault tolerance mode is on, but I am not converting Spark dataframe to Ray dataset. Will RayDP still recovers failed executors?
Also, we have other usecases where Spark context is not initialized using Python code, but using Spark submit/raydp submimt. In this case, we don't have a way to turn fault tolerance on event it can solve the problem..?

@kira-lin
Copy link
Collaborator

Will RayDP still recovers failed executors?

With fault tolerance mode on, we basically turn on Ray's failed actor restart, so yes, it should recover the executors by design.

Also, we have other usecases where Spark context is not initialized using Python code, but using Spark submit/raydp submimt. In this case, we don't have a way to turn fault tolerance on event it can solve the problem..?

That's right.

@KiranP-d11
Copy link
Contributor Author

@kira-lin I was able to debug and fix the issue with ray's fault tolerance.
I have raised another PR for this.

Closing this PR as its is not required.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants