diff --git a/python/raydp/spark/__init__.py b/python/raydp/spark/__init__.py index c41a94d5..48a8a2ee 100644 --- a/python/raydp/spark/__init__.py +++ b/python/raydp/spark/__init__.py @@ -15,7 +15,9 @@ # limitations under the License. # -from .dataset import spark_dataframe_to_ray_dataset, \ +from .dataset import PartitionObjectsOwner, \ + get_raydp_master_owner, \ + spark_dataframe_to_ray_dataset, \ ray_dataset_to_spark_dataframe, \ from_spark_recoverable from .interfaces import SparkEstimatorInterface @@ -24,6 +26,8 @@ __all__ = [ "SparkCluster", "SparkEstimatorInterface", + "PartitionObjectsOwner", + "get_raydp_master_owner", "spark_dataframe_to_ray_dataset", "ray_dataset_to_spark_dataframe", "from_spark_recoverable" diff --git a/python/raydp/spark/dataset.py b/python/raydp/spark/dataset.py index 3b09b114..06982dbe 100644 --- a/python/raydp/spark/dataset.py +++ b/python/raydp/spark/dataset.py @@ -13,16 +13,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import uuid from typing import Callable, Dict, List, NoReturn, Optional, Iterable, Union +from dataclasses import dataclass import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import pyspark.sql as sql +from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import StructType from pyspark.sql.pandas.types import from_arrow_type @@ -132,6 +133,30 @@ def with_row_ids(self, new_row_ids) -> "ParquetPiece": return ParquetPiece(self.piece, self.columns, self.partitions, new_row_ids, num_rows) +@dataclass +class PartitionObjectsOwner: + # Actor owner name + actor_name: str + # Function that set serialized parquet objects to actor owner state + # and return result of .remote() calling + set_reference_as_state: Callable[[ray.actor.ActorHandle, List[ObjectRef]], ObjectRef] + + +def get_raydp_master_owner(spark: Optional[SparkSession] = None) -> PartitionObjectsOwner: + if spark is None: + spark = SparkSession.getActiveSession() + obj_holder_name = spark.sparkContext.appName + RAYDP_SPARK_MASTER_SUFFIX + + def raydp_master_set_reference_as_state( + raydp_master_actor: ray.actor.ActorHandle, + objects: List[ObjectRef]) -> ObjectRef: + return raydp_master_actor.add_objects.remote(uuid.uuid4(), objects) + + return PartitionObjectsOwner( + obj_holder_name, + raydp_master_set_reference_as_state) + + @client_mode_wrap def _register_objects(records): worker = ray.worker.global_worker @@ -147,36 +172,34 @@ def _register_objects(records): return blocks, block_sizes def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True, - _use_owner: bool = False): + owner: Union[PartitionObjectsOwner, None] = None): # call java function from python jvm = df.sql_ctx.sparkSession.sparkContext._jvm jdf = df._jdf object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf) - obj_holder_name = df.sql_ctx.sparkSession.sparkContext.appName + RAYDP_SPARK_MASTER_SUFFIX - if _use_owner is True: - records = object_store_writer.save(use_batch, obj_holder_name) - else: - records = object_store_writer.save(use_batch, "") + actor_owner_name = "" + if owner is not None: + actor_owner_name = owner.actor_name + records = object_store_writer.save(use_batch, actor_owner_name) record_tuples = [(record.objectId(), record.ownerAddress(), record.numRecords()) - for record in records] + for record in records] blocks, block_sizes = _register_objects(record_tuples) - if _use_owner is True: - holder = ray.get_actor(obj_holder_name) - df_id = uuid.uuid4() - ray.get(holder.add_objects.remote(df_id, blocks)) + if owner is not None: + actor_owner = ray.get_actor(actor_owner_name) + ray.get(owner.set_reference_as_state(actor_owner, blocks)) return blocks, block_sizes def spark_dataframe_to_ray_dataset(df: sql.DataFrame, parallelism: Optional[int] = None, - _use_owner: bool = False): + owner: Union[PartitionObjectsOwner, None] = None): num_part = df.rdd.getNumPartitions() if parallelism is not None: if parallelism != num_part: df = df.repartition(parallelism) - blocks, _ = _save_spark_df_to_object_store(df, False, _use_owner) + blocks, _ = _save_spark_df_to_object_store(df, False, owner) return from_arrow_refs(blocks) # This is an experimental API for now. diff --git a/python/raydp/tests/test_data_owner_transfer.py b/python/raydp/tests/test_data_owner_transfer.py index f34edaec..dd859fee 100644 --- a/python/raydp/tests/test_data_owner_transfer.py +++ b/python/raydp/tests/test_data_owner_transfer.py @@ -1,18 +1,22 @@ import sys import time +from typing import Any import pytest import ray from ray._private.client_mode_hook import client_mode_wrap from ray.exceptions import RayTaskError, OwnerDiedError import raydp +from raydp.spark import PartitionObjectsOwner + +from raydp.spark import get_raydp_master_owner def gen_test_data(): from pyspark.sql.session import SparkSession s = SparkSession.getActiveSession() - + data = [] tmp = [("ming", 20, 15552211521), ("hong", 19, 13287994007), @@ -30,10 +34,10 @@ def gen_test_data(): @client_mode_wrap def ray_gc(): ray._private.internal_api.global_gc() - + def test_fail_without_data_ownership_transfer(ray_cluster): """ - Test shutting down Spark worker after data been put + Test shutting down Spark worker after data been put into Ray object store without data ownership transfer. This test should be throw error of data inaccessible after its owner (e.g. Spark JVM process) has terminated, which is expected. @@ -83,7 +87,7 @@ def test_fail_without_data_ownership_transfer(ray_cluster): def test_data_ownership_transfer(ray_cluster): """ - Test shutting down Spark worker after data been put + Test shutting down Spark worker after data been put into Ray object store with data ownership transfer. This test should be able to execute till the end without crash as expected. """ @@ -93,7 +97,7 @@ def test_data_ownership_transfer(ray_cluster): from raydp.spark.dataset import spark_dataframe_to_ray_dataset import numpy as np - + num_executor = 1 spark = raydp.init_spark( @@ -110,7 +114,8 @@ def test_data_ownership_transfer(ray_cluster): # convert data from spark dataframe to ray dataset, # and transfer data ownership to dedicated Object Holder (Singleton) - ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, _use_owner=True) + ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, + owner=get_raydp_master_owner(df_train.sql_ctx.sparkSession)) # display data ds.show(5) @@ -127,11 +132,77 @@ def test_data_ownership_transfer(ray_cluster): # confirm that data is still available from object store! # sanity check the dataset is as functional as normal assert np.isnan(ds.mean('Age')) is not True - + # final clean up raydp.stop_spark() +def test_custom_ownership_transfer_custom_actor(ray_cluster): + """ + Test shutting down Spark worker after data been put + into Ray object store with data ownership transfer to custom user actor. + This test should be able to execute till the end without crash as expected. + """ + + @ray.remote + class CustomActor: + objects: Any + + def wake(self): + pass + + def set_objects(self, objects): + self.objects = objects + + if not ray.worker.global_worker.connected: + pytest.skip("Skip this test if using ray client") + + from raydp.spark.dataset import spark_dataframe_to_ray_dataset + import numpy as np + + num_executor = 1 + + spark = raydp.init_spark( + app_name="example", + num_executors=num_executor, + executor_cores=1, + executor_memory="500M" + ) + + df_train = gen_test_data() + + resource_stats = ray.available_resources() + cpu_cnt = resource_stats['CPU'] + + # create owner + owner_actor_name = 'owner_actor_name' + actor = CustomActor.options(name=owner_actor_name).remote() + # waiting for the actor to be created + ray.get(actor.wake.remote()) + + # convert data from spark dataframe to ray dataset, + # and transfer data ownership to dedicated Object Holder (Singleton) + ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, owner=PartitionObjectsOwner( + owner_actor_name, + lambda actor, objects: actor.set_objects.remote(objects))) + + # display data + ds.show(5) + + # release resource by shutting down spark Java process + raydp.stop_spark() + ray_gc() # ensure GC kicked in + time.sleep(3) + + # confirm that resources has been recycled + resource_stats = ray.available_resources() + assert resource_stats['CPU'] == cpu_cnt + num_executor + + # confirm that data is still available from object store! + # sanity check the dataset is as functional as normal + assert np.isnan(ds.mean('Age')) is not True + + def test_api_compatibility(ray_cluster): """ Test the changes been made are not to break public APIs. @@ -159,13 +230,13 @@ def test_api_compatibility(ray_cluster): # confirm that resources is still being occupied resource_stats = ray.available_resources() assert resource_stats['CPU'] == cpu_cnt - + # final clean up raydp.stop_spark() if __name__ == '__main__': sys.exit(pytest.main(["-v", __file__])) - + # test_api_compatibility() # test_data_ownership_transfer() # test_fail_without_data_ownership_transfer() diff --git a/python/raydp/tf/estimator.py b/python/raydp/tf/estimator.py index fef6acee..88916da2 100644 --- a/python/raydp/tf/estimator.py +++ b/python/raydp/tf/estimator.py @@ -32,7 +32,7 @@ from raydp.estimator import EstimatorInterface from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF from raydp import stop_spark -from raydp.spark import spark_dataframe_to_ray_dataset +from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner class TFEstimator(EstimatorInterface, SparkEstimatorInterface): def __init__(self, @@ -255,12 +255,15 @@ def fit_on_spark(self, evaluate_df.write.parquet(path+"/test", compression=compression) evaluate_ds = read_parquet(path+"/test") else: + owner = None + if stop_spark_after_conversion: + owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession) train_ds = spark_dataframe_to_ray_dataset(train_df, - _use_owner=stop_spark_after_conversion) + owner=owner) if evaluate_df is not None: evaluate_df = self._check_and_convert(evaluate_df) evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df, - _use_owner=stop_spark_after_conversion) + owner=owner) if stop_spark_after_conversion: stop_spark(cleanup_data=False) return self.fit( diff --git a/python/raydp/torch/estimator.py b/python/raydp/torch/estimator.py index 7763cff2..20803e7d 100644 --- a/python/raydp/torch/estimator.py +++ b/python/raydp/torch/estimator.py @@ -25,7 +25,7 @@ from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF from raydp.torch.torch_metrics import TorchMetric from raydp import stop_spark -from raydp.spark import spark_dataframe_to_ray_dataset +from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner from raydp.torch.config import TorchConfig import ray @@ -365,12 +365,15 @@ def fit_on_spark(self, evaluate_df.write.parquet(path+"/test", compression=compression) evaluate_ds = ray.data.read_parquet(path+"/test") else: + owner = None + if stop_spark_after_conversion: + owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession) train_ds = spark_dataframe_to_ray_dataset(train_df, - _use_owner=stop_spark_after_conversion) + owner=owner) if evaluate_df is not None: evaluate_df = self._check_and_convert(evaluate_df) evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df, - _use_owner=stop_spark_after_conversion) + owner=owner) if stop_spark_after_conversion: stop_spark(cleanup_data=False) return self.fit( diff --git a/python/raydp/xgboost/estimator.py b/python/raydp/xgboost/estimator.py index 2d07dc28..df632fcb 100644 --- a/python/raydp/xgboost/estimator.py +++ b/python/raydp/xgboost/estimator.py @@ -20,7 +20,7 @@ from raydp.estimator import EstimatorInterface from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF from raydp import stop_spark -from raydp.spark import spark_dataframe_to_ray_dataset +from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner import ray from ray.air.config import ScalingConfig, RunConfig, FailureConfig @@ -99,14 +99,17 @@ def fit_on_spark(self, evaluate_df.write.parquet(path+"/test", compression=compression) evaluate_ds = ray.data.read_parquet(path+"/test") else: + owner = None + if stop_spark_after_conversion: + owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession) train_ds = spark_dataframe_to_ray_dataset(train_df, - parallelism=self._num_workers, - _use_owner=stop_spark_after_conversion) + parallelism=self._num_workers, + owner=owner) if evaluate_df is not None: evaluate_df = self._check_and_convert(evaluate_df) evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df, - parallelism=self._num_workers, - _use_owner=stop_spark_after_conversion) + parallelism=self._num_workers, + owner=owner) if stop_spark_after_conversion: stop_spark(cleanup_data=False) return self.fit(