From 0466c72097cba1883f702c33fe07fbd7e2998bb8 Mon Sep 17 00:00:00 2001 From: Pang Wu <104795337+pang-wu@users.noreply.github.com> Date: Wed, 25 Dec 2024 17:43:32 -0800 Subject: [PATCH] fix executor custom resource request+support pyarrow >=15 & spark 3.5.2/3/4 (#419) * fix executor custom resource request * support spark 3.5.2/3 * migrate to latest API * test against multiple ray version * unpin pyarrow * use iter_internal_ref_bundles in test, require ray >= 2.33.0 * fix test, init ray cluster properly * worksaorund * worksaorund * work around a bug in spark * Add spark 3.4.4 and 3.5.4 support --- .github/workflows/ray_nightly_test.yml | 13 +- .github/workflows/raydp.yml | 5 +- .gitignore | 3 + .../RayCoarseGrainedSchedulerBackend.scala | 20 +- .../intel/raydp/shims/SparkShimProvider.scala | 3 +- .../intel/raydp/shims/SparkShimProvider.scala | 6 +- python/raydp/spark/__init__.py | 8 - python/raydp/spark/dataset.py | 356 ------------------ python/raydp/tests/conftest.py | 2 + python/raydp/tests/test_spark_cluster.py | 76 +++- python/raydp/tf/estimator.py | 17 +- python/setup.py | 8 +- 12 files changed, 103 insertions(+), 414 deletions(-) diff --git a/.github/workflows/ray_nightly_test.yml b/.github/workflows/ray_nightly_test.yml index e49c8c69..9483276b 100644 --- a/.github/workflows/ray_nightly_test.yml +++ b/.github/workflows/ray_nightly_test.yml @@ -31,7 +31,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - python-version: [3.8, 3.9, 3.10.14] + python-version: [3.9, 3.10.14] spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0] runs-on: ${{ matrix.os }} @@ -82,17 +82,14 @@ jobs: pip install torch fi case $PYTHON_VERSION in - 3.7) - pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" - ;; - 3.8) - pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl" - ;; 3.9) pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl" ;; + 3.10.14) + pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl" + ;; esac - pip install pyarrow==6.0.1 pytest koalas tensorflow==2.13.1 tabulate grpcio-tools wget + pip install pyarrow tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget pip install "xgboost_ray[default]<=0.1.13" pip install torchmetrics HOROVOD_WITH_GLOO=1 diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index d4b8f6a2..5d11ebbf 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -33,8 +33,9 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - python-version: [3.8, 3.9, 3.10.14] + python-version: [3.9, 3.10.14] spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0] + ray-version: [2.34.0, 2.40.0] runs-on: ${{ matrix.os }} @@ -82,7 +83,7 @@ jobs: else pip install torch fi - pip install pyarrow==6.0.1 ray[train] pytest tensorflow==2.13.1 tabulate grpcio-tools wget + pip install pyarrow "ray[train]==${{ matrix.ray-version }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget pip install "xgboost_ray[default]<=0.1.13" pip install "xgboost<=2.0.3" pip install torchmetrics diff --git a/.gitignore b/.gitignore index 4642a059..571df90a 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ examples/.ipynb_checkpoints/ *.parquet *.crc _SUCCESS + +.metals/ +.bloop/ diff --git a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index b5ecb7f9..3c0d0f12 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -201,17 +201,15 @@ class RayCoarseGrainedSchedulerBackend( } def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = { - sparkConf.getAllWithPrefix( - s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.") - .filter{ case (key, _) => key.toLowerCase() != "cpu" } - .map{ case (key, _) => key } - .distinct - .map(name => { - val amountDouble = sparkConf.get( - s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.${name}", - 0d.toString).toDouble - name->amountDouble - }) + sparkConf.getAll + .filter { case (key, _) => + key.startsWith(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX) + } + .map { case (key, value) => + key.stripPrefix( + s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX}.") -> value.toDouble + } + .filter { case (key, _) => key.toLowerCase() != "cpu" } .toMap } diff --git a/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala b/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala index a878c6a3..229263e2 100644 --- a/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala +++ b/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala @@ -24,8 +24,9 @@ object SparkShimProvider { val SPARK341_DESCRIPTOR = SparkShimDescriptor(3, 4, 1) val SPARK342_DESCRIPTOR = SparkShimDescriptor(3, 4, 2) val SPARK343_DESCRIPTOR = SparkShimDescriptor(3, 4, 3) + val SPARK344_DESCRIPTOR = SparkShimDescriptor(3, 4, 4) val DESCRIPTOR_STRINGS = Seq(s"$SPARK340_DESCRIPTOR", s"$SPARK341_DESCRIPTOR", s"$SPARK342_DESCRIPTOR", - s"$SPARK343_DESCRIPTOR") + s"$SPARK343_DESCRIPTOR", s"$SPARK344_DESCRIPTOR") val DESCRIPTOR = SPARK341_DESCRIPTOR } diff --git a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala index 0a2ba58a..72e0aae0 100644 --- a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala +++ b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala @@ -22,7 +22,11 @@ import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor} object SparkShimProvider { val SPARK350_DESCRIPTOR = SparkShimDescriptor(3, 5, 0) val SPARK351_DESCRIPTOR = SparkShimDescriptor(3, 5, 1) - val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR", s"$SPARK351_DESCRIPTOR") + val SPARK352_DESCRIPTOR = SparkShimDescriptor(3, 5, 2) + val SPARK353_DESCRIPTOR = SparkShimDescriptor(3, 5, 3) + val SPARK354_DESCRIPTOR = SparkShimDescriptor(3, 5, 4) + val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR", s"$SPARK351_DESCRIPTOR", s"$SPARK352_DESCRIPTOR", + s"$SPARK353_DESCRIPTOR", s"$SPARK354_DESCRIPTOR") val DESCRIPTOR = SPARK350_DESCRIPTOR } diff --git a/python/raydp/spark/__init__.py b/python/raydp/spark/__init__.py index 48a8a2ee..8ba4a7c5 100644 --- a/python/raydp/spark/__init__.py +++ b/python/raydp/spark/__init__.py @@ -32,11 +32,3 @@ "ray_dataset_to_spark_dataframe", "from_spark_recoverable" ] - -try: - import ray.util.data - from .dataset import RayMLDataset - __all__.append("RayMLDataset") -except ImportError: - # Ray MLDataset is removed in Ray 2.0 - pass diff --git a/python/raydp/spark/dataset.py b/python/raydp/spark/dataset.py index 61ca945b..aadf5e30 100644 --- a/python/raydp/spark/dataset.py +++ b/python/raydp/spark/dataset.py @@ -18,10 +18,8 @@ from typing import Callable, Dict, List, NoReturn, Optional, Iterable, Union from dataclasses import dataclass -import numpy as np import pandas as pd import pyarrow as pa -import pyarrow.parquet as pq import pyspark.sql as sql from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame @@ -31,18 +29,7 @@ import ray from ray.data import Dataset, from_arrow_refs from ray.types import ObjectRef -import ray.util.iter as parallel_it from ray._private.client_mode_hook import client_mode_wrap -try: - import ray.util.data as ml_dataset - from ray.util.data import MLDataset - from ray.util.data.interface import _SourceShard - HAS_MLDATASET = True -except ImportError: - # Ray MLDataset is removed in Ray 2.0 - HAS_MLDATASET = False -from raydp.spark.parallel_iterator_worker import ParallelIteratorWorkerWithLen -from raydp.utils import divide_blocks from raydp.spark.ray_cluster_master import RAYDP_SPARK_MASTER_SUFFIX @@ -98,40 +85,6 @@ def with_row_ids(self, new_row_ids) -> "RayObjectPiece": return RayObjectPiece(self.obj_id, new_row_ids, num_rows) -class ParquetPiece(RecordPiece): - def __init__(self, - piece: pq.ParquetDatasetPiece, - columns: List[str], - partitions: pq.ParquetPartitions, - row_ids: Optional[List[int]], - num_rows: int): - super().__init__(row_ids, num_rows) - self.piece = piece - self.columns = columns - self.partitions = partitions - - def read(self, shuffle: bool) -> pd.DataFrame: - pdf = self.piece.read(columns=self.columns, - use_threads=False, - partitions=self.partitions).to_pandas() - if self.row_ids: - pdf = pdf.loc[self.row_ids] - - if shuffle: - pdf = pdf.sample(frac=1.0) - return pdf - - def with_row_ids(self, new_row_ids) -> "ParquetPiece": - """ - chang the num_rows to the length of new_row_ids. Keep the original size if - the new_row_ids is None. - """ - if new_row_ids: - num_rows = len(new_row_ids) - else: - num_rows = self.num_rows - return ParquetPiece(self.piece, self.columns, self.partitions, new_row_ids, num_rows) - @dataclass class PartitionObjectsOwner: @@ -302,312 +255,3 @@ def ray_dataset_to_spark_dataframe(spark: sql.SparkSession, return _convert_by_udf(spark, blocks, locations, schema) else: raise RuntimeError("ray.to_spark only supports arrow type blocks") - -if HAS_MLDATASET: - class RecordBatch(_SourceShard): - def __init__(self, - shard_id: int, - prefix: str, - record_pieces: List[RecordPiece], - shuffle: bool, - shuffle_seed: int): - self._shard_id = shard_id - self._prefix = prefix - self.record_pieces = record_pieces - self.shuffle = shuffle - self.shuffle_seed = shuffle_seed - - def prefix(self) -> str: - return self._prefix - - @property - def shard_id(self) -> int: - return self._shard_id - - def __iter__(self) -> Iterable[pd.DataFrame]: - if self.shuffle: - np.random.seed(self.shuffle_seed) - np.random.shuffle(self.record_pieces) - - for piece in self.record_pieces: - yield piece.read(self.shuffle) - - def __len__(self): - return sum([len(piece) for piece in self.record_pieces]) - - - class RayRecordBatch(RecordBatch): - def __init__(self, - shard_id: int, - prefix: str, - record_pieces: List[RecordPiece], - shuffle: bool, - shuffle_seed: int): - super().__init__(shard_id, prefix, record_pieces, shuffle, shuffle_seed) - self.resolved: bool = False - - def resolve(self, timeout: Optional[float] = None) -> NoReturn: - """ - This is just fetch object from remote object store to local and without deserialization. - :param timeout: The maximum amount of time in seconds to wait before returning. - """ - if self.resolved: - return - - worker = ray.worker.global_worker - worker.check_connected() - timeout_ms = int(timeout * 1000) if timeout else -1 - object_ids = [record.obj_id for record in self.record_pieces] - worker.core_worker.get_objects(object_ids, worker.current_task_id, timeout_ms) - self.resolved = True - - - def _create_ml_dataset(name: str, - record_pieces: List[RecordPiece], - record_sizes: List[int], - num_shards: int, - shuffle: bool, - shuffle_seed: int, - RecordBatchCls, - node_hints: List[str] = None) -> MLDataset: - if node_hints is not None: - assert num_shards % len(node_hints) == 0,\ - (f"num_shards: {num_shards} should be a multiple" - f" of length of node_hints: {node_hints}") - if shuffle_seed: - np.random.seed(shuffle_seed) - else: - np.random.seed(0) - - # split the piece into num_shards partitions - divided_blocks = divide_blocks(blocks=record_sizes, - world_size=num_shards, - shuffle=shuffle, - shuffle_seed=shuffle_seed) - - record_batches = [] - - for rank, blocks in divided_blocks.items(): - pieces = [] - for index, num_samples in blocks: - record_size = record_sizes[index] - piece = record_pieces[index] - if num_samples != record_size: - assert num_samples < record_size - new_row_ids = np.random.choice( - record_size, size=num_samples).tolist() - piece = piece.with_row_ids(new_row_ids) - pieces.append(piece) - - if shuffle: - np.random.shuffle(pieces) - record_batches.append(RecordBatchCls(shard_id=rank, - prefix=name, - record_pieces=pieces, - shuffle=shuffle, - shuffle_seed=shuffle_seed)) - - worker_cls = ray.remote(ParallelIteratorWorkerWithLen) - if node_hints is not None: - actors = [] - multiplier = num_shards // len(node_hints) - resource_keys = [f"node:{node_hints[i // multiplier]}" for i in range(num_shards)] - for g, resource_key in zip(record_batches, resource_keys): - actor = worker_cls.options(resources={resource_key: 0.01}).remote(g, False, len(g)) - actors.append(actor) - else: - worker_cls = ray.remote(ParallelIteratorWorkerWithLen) - actors = [worker_cls.remote(g, False, len(g)) for g in record_batches] - - it = parallel_it.from_actors(actors, name) - ds = ml_dataset.from_parallel_iter( - it, need_convert=False, batch_size=0, repeated=False) - return ds - - - class RayMLDataset: - @staticmethod - def from_spark(df: sql.DataFrame, - num_shards: int, - shuffle: bool = True, - shuffle_seed: int = None, - fs_directory: Optional[str] = None, - compression: Optional[str] = None, - node_hints: List[str] = None) -> MLDataset: - """ Create a MLDataset from Spark DataFrame - - This method will create a MLDataset from Spark DataFrame. - - :param df: the pyspark.sql.DataFrame - :param num_shards: the number of shards will be created for the MLDataset - :param shuffle: whether need to shuffle the blocks when create the MLDataset - :param shuffle_seed: the shuffle seed, default is 0 - :param fs_directory: an optional distributed file system directory for cache the - DataFrame. We will write the DataFrame to the given directory with parquet - format if this is provided. Otherwise, we will write the DataFrame to ray - object store. - :param compression: the optional compression for write the DataFrame as parquet - file. This is only useful when the fs_directory set. - :param node_hints: the node hints to create MLDataset actors - :return: a MLDataset - """ - df = df.repartition(num_shards) - if fs_directory is None: - # fs_directory has not provided, we save the Spark DataFrame to ray object store - blocks, block_sizes = _save_spark_df_to_object_store(df) - record_pieces = [RayObjectPiece(obj, None, num_rows) - for obj, num_rows in zip(blocks, block_sizes)] - - return _create_ml_dataset("from_spark", record_pieces, block_sizes, num_shards, - shuffle, shuffle_seed, RayRecordBatch, - node_hints) - else: - # fs_directory has provided, we write the Spark DataFrame as Parquet files - df.write.parquet(fs_directory, compression=compression) - # create the MLDataset from the parquet file - ds = RayMLDataset.from_parquet( - fs_directory, num_shards, shuffle, shuffle_seed, node_hints) - return ds - - @staticmethod - def from_parquet(paths: Union[str, List[str]], - num_shards: int, - shuffle: bool = True, - shuffle_seed: int = None, - columns: Optional[List[str]] = None, - node_hints: List[str] = None, - extra_parquet_arguments: Dict = None) -> MLDataset: - """ Create a MLDataset from Parquet files. - - :param paths: the parquet files path - :param num_shards: the number of shards will be created for the MLDataset - :param shuffle: whether need to shuffle the blocks when create the MLDataset - :param shuffle_seed: the shuffle seed, default is 0 - :param columns: the columns that need to read - :param node_hints: the node hints to create MLDataset actors - :param extra_parquet_arguments: the extra arguments need to pass into the parquet file - reading - :return: a MLDataset - """ - if not extra_parquet_arguments: - extra_parquet_arguments = {} - ds = pq.ParquetDataset(paths, **extra_parquet_arguments) - pieces = ds.pieces - record_pieces = [] - record_sizes = [] - - for piece in pieces: - meta_data = piece.get_metadata().to_dict() - num_row_groups = meta_data["num_row_groups"] - row_groups = meta_data["row_groups"] - for i in range(num_row_groups): - num_rows = row_groups[i]["num_rows"] - parquet_ds_piece = pq.ParquetDatasetPiece(piece.path, piece.open_file_func, - piece.file_options, i, - piece.partition_keys) - # row_ids will be set later - record_pieces.append(ParquetPiece(piece=parquet_ds_piece, - columns=columns, - partitions=ds.partitions, - row_ids=None, - num_rows=num_rows)) - record_sizes.append(num_rows) - - return _create_ml_dataset("from_parquet", record_pieces, record_sizes, num_shards, - shuffle, shuffle_seed, RecordBatch, node_hints) - - @staticmethod - def to_torch( - ds: MLDataset, - world_size: int, - world_rank: int, - batch_size: int, - collate_fn: Callable, - shuffle: bool = False, - shuffle_seed: int = None, - local_rank: int = -1, - prefer_node: str = None, - prefetch: bool = False): - """ - Create DataLoader from a MLDataset - :param ds: the MLDataset - :param world_size: the world_size of distributed model training - :param world_rank: create the DataLoader for the given world_rank - :param batch_size: the batch_size of the DtaLoader - :param collate_fn: the collate_fn that create tensors from a pandas DataFrame - :param shuffle: whether shuffle each batch of data - :param shuffle_seed: the shuffle seed - :param local_rank: the node local rank. It must be provided if prefer_node is - not None. - :param prefer_node: the prefer node for create the MLDataset actor - :param prefetch: prefetch the data of DataLoader with one thread - :return: a pytorch DataLoader - """ - # pylint: disable=C0415 - import torch - from raydp.torch.torch_ml_dataset import PrefetchedDataLoader, TorchMLDataset - - num_shards = ds.num_shards() - assert num_shards % world_size == 0, \ - (f"The number shards of MLDataset({ds}) should be a multiple of " - f"world_size({world_size})") - multiplier = num_shards // world_size - - selected_ds = None - if prefer_node is not None: - assert 0 <= local_rank < world_size - - # get all actors - # there should be only one actor_set because of select_shards() is not allowed - # after union() - - def location_check(actor): - address = ray.actors(actor._actor_id.hex())["Address"]["IPAddress"] - return address == prefer_node - - actors = ds.actor_sets[0].actors - actor_indexes = [i for i, actor in enumerate(actors) if location_check(actor)] - if len(actor_indexes) % multiplier != 0: - selected_ds = None - logger.warning(f"We could not find enough shard actor in prefer " - f"node({prefer_node}), fail back to normal select_shards(). " - f"Found: ({actor_indexes}) which length is not multiple of " - f"num_shards({num_shards}) // world_size({world_size}).") - else: - shard_ids = actor_indexes[local_rank: local_rank + multiplier] - selected_ds = ds.select_shards(shard_ids) - - if selected_ds is None: - shard_ids = [] - i = world_rank - step = world_size - while i < num_shards: - shard_ids.append(i) - i += step - selected_ds = ds.select_shards(shard_ids) - - selected_ds = selected_ds.batch(batch_size) - torch_ds = TorchMLDataset(selected_ds, collate_fn, shuffle, shuffle_seed) - data_loader = torch.utils.data.DataLoader(dataset=torch_ds, - batch_size=None, - batch_sampler=None, - shuffle=False, - num_workers=0, - collate_fn=None, - pin_memory=False, - drop_last=False, - sampler=None) - if prefetch: - data_loader = PrefetchedDataLoader(data_loader) - return data_loader - - - def create_ml_dataset_from_spark(df: sql.DataFrame, - num_shards: int, - shuffle: bool, - shuffle_seed: int, - fs_directory: Optional[str] = None, - compression: Optional[str] = None, - node_hints: List[str] = None) -> MLDataset: - return RayMLDataset.from_spark( - df, num_shards, shuffle, shuffle_seed, fs_directory, compression, node_hints) diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index c54fe587..50f7d8dd 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -66,6 +66,7 @@ def spark_on_ray_small(request): }) def stop_all(): + spark.stop() raydp.stop_spark() time.sleep(5) ray.shutdown() @@ -88,6 +89,7 @@ def spark_on_ray_2_executors(request): }) def stop_all(): + spark.stop() raydp.stop_spark() time.sleep(5) ray.shutdown() diff --git a/python/raydp/tests/test_spark_cluster.py b/python/raydp/tests/test_spark_cluster.py index 3e793aeb..f1fae5b7 100644 --- a/python/raydp/tests/test_spark_cluster.py +++ b/python/raydp/tests/test_spark_cluster.py @@ -42,11 +42,11 @@ def test_spark(spark_on_ray_small): def test_legacy_spark_on_fractional_cpu(): cluster = Cluster( initialize_head=True, - connect=True, head_node_args={ "num_cpus": 2 }) + ray.init(address=cluster.address, include_dashboard=False) spark = raydp.init_spark(app_name="test_cpu_fraction", num_executors=1, executor_cores=3, executor_memory="500M", configs={"spark.ray.actor.resource.cpu": "0.1"}) @@ -60,14 +60,14 @@ def test_legacy_spark_on_fractional_cpu(): cluster.shutdown() -def test_spark_on_fractional_cpu(): +def test_spark_executor_on_fractional_cpu(): cluster = Cluster( initialize_head=True, - connect=True, head_node_args={ "num_cpus": 2 }) + ray.init(address=cluster.address, include_dashboard=False) spark = raydp.init_spark(app_name="test_cpu_fraction", num_executors=1, executor_cores=3, executor_memory="500M", configs={"spark.ray.raydp_spark_executor.actor.resource.cpu": "0.1"}) @@ -84,18 +84,19 @@ def test_spark_on_fractional_cpu(): def test_spark_executor_node_affinity(): cluster = Cluster( initialize_head=True, - connect=True, head_node_args={ "num_cpus": 1, }) cluster.add_node(num_cpus=2, resources={"spark_executor": 10}) + ray.init(address=cluster.address, include_dashboard=False) spark = raydp.init_spark(app_name="test_executor_node_affinity", num_executors=1, executor_cores=2, executor_memory="500M", configs={"spark.ray.raydp_spark_executor.actor.resource.spark_executor": "1"}) result = spark.range(0, 10).count() assert result == 10 + spark.stop() raydp.stop_spark() time.sleep(5) ray.shutdown() @@ -137,22 +138,49 @@ def test_spark_driver_and_executor_hostname(spark_on_ray_small): assert node_ip_address == driver_bind_address -def test_ray_dataset_roundtrip(spark_on_ray_2_executors): +def test_ray_dataset_roundtrip(): + cluster = Cluster( + initialize_head=True, + head_node_args={ + "num_cpus": 6, + } + ) + ray.init(address=cluster.address, include_dashboard=False) + + configs = { + # This looks like a bug in Spark, where RayCoarseGrainedSchedulerBackend + # always get the same sparkContext between tests. + # So we need to re-set the resource explicitly here. + "spark.ray.raydp_spark_executor.actor.resource.spark_executor": "0" + } + spark = raydp.init_spark(app_name="test_ray_dataset_roundtrip", num_executors=2, + executor_cores=1, executor_memory="500M", + configs=configs) + # skipping this to be compatible with ray 2.4.0 # see issue #343 if not ray.worker.global_worker.connected: pytest.skip("Skip this test if using ray client") - spark = spark_on_ray_2_executors spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["one", "two"]) rows = [(r.one, r.two) for r in spark_df.take(3)] ds = ray.data.from_spark(spark_df) values = [(r["one"], r["two"]) for r in ds.take(6)] assert values == rows + block_refs = [] + for ref_bundle in ds.iter_internal_ref_bundles(): + for block_ref, block_md in ref_bundle.blocks: + block_refs.append(block_ref) df = raydp.spark.dataset. \ - ray_dataset_to_spark_dataframe(spark, ds.schema(), ds.get_internal_block_refs()) + ray_dataset_to_spark_dataframe(spark, ds.schema(), block_refs) rows_2 = [(r.one, r.two) for r in df.take(3)] assert values == rows_2 + spark.stop() + raydp.stop_spark() + time.sleep(5) + ray.shutdown() + cluster.shutdown() + def test_ray_dataset_to_spark(spark_on_ray_2_executors): # skipping this to be compatible with ray 2.4.0 @@ -164,14 +192,22 @@ def test_ray_dataset_to_spark(spark_on_ray_2_executors): data = {"value": list(range(n))} ds = ray.data.from_arrow(pyarrow.Table.from_pydict(data)) values = [r["value"] for r in ds.take(n)] + block_refs = [] + for ref_bundle in ds.iter_internal_ref_bundles(): + for block_ref, block_md in ref_bundle.blocks: + block_refs.append(block_ref) df = raydp.spark.dataset. \ - ray_dataset_to_spark_dataframe(spark, ds.schema(), ds.get_internal_block_refs()) + ray_dataset_to_spark_dataframe(spark, ds.schema(), block_refs) rows = [r.value for r in df.take(n)] assert values == rows ds2 = ray.data.from_items([{"id": i} for i in range(n)]) ids = [r["id"] for r in ds2.take(n)] + block_refs2 = [] + for ref_bundle in ds2.iter_internal_ref_bundles(): + for block_ref, block_md in ref_bundle.blocks: + block_refs2.append(block_ref) df2 = raydp.spark.dataset. \ - ray_dataset_to_spark_dataframe(spark, ds2.schema(), ds2.get_internal_block_refs()) + ray_dataset_to_spark_dataframe(spark, ds2.schema(), block_refs2) rows2 = [r.id for r in df2.take(n)] assert ids == rows2 @@ -217,19 +253,20 @@ def test_placement_group(ray_cluster): def test_reconstruction(): - cluster = ray.cluster_utils.Cluster() - # Head node has 2 cores for necessray actors - head = cluster.add_node( - num_cpus=2, - include_dashboard=False, - enable_object_reconstruction=True + cluster = Cluster( + initialize_head=True, + head_node_args={ + "num_cpus": 2, + "enable_object_reconstruction": True + } ) + ray.init(address=cluster.address, include_dashboard=False) # init_spark before adding nodes to ensure drivers connect to the head node spark = raydp.init_spark('a', 2, 1, '500m', fault_tolerant_mode=True) # Add two nodes, 1 executor each - node_to_kill = cluster.add_node(num_cpus=1, include_dashboard=False, object_store_memory=10 ** 8) - second_node = cluster.add_node(num_cpus=1, include_dashboard=False, object_store_memory=10 ** 8) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + second_node = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) # wait for executors to start time.sleep(5) # df should be large enough so that result will be put into plasma @@ -242,8 +279,9 @@ def test_reconstruction(): num_cpus=1, object_store_memory=10 ** 8 ) # verify that block is recovered - for block in ds.get_internal_block_refs(): - ray.get(block) + for ref_bundle in ds.iter_internal_ref_bundles(): + for block_ref, block_md in ref_bundle.blocks: + ray.get(block_ref) raydp.stop_spark() ray.shutdown() cluster.shutdown() diff --git a/python/raydp/tf/estimator.py b/python/raydp/tf/estimator.py index d185704a..9fa21f1d 100644 --- a/python/raydp/tf/estimator.py +++ b/python/raydp/tf/estimator.py @@ -15,8 +15,7 @@ # limitations under the License. # -import json -import os +from packaging import version import tempfile from typing import Any, List, NoReturn, Optional, Union, Dict @@ -25,6 +24,7 @@ from tensorflow import DType, TensorShape from tensorflow.keras.callbacks import Callback +import ray from ray.train import Checkpoint from ray.train.tensorflow import TensorflowTrainer, TensorflowCheckpoint, prepare_dataset_shard from ray.air import session @@ -226,8 +226,17 @@ def fit(self, label_cols = self._label_columns if not isinstance(label_cols, list): label_cols = [label_cols] - preprocessor = Concatenator(output_column_name="features", - exclude=label_cols) + # pylint: disable=E1123,E1120 + if version.parse(ray.__version__) >= version.parse("2.39.0"): + preprocessor = Concatenator( + columns=[col for col in self._feature_columns if col not in label_cols], + output_column_name="features", + ) + else: + preprocessor = Concatenator( + exclude=label_cols, + output_column_name="features", + ) train_loop_config["feature_columns"] = "features" train_ds = preprocessor.transform(train_ds) if evaluate_ds is not None: diff --git a/python/setup.py b/python/setup.py index 72a62b96..a48f5869 100644 --- a/python/setup.py +++ b/python/setup.py @@ -95,12 +95,12 @@ def run(self): copy2(SCRIPT_PATH, SCRIPT_TARGET) install_requires = [ - "numpy < 2.0.0", + "numpy", "pandas >= 1.1.4", "psutil", - "pyarrow >= 4.0.1, <15.0.0", - "ray >= 2.1.0, <= 2.38.0", - "pyspark >= 3.1.1, <=3.5.1", + "pyarrow >= 4.0.1", + "ray >= 2.1.0", + "pyspark >= 3.1.1, <=3.5.4", "netifaces", "protobuf > 3.19.5, <= 3.20.3" ]